diff --git a/backend/cmd/server/main.go b/backend/cmd/server/main.go index fc92a2c..b822a5c 100644 --- a/backend/cmd/server/main.go +++ b/backend/cmd/server/main.go @@ -1,6 +1,9 @@ package main import ( + "net/http" + "time" + "hightube/internal/api" "hightube/internal/chat" "hightube/internal/db" @@ -23,9 +26,16 @@ func main() { // Start the API server in a goroutine so it doesn't block the RTMP server go func() { r := api.SetupRouter(srv) + httpServer := &http.Server{ + Addr: ":8080", + Handler: r, + ReadHeaderTimeout: 5 * time.Second, + IdleTimeout: 60 * time.Second, + MaxHeaderBytes: 1 << 20, + } monitor.Infof("API server listening on :8080") monitor.Infof("Web console listening on :8080/admin") - if err := r.Run(":8080"); err != nil { + if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { monitor.Errorf("Failed to start API server: %v", err) } }() diff --git a/backend/internal/api/admin.go b/backend/internal/api/admin.go index 9cb716a..0e92822 100644 --- a/backend/internal/api/admin.go +++ b/backend/internal/api/admin.go @@ -29,8 +29,8 @@ func AdminLogin(c *gin.Context) { return } - var user model.User - if err := db.DB.Where("username = ?", strings.TrimSpace(req.Username)).First(&user).Error; err != nil { + user, err := db.LoadUserByUsername(strings.TrimSpace(req.Username)) + if err != nil { c.JSON(http.StatusUnauthorized, gin.H{"error": "Invalid username or password"}) return } @@ -191,7 +191,7 @@ func UpdateUserRole(c *gin.Context) { return } - if err := db.DB.Model(&model.User{}).Where("id = ?", userID).Update("role", role).Error; err != nil { + if err := db.UpdateUserRole(userID, role); err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update role"}) return } @@ -218,7 +218,7 @@ func UpdateUserEnabled(c *gin.Context) { return } - if err := db.DB.Model(&model.User{}).Where("id = ?", userID).Update("enabled", req.Enabled).Error; err != nil { + if err := db.UpdateUserEnabled(userID, req.Enabled); err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update enabled status"}) return } @@ -251,7 +251,7 @@ func ResetUserPassword(c *gin.Context) { return } - if err := db.DB.Model(&model.User{}).Where("id = ?", userID).Update("password", hash).Error; err != nil { + if err := db.UpdateUserPassword(userID, hash); err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to reset password"}) return } @@ -274,11 +274,7 @@ func DeleteUser(c *gin.Context) { return } - if err := db.DB.Where("user_id = ?", userID).Delete(&model.Room{}).Error; err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to delete rooms"}) - return - } - if err := db.DB.Delete(&model.User{}, userID).Error; err != nil { + if err := db.DeleteUserCascade(userID); err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to delete user"}) return } diff --git a/backend/internal/api/auth.go b/backend/internal/api/auth.go index c9196d5..9b6a274 100644 --- a/backend/internal/api/auth.go +++ b/backend/internal/api/auth.go @@ -1,11 +1,13 @@ package api import ( + "errors" "net/http" "os" "strings" "github.com/gin-gonic/gin" + "gorm.io/gorm" "hightube/internal/db" "hightube/internal/model" @@ -41,10 +43,12 @@ func Register(c *gin.Context) { } // Check if user exists - var existingUser model.User - if err := db.DB.Where("username = ?", req.Username).First(&existingUser).Error; err == nil { + if _, err := db.LoadUserByUsername(req.Username); err == nil { c.JSON(http.StatusConflict, gin.H{"error": "Username already exists"}) return + } else if !errors.Is(err, gorm.ErrRecordNotFound) { + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to validate username"}) + return } // Hash password @@ -61,20 +65,13 @@ func Register(c *gin.Context) { Role: "user", Enabled: true, } - if err := db.DB.Create(&user).Error; err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to create user"}) - return - } - - // Create a default live room for the new user room := model.Room{ - UserID: user.ID, Title: user.Username + "'s Live Room", StreamKey: utils.GenerateStreamKey(), IsActive: false, } - if err := db.DB.Create(&room).Error; err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to create room for user"}) + if err := db.CreateUserAndRoom(&user, &room); err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to create user"}) return } @@ -87,9 +84,10 @@ func Login(c *gin.Context) { c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) return } + req.Username = strings.TrimSpace(req.Username) - var user model.User - if err := db.DB.Where("username = ?", req.Username).First(&user).Error; err != nil { + user, err := db.LoadUserByUsername(req.Username) + if err != nil { c.JSON(http.StatusUnauthorized, gin.H{"error": "Invalid username or password"}) return } @@ -127,8 +125,8 @@ func ChangePassword(c *gin.Context) { return } - var user model.User - if err := db.DB.First(&user, userID).Error; err != nil { + user, err := db.LoadUserByID(userID.(uint)) + if err != nil { c.JSON(http.StatusNotFound, gin.H{"error": "User not found"}) return } @@ -147,7 +145,7 @@ func ChangePassword(c *gin.Context) { } // Update user - if err := db.DB.Model(&user).Update("password", hashedPassword).Error; err != nil { + if err := db.UpdateUserPassword(user.ID, hashedPassword); err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to update password"}) return } diff --git a/backend/internal/api/chat_handler.go b/backend/internal/api/chat_handler.go index 17c811f..e107d09 100644 --- a/backend/internal/api/chat_handler.go +++ b/backend/internal/api/chat_handler.go @@ -12,6 +12,8 @@ import ( ) var upgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, CheckOrigin: func(r *http.Request) bool { return true // Allow all connections }, diff --git a/backend/internal/api/middleware.go b/backend/internal/api/middleware.go index 897f111..c00e1fd 100644 --- a/backend/internal/api/middleware.go +++ b/backend/internal/api/middleware.go @@ -89,8 +89,8 @@ func authenticateRequest(c *gin.Context) (*model.User, error) { return nil, errInvalidToken } - var user model.User - if err := db.DB.First(&user, uint(userID)).Error; err != nil { + user, err := db.LoadUserByID(uint(userID)) + if err != nil { return nil, errUserNotFound } diff --git a/backend/internal/api/room.go b/backend/internal/api/room.go index ed3a5cd..3503dfb 100644 --- a/backend/internal/api/room.go +++ b/backend/internal/api/room.go @@ -6,15 +6,14 @@ import ( "github.com/gin-gonic/gin" "hightube/internal/db" - "hightube/internal/model" ) // GetMyRoom returns the room details for the currently authenticated user func GetMyRoom(c *gin.Context) { userID, _ := c.Get("user_id") - var room model.Room - if err := db.DB.Where("user_id = ?", userID).First(&room).Error; err != nil { + room, err := db.LoadRoomByUserID(userID.(uint)) + if err != nil { c.JSON(http.StatusNotFound, gin.H{"error": "Room not found"}) return } @@ -29,9 +28,8 @@ func GetMyRoom(c *gin.Context) { // GetActiveRooms returns a list of all currently active live rooms func GetActiveRooms(c *gin.Context) { - var rooms []model.Room - // Fetch rooms where is_active is true - if err := db.DB.Where("is_active = ?", true).Find(&rooms).Error; err != nil { + rooms, err := db.ListActiveRooms() + if err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to fetch active rooms"}) return } diff --git a/backend/internal/api/router.go b/backend/internal/api/router.go index c91fadd..88bd741 100644 --- a/backend/internal/api/router.go +++ b/backend/internal/api/router.go @@ -11,7 +11,8 @@ func SetupRouter(streamServer *stream.RTMPServer) *gin.Engine { // 设置为发布模式,消除 "[WARNING] Running in debug mode" 警告 gin.SetMode(gin.ReleaseMode) - r := gin.Default() + r := gin.New() + r.Use(gin.Recovery()) BindAdminDependencies(streamServer) // Use CORS middleware to allow web access diff --git a/backend/internal/chat/hub.go b/backend/internal/chat/hub.go index 23d5737..37fdbd4 100644 --- a/backend/internal/chat/hub.go +++ b/backend/internal/chat/hub.go @@ -9,10 +9,12 @@ import ( ) const ( - writeWait = 10 * time.Second - pongWait = 60 * time.Second - pingPeriod = (pongWait * 9) / 10 - maxMessageSize = 512 + writeWait = 10 * time.Second + pongWait = 60 * time.Second + pingPeriod = (pongWait * 9) / 10 + maxMessageSize = 512 + roomQueueBufferSize = 2048 + historyLimit = 100 ) type Message struct { @@ -32,12 +34,23 @@ type Client struct { } type Hub struct { - rooms map[string]map[*Client]bool - roomsHistory map[string][]Message - broadcast chan Message + mutex sync.RWMutex + rooms map[string]*roomHub +} + +type roomHub struct { + roomID string + manager *Hub register chan *Client unregister chan *Client - mutex sync.RWMutex + broadcast chan Message + clearHistory chan struct{} + stop chan struct{} + stopOnce sync.Once + + mutex sync.RWMutex + clients map[*Client]struct{} + history []Message } type StatsSnapshot struct { @@ -48,118 +61,179 @@ type StatsSnapshot struct { func NewHub() *Hub { return &Hub{ - broadcast: make(chan Message), - register: make(chan *Client), - unregister: make(chan *Client), - rooms: make(map[string]map[*Client]bool), - roomsHistory: make(map[string][]Message), + rooms: make(map[string]*roomHub), } } -func (h *Hub) Run() { +func (h *Hub) getRoom(roomID string) *roomHub { + h.mutex.RLock() + room := h.rooms[roomID] + h.mutex.RUnlock() + return room +} + +func (h *Hub) getOrCreateRoom(roomID string) *roomHub { + if room := h.getRoom(roomID); room != nil { + return room + } + + h.mutex.Lock() + defer h.mutex.Unlock() + + if room := h.rooms[roomID]; room != nil { + return room + } + + room := &roomHub{ + roomID: roomID, + manager: h, + register: make(chan *Client, roomQueueBufferSize), + unregister: make(chan *Client, roomQueueBufferSize), + broadcast: make(chan Message, roomQueueBufferSize), + clearHistory: make(chan struct{}, 1), + stop: make(chan struct{}), + clients: make(map[*Client]struct{}), + } + h.rooms[roomID] = room + go room.run() + return room +} + +func (h *Hub) deleteRoomIfIdle(room *roomHub) { + room.mutex.RLock() + idle := len(room.clients) == 0 && len(room.history) == 0 + room.mutex.RUnlock() + if !idle { + return + } + + h.mutex.Lock() + if current := h.rooms[room.roomID]; current == room { + delete(h.rooms, room.roomID) + room.stopOnce.Do(func() { + close(room.stop) + }) + } + h.mutex.Unlock() +} + +func (r *roomHub) run() { for { select { - case client := <-h.register: - h.mutex.Lock() - if h.rooms[client.RoomID] == nil { - h.rooms[client.RoomID] = make(map[*Client]bool) - } - h.rooms[client.RoomID][client] = true - - // Copy existing history to send outside the lock - var historyCopy []Message - if history, ok := h.roomsHistory[client.RoomID]; ok { - historyCopy = make([]Message, len(history)) - copy(historyCopy, history) - } - h.mutex.Unlock() - - // Send history outside the lock - for _, msg := range historyCopy { - msg.IsHistory = true - msgBytes, err := json.Marshal(msg) - if err == nil { - select { - case client.Send <- msgBytes: - default: - } - } - } - - case client := <-h.unregister: - h.mutex.Lock() - if rooms, ok := h.rooms[client.RoomID]; ok { - if _, ok := rooms[client]; ok { - delete(rooms, client) - close(client.Send) - // We no longer delete the room from h.rooms here if we want history to persist - // even if everyone leaves (as long as it's active in DB). - // But we should clean up if the room is empty and we want to save memory. - // However, the history is what matters. - if len(rooms) == 0 { - delete(h.rooms, client.RoomID) - } - } - } - h.mutex.Unlock() - - case message := <-h.broadcast: - // Marshal message outside the lock to optimize performance - msgBytes, err := json.Marshal(message) - if err != nil { - continue - } - - h.mutex.Lock() - // Only store "chat" and "danmaku" messages in history - if message.Type == "chat" || message.Type == "danmaku" { - h.roomsHistory[message.RoomID] = append(h.roomsHistory[message.RoomID], message) - // Limit history size to avoid memory leak (e.g., last 100 messages) - if len(h.roomsHistory[message.RoomID]) > 100 { - h.roomsHistory[message.RoomID] = h.roomsHistory[message.RoomID][1:] - } - } - - clients := h.rooms[message.RoomID] - if clients != nil { - for client := range clients { - select { - case client.Send <- msgBytes: - default: - close(client.Send) - delete(clients, client) - } - } - } - h.mutex.Unlock() + case client := <-r.register: + r.handleRegister(client) + case client := <-r.unregister: + r.handleUnregister(client) + case message := <-r.broadcast: + r.handleBroadcast(message) + case <-r.clearHistory: + r.handleClearHistory() + case <-r.stop: + return } } } -// ClearRoomHistory removes history for a room, should be called when stream ends +func (r *roomHub) handleRegister(client *Client) { + r.mutex.RLock() + historyCopy := make([]Message, len(r.history)) + copy(historyCopy, r.history) + r.mutex.RUnlock() + + for _, msg := range historyCopy { + msg.IsHistory = true + msgBytes, err := json.Marshal(msg) + if err != nil { + continue + } + select { + case client.Send <- msgBytes: + default: + } + } + + r.mutex.Lock() + r.clients[client] = struct{}{} + r.mutex.Unlock() +} + +func (r *roomHub) handleUnregister(client *Client) { + r.mutex.Lock() + if _, ok := r.clients[client]; ok { + delete(r.clients, client) + close(client.Send) + } + r.mutex.Unlock() + + r.manager.deleteRoomIfIdle(r) +} + +func (r *roomHub) handleBroadcast(message Message) { + msgBytes, err := json.Marshal(message) + if err != nil { + return + } + + r.mutex.Lock() + if message.Type == "chat" || message.Type == "danmaku" { + r.history = append(r.history, message) + if len(r.history) > historyLimit { + r.history = r.history[1:] + } + } + + for client := range r.clients { + select { + case client.Send <- msgBytes: + default: + close(client.Send) + delete(r.clients, client) + } + } + r.mutex.Unlock() +} + +func (r *roomHub) handleClearHistory() { + r.mutex.Lock() + r.history = nil + r.mutex.Unlock() + r.manager.deleteRoomIfIdle(r) +} + func (h *Hub) ClearRoomHistory(roomID string) { - h.mutex.Lock() - defer h.mutex.Unlock() - delete(h.roomsHistory, roomID) + if room := h.getRoom(roomID); room != nil { + select { + case room.clearHistory <- struct{}{}: + default: + } + } } func (h *Hub) RegisterClient(c *Client) { - h.register <- c + h.getOrCreateRoom(c.RoomID).register <- c +} + +func (h *Hub) UnregisterClient(c *Client) { + if room := h.getRoom(c.RoomID); room != nil { + room.unregister <- c + } } -// BroadcastToRoom sends a message to the broadcast channel func (h *Hub) BroadcastToRoom(msg Message) { - h.broadcast <- msg + h.getOrCreateRoom(msg.RoomID).broadcast <- msg } func (c *Client) ReadPump() { defer func() { - c.Hub.unregister <- c + c.Hub.UnregisterClient(c) c.Conn.Close() }() c.Conn.SetReadLimit(maxMessageSize) c.Conn.SetReadDeadline(time.Now().Add(pongWait)) - c.Conn.SetPongHandler(func(string) error { c.Conn.SetReadDeadline(time.Now().Add(pongWait)); return nil }) + c.Conn.SetPongHandler(func(string) error { + c.Conn.SetReadDeadline(time.Now().Add(pongWait)) + return nil + }) for { _, message, err := c.Conn.ReadMessage() if err != nil { @@ -169,7 +243,7 @@ func (c *Client) ReadPump() { if err := json.Unmarshal(message, &msg); err == nil { msg.RoomID = c.RoomID msg.Username = c.Username - c.Hub.broadcast <- msg + c.Hub.BroadcastToRoom(msg) } } } @@ -192,7 +266,10 @@ func (c *Client) WritePump() { if err != nil { return } - w.Write(message) + if _, err := w.Write(message); err != nil { + _ = w.Close() + return + } if err := w.Close(); err != nil { return } @@ -209,23 +286,29 @@ var MainHub *Hub func InitChat() { MainHub = NewHub() - go MainHub.Run() } func (h *Hub) GetStatsSnapshot() StatsSnapshot { h.mutex.RLock() - defer h.mutex.RUnlock() - + rooms := make([]*roomHub, 0, len(h.rooms)) roomClients := make(map[string]int, len(h.rooms)) + for roomID, room := range h.rooms { + rooms = append(rooms, room) + roomClients[roomID] = 0 + } + h.mutex.RUnlock() + totalClients := 0 - for roomID, clients := range h.rooms { - count := len(clients) - roomClients[roomID] = count + for _, room := range rooms { + room.mutex.RLock() + count := len(room.clients) + room.mutex.RUnlock() + roomClients[room.roomID] = count totalClients += count } return StatsSnapshot{ - RoomCount: len(h.rooms), + RoomCount: len(rooms), TotalConnectedClient: totalClients, RoomClients: roomClients, } diff --git a/backend/internal/db/cache.go b/backend/internal/db/cache.go new file mode 100644 index 0000000..99762b8 --- /dev/null +++ b/backend/internal/db/cache.go @@ -0,0 +1,332 @@ +package db + +import ( + "errors" + "strings" + "sync" + + "gorm.io/gorm" + + "hightube/internal/model" +) + +type userCache struct { + mutex sync.RWMutex + byID map[uint]model.User + byUsername map[string]uint +} + +type roomCache struct { + mutex sync.RWMutex + byID map[uint]model.Room + byUserID map[uint]uint + byStreamKey map[string]uint + activeRoomIDs map[uint]struct{} + activeRoomsLoaded bool +} + +var users = &userCache{ + byID: make(map[uint]model.User), + byUsername: make(map[string]uint), +} + +var rooms = &roomCache{ + byID: make(map[uint]model.Room), + byUserID: make(map[uint]uint), + byStreamKey: make(map[string]uint), + activeRoomIDs: make(map[uint]struct{}), +} + +func cacheUser(user model.User) { + usernameKey := strings.ToLower(strings.TrimSpace(user.Username)) + + users.mutex.Lock() + users.byID[user.ID] = user + if usernameKey != "" { + users.byUsername[usernameKey] = user.ID + } + users.mutex.Unlock() +} + +func removeUserFromCache(user model.User) { + usernameKey := strings.ToLower(strings.TrimSpace(user.Username)) + + users.mutex.Lock() + delete(users.byID, user.ID) + if usernameKey != "" { + delete(users.byUsername, usernameKey) + } + users.mutex.Unlock() +} + +func cacheRoom(room model.Room) { + rooms.mutex.Lock() + rooms.byID[room.ID] = room + rooms.byUserID[room.UserID] = room.ID + if room.StreamKey != "" { + rooms.byStreamKey[room.StreamKey] = room.ID + } + if room.IsActive { + rooms.activeRoomIDs[room.ID] = struct{}{} + } else { + delete(rooms.activeRoomIDs, room.ID) + } + rooms.mutex.Unlock() +} + +func removeRoomFromCache(room model.Room) { + rooms.mutex.Lock() + delete(rooms.byID, room.ID) + delete(rooms.byUserID, room.UserID) + if room.StreamKey != "" { + delete(rooms.byStreamKey, room.StreamKey) + } + delete(rooms.activeRoomIDs, room.ID) + rooms.mutex.Unlock() +} + +func LoadUserByID(id uint) (model.User, error) { + users.mutex.RLock() + if user, ok := users.byID[id]; ok { + users.mutex.RUnlock() + return user, nil + } + users.mutex.RUnlock() + + var user model.User + if err := DB.First(&user, id).Error; err != nil { + return model.User{}, err + } + cacheUser(user) + return user, nil +} + +func LoadUserByUsername(username string) (model.User, error) { + key := strings.ToLower(strings.TrimSpace(username)) + if key == "" { + return model.User{}, gorm.ErrRecordNotFound + } + + users.mutex.RLock() + if id, ok := users.byUsername[key]; ok { + if user, found := users.byID[id]; found { + users.mutex.RUnlock() + return user, nil + } + } + users.mutex.RUnlock() + + var user model.User + if err := DB.Where("username = ?", strings.TrimSpace(username)).First(&user).Error; err != nil { + return model.User{}, err + } + cacheUser(user) + return user, nil +} + +func LoadRoomByUserID(userID uint) (model.Room, error) { + rooms.mutex.RLock() + if roomID, ok := rooms.byUserID[userID]; ok { + if room, found := rooms.byID[roomID]; found { + rooms.mutex.RUnlock() + return room, nil + } + } + rooms.mutex.RUnlock() + + var room model.Room + if err := DB.Where("user_id = ?", userID).First(&room).Error; err != nil { + return model.Room{}, err + } + cacheRoom(room) + return room, nil +} + +func LoadRoomByStreamKey(streamKey string) (model.Room, error) { + rooms.mutex.RLock() + if roomID, ok := rooms.byStreamKey[streamKey]; ok { + if room, found := rooms.byID[roomID]; found { + rooms.mutex.RUnlock() + return room, nil + } + } + rooms.mutex.RUnlock() + + var room model.Room + if err := DB.Where("stream_key = ?", streamKey).First(&room).Error; err != nil { + return model.Room{}, err + } + cacheRoom(room) + return room, nil +} + +func ListActiveRooms() ([]model.Room, error) { + rooms.mutex.RLock() + loaded := rooms.activeRoomsLoaded + if loaded { + result := make([]model.Room, 0, len(rooms.activeRoomIDs)) + for roomID := range rooms.activeRoomIDs { + if room, ok := rooms.byID[roomID]; ok { + result = append(result, room) + } + } + complete := len(result) == len(rooms.activeRoomIDs) + rooms.mutex.RUnlock() + if complete { + return result, nil + } + } else { + rooms.mutex.RUnlock() + } + + var result []model.Room + if err := DB.Where("is_active = ?", true).Find(&result).Error; err != nil { + return nil, err + } + + rooms.mutex.Lock() + for _, room := range result { + rooms.byID[room.ID] = room + rooms.byUserID[room.UserID] = room.ID + if room.StreamKey != "" { + rooms.byStreamKey[room.StreamKey] = room.ID + } + rooms.activeRoomIDs[room.ID] = struct{}{} + } + rooms.activeRoomsLoaded = true + rooms.mutex.Unlock() + + return result, nil +} + +func CreateUserAndRoom(user *model.User, room *model.Room) error { + if user == nil || room == nil { + return errors.New("user and room are required") + } + + if err := DB.Transaction(func(tx *gorm.DB) error { + if err := tx.Create(user).Error; err != nil { + return err + } + room.UserID = user.ID + if err := tx.Create(room).Error; err != nil { + return err + } + return nil + }); err != nil { + return err + } + + cacheUser(*user) + cacheRoom(*room) + return nil +} + +func UpdateUserRole(userID uint, role string) error { + if err := DB.Model(&model.User{}).Where("id = ?", userID).Update("role", role).Error; err != nil { + return err + } + + user, err := LoadUserByID(userID) + if err != nil { + return err + } + user.Role = role + cacheUser(user) + return nil +} + +func UpdateUserEnabled(userID uint, enabled bool) error { + if err := DB.Model(&model.User{}).Where("id = ?", userID).Update("enabled", enabled).Error; err != nil { + return err + } + + user, err := LoadUserByID(userID) + if err != nil { + return err + } + user.Enabled = enabled + cacheUser(user) + return nil +} + +func UpdateUserPassword(userID uint, hash string) error { + if err := DB.Model(&model.User{}).Where("id = ?", userID).Update("password", hash).Error; err != nil { + return err + } + + user, err := LoadUserByID(userID) + if err != nil { + return err + } + user.Password = hash + cacheUser(user) + return nil +} + +func SetRoomActive(roomID uint, active bool) error { + if err := DB.Model(&model.Room{}).Where("id = ?", roomID).Update("is_active", active).Error; err != nil { + return err + } + + rooms.mutex.Lock() + room, ok := rooms.byID[roomID] + if ok { + room.IsActive = active + rooms.byID[roomID] = room + if active { + rooms.activeRoomIDs[roomID] = struct{}{} + } else { + delete(rooms.activeRoomIDs, roomID) + } + rooms.activeRoomsLoaded = true + rooms.mutex.Unlock() + return nil + } + rooms.mutex.Unlock() + + if !active { + rooms.mutex.Lock() + delete(rooms.activeRoomIDs, roomID) + rooms.activeRoomsLoaded = true + rooms.mutex.Unlock() + return nil + } + + var roomFromDB model.Room + if err := DB.First(&roomFromDB, roomID).Error; err != nil { + return err + } + roomFromDB.IsActive = active + cacheRoom(roomFromDB) + + rooms.mutex.Lock() + rooms.activeRoomsLoaded = true + rooms.mutex.Unlock() + return nil +} + +func DeleteUserCascade(userID uint) error { + user, userErr := LoadUserByID(userID) + room, roomErr := LoadRoomByUserID(userID) + + if err := DB.Transaction(func(tx *gorm.DB) error { + if err := tx.Where("user_id = ?", userID).Delete(&model.Room{}).Error; err != nil { + return err + } + if err := tx.Delete(&model.User{}, userID).Error; err != nil { + return err + } + return nil + }); err != nil { + return err + } + + if roomErr == nil { + removeRoomFromCache(room) + } + if userErr == nil { + removeUserFromCache(user) + } + return nil +} diff --git a/backend/internal/db/db.go b/backend/internal/db/db.go index cde01cc..8e89686 100644 --- a/backend/internal/db/db.go +++ b/backend/internal/db/db.go @@ -3,6 +3,7 @@ package db import ( "log" "os" + "runtime" "time" "gorm.io/driver/sqlite" @@ -37,15 +38,32 @@ func InitDB() { log.Fatalf("Failed to connect database: %v", err) } - // Configure connection pool settings sqlDB, err := DB.DB() if err != nil { log.Fatalf("Failed to get database instance: %v", err) } - sqlDB.SetMaxOpenConns(10) - sqlDB.SetMaxIdleConns(5) + maxOpen := runtime.NumCPU()*2 + 1 + if maxOpen < 4 { + maxOpen = 4 + } + if maxOpen > 32 { + maxOpen = 32 + } + sqlDB.SetMaxOpenConns(maxOpen) + sqlDB.SetMaxIdleConns(maxOpen) + sqlDB.SetConnMaxIdleTime(10 * time.Minute) sqlDB.SetConnMaxLifetime(time.Hour) + for _, pragma := range []string{ + "PRAGMA synchronous=NORMAL", + "PRAGMA temp_store=MEMORY", + "PRAGMA foreign_keys=ON", + } { + if execErr := DB.Exec(pragma).Error; execErr != nil { + log.Fatalf("Failed to apply %s: %v", pragma, execErr) + } + } + // Auto-migrate the schema err = DB.AutoMigrate(&model.User{}, &model.Room{}) if err != nil { @@ -57,7 +75,7 @@ func InitDB() { ensureAdminUser() - monitor.Infof("Database initialized successfully with WAL mode and connection pooling") + monitor.Infof("Database initialized successfully with WAL mode and tuned SQLite pragmas") } func ensureAdminUser() { @@ -77,14 +95,17 @@ func ensureAdminUser() { updates := map[string]interface{}{} if user.Role != "admin" { updates["role"] = "admin" + user.Role = "admin" } if !user.Enabled { updates["enabled"] = true + user.Enabled = true } if len(updates) > 0 { DB.Model(&user).Updates(updates) monitor.Warnf("Admin account normalized for username=%s", adminUsername) } + cacheUser(user) return } @@ -115,5 +136,10 @@ func ensureAdminUser() { monitor.Warnf("Failed to create default admin room: %v", roomErr) } + cacheUser(newAdmin) + if room.ID != 0 { + cacheRoom(room) + } + monitor.Warnf("Default admin created for username=%s; change the password after first login", adminUsername) } diff --git a/backend/internal/model/room.go b/backend/internal/model/room.go index 51cdbb8..22cb2d3 100644 --- a/backend/internal/model/room.go +++ b/backend/internal/model/room.go @@ -10,5 +10,5 @@ type Room struct { UserID uint `gorm:"uniqueIndex;not null"` Title string `gorm:"default:'My Live Room'"` StreamKey string `gorm:"uniqueIndex;not null"` // Secret key for OBS streaming - IsActive bool `gorm:"default:false"` // Whether the stream is currently active + IsActive bool `gorm:"index;default:false"` // Whether the stream is currently active } diff --git a/backend/internal/stream/server.go b/backend/internal/stream/server.go index 089eb99..d974963 100644 --- a/backend/internal/stream/server.go +++ b/backend/internal/stream/server.go @@ -25,7 +25,6 @@ import ( "hightube/internal/chat" "hightube/internal/db" - "hightube/internal/model" "hightube/internal/monitor" ) @@ -147,7 +146,9 @@ func NewRTMPServer() *RTMPServer { if isSource { roomIDUint := parseRoomID(roomID) if roomIDUint != 0 { - db.DB.Model(&model.Room{}).Where("id = ?", roomIDUint).Updates(map[string]interface{}{"is_active": true}) + if err := db.SetRoomActive(roomIDUint, true); err != nil { + monitor.Warnf("Failed to mark room active room_id=%s: %v", roomID, err) + } } s.startVariantTranscoders(roomID) s.startThumbnailCapture(roomID) @@ -165,7 +166,9 @@ func NewRTMPServer() *RTMPServer { s.stopThumbnailCapture(roomID) roomIDUint := parseRoomID(roomID) if roomIDUint != 0 { - db.DB.Model(&model.Room{}).Where("id = ?", roomIDUint).Updates(map[string]interface{}{"is_active": false}) + if err := db.SetRoomActive(roomIDUint, false); err != nil { + monitor.Warnf("Failed to mark room inactive room_id=%s: %v", roomID, err) + } } chat.MainHub.ClearRoomHistory(roomID) monitor.Infof("Publishing ended for room_id=%s", roomID) @@ -317,8 +320,8 @@ func (s *RTMPServer) HandleThumbnail(c *gin.Context) { func (s *RTMPServer) resolvePublishPath(parts []string) (roomID string, channelPath string, isSource bool, ok bool) { if parts[1] == "live" && len(parts) == 3 { - var room model.Room - if err := db.DB.Where("stream_key = ?", parts[2]).First(&room).Error; err != nil { + room, err := db.LoadRoomByStreamKey(parts[2]) + if err != nil { return "", "", false, false } roomID = fmt.Sprintf("%d", room.ID)