Files
Hightube/backend/internal/chat/hub.go

316 lines
6.2 KiB
Go

package chat
import (
"encoding/json"
"sync"
"time"
"github.com/gorilla/websocket"
)
const (
writeWait = 10 * time.Second
pongWait = 60 * time.Second
pingPeriod = (pongWait * 9) / 10
maxMessageSize = 512
roomQueueBufferSize = 2048
historyLimit = 100
)
type Message struct {
Type string `json:"type"` // "chat", "system", "danmaku"
Username string `json:"username"`
Content string `json:"content"`
RoomID string `json:"room_id"`
IsHistory bool `json:"is_history"`
}
type Client struct {
Hub *Hub
Conn *websocket.Conn
Send chan []byte
RoomID string
Username string
}
type Hub struct {
mutex sync.RWMutex
rooms map[string]*roomHub
}
type roomHub struct {
roomID string
manager *Hub
register chan *Client
unregister chan *Client
broadcast chan Message
clearHistory chan struct{}
stop chan struct{}
stopOnce sync.Once
mutex sync.RWMutex
clients map[*Client]struct{}
history []Message
}
type StatsSnapshot struct {
RoomCount int `json:"room_count"`
TotalConnectedClient int `json:"total_connected_client"`
RoomClients map[string]int `json:"room_clients"`
}
func NewHub() *Hub {
return &Hub{
rooms: make(map[string]*roomHub),
}
}
func (h *Hub) getRoom(roomID string) *roomHub {
h.mutex.RLock()
room := h.rooms[roomID]
h.mutex.RUnlock()
return room
}
func (h *Hub) getOrCreateRoom(roomID string) *roomHub {
if room := h.getRoom(roomID); room != nil {
return room
}
h.mutex.Lock()
defer h.mutex.Unlock()
if room := h.rooms[roomID]; room != nil {
return room
}
room := &roomHub{
roomID: roomID,
manager: h,
register: make(chan *Client, roomQueueBufferSize),
unregister: make(chan *Client, roomQueueBufferSize),
broadcast: make(chan Message, roomQueueBufferSize),
clearHistory: make(chan struct{}, 1),
stop: make(chan struct{}),
clients: make(map[*Client]struct{}),
}
h.rooms[roomID] = room
go room.run()
return room
}
func (h *Hub) deleteRoomIfIdle(room *roomHub) {
room.mutex.RLock()
idle := len(room.clients) == 0 && len(room.history) == 0
room.mutex.RUnlock()
if !idle {
return
}
h.mutex.Lock()
if current := h.rooms[room.roomID]; current == room {
delete(h.rooms, room.roomID)
room.stopOnce.Do(func() {
close(room.stop)
})
}
h.mutex.Unlock()
}
func (r *roomHub) run() {
for {
select {
case client := <-r.register:
r.handleRegister(client)
case client := <-r.unregister:
r.handleUnregister(client)
case message := <-r.broadcast:
r.handleBroadcast(message)
case <-r.clearHistory:
r.handleClearHistory()
case <-r.stop:
return
}
}
}
func (r *roomHub) handleRegister(client *Client) {
r.mutex.RLock()
historyCopy := make([]Message, len(r.history))
copy(historyCopy, r.history)
r.mutex.RUnlock()
for _, msg := range historyCopy {
msg.IsHistory = true
msgBytes, err := json.Marshal(msg)
if err != nil {
continue
}
select {
case client.Send <- msgBytes:
default:
}
}
r.mutex.Lock()
r.clients[client] = struct{}{}
r.mutex.Unlock()
}
func (r *roomHub) handleUnregister(client *Client) {
r.mutex.Lock()
if _, ok := r.clients[client]; ok {
delete(r.clients, client)
close(client.Send)
}
r.mutex.Unlock()
r.manager.deleteRoomIfIdle(r)
}
func (r *roomHub) handleBroadcast(message Message) {
msgBytes, err := json.Marshal(message)
if err != nil {
return
}
r.mutex.Lock()
if message.Type == "chat" || message.Type == "danmaku" {
r.history = append(r.history, message)
if len(r.history) > historyLimit {
r.history = r.history[1:]
}
}
for client := range r.clients {
select {
case client.Send <- msgBytes:
default:
close(client.Send)
delete(r.clients, client)
}
}
r.mutex.Unlock()
}
func (r *roomHub) handleClearHistory() {
r.mutex.Lock()
r.history = nil
r.mutex.Unlock()
r.manager.deleteRoomIfIdle(r)
}
func (h *Hub) ClearRoomHistory(roomID string) {
if room := h.getRoom(roomID); room != nil {
select {
case room.clearHistory <- struct{}{}:
default:
}
}
}
func (h *Hub) RegisterClient(c *Client) {
h.getOrCreateRoom(c.RoomID).register <- c
}
func (h *Hub) UnregisterClient(c *Client) {
if room := h.getRoom(c.RoomID); room != nil {
room.unregister <- c
}
}
func (h *Hub) BroadcastToRoom(msg Message) {
h.getOrCreateRoom(msg.RoomID).broadcast <- msg
}
func (c *Client) ReadPump() {
defer func() {
c.Hub.UnregisterClient(c)
c.Conn.Close()
}()
c.Conn.SetReadLimit(maxMessageSize)
c.Conn.SetReadDeadline(time.Now().Add(pongWait))
c.Conn.SetPongHandler(func(string) error {
c.Conn.SetReadDeadline(time.Now().Add(pongWait))
return nil
})
for {
_, message, err := c.Conn.ReadMessage()
if err != nil {
break
}
var msg Message
if err := json.Unmarshal(message, &msg); err == nil {
msg.RoomID = c.RoomID
msg.Username = c.Username
c.Hub.BroadcastToRoom(msg)
}
}
}
func (c *Client) WritePump() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
c.Conn.Close()
}()
for {
select {
case message, ok := <-c.Send:
c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
if !ok {
c.Conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
w, err := c.Conn.NextWriter(websocket.TextMessage)
if err != nil {
return
}
if _, err := w.Write(message); err != nil {
_ = w.Close()
return
}
if err := w.Close(); err != nil {
return
}
case <-ticker.C:
c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.Conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
}
}
}
var MainHub *Hub
func InitChat() {
MainHub = NewHub()
}
func (h *Hub) GetStatsSnapshot() StatsSnapshot {
h.mutex.RLock()
rooms := make([]*roomHub, 0, len(h.rooms))
roomClients := make(map[string]int, len(h.rooms))
for roomID, room := range h.rooms {
rooms = append(rooms, room)
roomClients[roomID] = 0
}
h.mutex.RUnlock()
totalClients := 0
for _, room := range rooms {
room.mutex.RLock()
count := len(room.clients)
room.mutex.RUnlock()
roomClients[room.roomID] = count
totalClients += count
}
return StatsSnapshot{
RoomCount: len(rooms),
TotalConnectedClient: totalClients,
RoomClients: roomClients,
}
}