监控网页实现

This commit is contained in:
Z
2026-04-09 00:14:57 +08:00
parent 6b1c7242c7
commit 1cce5634b1
17 changed files with 1186 additions and 38 deletions

View File

@@ -0,0 +1,333 @@
package api
import (
"encoding/json"
"net/http"
"strconv"
"strings"
"github.com/gin-gonic/gin"
"hightube/internal/chat"
"hightube/internal/db"
"hightube/internal/model"
"hightube/internal/monitor"
"hightube/internal/stream"
"hightube/internal/utils"
)
var adminRTMP *stream.RTMPServer
func BindAdminDependencies(rtmpSrv *stream.RTMPServer) {
adminRTMP = rtmpSrv
}
func GetAdminOverview(c *gin.Context) {
stats := monitor.GetSnapshot()
chatStats := chat.StatsSnapshot{}
if chat.MainHub != nil {
chatStats = chat.MainHub.GetStatsSnapshot()
}
activeCount := 0
activePaths := []string{}
if adminRTMP != nil {
activeCount = adminRTMP.ActiveStreamCount()
activePaths = adminRTMP.ActiveStreamPaths()
}
c.JSON(http.StatusOK, gin.H{
"system": stats,
"stream": gin.H{
"active_stream_count": activeCount,
"active_stream_paths": activePaths,
},
"chat": chatStats,
})
}
func GetAdminHealth(c *gin.Context) {
type dbHealth struct {
OK bool `json:"ok"`
Error string `json:"error,omitempty"`
}
health := gin.H{
"api": true,
"rtmp": adminRTMP != nil,
}
dbOK := dbHealth{OK: true}
if err := db.DB.Exec("SELECT 1").Error; err != nil {
dbOK.OK = false
dbOK.Error = err.Error()
}
health["db"] = dbOK
c.JSON(http.StatusOK, health)
}
func ListAdminLogs(c *gin.Context) {
level := c.Query("level")
keyword := c.Query("keyword")
page := parseIntWithDefault(c.Query("page"), 1)
pageSize := parseIntWithDefault(c.Query("page_size"), 20)
items, total := monitor.Query(level, keyword, page, pageSize)
c.JSON(http.StatusOK, gin.H{
"items": items,
"total": total,
"page": page,
"page_size": pageSize,
})
}
func StreamAdminLogs(c *gin.Context) {
if !authorizeAdminTokenFromQuery(c) {
return
}
c.Header("Content-Type", "text/event-stream")
c.Header("Cache-Control", "no-cache")
c.Header("Connection", "keep-alive")
c.Header("X-Accel-Buffering", "no")
flusher, ok := c.Writer.(http.Flusher)
if !ok {
c.JSON(http.StatusInternalServerError, gin.H{"error": "streaming unsupported"})
return
}
ch := monitor.Subscribe()
defer monitor.Unsubscribe(ch)
for {
select {
case entry := <-ch:
payload, _ := json.Marshal(entry)
_, _ = c.Writer.Write([]byte("event: log\n"))
_, _ = c.Writer.Write([]byte("data: " + string(payload) + "\n\n"))
flusher.Flush()
case <-c.Request.Context().Done():
return
}
}
}
func authorizeAdminTokenFromQuery(c *gin.Context) bool {
token := strings.TrimSpace(c.Query("token"))
if token == "" {
c.JSON(http.StatusUnauthorized, gin.H{"error": "token is required"})
return false
}
claims, err := utils.ParseToken(token)
if err != nil {
c.JSON(http.StatusUnauthorized, gin.H{"error": "invalid token"})
return false
}
userID, err := strconv.ParseUint(claims.Subject, 10, 32)
if err != nil {
c.JSON(http.StatusUnauthorized, gin.H{"error": "invalid token subject"})
return false
}
var user model.User
if err := db.DB.First(&user, uint(userID)).Error; err != nil {
c.JSON(http.StatusUnauthorized, gin.H{"error": "user not found"})
return false
}
if !user.Enabled || user.Role != "admin" {
c.JSON(http.StatusForbidden, gin.H{"error": "admin access required"})
return false
}
return true
}
type updateRoleRequest struct {
Role string `json:"role" binding:"required"`
}
func UpdateUserRole(c *gin.Context) {
userID, ok := parsePathUint(c.Param("id"))
if !ok {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid user id"})
return
}
var req updateRoleRequest
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
role := strings.ToLower(strings.TrimSpace(req.Role))
if role != "admin" && role != "user" {
c.JSON(http.StatusBadRequest, gin.H{"error": "role must be admin or user"})
return
}
if err := db.DB.Model(&model.User{}).Where("id = ?", userID).Update("role", role).Error; err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update role"})
return
}
operator, _ := c.Get("username")
monitor.Auditf("admin=%v updated user_id=%d role=%s", operator, userID, role)
c.JSON(http.StatusOK, gin.H{"message": "role updated"})
}
type updateEnabledRequest struct {
Enabled bool `json:"enabled"`
}
func UpdateUserEnabled(c *gin.Context) {
userID, ok := parsePathUint(c.Param("id"))
if !ok {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid user id"})
return
}
var req updateEnabledRequest
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
if err := db.DB.Model(&model.User{}).Where("id = ?", userID).Update("enabled", req.Enabled).Error; err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update enabled status"})
return
}
operator, _ := c.Get("username")
monitor.Auditf("admin=%v updated user_id=%d enabled=%v", operator, userID, req.Enabled)
c.JSON(http.StatusOK, gin.H{"message": "enabled status updated"})
}
type resetPasswordRequest struct {
NewPassword string `json:"new_password" binding:"required"`
}
func ResetUserPassword(c *gin.Context) {
userID, ok := parsePathUint(c.Param("id"))
if !ok {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid user id"})
return
}
var req resetPasswordRequest
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
hash, err := utils.HashPassword(req.NewPassword)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to hash password"})
return
}
if err := db.DB.Model(&model.User{}).Where("id = ?", userID).Update("password", hash).Error; err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to reset password"})
return
}
operator, _ := c.Get("username")
monitor.Auditf("admin=%v reset password for user_id=%d", operator, userID)
c.JSON(http.StatusOK, gin.H{"message": "password reset"})
}
func DeleteUser(c *gin.Context) {
userID, ok := parsePathUint(c.Param("id"))
if !ok {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid user id"})
return
}
operatorID, _ := c.Get("user_id")
if opID, ok := operatorID.(uint); ok && opID == userID {
c.JSON(http.StatusBadRequest, gin.H{"error": "cannot delete current admin account"})
return
}
if err := db.DB.Where("user_id = ?", userID).Delete(&model.Room{}).Error; err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to delete rooms"})
return
}
if err := db.DB.Delete(&model.User{}, userID).Error; err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to delete user"})
return
}
operator, _ := c.Get("username")
monitor.Auditf("admin=%v deleted user_id=%d", operator, userID)
c.JSON(http.StatusOK, gin.H{"message": "user deleted"})
}
func ListUsers(c *gin.Context) {
keyword := strings.TrimSpace(c.Query("keyword"))
page := parseIntWithDefault(c.Query("page"), 1)
pageSize := parseIntWithDefault(c.Query("page_size"), 20)
if pageSize > 200 {
pageSize = 200
}
offset := (page - 1) * pageSize
if offset < 0 {
offset = 0
}
query := db.DB.Model(&model.User{})
if keyword != "" {
query = query.Where("username LIKE ?", "%"+keyword+"%")
}
var total int64
if err := query.Count(&total).Error; err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to count users"})
return
}
var users []model.User
if err := query.Order("id DESC").Offset(offset).Limit(pageSize).Find(&users).Error; err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to query users"})
return
}
items := make([]gin.H, 0, len(users))
for _, u := range users {
items = append(items, gin.H{
"id": u.ID,
"username": u.Username,
"role": u.Role,
"enabled": u.Enabled,
"created_at": u.CreatedAt,
"updated_at": u.UpdatedAt,
})
}
c.JSON(http.StatusOK, gin.H{
"items": items,
"total": total,
"page": page,
"page_size": pageSize,
})
}
func parseIntWithDefault(v string, def int) int {
i, err := strconv.Atoi(v)
if err != nil || i <= 0 {
return def
}
return i
}
func parsePathUint(v string) (uint, bool) {
u64, err := strconv.ParseUint(v, 10, 32)
if err != nil {
return 0, false
}
return uint(u64), true
}

