package stream import ( "context" "crypto/rand" "encoding/hex" "fmt" "io" "net/http" "os/exec" "strings" "sync" "time" "github.com/gin-gonic/gin" "github.com/nareix/joy4/av/avutil" "github.com/nareix/joy4/av/pubsub" "github.com/nareix/joy4/format" "github.com/nareix/joy4/format/flv" "github.com/nareix/joy4/format/rtmp" "hightube/internal/chat" "hightube/internal/db" "hightube/internal/model" "hightube/internal/monitor" ) func init() { // Register all supported audio/video formats format.RegisterAll() } // RTMPServer manages all active live streams type RTMPServer struct { server *rtmp.Server channels map[string]*pubsub.Queue transcoders map[string][]*variantTranscoder internalPublishKey string mutex sync.RWMutex } type variantTranscoder struct { quality string cancel context.CancelFunc cmd *exec.Cmd } type qualityProfile struct { scale string videoBitrate string audioBitrate string } var qualityOrder = []string{"source", "720p", "480p"} var supportedQualities = map[string]qualityProfile{ "720p": { scale: "1280:-2", videoBitrate: "2500k", audioBitrate: "128k", }, "480p": { scale: "854:-2", videoBitrate: "1200k", audioBitrate: "96k", }, } type writeFlusher struct { httpFlusher http.Flusher io.Writer } func (w writeFlusher) Flush() error { w.httpFlusher.Flush() return nil } // NewRTMPServer creates and initializes a new media server func NewRTMPServer() *RTMPServer { s := &RTMPServer{ channels: make(map[string]*pubsub.Queue), transcoders: make(map[string][]*variantTranscoder), internalPublishKey: generateInternalPublishKey(), server: &rtmp.Server{}, } // Triggered when a broadcaster (e.g., OBS) starts publishing s.server.HandlePublish = func(conn *rtmp.Conn) { streamPath := conn.URL.Path // Expected format: /live/{stream_key} or /variant/{room_id}/{quality}/{token} monitor.Infof("OBS publish attempt: %s", streamPath) parts := strings.Split(streamPath, "/") if len(parts) < 3 { monitor.Warnf("Invalid publish path format: %s", streamPath) return } roomID, channelPath, isSource, ok := s.resolvePublishPath(parts) if !ok { monitor.Warnf("Invalid publish key/path: %s", streamPath) return } // 1. Get audio/video stream metadata streams, err := conn.Streams() if err != nil { monitor.Errorf("Failed to parse stream headers: %v", err) return } monitor.Infof("Stream authenticated for room_id=%s path=%s", roomID, channelPath) s.mutex.Lock() q := pubsub.NewQueue() q.WriteHeader(streams) s.channels[channelPath] = q s.mutex.Unlock() if isSource { roomIDUint := parseRoomID(roomID) if roomIDUint != 0 { db.DB.Model(&model.Room{}).Where("id = ?", roomIDUint).Updates(map[string]interface{}{"is_active": true}) } s.startVariantTranscoders(roomID) } // 3. Cleanup on end defer func() { s.mutex.Lock() delete(s.channels, channelPath) s.mutex.Unlock() q.Close() if isSource { s.stopVariantTranscoders(roomID) roomIDUint := parseRoomID(roomID) if roomIDUint != 0 { db.DB.Model(&model.Room{}).Where("id = ?", roomIDUint).Updates(map[string]interface{}{"is_active": false}) } chat.MainHub.ClearRoomHistory(roomID) monitor.Infof("Publishing ended for room_id=%s", roomID) } else { monitor.Infof("Variant publishing ended for room_id=%s path=%s", roomID, channelPath) } }() // 4. Continuously copy data packets to our broadcast queue avutil.CopyPackets(q, conn) } // Triggered when a viewer (e.g., VLC) requests playback s.server.HandlePlay = func(conn *rtmp.Conn) { streamPath := conn.URL.Path // Expected format: /live/{room_id} monitor.Infof("RTMP play requested: %s", streamPath) // 1. Look for the requested room's data queue s.mutex.RLock() q, ok := s.channels[streamPath] s.mutex.RUnlock() if !ok { monitor.Warnf("Stream not found or inactive: %s", streamPath) return } // 2. Get the cursor from the latest position and notify client of stream format cursor := q.Latest() streams, _ := cursor.Streams() conn.WriteHeader(streams) // 3. Cleanup on end defer monitor.Infof("Playback ended: %s", streamPath) // 4. Continuously copy data packets to the viewer err := avutil.CopyPackets(conn, cursor) if err != nil && err != io.EOF { // 如果是客户端主动断开连接引起的错误,不将其作为严重错误打印 errStr := err.Error() if strings.Contains(errStr, "broken pipe") || strings.Contains(errStr, "connection reset by peer") { monitor.Infof("Viewer disconnected: %s", streamPath) } else { monitor.Errorf("Playback error on %s: %v", streamPath, err) } } } return s } // Start launches the RTMP server func (s *RTMPServer) Start(addr string) error { s.server.Addr = addr monitor.Infof("RTMP server listening on %s", addr) return s.server.ListenAndServe() } // HandleHTTPFLV serves browser-compatible HTTP-FLV playback for web clients. func (s *RTMPServer) HandleHTTPFLV(c *gin.Context) { streamPath := fmt.Sprintf("/live/%s", c.Param("room_id")) if quality := normalizeQuality(c.Query("quality")); quality != "" { streamPath = fmt.Sprintf("%s/%s", streamPath, quality) } s.mutex.RLock() q, ok := s.channels[streamPath] s.mutex.RUnlock() if !ok { c.JSON(http.StatusNotFound, gin.H{"error": "Stream not found or inactive"}) return } flusher, ok := c.Writer.(http.Flusher) if !ok { c.JSON(http.StatusInternalServerError, gin.H{"error": "Streaming is not supported by the current server"}) return } c.Header("Content-Type", "video/x-flv") c.Header("Transfer-Encoding", "chunked") c.Header("Cache-Control", "no-cache") c.Header("Connection", "keep-alive") c.Header("Access-Control-Allow-Origin", "*") c.Status(http.StatusOK) flusher.Flush() muxer := flv.NewMuxerWriteFlusher(writeFlusher{ httpFlusher: flusher, Writer: c.Writer, }) cursor := q.Latest() if err := avutil.CopyFile(muxer, cursor); err != nil && err != io.EOF { errStr := err.Error() if strings.Contains(errStr, "broken pipe") || strings.Contains(errStr, "connection reset by peer") { monitor.Infof("HTTP-FLV viewer disconnected: %s", streamPath) return } monitor.Errorf("HTTP-FLV playback error on %s: %v", streamPath, err) } } func (s *RTMPServer) resolvePublishPath(parts []string) (roomID string, channelPath string, isSource bool, ok bool) { if parts[1] == "live" && len(parts) == 3 { var room model.Room if err := db.DB.Where("stream_key = ?", parts[2]).First(&room).Error; err != nil { return "", "", false, false } roomID = fmt.Sprintf("%d", room.ID) channelPath = fmt.Sprintf("/live/%s", roomID) return roomID, channelPath, true, true } if parts[1] == "variant" && len(parts) == 5 { roomID = parts[2] quality := normalizeQuality(parts[3]) token := parts[4] if quality == "" || token != s.internalPublishKey { return "", "", false, false } return roomID, fmt.Sprintf("/live/%s/%s", roomID, quality), false, true } return "", "", false, false } func (s *RTMPServer) startVariantTranscoders(roomID string) { s.stopVariantTranscoders(roomID) launch := make([]*variantTranscoder, 0, len(supportedQualities)) for quality, profile := range supportedQualities { ctx, cancel := context.WithCancel(context.Background()) inputURL := fmt.Sprintf("rtmp://127.0.0.1:1935/live/%s", roomID) outputURL := fmt.Sprintf("rtmp://127.0.0.1:1935/variant/%s/%s/%s", roomID, quality, s.internalPublishKey) cmd := exec.CommandContext( ctx, "ffmpeg", "-nostdin", "-loglevel", "error", "-i", inputURL, "-vf", "scale="+profile.scale+":force_original_aspect_ratio=decrease", "-c:v", "libx264", "-preset", "veryfast", "-tune", "zerolatency", "-g", "48", "-keyint_min", "48", "-sc_threshold", "0", "-b:v", profile.videoBitrate, "-maxrate", profile.videoBitrate, "-bufsize", profile.videoBitrate, "-c:a", "aac", "-b:a", profile.audioBitrate, "-ar", "44100", "-ac", "2", "-f", "flv", outputURL, ) transcoder := &variantTranscoder{ quality: quality, cancel: cancel, cmd: cmd, } launch = append(launch, transcoder) go func(roomID string, tr *variantTranscoder) { time.Sleep(2 * time.Second) monitor.Infof("Starting transcoder room_id=%s quality=%s", roomID, tr.quality) if err := tr.cmd.Start(); err != nil { monitor.Errorf("Failed to start transcoder room_id=%s quality=%s: %v", roomID, tr.quality, err) return } if err := tr.cmd.Wait(); err != nil && ctx.Err() == nil { monitor.Warnf("Transcoder exited room_id=%s quality=%s: %v", roomID, tr.quality, err) } }(roomID, transcoder) } s.mutex.Lock() s.transcoders[roomID] = launch s.mutex.Unlock() } func (s *RTMPServer) stopVariantTranscoders(roomID string) { s.mutex.Lock() transcoders := s.transcoders[roomID] delete(s.transcoders, roomID) s.mutex.Unlock() for _, transcoder := range transcoders { transcoder.cancel() } } func normalizeQuality(value string) string { value = strings.ToLower(strings.TrimSpace(value)) if _, ok := supportedQualities[value]; ok { return value } return "" } func parseRoomID(value string) uint { var roomID uint _, _ = fmt.Sscanf(value, "%d", &roomID) return roomID } func generateInternalPublishKey() string { buf := make([]byte, 16) if _, err := rand.Read(buf); err != nil { return "internal_publish_fallback_key" } return hex.EncodeToString(buf) } func (s *RTMPServer) ActiveStreamCount() int { s.mutex.RLock() defer s.mutex.RUnlock() count := 0 for path := range s.channels { if strings.Count(path, "/") == 2 { count++ } } return count } func (s *RTMPServer) AvailablePlaybackQualities(roomID string) []string { s.mutex.RLock() defer s.mutex.RUnlock() basePath := fmt.Sprintf("/live/%s", roomID) available := make([]string, 0, len(qualityOrder)) for _, quality := range qualityOrder { streamPath := basePath if quality != "source" { streamPath = fmt.Sprintf("%s/%s", basePath, quality) } if _, ok := s.channels[streamPath]; ok { available = append(available, quality) } } return available } func (s *RTMPServer) ActiveStreamPaths() []string { s.mutex.RLock() defer s.mutex.RUnlock() paths := make([]string, 0, len(s.channels)) for path := range s.channels { if strings.Count(path, "/") == 2 { paths = append(paths, path) } } return paths }