diff --git a/backend/cmd/server/main.go b/backend/cmd/server/main.go index 6491e47..db88312 100644 --- a/backend/cmd/server/main.go +++ b/backend/cmd/server/main.go @@ -1,16 +1,16 @@ package main import ( - "log" - "hightube/internal/api" "hightube/internal/chat" "hightube/internal/db" + "hightube/internal/monitor" "hightube/internal/stream" ) func main() { - log.Println("Starting Hightube Server v1.0.0-Beta3.7...") + monitor.Init(2000) + monitor.Infof("Starting Hightube Server v1.0.0-Beta3.7") // Initialize Database and run auto-migrations db.InitDB() @@ -23,15 +23,15 @@ func main() { // Start the API server in a goroutine so it doesn't block the RTMP server go func() { r := api.SetupRouter(srv) - log.Println("[INFO] API Server is listening on :8080...") + monitor.Infof("API server listening on :8080") if err := r.Run(":8080"); err != nil { - log.Fatalf("Failed to start API server: %v", err) + monitor.Errorf("Failed to start API server: %v", err) } }() // Setup and start the RTMP server - log.Println("[INFO] Ready to receive RTMP streams from OBS.") + monitor.Infof("Ready to receive RTMP streams from OBS") if err := srv.Start(":1935"); err != nil { - log.Fatalf("Failed to start RTMP server: %v", err) + monitor.Errorf("Failed to start RTMP server: %v", err) } } diff --git a/backend/internal/api/admin.go b/backend/internal/api/admin.go new file mode 100644 index 0000000..eed6783 --- /dev/null +++ b/backend/internal/api/admin.go @@ -0,0 +1,333 @@ +package api + +import ( + "encoding/json" + "net/http" + "strconv" + "strings" + + "github.com/gin-gonic/gin" + + "hightube/internal/chat" + "hightube/internal/db" + "hightube/internal/model" + "hightube/internal/monitor" + "hightube/internal/stream" + "hightube/internal/utils" +) + +var adminRTMP *stream.RTMPServer + +func BindAdminDependencies(rtmpSrv *stream.RTMPServer) { + adminRTMP = rtmpSrv +} + +func GetAdminOverview(c *gin.Context) { + stats := monitor.GetSnapshot() + chatStats := chat.StatsSnapshot{} + if chat.MainHub != nil { + chatStats = chat.MainHub.GetStatsSnapshot() + } + + activeCount := 0 + activePaths := []string{} + if adminRTMP != nil { + activeCount = adminRTMP.ActiveStreamCount() + activePaths = adminRTMP.ActiveStreamPaths() + } + + c.JSON(http.StatusOK, gin.H{ + "system": stats, + "stream": gin.H{ + "active_stream_count": activeCount, + "active_stream_paths": activePaths, + }, + "chat": chatStats, + }) +} + +func GetAdminHealth(c *gin.Context) { + type dbHealth struct { + OK bool `json:"ok"` + Error string `json:"error,omitempty"` + } + + health := gin.H{ + "api": true, + "rtmp": adminRTMP != nil, + } + + dbOK := dbHealth{OK: true} + if err := db.DB.Exec("SELECT 1").Error; err != nil { + dbOK.OK = false + dbOK.Error = err.Error() + } + health["db"] = dbOK + + c.JSON(http.StatusOK, health) +} + +func ListAdminLogs(c *gin.Context) { + level := c.Query("level") + keyword := c.Query("keyword") + page := parseIntWithDefault(c.Query("page"), 1) + pageSize := parseIntWithDefault(c.Query("page_size"), 20) + + items, total := monitor.Query(level, keyword, page, pageSize) + c.JSON(http.StatusOK, gin.H{ + "items": items, + "total": total, + "page": page, + "page_size": pageSize, + }) +} + +func StreamAdminLogs(c *gin.Context) { + if !authorizeAdminTokenFromQuery(c) { + return + } + + c.Header("Content-Type", "text/event-stream") + c.Header("Cache-Control", "no-cache") + c.Header("Connection", "keep-alive") + c.Header("X-Accel-Buffering", "no") + + flusher, ok := c.Writer.(http.Flusher) + if !ok { + c.JSON(http.StatusInternalServerError, gin.H{"error": "streaming unsupported"}) + return + } + + ch := monitor.Subscribe() + defer monitor.Unsubscribe(ch) + + for { + select { + case entry := <-ch: + payload, _ := json.Marshal(entry) + _, _ = c.Writer.Write([]byte("event: log\n")) + _, _ = c.Writer.Write([]byte("data: " + string(payload) + "\n\n")) + flusher.Flush() + case <-c.Request.Context().Done(): + return + } + } +} + +func authorizeAdminTokenFromQuery(c *gin.Context) bool { + token := strings.TrimSpace(c.Query("token")) + if token == "" { + c.JSON(http.StatusUnauthorized, gin.H{"error": "token is required"}) + return false + } + + claims, err := utils.ParseToken(token) + if err != nil { + c.JSON(http.StatusUnauthorized, gin.H{"error": "invalid token"}) + return false + } + + userID, err := strconv.ParseUint(claims.Subject, 10, 32) + if err != nil { + c.JSON(http.StatusUnauthorized, gin.H{"error": "invalid token subject"}) + return false + } + + var user model.User + if err := db.DB.First(&user, uint(userID)).Error; err != nil { + c.JSON(http.StatusUnauthorized, gin.H{"error": "user not found"}) + return false + } + + if !user.Enabled || user.Role != "admin" { + c.JSON(http.StatusForbidden, gin.H{"error": "admin access required"}) + return false + } + + return true +} + +type updateRoleRequest struct { + Role string `json:"role" binding:"required"` +} + +func UpdateUserRole(c *gin.Context) { + userID, ok := parsePathUint(c.Param("id")) + if !ok { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid user id"}) + return + } + + var req updateRoleRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + + role := strings.ToLower(strings.TrimSpace(req.Role)) + if role != "admin" && role != "user" { + c.JSON(http.StatusBadRequest, gin.H{"error": "role must be admin or user"}) + return + } + + if err := db.DB.Model(&model.User{}).Where("id = ?", userID).Update("role", role).Error; err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update role"}) + return + } + + operator, _ := c.Get("username") + monitor.Auditf("admin=%v updated user_id=%d role=%s", operator, userID, role) + c.JSON(http.StatusOK, gin.H{"message": "role updated"}) +} + +type updateEnabledRequest struct { + Enabled bool `json:"enabled"` +} + +func UpdateUserEnabled(c *gin.Context) { + userID, ok := parsePathUint(c.Param("id")) + if !ok { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid user id"}) + return + } + + var req updateEnabledRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + + if err := db.DB.Model(&model.User{}).Where("id = ?", userID).Update("enabled", req.Enabled).Error; err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update enabled status"}) + return + } + + operator, _ := c.Get("username") + monitor.Auditf("admin=%v updated user_id=%d enabled=%v", operator, userID, req.Enabled) + c.JSON(http.StatusOK, gin.H{"message": "enabled status updated"}) +} + +type resetPasswordRequest struct { + NewPassword string `json:"new_password" binding:"required"` +} + +func ResetUserPassword(c *gin.Context) { + userID, ok := parsePathUint(c.Param("id")) + if !ok { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid user id"}) + return + } + + var req resetPasswordRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + + hash, err := utils.HashPassword(req.NewPassword) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to hash password"}) + return + } + + if err := db.DB.Model(&model.User{}).Where("id = ?", userID).Update("password", hash).Error; err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to reset password"}) + return + } + + operator, _ := c.Get("username") + monitor.Auditf("admin=%v reset password for user_id=%d", operator, userID) + c.JSON(http.StatusOK, gin.H{"message": "password reset"}) +} + +func DeleteUser(c *gin.Context) { + userID, ok := parsePathUint(c.Param("id")) + if !ok { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid user id"}) + return + } + + operatorID, _ := c.Get("user_id") + if opID, ok := operatorID.(uint); ok && opID == userID { + c.JSON(http.StatusBadRequest, gin.H{"error": "cannot delete current admin account"}) + return + } + + if err := db.DB.Where("user_id = ?", userID).Delete(&model.Room{}).Error; err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to delete rooms"}) + return + } + if err := db.DB.Delete(&model.User{}, userID).Error; err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to delete user"}) + return + } + + operator, _ := c.Get("username") + monitor.Auditf("admin=%v deleted user_id=%d", operator, userID) + c.JSON(http.StatusOK, gin.H{"message": "user deleted"}) +} + +func ListUsers(c *gin.Context) { + keyword := strings.TrimSpace(c.Query("keyword")) + page := parseIntWithDefault(c.Query("page"), 1) + pageSize := parseIntWithDefault(c.Query("page_size"), 20) + if pageSize > 200 { + pageSize = 200 + } + offset := (page - 1) * pageSize + if offset < 0 { + offset = 0 + } + + query := db.DB.Model(&model.User{}) + if keyword != "" { + query = query.Where("username LIKE ?", "%"+keyword+"%") + } + + var total int64 + if err := query.Count(&total).Error; err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to count users"}) + return + } + + var users []model.User + if err := query.Order("id DESC").Offset(offset).Limit(pageSize).Find(&users).Error; err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to query users"}) + return + } + + items := make([]gin.H, 0, len(users)) + for _, u := range users { + items = append(items, gin.H{ + "id": u.ID, + "username": u.Username, + "role": u.Role, + "enabled": u.Enabled, + "created_at": u.CreatedAt, + "updated_at": u.UpdatedAt, + }) + } + + c.JSON(http.StatusOK, gin.H{ + "items": items, + "total": total, + "page": page, + "page_size": pageSize, + }) +} + +func parseIntWithDefault(v string, def int) int { + i, err := strconv.Atoi(v) + if err != nil || i <= 0 { + return def + } + return i +} + +func parsePathUint(v string) (uint, bool) { + u64, err := strconv.ParseUint(v, 10, 32) + if err != nil { + return 0, false + } + return uint(u64), true +} diff --git a/backend/internal/api/admin_ui.go b/backend/internal/api/admin_ui.go new file mode 100644 index 0000000..ca1a936 --- /dev/null +++ b/backend/internal/api/admin_ui.go @@ -0,0 +1,21 @@ +package api + +import ( + "embed" + "io/fs" + "net/http" + + "github.com/gin-gonic/gin" +) + +//go:embed static/admin/* +var adminFS embed.FS + +func AdminPage(c *gin.Context) { + content, err := fs.ReadFile(adminFS, "static/admin/index.html") + if err != nil { + c.String(http.StatusInternalServerError, "failed to load admin page") + return + } + c.Data(http.StatusOK, "text/html; charset=utf-8", content) +} diff --git a/backend/internal/api/auth.go b/backend/internal/api/auth.go index 9fec6b8..b99023f 100644 --- a/backend/internal/api/auth.go +++ b/backend/internal/api/auth.go @@ -50,6 +50,8 @@ func Register(c *gin.Context) { user := model.User{ Username: req.Username, Password: hashedPassword, + Role: "user", + Enabled: true, } if err := db.DB.Create(&user).Error; err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to create user"}) @@ -89,7 +91,12 @@ func Login(c *gin.Context) { return } - token, err := utils.GenerateToken(user.ID) + if !user.Enabled { + c.JSON(http.StatusForbidden, gin.H{"error": "Account is disabled"}) + return + } + + token, err := utils.GenerateToken(user.ID, user.Username, user.Role) if err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to generate token"}) return @@ -98,6 +105,8 @@ func Login(c *gin.Context) { c.JSON(http.StatusOK, gin.H{ "token": token, "username": user.Username, + "role": user.Role, + "enabled": user.Enabled, }) } diff --git a/backend/internal/api/chat_handler.go b/backend/internal/api/chat_handler.go index c293e55..17c811f 100644 --- a/backend/internal/api/chat_handler.go +++ b/backend/internal/api/chat_handler.go @@ -8,6 +8,7 @@ import ( "github.com/gorilla/websocket" "hightube/internal/chat" + "hightube/internal/monitor" ) var upgrader = websocket.Upgrader{ @@ -23,7 +24,7 @@ func WSHandler(c *gin.Context) { conn, err := upgrader.Upgrade(c.Writer, c.Request, nil) if err != nil { - fmt.Printf("[WS ERROR] Failed to upgrade: %v\n", err) + monitor.Errorf("WebSocket upgrade failed: %v", err) return } @@ -48,4 +49,6 @@ func WSHandler(c *gin.Context) { Content: fmt.Sprintf("%s joined the room", username), RoomID: roomID, }) + + monitor.Infof("WebSocket client joined room_id=%s username=%s", roomID, username) } diff --git a/backend/internal/api/middleware.go b/backend/internal/api/middleware.go index 3c3af8e..7fea0a1 100644 --- a/backend/internal/api/middleware.go +++ b/backend/internal/api/middleware.go @@ -7,6 +7,9 @@ import ( "github.com/gin-gonic/gin" + "hightube/internal/db" + "hightube/internal/model" + "hightube/internal/monitor" "hightube/internal/utils" ) @@ -28,19 +31,57 @@ func AuthMiddleware() gin.HandlerFunc { } tokenStr := parts[1] - userIDStr, err := utils.ParseToken(tokenStr) + claims, err := utils.ParseToken(tokenStr) if err != nil { c.JSON(http.StatusUnauthorized, gin.H{"error": "Invalid or expired token"}) c.Abort() return } - userID, _ := strconv.ParseUint(userIDStr, 10, 32) + userID, _ := strconv.ParseUint(claims.Subject, 10, 32) + + var user model.User + if err := db.DB.First(&user, uint(userID)).Error; err != nil { + c.JSON(http.StatusUnauthorized, gin.H{"error": "User not found"}) + c.Abort() + return + } + + if !user.Enabled { + c.JSON(http.StatusForbidden, gin.H{"error": "Account is disabled"}) + c.Abort() + return + } + c.Set("user_id", uint(userID)) + c.Set("username", user.Username) + c.Set("role", user.Role) c.Next() } } +func AdminMiddleware() gin.HandlerFunc { + return func(c *gin.Context) { + role, ok := c.Get("role") + if !ok || role != "admin" { + c.JSON(http.StatusForbidden, gin.H{"error": "admin access required"}) + c.Abort() + return + } + c.Next() + } +} + +func RequestMetricsMiddleware() gin.HandlerFunc { + return func(c *gin.Context) { + monitor.IncrementRequestCount() + c.Next() + if c.Writer.Status() >= http.StatusBadRequest { + monitor.IncrementErrorCount() + } + } +} + // CORSMiddleware handles cross-origin requests from web clients func CORSMiddleware() gin.HandlerFunc { return func(c *gin.Context) { diff --git a/backend/internal/api/router.go b/backend/internal/api/router.go index 0f11d08..46045f7 100644 --- a/backend/internal/api/router.go +++ b/backend/internal/api/router.go @@ -12,9 +12,10 @@ func SetupRouter(streamServer *stream.RTMPServer) *gin.Engine { gin.SetMode(gin.ReleaseMode) r := gin.Default() + BindAdminDependencies(streamServer) // Use CORS middleware to allow web access - r.Use(CORSMiddleware()) + r.Use(CORSMiddleware(), RequestMetricsMiddleware()) // 清除代理信任警告 "[WARNING] You trusted all proxies" r.SetTrustedProxies(nil) @@ -24,9 +25,11 @@ func SetupRouter(streamServer *stream.RTMPServer) *gin.Engine { r.POST("/api/login", Login) r.GET("/api/rooms/active", GetActiveRooms) r.GET("/live/:room_id", streamServer.HandleHTTPFLV) - + // WebSocket endpoint for live chat r.GET("/api/ws/room/:room_id", WSHandler) + r.GET("/admin", AdminPage) + r.GET("/api/admin/logs/stream", StreamAdminLogs) // Protected routes (require JWT) authGroup := r.Group("/api") @@ -34,6 +37,19 @@ func SetupRouter(streamServer *stream.RTMPServer) *gin.Engine { { authGroup.GET("/room/my", GetMyRoom) authGroup.POST("/user/change-password", ChangePassword) + + adminGroup := authGroup.Group("/admin") + adminGroup.Use(AdminMiddleware()) + { + adminGroup.GET("/overview", GetAdminOverview) + adminGroup.GET("/health", GetAdminHealth) + adminGroup.GET("/logs", ListAdminLogs) + adminGroup.GET("/users", ListUsers) + adminGroup.PATCH("/users/:id/role", UpdateUserRole) + adminGroup.PATCH("/users/:id/enabled", UpdateUserEnabled) + adminGroup.POST("/users/:id/reset-password", ResetUserPassword) + adminGroup.DELETE("/users/:id", DeleteUser) + } } return r diff --git a/backend/internal/api/static/admin/index.html b/backend/internal/api/static/admin/index.html new file mode 100644 index 0000000..3c41435 --- /dev/null +++ b/backend/internal/api/static/admin/index.html @@ -0,0 +1,378 @@ + + + + + + Hightube Admin Console + + + + + + +
+
+

Hightube Admin Console

+
+ + +
+
+ +
+

系统状态

+
+
+
+ +
+

在线状态

+
+
+ +
+

实时日志

+
+ + + +
+
+
+ +
+

操作说明

+ +
+ +
+

用户管理

+
+ + +
+ + + + + + + + + + + + +
ID用户名角色状态创建时间操作
+
+
+ + + + diff --git a/backend/internal/chat/hub.go b/backend/internal/chat/hub.go index e99d1b7..5473a4a 100644 --- a/backend/internal/chat/hub.go +++ b/backend/internal/chat/hub.go @@ -40,6 +40,12 @@ type Hub struct { mutex sync.RWMutex } +type StatsSnapshot struct { + RoomCount int `json:"room_count"` + TotalConnectedClient int `json:"total_connected_client"` + RoomClients map[string]int `json:"room_clients"` +} + func NewHub() *Hub { return &Hub{ broadcast: make(chan Message), @@ -195,3 +201,22 @@ func InitChat() { MainHub = NewHub() go MainHub.Run() } + +func (h *Hub) GetStatsSnapshot() StatsSnapshot { + h.mutex.RLock() + defer h.mutex.RUnlock() + + roomClients := make(map[string]int, len(h.rooms)) + totalClients := 0 + for roomID, clients := range h.rooms { + count := len(clients) + roomClients[roomID] = count + totalClients += count + } + + return StatsSnapshot{ + RoomCount: len(h.rooms), + TotalConnectedClient: totalClients, + RoomClients: roomClients, + } +} diff --git a/backend/internal/db/db.go b/backend/internal/db/db.go index a2b211b..ee5280b 100644 --- a/backend/internal/db/db.go +++ b/backend/internal/db/db.go @@ -10,6 +10,8 @@ import ( "gorm.io/gorm/logger" "hightube/internal/model" + "hightube/internal/monitor" + "hightube/internal/utils" ) var DB *gorm.DB @@ -19,10 +21,10 @@ func InitDB() { newLogger := logger.New( log.New(os.Stdout, "\r\n", log.LstdFlags), // io writer logger.Config{ - SlowThreshold: time.Second, // Slow SQL threshold - LogLevel: logger.Warn, // Log level - IgnoreRecordNotFoundError: true, // Ignore ErrRecordNotFound error for logger - Colorful: true, // Disable color + SlowThreshold: time.Second, // Slow SQL threshold + LogLevel: logger.Warn, // Log level + IgnoreRecordNotFoundError: true, // Ignore ErrRecordNotFound error for logger + Colorful: true, // Disable color }, ) @@ -44,5 +46,65 @@ func InitDB() { // Phase 3.5 Fix: Reset all rooms to inactive on startup using explicit map to ensure false is updated DB.Model(&model.Room{}).Where("1 = 1").Updates(map[string]interface{}{"is_active": false}) - log.Println("Database initialized successfully.") + ensureAdminUser() + + monitor.Infof("Database initialized successfully") +} + +func ensureAdminUser() { + adminUsername := os.Getenv("HIGHTUBE_ADMIN_USER") + if adminUsername == "" { + adminUsername = "admin" + } + + adminPassword := os.Getenv("HIGHTUBE_ADMIN_PASS") + if adminPassword == "" { + adminPassword = "admin123456" + } + + var user model.User + err := DB.Where("username = ?", adminUsername).First(&user).Error + if err == nil { + updates := map[string]interface{}{} + if user.Role != "admin" { + updates["role"] = "admin" + } + if !user.Enabled { + updates["enabled"] = true + } + if len(updates) > 0 { + DB.Model(&user).Updates(updates) + monitor.Warnf("Admin account normalized for username=%s", adminUsername) + } + return + } + + hash, hashErr := utils.HashPassword(adminPassword) + if hashErr != nil { + monitor.Errorf("Failed to hash default admin password: %v", hashErr) + return + } + + newAdmin := model.User{ + Username: adminUsername, + Password: hash, + Role: "admin", + Enabled: true, + } + if createErr := DB.Create(&newAdmin).Error; createErr != nil { + monitor.Errorf("Failed to create admin account: %v", createErr) + return + } + + room := model.Room{ + UserID: newAdmin.ID, + Title: newAdmin.Username + "'s Live Room", + StreamKey: utils.GenerateStreamKey(), + IsActive: false, + } + if roomErr := DB.Create(&room).Error; roomErr != nil { + monitor.Warnf("Failed to create default admin room: %v", roomErr) + } + + monitor.Warnf("Default admin created: username=%s password=%s", adminUsername, adminPassword) } diff --git a/backend/internal/model/user.go b/backend/internal/model/user.go index 73b50fa..79c3d75 100644 --- a/backend/internal/model/user.go +++ b/backend/internal/model/user.go @@ -9,4 +9,6 @@ type User struct { gorm.Model Username string `gorm:"uniqueIndex;not null"` Password string `gorm:"not null"` // Hashed password + Role string `gorm:"type:varchar(20);not null;default:user"` + Enabled bool `gorm:"not null;default:true"` } diff --git a/backend/internal/monitor/disk_other.go b/backend/internal/monitor/disk_other.go new file mode 100644 index 0000000..f541806 --- /dev/null +++ b/backend/internal/monitor/disk_other.go @@ -0,0 +1,7 @@ +//go:build !windows + +package monitor + +func getDiskSpaceGB() (float64, float64) { + return 0, 0 +} diff --git a/backend/internal/monitor/disk_windows.go b/backend/internal/monitor/disk_windows.go new file mode 100644 index 0000000..e8b654b --- /dev/null +++ b/backend/internal/monitor/disk_windows.go @@ -0,0 +1,38 @@ +//go:build windows + +package monitor + +import ( + "os" + "path/filepath" + + "golang.org/x/sys/windows" +) + +func getDiskSpaceGB() (float64, float64) { + wd, err := os.Getwd() + if err != nil { + return 0, 0 + } + + vol := filepath.VolumeName(wd) + if vol == "" { + return 0, 0 + } + root := vol + `\\` + + pathPtr, err := windows.UTF16PtrFromString(root) + if err != nil { + return 0, 0 + } + + var freeBytesAvailable uint64 + var totalBytes uint64 + var totalFreeBytes uint64 + if err := windows.GetDiskFreeSpaceEx(pathPtr, &freeBytesAvailable, &totalBytes, &totalFreeBytes); err != nil { + return 0, 0 + } + + const gb = 1024.0 * 1024.0 * 1024.0 + return float64(totalBytes) / gb, float64(totalFreeBytes) / gb +} diff --git a/backend/internal/monitor/logs.go b/backend/internal/monitor/logs.go new file mode 100644 index 0000000..da41aeb --- /dev/null +++ b/backend/internal/monitor/logs.go @@ -0,0 +1,130 @@ +package monitor + +import ( + "fmt" + "log" + "strings" + "sync" + "time" +) + +type LogEntry struct { + Time string `json:"time"` + Level string `json:"level"` + Message string `json:"message"` +} + +type logHub struct { + mutex sync.RWMutex + entries []LogEntry + maxEntries int + subscribers map[chan LogEntry]struct{} +} + +var hub = &logHub{ + maxEntries: 1000, + subscribers: make(map[chan LogEntry]struct{}), +} + +func Init(maxEntries int) { + if maxEntries > 0 { + hub.maxEntries = maxEntries + } +} + +func Infof(format string, args ...interface{}) { + appendEntry("info", fmt.Sprintf(format, args...)) +} + +func Warnf(format string, args ...interface{}) { + appendEntry("warn", fmt.Sprintf(format, args...)) +} + +func Errorf(format string, args ...interface{}) { + appendEntry("error", fmt.Sprintf(format, args...)) +} + +func Auditf(format string, args ...interface{}) { + appendEntry("audit", fmt.Sprintf(format, args...)) +} + +func appendEntry(level, message string) { + entry := LogEntry{ + Time: time.Now().Format(time.RFC3339), + Level: strings.ToLower(level), + Message: message, + } + + log.Printf("[%s] %s", strings.ToUpper(entry.Level), entry.Message) + + hub.mutex.Lock() + hub.entries = append(hub.entries, entry) + if len(hub.entries) > hub.maxEntries { + hub.entries = hub.entries[len(hub.entries)-hub.maxEntries:] + } + + for ch := range hub.subscribers { + select { + case ch <- entry: + default: + } + } + hub.mutex.Unlock() +} + +func Subscribe() chan LogEntry { + ch := make(chan LogEntry, 100) + hub.mutex.Lock() + hub.subscribers[ch] = struct{}{} + hub.mutex.Unlock() + return ch +} + +func Unsubscribe(ch chan LogEntry) { + hub.mutex.Lock() + if _, ok := hub.subscribers[ch]; ok { + delete(hub.subscribers, ch) + close(ch) + } + hub.mutex.Unlock() +} + +func Query(level, keyword string, page, pageSize int) ([]LogEntry, int) { + if page < 1 { + page = 1 + } + if pageSize < 1 { + pageSize = 20 + } + if pageSize > 200 { + pageSize = 200 + } + + level = strings.TrimSpace(strings.ToLower(level)) + keyword = strings.TrimSpace(strings.ToLower(keyword)) + + hub.mutex.RLock() + defer hub.mutex.RUnlock() + + filtered := make([]LogEntry, 0, len(hub.entries)) + for _, e := range hub.entries { + if level != "" && e.Level != level { + continue + } + if keyword != "" && !strings.Contains(strings.ToLower(e.Message), keyword) { + continue + } + filtered = append(filtered, e) + } + + total := len(filtered) + start := (page - 1) * pageSize + if start >= total { + return []LogEntry{}, total + } + end := start + pageSize + if end > total { + end = total + } + return filtered[start:end], total +} diff --git a/backend/internal/monitor/metrics.go b/backend/internal/monitor/metrics.go new file mode 100644 index 0000000..4e12d51 --- /dev/null +++ b/backend/internal/monitor/metrics.go @@ -0,0 +1,55 @@ +package monitor + +import ( + "runtime" + "sync/atomic" + "time" +) + +var startedAt = time.Now() + +var totalRequests uint64 +var totalErrors uint64 + +type Snapshot struct { + UptimeSeconds int64 `json:"uptime_seconds"` + Goroutines int `json:"goroutines"` + MemoryAllocMB float64 `json:"memory_alloc_mb"` + MemorySysMB float64 `json:"memory_sys_mb"` + CPUCores int `json:"cpu_cores"` + DiskTotalGB float64 `json:"disk_total_gb"` + DiskFreeGB float64 `json:"disk_free_gb"` + RequestsTotal uint64 `json:"requests_total"` + ErrorsTotal uint64 `json:"errors_total"` +} + +func IncrementRequestCount() { + atomic.AddUint64(&totalRequests, 1) +} + +func IncrementErrorCount() { + atomic.AddUint64(&totalErrors, 1) +} + +func GetSnapshot() Snapshot { + var mem runtime.MemStats + runtime.ReadMemStats(&mem) + + diskTotal, diskFree := getDiskSpaceGB() + + return Snapshot{ + UptimeSeconds: int64(time.Since(startedAt).Seconds()), + Goroutines: runtime.NumGoroutine(), + MemoryAllocMB: bytesToMB(mem.Alloc), + MemorySysMB: bytesToMB(mem.Sys), + CPUCores: runtime.NumCPU(), + DiskTotalGB: diskTotal, + DiskFreeGB: diskFree, + RequestsTotal: atomic.LoadUint64(&totalRequests), + ErrorsTotal: atomic.LoadUint64(&totalErrors), + } +} + +func bytesToMB(v uint64) float64 { + return float64(v) / 1024.0 / 1024.0 +} diff --git a/backend/internal/stream/server.go b/backend/internal/stream/server.go index c62719a..4b397d5 100644 --- a/backend/internal/stream/server.go +++ b/backend/internal/stream/server.go @@ -17,6 +17,7 @@ import ( "hightube/internal/chat" "hightube/internal/db" "hightube/internal/model" + "hightube/internal/monitor" ) func init() { @@ -51,12 +52,12 @@ func NewRTMPServer() *RTMPServer { // Triggered when a broadcaster (e.g., OBS) starts publishing s.server.HandlePublish = func(conn *rtmp.Conn) { streamPath := conn.URL.Path // Expected format: /live/{stream_key} - fmt.Printf("[INFO] OBS is attempting to publish to: %s\n", streamPath) + monitor.Infof("OBS publish attempt: %s", streamPath) // Extract stream key from path parts := strings.Split(streamPath, "/") if len(parts) < 3 || parts[1] != "live" { - fmt.Printf("[WARN] Invalid publish path format: %s\n", streamPath) + monitor.Warnf("Invalid publish path format: %s", streamPath) return } streamKey := parts[2] @@ -64,16 +65,16 @@ func NewRTMPServer() *RTMPServer { // Authenticate stream key var room model.Room if err := db.DB.Where("stream_key = ?", streamKey).First(&room).Error; err != nil { - fmt.Printf("[WARN] Authentication failed, invalid stream key: %s\n", streamKey) + monitor.Warnf("Invalid stream key: %s", streamKey) return // Reject connection } - fmt.Printf("[INFO] Stream authenticated for Room ID: %d\n", room.ID) + monitor.Infof("Stream authenticated for room_id=%d", room.ID) // 1. Get audio/video stream metadata streams, err := conn.Streams() if err != nil { - fmt.Printf("[ERROR] Failed to parse stream headers: %v\n", err) + monitor.Errorf("Failed to parse stream headers: %v", err) return } @@ -101,7 +102,7 @@ func NewRTMPServer() *RTMPServer { // Clear chat history for this room chat.MainHub.ClearRoomHistory(fmt.Sprintf("%d", room.ID)) - fmt.Printf("[INFO] Publishing ended for Room ID: %d\n", room.ID) + monitor.Infof("Publishing ended for room_id=%d", room.ID) }() // 4. Continuously copy data packets to our broadcast queue @@ -111,7 +112,7 @@ func NewRTMPServer() *RTMPServer { // Triggered when a viewer (e.g., VLC) requests playback s.server.HandlePlay = func(conn *rtmp.Conn) { streamPath := conn.URL.Path // Expected format: /live/{room_id} - fmt.Printf("[INFO] VLC is pulling stream from: %s\n", streamPath) + monitor.Infof("RTMP play requested: %s", streamPath) // 1. Look for the requested room's data queue s.mutex.RLock() @@ -119,7 +120,7 @@ func NewRTMPServer() *RTMPServer { s.mutex.RUnlock() if !ok { - fmt.Printf("[WARN] Stream not found or inactive: %s\n", streamPath) + monitor.Warnf("Stream not found or inactive: %s", streamPath) return } @@ -129,7 +130,7 @@ func NewRTMPServer() *RTMPServer { conn.WriteHeader(streams) // 3. Cleanup on end - defer fmt.Printf("[INFO] Playback ended: %s\n", streamPath) + defer monitor.Infof("Playback ended: %s", streamPath) // 4. Continuously copy data packets to the viewer err := avutil.CopyPackets(conn, cursor) @@ -137,9 +138,9 @@ func NewRTMPServer() *RTMPServer { // 如果是客户端主动断开连接引起的错误,不将其作为严重错误打印 errStr := err.Error() if strings.Contains(errStr, "broken pipe") || strings.Contains(errStr, "connection reset by peer") { - fmt.Printf("[INFO] Viewer disconnected normally: %s\n", streamPath) + monitor.Infof("Viewer disconnected: %s", streamPath) } else { - fmt.Printf("[ERROR] Error occurred during playback: %v\n", err) + monitor.Errorf("Playback error on %s: %v", streamPath, err) } } } @@ -150,7 +151,7 @@ func NewRTMPServer() *RTMPServer { // Start launches the RTMP server func (s *RTMPServer) Start(addr string) error { s.server.Addr = addr - fmt.Printf("[INFO] RTMP Server is listening on %s...\n", addr) + monitor.Infof("RTMP server listening on %s", addr) return s.server.ListenAndServe() } @@ -190,9 +191,26 @@ func (s *RTMPServer) HandleHTTPFLV(c *gin.Context) { if err := avutil.CopyFile(muxer, cursor); err != nil && err != io.EOF { errStr := err.Error() if strings.Contains(errStr, "broken pipe") || strings.Contains(errStr, "connection reset by peer") { - fmt.Printf("[INFO] HTTP-FLV viewer disconnected normally: %s\n", streamPath) + monitor.Infof("HTTP-FLV viewer disconnected: %s", streamPath) return } - fmt.Printf("[ERROR] HTTP-FLV playback error on %s: %v\n", streamPath, err) + monitor.Errorf("HTTP-FLV playback error on %s: %v", streamPath, err) } } + +func (s *RTMPServer) ActiveStreamCount() int { + s.mutex.RLock() + defer s.mutex.RUnlock() + return len(s.channels) +} + +func (s *RTMPServer) ActiveStreamPaths() []string { + s.mutex.RLock() + defer s.mutex.RUnlock() + + paths := make([]string, 0, len(s.channels)) + for path := range s.channels { + paths = append(paths, path) + } + return paths +} diff --git a/backend/internal/utils/utils.go b/backend/internal/utils/utils.go index f6f8bd5..9ed8172 100644 --- a/backend/internal/utils/utils.go +++ b/backend/internal/utils/utils.go @@ -13,26 +13,36 @@ import ( // In production, load this from environment variables var jwtKey = []byte("hightube_super_secret_key_MVP_only") +type TokenClaims struct { + Username string `json:"username"` + Role string `json:"role"` + jwt.RegisteredClaims +} + // GenerateToken generates a JWT token for a given user ID -func GenerateToken(userID uint) (string, error) { - claims := &jwt.RegisteredClaims{ +func GenerateToken(userID uint, username, role string) (string, error) { + claims := &TokenClaims{ + Username: username, + Role: role, + RegisteredClaims: jwt.RegisteredClaims{ Subject: fmt.Sprintf("%d", userID), ExpiresAt: jwt.NewNumericDate(time.Now().Add(24 * time.Hour)), + }, } token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims) return token.SignedString(jwtKey) } // ParseToken parses the JWT string and returns the user ID (Subject) -func ParseToken(tokenStr string) (string, error) { - claims := &jwt.RegisteredClaims{} +func ParseToken(tokenStr string) (*TokenClaims, error) { + claims := &TokenClaims{} token, err := jwt.ParseWithClaims(tokenStr, claims, func(t *jwt.Token) (interface{}, error) { return jwtKey, nil }) if err != nil || !token.Valid { - return "", err + return nil, err } - return claims.Subject, nil + return claims, nil } // HashPassword creates a bcrypt hash of the password