View File

@@ -0,0 +1,21 @@
package api
import (
"embed"
"io/fs"
"net/http"
"github.com/gin-gonic/gin"
)
//go:embed static/admin/*
var adminFS embed.FS
func AdminPage(c *gin.Context) {
content, err := fs.ReadFile(adminFS, "static/admin/index.html")
if err != nil {
c.String(http.StatusInternalServerError, "failed to load admin page")
return
}
c.Data(http.StatusOK, "text/html; charset=utf-8", content)
}

View File

@@ -50,6 +50,8 @@ func Register(c *gin.Context) {
user := model.User{
Username: req.Username,
Password: hashedPassword,
Role: "user",
Enabled: true,
}
if err := db.DB.Create(&user).Error; err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to create user"})
@@ -89,7 +91,12 @@ func Login(c *gin.Context) {
return
}
token, err := utils.GenerateToken(user.ID)
if !user.Enabled {
c.JSON(http.StatusForbidden, gin.H{"error": "Account is disabled"})
return
}
token, err := utils.GenerateToken(user.ID, user.Username, user.Role)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to generate token"})
return
@@ -98,6 +105,8 @@ func Login(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{
"token": token,
"username": user.Username,
"role": user.Role,
"enabled": user.Enabled,
})
}

View File

@@ -8,6 +8,7 @@ import (
"github.com/gorilla/websocket"
"hightube/internal/chat"
"hightube/internal/monitor"
)
var upgrader = websocket.Upgrader{
@@ -23,7 +24,7 @@ func WSHandler(c *gin.Context) {
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
fmt.Printf("[WS ERROR] Failed to upgrade: %v\n", err)
monitor.Errorf("WebSocket upgrade failed: %v", err)
return
}
@@ -48,4 +49,6 @@ func WSHandler(c *gin.Context) {
Content: fmt.Sprintf("%s joined the room", username),
RoomID: roomID,
})
monitor.Infof("WebSocket client joined room_id=%s username=%s", roomID, username)
}

View File

