perf: 优化后端性能与直播延迟,包含数据库WAL模式、流媒体写缓冲、转码 Preset 优化、聊天锁和指标采集优化,以及前端自动追帧功能

This commit is contained in:
2026-06-15 15:08:07 +08:00
parent 261b1ab169
commit e0a6923984
5 changed files with 150 additions and 24 deletions

View File

@@ -66,20 +66,25 @@ func (h *Hub) Run() {
}
h.rooms[client.RoomID][client] = true
// Send existing history to the newly joined client
// Copy existing history to send outside the lock
var historyCopy []Message
if history, ok := h.roomsHistory[client.RoomID]; ok {
for _, msg := range history {
msg.IsHistory = true
msgBytes, _ := json.Marshal(msg)
// Use select to avoid blocking if client's send channel is full
historyCopy = make([]Message, len(history))
copy(historyCopy, history)
}
h.mutex.Unlock()
// Send history outside the lock
for _, msg := range historyCopy {
msg.IsHistory = true
msgBytes, err := json.Marshal(msg)
if err == nil {
select {
case client.Send <- msgBytes:
default:
// If send fails, we could potentially log or ignore
}
}
}
h.mutex.Unlock()
case client := <-h.unregister:
h.mutex.Lock()
@@ -99,6 +104,12 @@ func (h *Hub) Run() {
h.mutex.Unlock()
case message := <-h.broadcast:
// Marshal message outside the lock to optimize performance
msgBytes, err := json.Marshal(message)
if err != nil {
continue
}
h.mutex.Lock()
// Only store "chat" and "danmaku" messages in history
if message.Type == "chat" || message.Type == "danmaku" {
@@ -111,7 +122,6 @@ func (h *Hub) Run() {
clients := h.rooms[message.RoomID]
if clients != nil {
msgBytes, _ := json.Marshal(message)
for client := range clients {
select {
case client.Send <- msgBytes:

View File

@@ -29,14 +29,23 @@ func InitDB() {
)
var err error
// Use SQLite database stored in a local file named "hightube.db"
DB, err = gorm.Open(sqlite.Open("hightube.db"), &gorm.Config{
// Use SQLite database stored in a local file named "hightube.db" with WAL mode and busy timeout enabled
DB, err = gorm.Open(sqlite.Open("hightube.db?_journal_mode=WAL&_busy_timeout=5000"), &gorm.Config{
Logger: newLogger,
})
if err != nil {
log.Fatalf("Failed to connect database: %v", err)
}
// Configure connection pool settings
sqlDB, err := DB.DB()
if err != nil {
log.Fatalf("Failed to get database instance: %v", err)
}
sqlDB.SetMaxOpenConns(10)
sqlDB.SetMaxIdleConns(5)
sqlDB.SetConnMaxLifetime(time.Hour)
// Auto-migrate the schema
err = DB.AutoMigrate(&model.User{}, &model.Room{})
if err != nil {
@@ -48,7 +57,7 @@ func InitDB() {
ensureAdminUser()
monitor.Infof("Database initialized successfully")
monitor.Infof("Database initialized successfully with WAL mode and connection pooling")
}
func ensureAdminUser() {

View File

@@ -2,6 +2,7 @@ package monitor
import (
"runtime"
"sync"
"sync/atomic"
"time"
)
@@ -23,21 +24,32 @@ type Snapshot struct {
ErrorsTotal uint64 `json:"errors_total"`
}
func IncrementRequestCount() {
atomic.AddUint64(&totalRequests, 1)
var (
cachedSnapshot Snapshot
snapshotMutex sync.RWMutex
)
func init() {
// Initialize the snapshot once on startup
updateSnapshot()
// Update the snapshot in the background every 2 seconds to avoid STW runtime.ReadMemStats in request threads
go func() {
ticker := time.NewTicker(2 * time.Second)
for range ticker.C {
updateSnapshot()
}
}()
}
func IncrementErrorCount() {
atomic.AddUint64(&totalErrors, 1)
}
func GetSnapshot() Snapshot {
func updateSnapshot() {
var mem runtime.MemStats
runtime.ReadMemStats(&mem)
diskTotal, diskFree := getDiskSpaceGB()
return Snapshot{
snapshotMutex.Lock()
cachedSnapshot = Snapshot{
UptimeSeconds: int64(time.Since(startedAt).Seconds()),
Goroutines: runtime.NumGoroutine(),
MemoryAllocMB: bytesToMB(mem.Alloc),
@@ -48,6 +60,28 @@ func GetSnapshot() Snapshot {
RequestsTotal: atomic.LoadUint64(&totalRequests),
ErrorsTotal: atomic.LoadUint64(&totalErrors),
}
snapshotMutex.Unlock()
}
func IncrementRequestCount() {
atomic.AddUint64(&totalRequests, 1)
}
func IncrementErrorCount() {
atomic.AddUint64(&totalErrors, 1)
}
func GetSnapshot() Snapshot {
snapshotMutex.RLock()
defer snapshotMutex.RUnlock()
// Return the cached snapshot, overlaying volatile/cheap fields in real-time
s := cachedSnapshot
s.UptimeSeconds = int64(time.Since(startedAt).Seconds())
s.Goroutines = runtime.NumGoroutine()
s.RequestsTotal = atomic.LoadUint64(&totalRequests)
s.ErrorsTotal = atomic.LoadUint64(&totalErrors)
return s
}
func bytesToMB(v uint64) float64 {

View File

@@ -1,6 +1,7 @@
package stream
import (
"bufio"
"context"
"crypto/rand"
"encoding/hex"
@@ -15,6 +16,7 @@ import (
"time"
"github.com/gin-gonic/gin"
"github.com/nareix/joy4/av"
"github.com/nareix/joy4/av/avutil"
"github.com/nareix/joy4/av/pubsub"
"github.com/nareix/joy4/format"
@@ -82,6 +84,23 @@ func (w writeFlusher) Flush() error {
return nil
}
type bufferedWriteFlusher struct {
bufw *bufio.Writer
httpFlusher http.Flusher
}
func (w *bufferedWriteFlusher) Write(p []byte) (n int, err error) {
return w.bufw.Write(p)
}
func (w *bufferedWriteFlusher) Flush() error {
if err := w.bufw.Flush(); err != nil {
return err
}
w.httpFlusher.Flush()
return nil
}
// NewRTMPServer creates and initializes a new media server
func NewRTMPServer() *RTMPServer {
s := &RTMPServer{
@@ -235,13 +254,47 @@ func (s *RTMPServer) HandleHTTPFLV(c *gin.Context) {
c.Status(http.StatusOK)
flusher.Flush()
muxer := flv.NewMuxerWriteFlusher(writeFlusher{
// Coalesce the 3 internal write calls of WriteTag using a 4KB bufio.Writer
bufWriter := bufio.NewWriterSize(c.Writer, 4096)
bwf := &bufferedWriteFlusher{
bufw: bufWriter,
httpFlusher: flusher,
Writer: c.Writer,
})
}
muxer := flv.NewMuxerWriteFlusher(bwf)
cursor := q.Latest()
if err := avutil.CopyFile(muxer, cursor); err != nil && err != io.EOF {
// Write header first
streams, err := cursor.Streams()
if err != nil {
monitor.Errorf("HTTP-FLV failed to get cursor streams: %v", err)
return
}
if err = muxer.WriteHeader(streams); err != nil {
monitor.Errorf("HTTP-FLV failed to write header: %v", err)
return
}
if err = bwf.Flush(); err != nil {
return
}
// Read and write packet loop with per-packet flushing for low latency
for {
var pkt av.Packet
pkt, err = cursor.ReadPacket()
if err != nil {
break
}
if err = muxer.WritePacket(pkt); err != nil {
break
}
// Flush immediately so the frame is sent to the client (grouped write syscall)
if err = bwf.Flush(); err != nil {
break
}
}
if err != nil && err != io.EOF {
errStr := err.Error()
if strings.Contains(errStr, "broken pipe") || strings.Contains(errStr, "connection reset by peer") {
monitor.Infof("HTTP-FLV viewer disconnected: %s", streamPath)
@@ -299,10 +352,11 @@ func (s *RTMPServer) startVariantTranscoders(roomID string) {
"ffmpeg",
"-nostdin",
"-loglevel", "error",
"-fflags", "nobuffer",
"-i", inputURL,
"-vf", "scale="+profile.scale+":force_original_aspect_ratio=decrease",
"-c:v", "libx264",
"-preset", "veryfast",
"-preset", "ultrafast",
"-tune", "zerolatency",
"-g", "48",
"-keyint_min", "48",

View File

@@ -81,6 +81,7 @@
isLive: true,
}, {
enableWorker: false,
enableStashBuffer: false,
stashInitialSize: 128,
});
@@ -88,6 +89,24 @@
player.load();
player.play().catch(() => {});
// Live latency auto-catchup logic
video.addEventListener('timeupdate', function() {
if (video.buffered.length > 0) {
const end = video.buffered.end(video.buffered.length - 1);
const diff = end - video.currentTime;
if (diff > 5) {
// If way behind, jump directly close to the live edge
video.currentTime = end - 1.0;
} else if (diff > 1.5) {
// Speed up slightly to catch up
video.playbackRate = 1.15;
} else {
// Reset to normal speed
video.playbackRate = 1.0;
}
}
});
player.on(flvjs.Events.ERROR, function(errorType, detail, info) {
const parts = ['Live stream failed to load.'];
if (errorType) parts.push('type=' + errorType);