package chat import ( "encoding/json" "sync" "time" "github.com/gorilla/websocket" ) const ( writeWait = 10 * time.Second pongWait = 60 * time.Second pingPeriod = (pongWait * 9) / 10 maxMessageSize = 512 ) type Message struct { Type string `json:"type"` // "chat", "system", "danmaku" Username string `json:"username"` Content string `json:"content"` RoomID string `json:"room_id"` IsHistory bool `json:"is_history"` } type Client struct { Hub *Hub Conn *websocket.Conn Send chan []byte RoomID string Username string } type Hub struct { rooms map[string]map[*Client]bool roomsHistory map[string][]Message broadcast chan Message register chan *Client unregister chan *Client mutex sync.RWMutex } func NewHub() *Hub { return &Hub{ broadcast: make(chan Message), register: make(chan *Client), unregister: make(chan *Client), rooms: make(map[string]map[*Client]bool), roomsHistory: make(map[string][]Message), } } func (h *Hub) Run() { for { select { case client := <-h.register: h.mutex.Lock() if h.rooms[client.RoomID] == nil { h.rooms[client.RoomID] = make(map[*Client]bool) } h.rooms[client.RoomID][client] = true // Send existing history to the newly joined client if history, ok := h.roomsHistory[client.RoomID]; ok { for _, msg := range history { msg.IsHistory = true msgBytes, _ := json.Marshal(msg) // Use select to avoid blocking if client's send channel is full select { case client.Send <- msgBytes: default: // If send fails, we could potentially log or ignore } } } h.mutex.Unlock() case client := <-h.unregister: h.mutex.Lock() if rooms, ok := h.rooms[client.RoomID]; ok { if _, ok := rooms[client]; ok { delete(rooms, client) close(client.Send) // We no longer delete the room from h.rooms here if we want history to persist // even if everyone leaves (as long as it's active in DB). // But we should clean up if the room is empty and we want to save memory. // However, the history is what matters. if len(rooms) == 0 { delete(h.rooms, client.RoomID) } } } h.mutex.Unlock() case message := <-h.broadcast: h.mutex.Lock() // Only store "chat" and "danmaku" messages in history if message.Type == "chat" || message.Type == "danmaku" { h.roomsHistory[message.RoomID] = append(h.roomsHistory[message.RoomID], message) // Limit history size to avoid memory leak (e.g., last 100 messages) if len(h.roomsHistory[message.RoomID]) > 100 { h.roomsHistory[message.RoomID] = h.roomsHistory[message.RoomID][1:] } } clients := h.rooms[message.RoomID] if clients != nil { msgBytes, _ := json.Marshal(message) for client := range clients { select { case client.Send <- msgBytes: default: close(client.Send) delete(clients, client) } } } h.mutex.Unlock() } } } // ClearRoomHistory removes history for a room, should be called when stream ends func (h *Hub) ClearRoomHistory(roomID string) { h.mutex.Lock() defer h.mutex.Unlock() delete(h.roomsHistory, roomID) } func (h *Hub) RegisterClient(c *Client) { h.register <- c } // BroadcastToRoom sends a message to the broadcast channel func (h *Hub) BroadcastToRoom(msg Message) { h.broadcast <- msg } func (c *Client) ReadPump() { defer func() { c.Hub.unregister <- c c.Conn.Close() }() c.Conn.SetReadLimit(maxMessageSize) c.Conn.SetReadDeadline(time.Now().Add(pongWait)) c.Conn.SetPongHandler(func(string) error { c.Conn.SetReadDeadline(time.Now().Add(pongWait)); return nil }) for { _, message, err := c.Conn.ReadMessage() if err != nil { break } var msg Message if err := json.Unmarshal(message, &msg); err == nil { msg.RoomID = c.RoomID msg.Username = c.Username c.Hub.broadcast <- msg } } } func (c *Client) WritePump() { ticker := time.NewTicker(pingPeriod) defer func() { ticker.Stop() c.Conn.Close() }() for { select { case message, ok := <-c.Send: c.Conn.SetWriteDeadline(time.Now().Add(writeWait)) if !ok { c.Conn.WriteMessage(websocket.CloseMessage, []byte{}) return } w, err := c.Conn.NextWriter(websocket.TextMessage) if err != nil { return } w.Write(message) if err := w.Close(); err != nil { return } case <-ticker.C: c.Conn.SetWriteDeadline(time.Now().Add(writeWait)) if err := c.Conn.WriteMessage(websocket.PingMessage, nil); err != nil { return } } } } var MainHub *Hub func InitChat() { MainHub = NewHub() go MainHub.Run() }