diff --git a/backend/internal/api/room.go b/backend/internal/api/room.go index 0a3ce92..ed3a5cd 100644 --- a/backend/internal/api/room.go +++ b/backend/internal/api/room.go @@ -48,3 +48,18 @@ func GetActiveRooms(c *gin.Context) { c.JSON(http.StatusOK, gin.H{"active_rooms": result}) } + +func GetRoomPlaybackOptions(c *gin.Context) { + roomID := c.Param("room_id") + qualities := []string{"source"} + if adminRTMP != nil { + if available := adminRTMP.AvailablePlaybackQualities(roomID); len(available) > 0 { + qualities = available + } + } + + c.JSON(http.StatusOK, gin.H{ + "room_id": roomID, + "qualities": qualities, + }) +} diff --git a/backend/internal/api/router.go b/backend/internal/api/router.go index 1a514a2..0e57d67 100644 --- a/backend/internal/api/router.go +++ b/backend/internal/api/router.go @@ -37,6 +37,7 @@ func SetupRouter(streamServer *stream.RTMPServer) *gin.Engine { authGroup.Use(AuthMiddleware()) { authGroup.GET("/room/my", GetMyRoom) + authGroup.GET("/rooms/:room_id/playback-options", GetRoomPlaybackOptions) authGroup.POST("/user/change-password", ChangePassword) adminGroup := authGroup.Group("/admin") diff --git a/backend/internal/stream/server.go b/backend/internal/stream/server.go index 4b397d5..e09988e 100644 --- a/backend/internal/stream/server.go +++ b/backend/internal/stream/server.go @@ -1,11 +1,16 @@ package stream import ( + "context" + "crypto/rand" + "encoding/hex" "fmt" "io" "net/http" + "os/exec" "strings" "sync" + "time" "github.com/gin-gonic/gin" "github.com/nareix/joy4/av/avutil" @@ -27,9 +32,38 @@ func init() { // RTMPServer manages all active live streams type RTMPServer struct { - server *rtmp.Server - channels map[string]*pubsub.Queue - mutex sync.RWMutex + server *rtmp.Server + channels map[string]*pubsub.Queue + transcoders map[string][]*variantTranscoder + internalPublishKey string + mutex sync.RWMutex +} + +type variantTranscoder struct { + quality string + cancel context.CancelFunc + cmd *exec.Cmd +} + +type qualityProfile struct { + scale string + videoBitrate string + audioBitrate string +} + +var qualityOrder = []string{"source", "720p", "480p"} + +var supportedQualities = map[string]qualityProfile{ + "720p": { + scale: "1280:-2", + videoBitrate: "2500k", + audioBitrate: "128k", + }, + "480p": { + scale: "854:-2", + videoBitrate: "1200k", + audioBitrate: "96k", + }, } type writeFlusher struct { @@ -45,32 +79,29 @@ func (w writeFlusher) Flush() error { // NewRTMPServer creates and initializes a new media server func NewRTMPServer() *RTMPServer { s := &RTMPServer{ - channels: make(map[string]*pubsub.Queue), - server: &rtmp.Server{}, + channels: make(map[string]*pubsub.Queue), + transcoders: make(map[string][]*variantTranscoder), + internalPublishKey: generateInternalPublishKey(), + server: &rtmp.Server{}, } // Triggered when a broadcaster (e.g., OBS) starts publishing s.server.HandlePublish = func(conn *rtmp.Conn) { - streamPath := conn.URL.Path // Expected format: /live/{stream_key} + streamPath := conn.URL.Path // Expected format: /live/{stream_key} or /variant/{room_id}/{quality}/{token} monitor.Infof("OBS publish attempt: %s", streamPath) - // Extract stream key from path parts := strings.Split(streamPath, "/") - if len(parts) < 3 || parts[1] != "live" { + if len(parts) < 3 { monitor.Warnf("Invalid publish path format: %s", streamPath) return } - streamKey := parts[2] - // Authenticate stream key - var room model.Room - if err := db.DB.Where("stream_key = ?", streamKey).First(&room).Error; err != nil { - monitor.Warnf("Invalid stream key: %s", streamKey) - return // Reject connection + roomID, channelPath, isSource, ok := s.resolvePublishPath(parts) + if !ok { + monitor.Warnf("Invalid publish key/path: %s", streamPath) + return } - monitor.Infof("Stream authenticated for room_id=%d", room.ID) - // 1. Get audio/video stream metadata streams, err := conn.Streams() if err != nil { @@ -78,31 +109,40 @@ func NewRTMPServer() *RTMPServer { return } - // 2. Map the active stream by Room ID so viewers can use /live/{room_id} - roomLivePath := fmt.Sprintf("/live/%d", room.ID) - + monitor.Infof("Stream authenticated for room_id=%s path=%s", roomID, channelPath) + s.mutex.Lock() q := pubsub.NewQueue() q.WriteHeader(streams) - s.channels[roomLivePath] = q + s.channels[channelPath] = q s.mutex.Unlock() - // Mark room as active in DB (using map to ensure true/false is correctly updated) - db.DB.Model(&room).Updates(map[string]interface{}{"is_active": true}) + if isSource { + roomIDUint := parseRoomID(roomID) + if roomIDUint != 0 { + db.DB.Model(&model.Room{}).Where("id = ?", roomIDUint).Updates(map[string]interface{}{"is_active": true}) + } + s.startVariantTranscoders(roomID) + } // 3. Cleanup on end defer func() { s.mutex.Lock() - delete(s.channels, roomLivePath) + delete(s.channels, channelPath) s.mutex.Unlock() q.Close() - // Explicitly set is_active to false using map - db.DB.Model(&room).Updates(map[string]interface{}{"is_active": false}) - - // Clear chat history for this room - chat.MainHub.ClearRoomHistory(fmt.Sprintf("%d", room.ID)) - - monitor.Infof("Publishing ended for room_id=%d", room.ID) + + if isSource { + s.stopVariantTranscoders(roomID) + roomIDUint := parseRoomID(roomID) + if roomIDUint != 0 { + db.DB.Model(&model.Room{}).Where("id = ?", roomIDUint).Updates(map[string]interface{}{"is_active": false}) + } + chat.MainHub.ClearRoomHistory(roomID) + monitor.Infof("Publishing ended for room_id=%s", roomID) + } else { + monitor.Infof("Variant publishing ended for room_id=%s path=%s", roomID, channelPath) + } }() // 4. Continuously copy data packets to our broadcast queue @@ -158,6 +198,9 @@ func (s *RTMPServer) Start(addr string) error { // HandleHTTPFLV serves browser-compatible HTTP-FLV playback for web clients. func (s *RTMPServer) HandleHTTPFLV(c *gin.Context) { streamPath := fmt.Sprintf("/live/%s", c.Param("room_id")) + if quality := normalizeQuality(c.Query("quality")); quality != "" { + streamPath = fmt.Sprintf("%s/%s", streamPath, quality) + } s.mutex.RLock() q, ok := s.channels[streamPath] @@ -198,10 +241,149 @@ func (s *RTMPServer) HandleHTTPFLV(c *gin.Context) { } } +func (s *RTMPServer) resolvePublishPath(parts []string) (roomID string, channelPath string, isSource bool, ok bool) { + if parts[1] == "live" && len(parts) == 3 { + var room model.Room + if err := db.DB.Where("stream_key = ?", parts[2]).First(&room).Error; err != nil { + return "", "", false, false + } + roomID = fmt.Sprintf("%d", room.ID) + channelPath = fmt.Sprintf("/live/%s", roomID) + return roomID, channelPath, true, true + } + + if parts[1] == "variant" && len(parts) == 5 { + roomID = parts[2] + quality := normalizeQuality(parts[3]) + token := parts[4] + if quality == "" || token != s.internalPublishKey { + return "", "", false, false + } + return roomID, fmt.Sprintf("/live/%s/%s", roomID, quality), false, true + } + + return "", "", false, false +} + +func (s *RTMPServer) startVariantTranscoders(roomID string) { + s.stopVariantTranscoders(roomID) + + launch := make([]*variantTranscoder, 0, len(supportedQualities)) + for quality, profile := range supportedQualities { + ctx, cancel := context.WithCancel(context.Background()) + inputURL := fmt.Sprintf("rtmp://127.0.0.1:1935/live/%s", roomID) + outputURL := fmt.Sprintf("rtmp://127.0.0.1:1935/variant/%s/%s/%s", roomID, quality, s.internalPublishKey) + cmd := exec.CommandContext( + ctx, + "ffmpeg", + "-nostdin", + "-loglevel", "error", + "-i", inputURL, + "-vf", "scale="+profile.scale+":force_original_aspect_ratio=decrease", + "-c:v", "libx264", + "-preset", "veryfast", + "-tune", "zerolatency", + "-g", "48", + "-keyint_min", "48", + "-sc_threshold", "0", + "-b:v", profile.videoBitrate, + "-maxrate", profile.videoBitrate, + "-bufsize", profile.videoBitrate, + "-c:a", "aac", + "-b:a", profile.audioBitrate, + "-ar", "44100", + "-ac", "2", + "-f", "flv", + outputURL, + ) + + transcoder := &variantTranscoder{ + quality: quality, + cancel: cancel, + cmd: cmd, + } + launch = append(launch, transcoder) + + go func(roomID string, tr *variantTranscoder) { + time.Sleep(2 * time.Second) + monitor.Infof("Starting transcoder room_id=%s quality=%s", roomID, tr.quality) + if err := tr.cmd.Start(); err != nil { + monitor.Errorf("Failed to start transcoder room_id=%s quality=%s: %v", roomID, tr.quality, err) + return + } + if err := tr.cmd.Wait(); err != nil && ctx.Err() == nil { + monitor.Warnf("Transcoder exited room_id=%s quality=%s: %v", roomID, tr.quality, err) + } + }(roomID, transcoder) + } + + s.mutex.Lock() + s.transcoders[roomID] = launch + s.mutex.Unlock() +} + +func (s *RTMPServer) stopVariantTranscoders(roomID string) { + s.mutex.Lock() + transcoders := s.transcoders[roomID] + delete(s.transcoders, roomID) + s.mutex.Unlock() + + for _, transcoder := range transcoders { + transcoder.cancel() + } +} + +func normalizeQuality(value string) string { + value = strings.ToLower(strings.TrimSpace(value)) + if _, ok := supportedQualities[value]; ok { + return value + } + return "" +} + +func parseRoomID(value string) uint { + var roomID uint + _, _ = fmt.Sscanf(value, "%d", &roomID) + return roomID +} + +func generateInternalPublishKey() string { + buf := make([]byte, 16) + if _, err := rand.Read(buf); err != nil { + return "internal_publish_fallback_key" + } + return hex.EncodeToString(buf) +} + func (s *RTMPServer) ActiveStreamCount() int { s.mutex.RLock() defer s.mutex.RUnlock() - return len(s.channels) + + count := 0 + for path := range s.channels { + if strings.Count(path, "/") == 2 { + count++ + } + } + return count +} + +func (s *RTMPServer) AvailablePlaybackQualities(roomID string) []string { + s.mutex.RLock() + defer s.mutex.RUnlock() + + basePath := fmt.Sprintf("/live/%s", roomID) + available := make([]string, 0, len(qualityOrder)) + for _, quality := range qualityOrder { + streamPath := basePath + if quality != "source" { + streamPath = fmt.Sprintf("%s/%s", basePath, quality) + } + if _, ok := s.channels[streamPath]; ok { + available = append(available, quality) + } + } + return available } func (s *RTMPServer) ActiveStreamPaths() []string { @@ -210,7 +392,9 @@ func (s *RTMPServer) ActiveStreamPaths() []string { paths := make([]string, 0, len(s.channels)) for path := range s.channels { - paths = append(paths, path) + if strings.Count(path, "/") == 2 { + paths = append(paths, path) + } } return paths } diff --git a/frontend/lib/pages/player_page.dart b/frontend/lib/pages/player_page.dart index 15dd36e..20d7b3a 100644 --- a/frontend/lib/pages/player_page.dart +++ b/frontend/lib/pages/player_page.dart @@ -1,4 +1,5 @@ import 'dart:async'; +import 'dart:convert'; import 'package:flutter/foundation.dart'; import 'package:flutter/material.dart'; @@ -8,6 +9,7 @@ import 'package:video_player/video_player.dart'; import '../providers/auth_provider.dart'; import '../providers/settings_provider.dart'; +import '../services/api_service.dart'; import '../services/chat_service.dart'; import '../widgets/web_stream_player.dart'; @@ -24,7 +26,7 @@ class PlayerPage extends StatefulWidget { }); @override - _PlayerPageState createState() => _PlayerPageState(); + State createState() => _PlayerPageState(); } class _PlayerPageState extends State { @@ -42,11 +44,13 @@ class _PlayerPageState extends State { bool _controlsVisible = true; int _playerVersion = 0; String _selectedResolution = 'Source'; + List _availableResolutions = const ['Source']; Timer? _controlsHideTimer; @override void initState() { super.initState(); + _loadPlaybackOptions(); if (!kIsWeb) { _initializePlayer(); } @@ -55,9 +59,8 @@ class _PlayerPageState extends State { } Future _initializePlayer() async { - _controller = VideoPlayerController.networkUrl( - Uri.parse(widget.playbackUrl), - ); + final playbackUrl = _currentPlaybackUrl(); + _controller = VideoPlayerController.networkUrl(Uri.parse(playbackUrl)); try { await _controller!.initialize(); _controller!.play(); @@ -81,6 +84,55 @@ class _PlayerPageState extends State { } } + String _currentPlaybackUrl() { + final settings = context.read(); + final quality = _selectedResolution == 'Source' + ? null + : _selectedResolution.toLowerCase(); + return settings.playbackUrl(widget.roomId, quality: quality); + } + + Future _loadPlaybackOptions() async { + final settings = context.read(); + final auth = context.read(); + final api = ApiService(settings, auth.token); + + try { + final response = await api.getPlaybackOptions(widget.roomId); + if (response.statusCode != 200) { + return; + } + + final data = jsonDecode(response.body) as Map; + final rawQualities = + (data['qualities'] as List? ?? const ['source']) + .map((item) => item.toString().trim().toLowerCase()) + .where((item) => item.isNotEmpty) + .toList(); + + final normalized = ['Source']; + for (final quality in rawQualities) { + if (quality == 'source') { + continue; + } + normalized.add(quality); + } + + if (!mounted) { + return; + } + + setState(() { + _availableResolutions = normalized.toSet().toList(); + if (!_availableResolutions.contains(_selectedResolution)) { + _selectedResolution = 'Source'; + } + }); + } catch (_) { + // Keep source-only playback when the capability probe fails. + } + } + void _initializeChat() { final settings = context.read(); final auth = context.read(); @@ -141,6 +193,8 @@ class _PlayerPageState extends State { return; } + await _loadPlaybackOptions(); + setState(() { _isRefreshing = true; _isError = false; @@ -223,22 +277,30 @@ class _PlayerPageState extends State { Future _selectResolution() async { _showControls(); + await _loadPlaybackOptions(); + if (!mounted) { + return; + } + final nextResolution = await showModalBottomSheet( context: context, builder: (context) { const options = ['Source', '720p', '480p']; + final available = _availableResolutions.toSet(); return SafeArea( child: Column( mainAxisSize: MainAxisSize.min, children: [ - const ListTile( + ListTile( title: Text('Playback Resolution'), subtitle: Text( - 'Current backend only provides the source stream. Lower resolutions are reserved for future multi-bitrate output.', + available.length > 1 + ? 'Select an available transcoded stream.' + : 'Only the source stream is available right now.', ), ), ...options.map((option) { - final enabled = option == 'Source'; + final enabled = available.contains(option); return ListTile( enabled: enabled, leading: Icon( @@ -249,7 +311,7 @@ class _PlayerPageState extends State { title: Text(option), subtitle: enabled ? const Text('Available now') - : const Text('Requires backend transcoding support'), + : const Text('Waiting for backend transcoding output'), onTap: enabled ? () => Navigator.pop(context, option) : null, ); }), @@ -376,7 +438,7 @@ class _PlayerPageState extends State { : kIsWeb ? WebStreamPlayer( key: ValueKey('web-player-$_playerVersion'), - streamUrl: widget.playbackUrl, + streamUrl: _currentPlaybackUrl(), ) : _controller != null && _controller!.value.isInitialized ? AspectRatio( diff --git a/frontend/lib/pages/settings_page.dart b/frontend/lib/pages/settings_page.dart index 87bbaba..d2fb3df 100644 --- a/frontend/lib/pages/settings_page.dart +++ b/frontend/lib/pages/settings_page.dart @@ -307,11 +307,11 @@ class _SettingsPageState extends State { style: TextStyle(fontSize: 20, fontWeight: FontWeight.bold), ), Text( - "Version: 1.0.0-beta3.5", + "Version: 1.0.0-beta4.1", style: TextStyle(color: Colors.grey), ), Text( - "Author: Highground-Soft & Minimax", + "Author: Highground-Soft", style: TextStyle(color: Colors.grey), ), SizedBox(height: 20), diff --git a/frontend/lib/providers/settings_provider.dart b/frontend/lib/providers/settings_provider.dart index d28e239..53558c1 100644 --- a/frontend/lib/providers/settings_provider.dart +++ b/frontend/lib/providers/settings_provider.dart @@ -62,12 +62,26 @@ class SettingsProvider with ChangeNotifier { return "rtmp://${uri.host}:1935/live"; } - String playbackUrl(String roomId) { + String playbackUrl(String roomId, {String? quality}) { final uri = Uri.parse(_baseUrl); + final normalizedQuality = quality?.trim().toLowerCase(); + if (kIsWeb) { - return uri.replace(path: '/live/$roomId').toString(); + return uri + .replace( + path: '/live/$roomId', + queryParameters: + normalizedQuality == null || normalizedQuality.isEmpty + ? null + : {'quality': normalizedQuality}, + ) + .toString(); } - return "$rtmpUrl/$roomId"; + + if (normalizedQuality == null || normalizedQuality.isEmpty) { + return "$rtmpUrl/$roomId"; + } + return "$rtmpUrl/$roomId/$normalizedQuality"; } ThemeMode _themeModeFromString(String value) { diff --git a/frontend/lib/services/api_service.dart b/frontend/lib/services/api_service.dart index 640a80d..dec796c 100644 --- a/frontend/lib/services/api_service.dart +++ b/frontend/lib/services/api_service.dart @@ -43,11 +43,24 @@ class ApiService { ); } - Future changePassword(String oldPassword, String newPassword) async { + Future getPlaybackOptions(String roomId) async { + return await http.get( + Uri.parse("${settings.baseUrl}/api/rooms/$roomId/playback-options"), + headers: _headers, + ); + } + + Future changePassword( + String oldPassword, + String newPassword, + ) async { return await http.post( Uri.parse("${settings.baseUrl}/api/user/change-password"), headers: _headers, - body: jsonEncode({"old_password": oldPassword, "new_password": newPassword}), + body: jsonEncode({ + "old_password": oldPassword, + "new_password": newPassword, + }), ); } }