diff --git a/backend/internal/chat/hub.go b/backend/internal/chat/hub.go index 5473a4a..23d5737 100644 --- a/backend/internal/chat/hub.go +++ b/backend/internal/chat/hub.go @@ -66,20 +66,25 @@ func (h *Hub) Run() { } h.rooms[client.RoomID][client] = true - // Send existing history to the newly joined client + // Copy existing history to send outside the lock + var historyCopy []Message if history, ok := h.roomsHistory[client.RoomID]; ok { - for _, msg := range history { - msg.IsHistory = true - msgBytes, _ := json.Marshal(msg) - // Use select to avoid blocking if client's send channel is full + historyCopy = make([]Message, len(history)) + copy(historyCopy, history) + } + h.mutex.Unlock() + + // Send history outside the lock + for _, msg := range historyCopy { + msg.IsHistory = true + msgBytes, err := json.Marshal(msg) + if err == nil { select { case client.Send <- msgBytes: default: - // If send fails, we could potentially log or ignore } } } - h.mutex.Unlock() case client := <-h.unregister: h.mutex.Lock() @@ -99,6 +104,12 @@ func (h *Hub) Run() { h.mutex.Unlock() case message := <-h.broadcast: + // Marshal message outside the lock to optimize performance + msgBytes, err := json.Marshal(message) + if err != nil { + continue + } + h.mutex.Lock() // Only store "chat" and "danmaku" messages in history if message.Type == "chat" || message.Type == "danmaku" { @@ -111,7 +122,6 @@ func (h *Hub) Run() { clients := h.rooms[message.RoomID] if clients != nil { - msgBytes, _ := json.Marshal(message) for client := range clients { select { case client.Send <- msgBytes: diff --git a/backend/internal/db/db.go b/backend/internal/db/db.go index abb1bc7..cde01cc 100644 --- a/backend/internal/db/db.go +++ b/backend/internal/db/db.go @@ -29,14 +29,23 @@ func InitDB() { ) var err error - // Use SQLite database stored in a local file named "hightube.db" - DB, err = gorm.Open(sqlite.Open("hightube.db"), &gorm.Config{ + // Use SQLite database stored in a local file named "hightube.db" with WAL mode and busy timeout enabled + DB, err = gorm.Open(sqlite.Open("hightube.db?_journal_mode=WAL&_busy_timeout=5000"), &gorm.Config{ Logger: newLogger, }) if err != nil { log.Fatalf("Failed to connect database: %v", err) } + // Configure connection pool settings + sqlDB, err := DB.DB() + if err != nil { + log.Fatalf("Failed to get database instance: %v", err) + } + sqlDB.SetMaxOpenConns(10) + sqlDB.SetMaxIdleConns(5) + sqlDB.SetConnMaxLifetime(time.Hour) + // Auto-migrate the schema err = DB.AutoMigrate(&model.User{}, &model.Room{}) if err != nil { @@ -48,7 +57,7 @@ func InitDB() { ensureAdminUser() - monitor.Infof("Database initialized successfully") + monitor.Infof("Database initialized successfully with WAL mode and connection pooling") } func ensureAdminUser() { diff --git a/backend/internal/monitor/metrics.go b/backend/internal/monitor/metrics.go index 4e12d51..5a252db 100644 --- a/backend/internal/monitor/metrics.go +++ b/backend/internal/monitor/metrics.go @@ -2,6 +2,7 @@ package monitor import ( "runtime" + "sync" "sync/atomic" "time" ) @@ -23,21 +24,32 @@ type Snapshot struct { ErrorsTotal uint64 `json:"errors_total"` } -func IncrementRequestCount() { - atomic.AddUint64(&totalRequests, 1) +var ( + cachedSnapshot Snapshot + snapshotMutex sync.RWMutex +) + +func init() { + // Initialize the snapshot once on startup + updateSnapshot() + + // Update the snapshot in the background every 2 seconds to avoid STW runtime.ReadMemStats in request threads + go func() { + ticker := time.NewTicker(2 * time.Second) + for range ticker.C { + updateSnapshot() + } + }() } -func IncrementErrorCount() { - atomic.AddUint64(&totalErrors, 1) -} - -func GetSnapshot() Snapshot { +func updateSnapshot() { var mem runtime.MemStats runtime.ReadMemStats(&mem) diskTotal, diskFree := getDiskSpaceGB() - return Snapshot{ + snapshotMutex.Lock() + cachedSnapshot = Snapshot{ UptimeSeconds: int64(time.Since(startedAt).Seconds()), Goroutines: runtime.NumGoroutine(), MemoryAllocMB: bytesToMB(mem.Alloc), @@ -48,6 +60,28 @@ func GetSnapshot() Snapshot { RequestsTotal: atomic.LoadUint64(&totalRequests), ErrorsTotal: atomic.LoadUint64(&totalErrors), } + snapshotMutex.Unlock() +} + +func IncrementRequestCount() { + atomic.AddUint64(&totalRequests, 1) +} + +func IncrementErrorCount() { + atomic.AddUint64(&totalErrors, 1) +} + +func GetSnapshot() Snapshot { + snapshotMutex.RLock() + defer snapshotMutex.RUnlock() + + // Return the cached snapshot, overlaying volatile/cheap fields in real-time + s := cachedSnapshot + s.UptimeSeconds = int64(time.Since(startedAt).Seconds()) + s.Goroutines = runtime.NumGoroutine() + s.RequestsTotal = atomic.LoadUint64(&totalRequests) + s.ErrorsTotal = atomic.LoadUint64(&totalErrors) + return s } func bytesToMB(v uint64) float64 { diff --git a/backend/internal/stream/server.go b/backend/internal/stream/server.go index c4c03c1..089eb99 100644 --- a/backend/internal/stream/server.go +++ b/backend/internal/stream/server.go @@ -1,6 +1,7 @@ package stream import ( + "bufio" "context" "crypto/rand" "encoding/hex" @@ -15,6 +16,7 @@ import ( "time" "github.com/gin-gonic/gin" + "github.com/nareix/joy4/av" "github.com/nareix/joy4/av/avutil" "github.com/nareix/joy4/av/pubsub" "github.com/nareix/joy4/format" @@ -82,6 +84,23 @@ func (w writeFlusher) Flush() error { return nil } +type bufferedWriteFlusher struct { + bufw *bufio.Writer + httpFlusher http.Flusher +} + +func (w *bufferedWriteFlusher) Write(p []byte) (n int, err error) { + return w.bufw.Write(p) +} + +func (w *bufferedWriteFlusher) Flush() error { + if err := w.bufw.Flush(); err != nil { + return err + } + w.httpFlusher.Flush() + return nil +} + // NewRTMPServer creates and initializes a new media server func NewRTMPServer() *RTMPServer { s := &RTMPServer{ @@ -235,13 +254,47 @@ func (s *RTMPServer) HandleHTTPFLV(c *gin.Context) { c.Status(http.StatusOK) flusher.Flush() - muxer := flv.NewMuxerWriteFlusher(writeFlusher{ + // Coalesce the 3 internal write calls of WriteTag using a 4KB bufio.Writer + bufWriter := bufio.NewWriterSize(c.Writer, 4096) + bwf := &bufferedWriteFlusher{ + bufw: bufWriter, httpFlusher: flusher, - Writer: c.Writer, - }) + } + + muxer := flv.NewMuxerWriteFlusher(bwf) cursor := q.Latest() - if err := avutil.CopyFile(muxer, cursor); err != nil && err != io.EOF { + // Write header first + streams, err := cursor.Streams() + if err != nil { + monitor.Errorf("HTTP-FLV failed to get cursor streams: %v", err) + return + } + if err = muxer.WriteHeader(streams); err != nil { + monitor.Errorf("HTTP-FLV failed to write header: %v", err) + return + } + if err = bwf.Flush(); err != nil { + return + } + + // Read and write packet loop with per-packet flushing for low latency + for { + var pkt av.Packet + pkt, err = cursor.ReadPacket() + if err != nil { + break + } + if err = muxer.WritePacket(pkt); err != nil { + break + } + // Flush immediately so the frame is sent to the client (grouped write syscall) + if err = bwf.Flush(); err != nil { + break + } + } + + if err != nil && err != io.EOF { errStr := err.Error() if strings.Contains(errStr, "broken pipe") || strings.Contains(errStr, "connection reset by peer") { monitor.Infof("HTTP-FLV viewer disconnected: %s", streamPath) @@ -299,10 +352,11 @@ func (s *RTMPServer) startVariantTranscoders(roomID string) { "ffmpeg", "-nostdin", "-loglevel", "error", + "-fflags", "nobuffer", "-i", inputURL, "-vf", "scale="+profile.scale+":force_original_aspect_ratio=decrease", "-c:v", "libx264", - "-preset", "veryfast", + "-preset", "ultrafast", "-tune", "zerolatency", "-g", "48", "-keyint_min", "48", diff --git a/frontend/web/flv_player.html b/frontend/web/flv_player.html index 9f31335..25eba12 100644 --- a/frontend/web/flv_player.html +++ b/frontend/web/flv_player.html @@ -81,6 +81,7 @@ isLive: true, }, { enableWorker: false, + enableStashBuffer: false, stashInitialSize: 128, }); @@ -88,6 +89,24 @@ player.load(); player.play().catch(() => {}); + // Live latency auto-catchup logic + video.addEventListener('timeupdate', function() { + if (video.buffered.length > 0) { + const end = video.buffered.end(video.buffered.length - 1); + const diff = end - video.currentTime; + if (diff > 5) { + // If way behind, jump directly close to the live edge + video.currentTime = end - 1.0; + } else if (diff > 1.5) { + // Speed up slightly to catch up + video.playbackRate = 1.15; + } else { + // Reset to normal speed + video.playbackRate = 1.0; + } + } + }); + player.on(flvjs.Events.ERROR, function(errorType, detail, info) { const parts = ['Live stream failed to load.']; if (errorType) parts.push('type=' + errorType);