@@ -7,6 +7,9 @@ import (
"github.com/gin-gonic/gin"
"hightube/internal/db"
"hightube/internal/model"
"hightube/internal/monitor"
"hightube/internal/utils"
)
@@ -28,19 +31,57 @@ func AuthMiddleware() gin.HandlerFunc {
}
tokenStr := parts[1]
userIDStr, err := utils.ParseToken(tokenStr)
claims, err := utils.ParseToken(tokenStr)
if err != nil {
c.JSON(http.StatusUnauthorized, gin.H{"error": "Invalid or expired token"})
c.Abort()
return
}
userID, _ := strconv.ParseUint(userIDStr, 10, 32)
userID, _ := strconv.ParseUint(claims.Subject, 10, 32)
var user model.User
if err := db.DB.First(&user, uint(userID)).Error; err != nil {
c.JSON(http.StatusUnauthorized, gin.H{"error": "User not found"})
c.Abort()
return
}
if !user.Enabled {
c.JSON(http.StatusForbidden, gin.H{"error": "Account is disabled"})
c.Abort()
return
}
c.Set("user_id", uint(userID))
c.Set("username", user.Username)
c.Set("role", user.Role)
c.Next()
}
}
func AdminMiddleware() gin.HandlerFunc {
return func(c *gin.Context) {
role, ok := c.Get("role")
if !ok || role != "admin" {
c.JSON(http.StatusForbidden, gin.H{"error": "admin access required"})
c.Abort()
return
}
c.Next()
}
}
func RequestMetricsMiddleware() gin.HandlerFunc {
return func(c *gin.Context) {
monitor.IncrementRequestCount()
c.Next()
if c.Writer.Status() >= http.StatusBadRequest {
monitor.IncrementErrorCount()
}
}
}
// CORSMiddleware handles cross-origin requests from web clients
func CORSMiddleware() gin.HandlerFunc {
return func(c *gin.Context) {

View File

@@ -12,9 +12,10 @@ func SetupRouter(streamServer *stream.RTMPServer) *gin.Engine {
gin.SetMode(gin.ReleaseMode)
r := gin.Default()
BindAdminDependencies(streamServer)
// Use CORS middleware to allow web access
r.Use(CORSMiddleware())
r.Use(CORSMiddleware(), RequestMetricsMiddleware())
// 清除代理信任警告 "[WARNING] You trusted all proxies"
r.SetTrustedProxies(nil)
@@ -24,9 +25,11 @@ func SetupRouter(streamServer *stream.RTMPServer) *gin.Engine {
r.POST("/api/login", Login)
r.GET("/api/rooms/active", GetActiveRooms)
r.GET("/live/:room_id", streamServer.HandleHTTPFLV)
// WebSocket endpoint for live chat
r.GET("/api/ws/room/:room_id", WSHandler)
r.GET("/admin", AdminPage)
r.GET("/api/admin/logs/stream", StreamAdminLogs)
// Protected routes (require JWT)
authGroup := r.Group("/api")
@@ -34,6 +37,19 @@ func SetupRouter(streamServer *stream.RTMPServer) *gin.Engine {
{
authGroup.GET("/room/my", GetMyRoom)
authGroup.POST("/user/change-password", ChangePassword)
adminGroup := authGroup.Group("/admin")
adminGroup.Use(AdminMiddleware())
{
adminGroup.GET("/overview", GetAdminOverview)
adminGroup.GET("/health", GetAdminHealth)
adminGroup.GET("/logs", ListAdminLogs)
adminGroup.GET("/users", ListUsers)
adminGroup.PATCH("/users/:id/role", UpdateUserRole)
adminGroup.PATCH("/users/:id/enabled", UpdateUserEnabled)
adminGroup.POST("/users/:id/reset-password", ResetUserPassword)
adminGroup.DELETE("/users/:id", DeleteUser)
}
}
return r

View File

@@ -0,0 +1,378 @@
<!doctype html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width,initial-scale=1" />
<title>Hightube Admin Console</title>
<link rel="preconnect" href="https://fonts.googleapis.com">
<link rel="preconnect" href="https://fonts.gstatic.com" crossorigin>
<link href="https://fonts.googleapis.com/css2?family=Space+Grotesk:wght@400;500;700&family=IBM+Plex+Mono:wght@400;600&display=swap" rel="stylesheet">
<style>
:root {
--bg: #0f1c2e;
--bg2: #102944;
--card: #f7f6f3;
--ink: #12263d;
--accent: #ff6b35;
--accent2: #0ea5a3;
--danger: #dc2626;
--ok: #15803d;
--line: #d9d5cc;
}
* { box-sizing: border-box; }
body {
margin: 0;
font-family: "Space Grotesk", "Segoe UI", sans-serif;
color: var(--ink);
background:
radial-gradient(1200px 600px at -10% -10%, #2a4b70 0%, transparent 65%),
radial-gradient(1000px 500px at 110% 0%, #1f4f6d 0%, transparent 65%),
linear-gradient(140deg, var(--bg) 0%, var(--bg2) 100%);
min-height: 100vh;
padding: 20px;
}
.layout {
max-width: 1280px;
margin: 0 auto;
display: grid;
gap: 16px;
grid-template-columns: repeat(12, 1fr);
animation: appear 500ms ease-out;
}
@keyframes appear {
from { opacity: 0; transform: translateY(8px); }
to { opacity: 1; transform: translateY(0); }
}
.card {
background: var(--card);
border: 1px solid var(--line);
border-radius: 16px;
box-shadow: 0 12px 30px rgba(0,0,0,0.22);
padding: 14px;
}
.full { grid-column: 1 / -1; }
.col4 { grid-column: span 4; }
.col6 { grid-column: span 6; }
.col8 { grid-column: span 8; }
h1 { margin: 0; color: #f8fafc; letter-spacing: .02em; }
h2 { margin: 0 0 10px; font-size: 18px; }
.topbar {
grid-column: 1 / -1;
display: flex;
gap: 10px;
align-items: center;
justify-content: space-between;
margin-bottom: 6px;
}
.tokenbox {
display: flex;
gap: 8px;
width: min(800px, 100%);
}
input, select, button {
border-radius: 10px;
border: 1px solid #c8c2b7;
padding: 8px 10px;
font-size: 14px;
font-family: inherit;
}
input { width: 100%; }
button {
cursor: pointer;
color: #fff;
background: linear-gradient(120deg, var(--accent), #f97316);
border: none;
font-weight: 700;
}
button.secondary {
background: #184f77;
}
button.danger {
background: var(--danger);
}
.stats {
display: grid;
grid-template-columns: repeat(4, 1fr);
gap: 10px;
}
.metric {
border: 1px dashed #c9c2b4;
padding: 8px;
border-radius: 12px;
}
.metric .k { font-size: 12px; opacity: .75; }
.metric .v { font-size: 22px; font-weight: 700; }
.mono { font-family: "IBM Plex Mono", monospace; font-size: 12px; }
#logs {
height: 280px;
overflow: auto;
background: #0e1624;
color: #d9f6ea;
border-radius: 10px;
padding: 10px;
white-space: pre-wrap;
font-family: "IBM Plex Mono", monospace;
border: 1px solid #1f3552;
}
table {
width: 100%;
border-collapse: collapse;
font-size: 14px;
}
th, td {
text-align: left;
border-bottom: 1px solid var(--line);
padding: 8px 6px;
}
.actions { display: flex; gap: 6px; flex-wrap: wrap; }
.pill {
display: inline-block;
padding: 2px 8px;
border-radius: 999px;
font-size: 12px;
font-weight: 700;
}
.pill.ok { background: #dcfce7; color: var(--ok); }
.pill.off { background: #fee2e2; color: var(--danger); }
.pill.admin { background: #dbeafe; color: #1d4ed8; }
.pill.user { background: #e2e8f0; color: #334155; }
@media (max-width: 960px) {
.col4, .col6, .col8 { grid-column: 1 / -1; }
.stats { grid-template-columns: repeat(2, 1fr); }
.topbar { flex-direction: column; align-items: stretch; }
}
</style>
</head>
<body>
<div class="layout">
<div class="topbar">
<h1>Hightube Admin Console</h1>
<div class="tokenbox">
<input id="token" placeholder="粘贴 admin JWT token来自 /api/login" />
<button onclick="connectAll()">连接</button>
</div>
</div>
<div class="card col8">
<h2>系统状态</h2>
<div class="stats" id="stats"></div>
<div class="mono" id="health"></div>
</div>
<div class="card col4">
<h2>在线状态</h2>
<div id="online"></div>
</div>
<div class="card col8">
<h2>实时日志</h2>
<div style="display:flex; gap:8px; margin-bottom:8px;">
<button class="secondary" onclick="loadHistory()">加载历史日志</button>
<select id="logLevel">
<option value="">全部级别</option>
<option value="info">info</option>
<option value="warn">warn</option>
<option value="error">error</option>
<option value="audit">audit</option>
</select>
<input id="logKeyword" placeholder="关键词过滤" />
</div>
<div id="logs"></div>
</div>
<div class="card col4">
<h2>操作说明</h2>
<ul>
<li>先在上方输入 admin token 并点击连接。</li>
<li>用户管理支持搜索、角色切换、启用禁用、重置密码、删除。</li>
<li>日志区会持续接收后端实时事件。</li>
</ul>
</div>
<div class="card full">
<h2>用户管理</h2>
<div style="display:flex; gap:8px; margin-bottom:8px;">
<input id="userKeyword" placeholder="按用户名搜索" />
<button class="secondary" onclick="loadUsers()">查询</button>
</div>
<table>
<thead>
<tr>
<th>ID</th>
<th>用户名</th>
<th>角色</th>
<th>状态</th>
<th>创建时间</th>
<th>操作</th>
</tr>
</thead>
<tbody id="usersBody"></tbody>
</table>
</div>
</div>
<script>
let evt = null;
function authHeaders() {
const token = document.getElementById('token').value.trim();
return {
'Authorization': 'Bearer ' + token,
'Content-Type': 'application/json'
};
}
function addLogLine(text) {
const box = document.getElementById('logs');
box.textContent += text + '\n';
box.scrollTop = box.scrollHeight;
}
async function connectAll() {
await Promise.all([loadOverview(), loadHealth(), loadUsers(), loadHistory()]);
connectLiveLogs();
setInterval(loadOverview, 1000);
setInterval(loadHealth, 1000);
}
async function loadOverview() {
const resp = await fetch('/api/admin/overview', { headers: authHeaders() });
if (!resp.ok) {
addLogLine('[error] 概览拉取失败,请检查 token 是否为 admin。');
return;
}
const data = await resp.json();
const sys = data.system || {};
const stream = data.stream || {};
const chat = data.chat || {};
document.getElementById('stats').innerHTML = `
<div class="metric"><div class="k">运行时长(秒)</div><div class="v">${sys.uptime_seconds ?? '-'}</div></div>
<div class="metric"><div class="k">请求总量</div><div class="v">${sys.requests_total ?? '-'}</div></div>
<div class="metric"><div class="k">错误总量</div><div class="v">${sys.errors_total ?? '-'}</div></div>
<div class="metric"><div class="k">Goroutines</div><div class="v">${sys.goroutines ?? '-'}</div></div>
<div class="metric"><div class="k">内存Alloc(MB)</div><div class="v">${(sys.memory_alloc_mb || 0).toFixed(1)}</div></div>
<div class="metric"><div class="k">内存Sys(MB)</div><div class="v">${(sys.memory_sys_mb || 0).toFixed(1)}</div></div>
<div class="metric"><div class="k">CPU核心数</div><div class="v">${sys.cpu_cores ?? '-'}</div></div>
<div class="metric"><div class="k">磁盘剩余/总量(GB)</div><div class="v">${(sys.disk_free_gb || 0).toFixed(1)} / ${(sys.disk_total_gb || 0).toFixed(1)}</div></div>
`;
document.getElementById('online').innerHTML = `
<p>活跃流数量:<b>${stream.active_stream_count ?? 0}</b></p>
<p>活跃聊天室:<b>${chat.room_count ?? 0}</b></p>
<p>在线聊天连接:<b>${chat.total_connected_client ?? 0}</b></p>
<div class="mono">流路径: ${(stream.active_stream_paths || []).join(', ') || '无'}</div>
`;
}
async function loadHealth() {
const resp = await fetch('/api/admin/health', { headers: authHeaders() });
if (!resp.ok) return;
const h = await resp.json();
const dbOk = h.db && h.db.ok;
document.getElementById('health').innerHTML =
`API: <span class="pill ${h.api ? 'ok' : 'off'}">${h.api ? 'UP' : 'DOWN'}</span> ` +
`RTMP: <span class="pill ${h.rtmp ? 'ok' : 'off'}">${h.rtmp ? 'UP' : 'DOWN'}</span> ` +
`DB: <span class="pill ${dbOk ? 'ok' : 'off'}">${dbOk ? 'UP' : 'DOWN'}</span>`;
}
async function loadHistory() {
const level = encodeURIComponent(document.getElementById('logLevel').value || '');
const keyword = encodeURIComponent(document.getElementById('logKeyword').value || '');
const resp = await fetch(`/api/admin/logs?page=1&page_size=100&level=${level}&keyword=${keyword}`, {
headers: authHeaders()
});
if (!resp.ok) return;
const data = await resp.json();
const items = data.items || [];
const box = document.getElementById('logs');
box.textContent = '';
items.forEach(it => addLogLine(`[${it.time}] [${it.level}] ${it.message}`));
}
function connectLiveLogs() {
if (evt) evt.close();
const token = document.getElementById('token').value.trim();
evt = new EventSource('/api/admin/logs/stream?token=' + encodeURIComponent(token));
evt.onmessage = () => {};
evt.addEventListener('log', (e) => {
const item = JSON.parse(e.data);
addLogLine(`[${item.time}] [${item.level}] ${item.message}`);
});
evt.onerror = () => {
addLogLine('[warn] 实时日志连接断开,稍后可重连。');
};
}
async function loadUsers() {
const keyword = encodeURIComponent(document.getElementById('userKeyword').value || '');
const resp = await fetch(`/api/admin/users?page=1&page_size=50&keyword=${keyword}`, {
headers: authHeaders()
});
if (!resp.ok) {
addLogLine('[error] 用户列表拉取失败。');
return;
}
const data = await resp.json();
const body = document.getElementById('usersBody');
body.innerHTML = '';
(data.items || []).forEach(u => {
const tr = document.createElement('tr');
tr.innerHTML = `
<td>${u.id}</td>
<td>${u.username}</td>
<td><span class="pill ${u.role === 'admin' ? 'admin' : 'user'}">${u.role}</span></td>
<td><span class="pill ${u.enabled ? 'ok' : 'off'}">${u.enabled ? 'enabled' : 'disabled'}</span></td>
<td>${new Date(u.created_at).toLocaleString()}</td>
<td class="actions">
<button class="secondary" onclick="toggleRole(${u.id}, '${u.role}')">切换角色</button>
<button class="secondary" onclick="toggleEnabled(${u.id}, ${u.enabled})">启用/禁用</button>
<button class="secondary" onclick="resetPwd(${u.id})">重置密码</button>
<button class="danger" onclick="deleteUser(${u.id})">删除</button>
</td>
`;
body.appendChild(tr);
});
}
async function toggleRole(id, role) {
const next = role === 'admin' ? 'user' : 'admin';
await fetch(`/api/admin/users/${id}/role`, {
method: 'PATCH',
headers: authHeaders(),
body: JSON.stringify({ role: next })
});
loadUsers();
}
async function toggleEnabled(id, enabled) {
await fetch(`/api/admin/users/${id}/enabled`, {
method: 'PATCH',
headers: authHeaders(),
body: JSON.stringify({ enabled: !enabled })
});
loadUsers();
}
async function resetPwd(id) {
const newPwd = prompt('输入新密码(至少 6 位)');
if (!newPwd) return;
await fetch(`/api/admin/users/${id}/reset-password`, {
method: 'POST',
headers: authHeaders(),
body: JSON.stringify({ new_password: newPwd })
});
addLogLine(`[audit] user ${id} password reset requested`);
}
async function deleteUser(id) {
if (!confirm('确认删除该用户?')) return;
await fetch(`/api/admin/users/${id}`, {
method: 'DELETE',
headers: authHeaders()
});
loadUsers();
}
</script>
</body>
</html>

View File

@@ -40,6 +40,12 @@ type Hub struct {
mutex sync.RWMutex
}
type StatsSnapshot struct {
RoomCount int `json:"room_count"`
TotalConnectedClient int `json:"total_connected_client"`
RoomClients map[string]int `json:"room_clients"`
}
func NewHub() *Hub {
return &Hub{
broadcast: make(chan Message),
@@ -195,3 +201,22 @@ func InitChat() {
MainHub = NewHub()
go MainHub.Run()
}
func (h *Hub) GetStatsSnapshot() StatsSnapshot {
h.mutex.RLock()
defer h.mutex.RUnlock()
roomClients := make(map[string]int, len(h.rooms))
totalClients := 0
for roomID, clients := range h.rooms {
count := len(clients)
roomClients[roomID] = count
totalClients += count
}
return StatsSnapshot{
RoomCount: len(h.rooms),
TotalConnectedClient: totalClients,
RoomClients: roomClients,
}
}

View File

@@ -10,6 +10,8 @@ import (
"gorm.io/gorm/logger"
"hightube/internal/model"
"hightube/internal/monitor"
"hightube/internal/utils"
)
var DB *gorm.DB
@@ -19,10 +21,10 @@ func InitDB() {
newLogger := logger.New(
log.New(os.Stdout, "\r\n", log.LstdFlags), // io writer
logger.Config{
SlowThreshold: time.Second, // Slow SQL threshold
LogLevel: logger.Warn, // Log level
IgnoreRecordNotFoundError: true, // Ignore ErrRecordNotFound error for logger
Colorful: true, // Disable color
SlowThreshold: time.Second, // Slow SQL threshold
LogLevel: logger.Warn, // Log level
IgnoreRecordNotFoundError: true, // Ignore ErrRecordNotFound error for logger
Colorful: true, // Disable color
},
)
@@ -44,5 +46,65 @@ func InitDB() {
// Phase 3.5 Fix: Reset all rooms to inactive on startup using explicit map to ensure false is updated
DB.Model(&model.Room{}).Where("1 = 1").Updates(map[string]interface{}{"is_active": false})
log.Println("Database initialized successfully.")
ensureAdminUser()
monitor.Infof("Database initialized successfully")
}
func ensureAdminUser() {
adminUsername := os.Getenv("HIGHTUBE_ADMIN_USER")
if adminUsername == "" {
adminUsername = "admin"
}
adminPassword := os.Getenv("HIGHTUBE_ADMIN_PASS")
if adminPassword == "" {
adminPassword = "admin123456"
}
var user model.User
err := DB.Where("username = ?", adminUsername).First(&user).Error
if err == nil {
updates := map[string]interface{}{}
if user.Role != "admin" {
updates["role"] = "admin"
}
if !user.Enabled {
updates["enabled"] = true
}
if len(updates) > 0 {
DB.Model(&user).Updates(updates)
monitor.Warnf("Admin account normalized for username=%s", adminUsername)
}
return
}
hash, hashErr := utils.HashPassword(adminPassword)
if hashErr != nil {
monitor.Errorf("Failed to hash default admin password: %v", hashErr)
return
}
newAdmin := model.User{
Username: adminUsername,
Password: hash,
Role: "admin",
Enabled: true,
}
if createErr := DB.Create(&newAdmin).Error; createErr != nil {
monitor.Errorf("Failed to create admin account: %v", createErr)
return
}
room := model.Room{
UserID: newAdmin.ID,
Title: newAdmin.Username + "'s Live Room",
StreamKey: utils.GenerateStreamKey(),
IsActive: false,
}
if roomErr := DB.Create(&room).Error; roomErr != nil {
monitor.Warnf("Failed to create default admin room: %v", roomErr)
}
monitor.Warnf("Default admin created: username=%s password=%s", adminUsername, adminPassword)
}

View File

@@ -9,4 +9,6 @@ type User struct {
gorm.Model
Username string `gorm:"uniqueIndex;not null"`
Password string `gorm:"not null"` // Hashed password
Role string `gorm:"type:varchar(20);not null;default:user"`
Enabled bool `gorm:"not null;default:true"`
}

View File

@@ -0,0 +1,7 @@
//go:build !windows
package monitor
func getDiskSpaceGB() (float64, float64) {
return 0, 0
}

View File

@@ -0,0 +1,38 @@
//go:build windows
package monitor
import (
"os"
"path/filepath"
"golang.org/x/sys/windows"
)
func getDiskSpaceGB() (float64, float64) {
wd, err := os.Getwd()
if err != nil {
return 0, 0
}
vol := filepath.VolumeName(wd)
if vol == "" {
return 0, 0
}
root := vol + `\\`
pathPtr, err := windows.UTF16PtrFromString(root)
if err != nil {
return 0, 0
}
var freeBytesAvailable uint64
var totalBytes uint64
var totalFreeBytes uint64
if err := windows.GetDiskFreeSpaceEx(pathPtr, &freeBytesAvailable, &totalBytes, &totalFreeBytes); err != nil {
return 0, 0
}
const gb = 1024.0 * 1024.0 * 1024.0
return float64(totalBytes) / gb, float64(totalFreeBytes) / gb
}

View File

@@ -0,0 +1,130 @@
package monitor
import (
"fmt"
"log"
"strings"
"sync"
"time"
)
type LogEntry struct {
Time string `json:"time"`
Level string `json:"level"`
Message string `json:"message"`
}
type logHub struct {
mutex sync.RWMutex
entries []LogEntry
maxEntries int
subscribers map[chan LogEntry]struct{}
}
var hub = &logHub{
maxEntries: 1000,
subscribers: make(map[chan LogEntry]struct{}),
}
func Init(maxEntries int) {
if maxEntries > 0 {
hub.maxEntries = maxEntries
}
}
func Infof(format string, args ...interface{}) {
appendEntry("info", fmt.Sprintf(format, args...))
}
func Warnf(format string, args ...interface{}) {
appendEntry("warn", fmt.Sprintf(format, args...))
}
func Errorf(format string, args ...interface{}) {
appendEntry("error", fmt.Sprintf(format, args...))
}
func Auditf(format string, args ...interface{}) {
appendEntry("audit", fmt.Sprintf(format, args...))
}
func appendEntry(level, message string) {
entry := LogEntry{
Time: time.Now().Format(time.RFC3339),
Level: strings.ToLower(level),
Message: message,
}
log.Printf("[%s] %s", strings.ToUpper(entry.Level), entry.Message)
hub.mutex.Lock()
hub.entries = append(hub.entries, entry)
if len(hub.entries) > hub.maxEntries {
hub.entries = hub.entries[len(hub.entries)-hub.maxEntries:]
}
for ch := range hub.subscribers {
select {
case ch <- entry:
default:
}
}
hub.mutex.Unlock()
}
func Subscribe() chan LogEntry {
ch := make(chan LogEntry, 100)
hub.mutex.Lock()
hub.subscribers[ch] = struct{}{}
hub.mutex.Unlock()
return ch
}
func Unsubscribe(ch chan LogEntry) {
hub.mutex.Lock()
if _, ok := hub.subscribers[ch]; ok {
delete(hub.subscribers, ch)
close(ch)
}
hub.mutex.Unlock()
}
func Query(level, keyword string, page, pageSize int) ([]LogEntry, int) {
if page < 1 {
page = 1
}
if pageSize < 1 {
pageSize = 20
}
if pageSize > 200 {
pageSize = 200
}
level = strings.TrimSpace(strings.ToLower(level))
keyword = strings.TrimSpace(strings.ToLower(keyword))
hub.mutex.RLock()
defer hub.mutex.RUnlock()
filtered := make([]LogEntry, 0, len(hub.entries))
for _, e := range hub.entries {
if level != "" && e.Level != level {
continue
}
if keyword != "" && !strings.Contains(strings.ToLower(e.Message), keyword) {
continue
}
filtered = append(filtered, e)
}
total := len(filtered)
start := (page - 1) * pageSize
if start >= total {
return []LogEntry{}, total
}
end := start + pageSize
if end > total {
end = total
}
return filtered[start:end], total
}

View File

@@ -0,0 +1,55 @@
package monitor
import (
"runtime"
"sync/atomic"
"time"
)
var startedAt = time.Now()
var totalRequests uint64
var totalErrors uint64
type Snapshot struct {
UptimeSeconds int64 `json:"uptime_seconds"`
Goroutines int `json:"goroutines"`
MemoryAllocMB float64 `json:"memory_alloc_mb"`
MemorySysMB float64 `json:"memory_sys_mb"`
CPUCores int `json:"cpu_cores"`
DiskTotalGB float64 `json:"disk_total_gb"`
DiskFreeGB float64 `json:"disk_free_gb"`
RequestsTotal uint64 `json:"requests_total"`
ErrorsTotal uint64 `json:"errors_total"`
}
func IncrementRequestCount() {
atomic.AddUint64(&totalRequests, 1)
}
func IncrementErrorCount() {
atomic.AddUint64(&totalErrors, 1)
}
func GetSnapshot() Snapshot {
var mem runtime.MemStats
runtime.ReadMemStats(&mem)
diskTotal, diskFree := getDiskSpaceGB()
return Snapshot{
UptimeSeconds: int64(time.Since(startedAt).Seconds()),
Goroutines: runtime.NumGoroutine(),
MemoryAllocMB: bytesToMB(mem.Alloc),
MemorySysMB: bytesToMB(mem.Sys),
CPUCores: runtime.NumCPU(),
DiskTotalGB: diskTotal,
DiskFreeGB: diskFree,
RequestsTotal: atomic.LoadUint64(&totalRequests),
ErrorsTotal: atomic.LoadUint64(&totalErrors),
}
}
func bytesToMB(v uint64) float64 {
return float64(v) / 1024.0 / 1024.0
}

View File

@@ -17,6 +17,7 @@ import (
"hightube/internal/chat"
"hightube/internal/db"
"hightube/internal/model"
"hightube/internal/monitor"
)
func init() {
@@ -51,12 +52,12 @@ func NewRTMPServer() *RTMPServer {
// Triggered when a broadcaster (e.g., OBS) starts publishing
s.server.HandlePublish = func(conn *rtmp.Conn) {
streamPath := conn.URL.Path // Expected format: /live/{stream_key}
fmt.Printf("[INFO] OBS is attempting to publish to: %s\n", streamPath)
monitor.Infof("OBS publish attempt: %s", streamPath)
// Extract stream key from path
parts := strings.Split(streamPath, "/")
if len(parts) < 3 || parts[1] != "live" {
fmt.Printf("[WARN] Invalid publish path format: %s\n", streamPath)
monitor.Warnf("Invalid publish path format: %s", streamPath)
return
}
streamKey := parts[2]
@@ -64,16 +65,16 @@ func NewRTMPServer() *RTMPServer {
// Authenticate stream key
var room model.Room
if err := db.DB.Where("stream_key = ?", streamKey).First(&room).Error; err != nil {
fmt.Printf("[WARN] Authentication failed, invalid stream key: %s\n", streamKey)
monitor.Warnf("Invalid stream key: %s", streamKey)
return // Reject connection
}
fmt.Printf("[INFO] Stream authenticated for Room ID: %d\n", room.ID)
monitor.Infof("Stream authenticated for room_id=%d", room.ID)
// 1. Get audio/video stream metadata
streams, err := conn.Streams()
if err != nil {
fmt.Printf("[ERROR] Failed to parse stream headers: %v\n", err)
monitor.Errorf("Failed to parse stream headers: %v", err)
return
}
@@ -101,7 +102,7 @@ func NewRTMPServer() *RTMPServer {
// Clear chat history for this room
chat.MainHub.ClearRoomHistory(fmt.Sprintf("%d", room.ID))
fmt.Printf("[INFO] Publishing ended for Room ID: %d\n", room.ID)
monitor.Infof("Publishing ended for room_id=%d", room.ID)
}()
// 4. Continuously copy data packets to our broadcast queue
@@ -111,7 +112,7 @@ func NewRTMPServer() *RTMPServer {
// Triggered when a viewer (e.g., VLC) requests playback
s.server.HandlePlay = func(conn *rtmp.Conn) {
streamPath := conn.URL.Path // Expected format: /live/{room_id}
fmt.Printf("[INFO] VLC is pulling stream from: %s\n", streamPath)
monitor.Infof("RTMP play requested: %s", streamPath)
// 1. Look for the requested room's data queue
s.mutex.RLock()
@@ -119,7 +120,7 @@ func NewRTMPServer() *RTMPServer {
s.mutex.RUnlock()
if !ok {
fmt.Printf("[WARN] Stream not found or inactive: %s\n", streamPath)
monitor.Warnf("Stream not found or inactive: %s", streamPath)
return
}
@@ -129,7 +130,7 @@ func NewRTMPServer() *RTMPServer {
conn.WriteHeader(streams)
// 3. Cleanup on end
defer fmt.Printf("[INFO] Playback ended: %s\n", streamPath)
defer monitor.Infof("Playback ended: %s", streamPath)
// 4. Continuously copy data packets to the viewer
err := avutil.CopyPackets(conn, cursor)
@@ -137,9 +138,9 @@ func NewRTMPServer() *RTMPServer {
// 如果是客户端主动断开连接引起的错误,不将其作为严重错误打印
errStr := err.Error()
if strings.Contains(errStr, "broken pipe") || strings.Contains(errStr, "connection reset by peer") {
fmt.Printf("[INFO] Viewer disconnected normally: %s\n", streamPath)
monitor.Infof("Viewer disconnected: %s", streamPath)
} else {
fmt.Printf("[ERROR] Error occurred during playback: %v\n", err)
monitor.Errorf("Playback error on %s: %v", streamPath, err)
}
}
}
@@ -150,7 +151,7 @@ func NewRTMPServer() *RTMPServer {
// Start launches the RTMP server
func (s *RTMPServer) Start(addr string) error {
s.server.Addr = addr
fmt.Printf("[INFO] RTMP Server is listening on %s...\n", addr)
monitor.Infof("RTMP server listening on %s", addr)
return s.server.ListenAndServe()
}
@@ -190,9 +191,26 @@ func (s *RTMPServer) HandleHTTPFLV(c *gin.Context) {
if err := avutil.CopyFile(muxer, cursor); err != nil && err != io.EOF {
errStr := err.Error()
if strings.Contains(errStr, "broken pipe") || strings.Contains(errStr, "connection reset by peer") {
fmt.Printf("[INFO] HTTP-FLV viewer disconnected normally: %s\n", streamPath)
monitor.Infof("HTTP-FLV viewer disconnected: %s", streamPath)
return
}
fmt.Printf("[ERROR] HTTP-FLV playback error on %s: %v\n", streamPath, err)
monitor.Errorf("HTTP-FLV playback error on %s: %v", streamPath, err)
}
}
func (s *RTMPServer) ActiveStreamCount() int {
s.mutex.RLock()
defer s.mutex.RUnlock()
return len(s.channels)
}
func (s *RTMPServer) ActiveStreamPaths() []string {
s.mutex.RLock()
defer s.mutex.RUnlock()
paths := make([]string, 0, len(s.channels))
for path := range s.channels {
paths = append(paths, path)
}
return paths
}

View File

@@ -13,26 +13,36 @@ import (
// In production, load this from environment variables
var jwtKey = []byte("hightube_super_secret_key_MVP_only")
type TokenClaims struct {
Username string `json:"username"`
Role string `json:"role"`
jwt.RegisteredClaims
}
// GenerateToken generates a JWT token for a given user ID
func GenerateToken(userID uint) (string, error) {
claims := &jwt.RegisteredClaims{
func GenerateToken(userID uint, username, role string) (string, error) {
claims := &TokenClaims{
Username: username,
Role: role,
RegisteredClaims: jwt.RegisteredClaims{
Subject: fmt.Sprintf("%d", userID),
ExpiresAt: jwt.NewNumericDate(time.Now().Add(24 * time.Hour)),
},
}
token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims)
return token.SignedString(jwtKey)
}
// ParseToken parses the JWT string and returns the user ID (Subject)
func ParseToken(tokenStr string) (string, error) {
claims := &jwt.RegisteredClaims{}
func ParseToken(tokenStr string) (*TokenClaims, error) {
claims := &TokenClaims{}
token, err := jwt.ParseWithClaims(tokenStr, claims, func(t *jwt.Token) (interface{}, error) {
return jwtKey, nil
})
if err != nil || !token.Valid {
return "", err
return nil, err
}
return claims.Subject, nil
return claims, nil
}
// HashPassword creates a bcrypt hash of the password