From 1cce5634b1c4097363047141e240e142559d9a0d Mon Sep 17 00:00:00 2001 From: Z <424062803@qq.com> Date: Thu, 9 Apr 2026 00:14:57 +0800 Subject: [PATCH] =?UTF-8?q?=E7=9B=91=E6=8E=A7=E7=BD=91=E9=A1=B5=E5=AE=9E?= =?UTF-8?q?=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/cmd/server/main.go | 14 +- backend/internal/api/admin.go | 333 ++++++++++++++++ backend/internal/api/admin_ui.go | 21 ++ backend/internal/api/auth.go | 11 +- backend/internal/api/chat_handler.go | 5 +- backend/internal/api/middleware.go | 45 ++- backend/internal/api/router.go | 20 +- backend/internal/api/static/admin/index.html | 378 +++++++++++++++++++ backend/internal/chat/hub.go | 25 ++ backend/internal/db/db.go | 72 +++- backend/internal/model/user.go | 2 + backend/internal/monitor/disk_other.go | 7 + backend/internal/monitor/disk_windows.go | 38 ++ backend/internal/monitor/logs.go | 130 +++++++ backend/internal/monitor/metrics.go | 55 +++ backend/internal/stream/server.go | 46 ++- backend/internal/utils/utils.go | 22 +- 17 files changed, 1186 insertions(+), 38 deletions(-) create mode 100644 backend/internal/api/admin.go create mode 100644 backend/internal/api/admin_ui.go create mode 100644 backend/internal/api/static/admin/index.html create mode 100644 backend/internal/monitor/disk_other.go create mode 100644 backend/internal/monitor/disk_windows.go create mode 100644 backend/internal/monitor/logs.go create mode 100644 backend/internal/monitor/metrics.go diff --git a/backend/cmd/server/main.go b/backend/cmd/server/main.go index 6491e47..db88312 100644 --- a/backend/cmd/server/main.go +++ b/backend/cmd/server/main.go @@ -1,16 +1,16 @@ package main import ( - "log" - "hightube/internal/api" "hightube/internal/chat" "hightube/internal/db" + "hightube/internal/monitor" "hightube/internal/stream" ) func main() { - log.Println("Starting Hightube Server v1.0.0-Beta3.7...") + monitor.Init(2000) + monitor.Infof("Starting Hightube Server v1.0.0-Beta3.7") // Initialize Database and run auto-migrations db.InitDB() @@ -23,15 +23,15 @@ func main() { // Start the API server in a goroutine so it doesn't block the RTMP server go func() { r := api.SetupRouter(srv) - log.Println("[INFO] API Server is listening on :8080...") + monitor.Infof("API server listening on :8080") if err := r.Run(":8080"); err != nil { - log.Fatalf("Failed to start API server: %v", err) + monitor.Errorf("Failed to start API server: %v", err) } }() // Setup and start the RTMP server - log.Println("[INFO] Ready to receive RTMP streams from OBS.") + monitor.Infof("Ready to receive RTMP streams from OBS") if err := srv.Start(":1935"); err != nil { - log.Fatalf("Failed to start RTMP server: %v", err) + monitor.Errorf("Failed to start RTMP server: %v", err) } } diff --git a/backend/internal/api/admin.go b/backend/internal/api/admin.go new file mode 100644 index 0000000..eed6783 --- /dev/null +++ b/backend/internal/api/admin.go @@ -0,0 +1,333 @@ +package api + +import ( + "encoding/json" + "net/http" + "strconv" + "strings" + + "github.com/gin-gonic/gin" + + "hightube/internal/chat" + "hightube/internal/db" + "hightube/internal/model" + "hightube/internal/monitor" + "hightube/internal/stream" + "hightube/internal/utils" +) + +var adminRTMP *stream.RTMPServer + +func BindAdminDependencies(rtmpSrv *stream.RTMPServer) { + adminRTMP = rtmpSrv +} + +func GetAdminOverview(c *gin.Context) { + stats := monitor.GetSnapshot() + chatStats := chat.StatsSnapshot{} + if chat.MainHub != nil { + chatStats = chat.MainHub.GetStatsSnapshot() + } + + activeCount := 0 + activePaths := []string{} + if adminRTMP != nil { + activeCount = adminRTMP.ActiveStreamCount() + activePaths = adminRTMP.ActiveStreamPaths() + } + + c.JSON(http.StatusOK, gin.H{ + "system": stats, + "stream": gin.H{ + "active_stream_count": activeCount, + "active_stream_paths": activePaths, + }, + "chat": chatStats, + }) +} + +func GetAdminHealth(c *gin.Context) { + type dbHealth struct { + OK bool `json:"ok"` + Error string `json:"error,omitempty"` + } + + health := gin.H{ + "api": true, + "rtmp": adminRTMP != nil, + } + + dbOK := dbHealth{OK: true} + if err := db.DB.Exec("SELECT 1").Error; err != nil { + dbOK.OK = false + dbOK.Error = err.Error() + } + health["db"] = dbOK + + c.JSON(http.StatusOK, health) +} + +func ListAdminLogs(c *gin.Context) { + level := c.Query("level") + keyword := c.Query("keyword") + page := parseIntWithDefault(c.Query("page"), 1) + pageSize := parseIntWithDefault(c.Query("page_size"), 20) + + items, total := monitor.Query(level, keyword, page, pageSize) + c.JSON(http.StatusOK, gin.H{ + "items": items, + "total": total, + "page": page, + "page_size": pageSize, + }) +} + +func StreamAdminLogs(c *gin.Context) { + if !authorizeAdminTokenFromQuery(c) { + return + } + + c.Header("Content-Type", "text/event-stream") + c.Header("Cache-Control", "no-cache") + c.Header("Connection", "keep-alive") + c.Header("X-Accel-Buffering", "no") + + flusher, ok := c.Writer.(http.Flusher) + if !ok { + c.JSON(http.StatusInternalServerError, gin.H{"error": "streaming unsupported"}) + return + } + + ch := monitor.Subscribe() + defer monitor.Unsubscribe(ch) + + for { + select { + case entry := <-ch: + payload, _ := json.Marshal(entry) + _, _ = c.Writer.Write([]byte("event: log\n")) + _, _ = c.Writer.Write([]byte("data: " + string(payload) + "\n\n")) + flusher.Flush() + case <-c.Request.Context().Done(): + return + } + } +} + +func authorizeAdminTokenFromQuery(c *gin.Context) bool { + token := strings.TrimSpace(c.Query("token")) + if token == "" { + c.JSON(http.StatusUnauthorized, gin.H{"error": "token is required"}) + return false + } + + claims, err := utils.ParseToken(token) + if err != nil { + c.JSON(http.StatusUnauthorized, gin.H{"error": "invalid token"}) + return false + } + + userID, err := strconv.ParseUint(claims.Subject, 10, 32) + if err != nil { + c.JSON(http.StatusUnauthorized, gin.H{"error": "invalid token subject"}) + return false + } + + var user model.User + if err := db.DB.First(&user, uint(userID)).Error; err != nil { + c.JSON(http.StatusUnauthorized, gin.H{"error": "user not found"}) + return false + } + + if !user.Enabled || user.Role != "admin" { + c.JSON(http.StatusForbidden, gin.H{"error": "admin access required"}) + return false + } + + return true +} + +type updateRoleRequest struct { + Role string `json:"role" binding:"required"` +} + +func UpdateUserRole(c *gin.Context) { + userID, ok := parsePathUint(c.Param("id")) + if !ok { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid user id"}) + return + } + + var req updateRoleRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + + role := strings.ToLower(strings.TrimSpace(req.Role)) + if role != "admin" && role != "user" { + c.JSON(http.StatusBadRequest, gin.H{"error": "role must be admin or user"}) + return + } + + if err := db.DB.Model(&model.User{}).Where("id = ?", userID).Update("role", role).Error; err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update role"}) + return + } + + operator, _ := c.Get("username") + monitor.Auditf("admin=%v updated user_id=%d role=%s", operator, userID, role) + c.JSON(http.StatusOK, gin.H{"message": "role updated"}) +} + +type updateEnabledRequest struct { + Enabled bool `json:"enabled"` +} + +func UpdateUserEnabled(c *gin.Context) { + userID, ok := parsePathUint(c.Param("id")) + if !ok { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid user id"}) + return + } + + var req updateEnabledRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + + if err := db.DB.Model(&model.User{}).Where("id = ?", userID).Update("enabled", req.Enabled).Error; err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update enabled status"}) + return + } + + operator, _ := c.Get("username") + monitor.Auditf("admin=%v updated user_id=%d enabled=%v", operator, userID, req.Enabled) + c.JSON(http.StatusOK, gin.H{"message": "enabled status updated"}) +} + +type resetPasswordRequest struct { + NewPassword string `json:"new_password" binding:"required"` +} + +func ResetUserPassword(c *gin.Context) { + userID, ok := parsePathUint(c.Param("id")) + if !ok { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid user id"}) + return + } + + var req resetPasswordRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + + hash, err := utils.HashPassword(req.NewPassword) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to hash password"}) + return + } + + if err := db.DB.Model(&model.User{}).Where("id = ?", userID).Update("password", hash).Error; err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to reset password"}) + return + } + + operator, _ := c.Get("username") + monitor.Auditf("admin=%v reset password for user_id=%d", operator, userID) + c.JSON(http.StatusOK, gin.H{"message": "password reset"}) +} + +func DeleteUser(c *gin.Context) { + userID, ok := parsePathUint(c.Param("id")) + if !ok { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid user id"}) + return + } + + operatorID, _ := c.Get("user_id") + if opID, ok := operatorID.(uint); ok && opID == userID { + c.JSON(http.StatusBadRequest, gin.H{"error": "cannot delete current admin account"}) + return + } + + if err := db.DB.Where("user_id = ?", userID).Delete(&model.Room{}).Error; err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to delete rooms"}) + return + } + if err := db.DB.Delete(&model.User{}, userID).Error; err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to delete user"}) + return + } + + operator, _ := c.Get("username") + monitor.Auditf("admin=%v deleted user_id=%d", operator, userID) + c.JSON(http.StatusOK, gin.H{"message": "user deleted"}) +} + +func ListUsers(c *gin.Context) { + keyword := strings.TrimSpace(c.Query("keyword")) + page := parseIntWithDefault(c.Query("page"), 1) + pageSize := parseIntWithDefault(c.Query("page_size"), 20) + if pageSize > 200 { + pageSize = 200 + } + offset := (page - 1) * pageSize + if offset < 0 { + offset = 0 + } + + query := db.DB.Model(&model.User{}) + if keyword != "" { + query = query.Where("username LIKE ?", "%"+keyword+"%") + } + + var total int64 + if err := query.Count(&total).Error; err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to count users"}) + return + } + + var users []model.User + if err := query.Order("id DESC").Offset(offset).Limit(pageSize).Find(&users).Error; err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to query users"}) + return + } + + items := make([]gin.H, 0, len(users)) + for _, u := range users { + items = append(items, gin.H{ + "id": u.ID, + "username": u.Username, + "role": u.Role, + "enabled": u.Enabled, + "created_at": u.CreatedAt, + "updated_at": u.UpdatedAt, + }) + } + + c.JSON(http.StatusOK, gin.H{ + "items": items, + "total": total, + "page": page, + "page_size": pageSize, + }) +} + +func parseIntWithDefault(v string, def int) int { + i, err := strconv.Atoi(v) + if err != nil || i <= 0 { + return def + } + return i +} + +func parsePathUint(v string) (uint, bool) { + u64, err := strconv.ParseUint(v, 10, 32) + if err != nil { + return 0, false + } + return uint(u64), true +} diff --git a/backend/internal/api/admin_ui.go b/backend/internal/api/admin_ui.go new file mode 100644 index 0000000..ca1a936 --- /dev/null +++ b/backend/internal/api/admin_ui.go @@ -0,0 +1,21 @@ +package api + +import ( + "embed" + "io/fs" + "net/http" + + "github.com/gin-gonic/gin" +) + +//go:embed static/admin/* +var adminFS embed.FS + +func AdminPage(c *gin.Context) { + content, err := fs.ReadFile(adminFS, "static/admin/index.html") + if err != nil { + c.String(http.StatusInternalServerError, "failed to load admin page") + return + } + c.Data(http.StatusOK, "text/html; charset=utf-8", content) +} diff --git a/backend/internal/api/auth.go b/backend/internal/api/auth.go index 9fec6b8..b99023f 100644 --- a/backend/internal/api/auth.go +++ b/backend/internal/api/auth.go @@ -50,6 +50,8 @@ func Register(c *gin.Context) { user := model.User{ Username: req.Username, Password: hashedPassword, + Role: "user", + Enabled: true, } if err := db.DB.Create(&user).Error; err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to create user"}) @@ -89,7 +91,12 @@ func Login(c *gin.Context) { return } - token, err := utils.GenerateToken(user.ID) + if !user.Enabled { + c.JSON(http.StatusForbidden, gin.H{"error": "Account is disabled"}) + return + } + + token, err := utils.GenerateToken(user.ID, user.Username, user.Role) if err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to generate token"}) return @@ -98,6 +105,8 @@ func Login(c *gin.Context) { c.JSON(http.StatusOK, gin.H{ "token": token, "username": user.Username, + "role": user.Role, + "enabled": user.Enabled, }) } diff --git a/backend/internal/api/chat_handler.go b/backend/internal/api/chat_handler.go index c293e55..17c811f 100644 --- a/backend/internal/api/chat_handler.go +++ b/backend/internal/api/chat_handler.go @@ -8,6 +8,7 @@ import ( "github.com/gorilla/websocket" "hightube/internal/chat" + "hightube/internal/monitor" ) var upgrader = websocket.Upgrader{ @@ -23,7 +24,7 @@ func WSHandler(c *gin.Context) { conn, err := upgrader.Upgrade(c.Writer, c.Request, nil) if err != nil { - fmt.Printf("[WS ERROR] Failed to upgrade: %v\n", err) + monitor.Errorf("WebSocket upgrade failed: %v", err) return } @@ -48,4 +49,6 @@ func WSHandler(c *gin.Context) { Content: fmt.Sprintf("%s joined the room", username), RoomID: roomID, }) + + monitor.Infof("WebSocket client joined room_id=%s username=%s", roomID, username) } diff --git a/backend/internal/api/middleware.go b/backend/internal/api/middleware.go index 3c3af8e..7fea0a1 100644 --- a/backend/internal/api/middleware.go +++ b/backend/internal/api/middleware.go @@ -7,6 +7,9 @@ import ( "github.com/gin-gonic/gin" + "hightube/internal/db" + "hightube/internal/model" + "hightube/internal/monitor" "hightube/internal/utils" ) @@ -28,19 +31,57 @@ func AuthMiddleware() gin.HandlerFunc { } tokenStr := parts[1] - userIDStr, err := utils.ParseToken(tokenStr) + claims, err := utils.ParseToken(tokenStr) if err != nil { c.JSON(http.StatusUnauthorized, gin.H{"error": "Invalid or expired token"}) c.Abort() return } - userID, _ := strconv.ParseUint(userIDStr, 10, 32) + userID, _ := strconv.ParseUint(claims.Subject, 10, 32) + + var user model.User + if err := db.DB.First(&user, uint(userID)).Error; err != nil { + c.JSON(http.StatusUnauthorized, gin.H{"error": "User not found"}) + c.Abort() + return + } + + if !user.Enabled { + c.JSON(http.StatusForbidden, gin.H{"error": "Account is disabled"}) + c.Abort() + return + } + c.Set("user_id", uint(userID)) + c.Set("username", user.Username) + c.Set("role", user.Role) c.Next() } } +func AdminMiddleware() gin.HandlerFunc { + return func(c *gin.Context) { + role, ok := c.Get("role") + if !ok || role != "admin" { + c.JSON(http.StatusForbidden, gin.H{"error": "admin access required"}) + c.Abort() + return + } + c.Next() + } +} + +func RequestMetricsMiddleware() gin.HandlerFunc { + return func(c *gin.Context) { + monitor.IncrementRequestCount() + c.Next() + if c.Writer.Status() >= http.StatusBadRequest { + monitor.IncrementErrorCount() + } + } +} + // CORSMiddleware handles cross-origin requests from web clients func CORSMiddleware() gin.HandlerFunc { return func(c *gin.Context) { diff --git a/backend/internal/api/router.go b/backend/internal/api/router.go index 0f11d08..46045f7 100644 --- a/backend/internal/api/router.go +++ b/backend/internal/api/router.go @@ -12,9 +12,10 @@ func SetupRouter(streamServer *stream.RTMPServer) *gin.Engine { gin.SetMode(gin.ReleaseMode) r := gin.Default() + BindAdminDependencies(streamServer) // Use CORS middleware to allow web access - r.Use(CORSMiddleware()) + r.Use(CORSMiddleware(), RequestMetricsMiddleware()) // 清除代理信任警告 "[WARNING] You trusted all proxies" r.SetTrustedProxies(nil) @@ -24,9 +25,11 @@ func SetupRouter(streamServer *stream.RTMPServer) *gin.Engine { r.POST("/api/login", Login) r.GET("/api/rooms/active", GetActiveRooms) r.GET("/live/:room_id", streamServer.HandleHTTPFLV) - + // WebSocket endpoint for live chat r.GET("/api/ws/room/:room_id", WSHandler) + r.GET("/admin", AdminPage) + r.GET("/api/admin/logs/stream", StreamAdminLogs) // Protected routes (require JWT) authGroup := r.Group("/api") @@ -34,6 +37,19 @@ func SetupRouter(streamServer *stream.RTMPServer) *gin.Engine { { authGroup.GET("/room/my", GetMyRoom) authGroup.POST("/user/change-password", ChangePassword) + + adminGroup := authGroup.Group("/admin") + adminGroup.Use(AdminMiddleware()) + { + adminGroup.GET("/overview", GetAdminOverview) + adminGroup.GET("/health", GetAdminHealth) + adminGroup.GET("/logs", ListAdminLogs) + adminGroup.GET("/users", ListUsers) + adminGroup.PATCH("/users/:id/role", UpdateUserRole) + adminGroup.PATCH("/users/:id/enabled", UpdateUserEnabled) + adminGroup.POST("/users/:id/reset-password", ResetUserPassword) + adminGroup.DELETE("/users/:id", DeleteUser) + } } return r diff --git a/backend/internal/api/static/admin/index.html b/backend/internal/api/static/admin/index.html new file mode 100644 index 0000000..3c41435 --- /dev/null +++ b/backend/internal/api/static/admin/index.html @@ -0,0 +1,378 @@ + + +
+ + +| ID | +用户名 | +角色 | +状态 | +创建时间 | +操作 | +
|---|