package chat import ( "encoding/json" "sync" "time" "github.com/gorilla/websocket" ) const ( writeWait = 10 * time.Second pongWait = 60 * time.Second pingPeriod = (pongWait * 9) / 10 maxMessageSize = 512 ) type Message struct { Type string `json:"type"` // "chat", "system", "danmaku" Username string `json:"username"` Content string `json:"content"` RoomID string `json:"room_id"` } type Client struct { Hub *Hub Conn *websocket.Conn Send chan []byte RoomID string Username string } type Hub struct { rooms map[string]map[*Client]bool broadcast chan Message register chan *Client unregister chan *Client mutex sync.RWMutex } func NewHub() *Hub { return &Hub{ broadcast: make(chan Message), register: make(chan *Client), unregister: make(chan *Client), rooms: make(map[string]map[*Client]bool), } } func (h *Hub) Run() { for { select { case client := <-h.register: h.mutex.Lock() if h.rooms[client.RoomID] == nil { h.rooms[client.RoomID] = make(map[*Client]bool) } h.rooms[client.RoomID][client] = true h.mutex.Unlock() case client := <-h.unregister: h.mutex.Lock() if rooms, ok := h.rooms[client.RoomID]; ok { if _, ok := rooms[client]; ok { delete(rooms, client) close(client.Send) if len(rooms) == 0 { delete(h.rooms, client.RoomID) } } } h.mutex.Unlock() case message := <-h.broadcast: h.mutex.RLock() clients := h.rooms[message.RoomID] if clients != nil { msgBytes, _ := json.Marshal(message) for client := range clients { select { case client.Send <- msgBytes: default: close(client.Send) delete(clients, client) } } } h.mutex.RUnlock() } } } func (h *Hub) RegisterClient(c *Client) { h.register <- c } // BroadcastToRoom sends a message to the broadcast channel func (h *Hub) BroadcastToRoom(msg Message) { h.broadcast <- msg } func (c *Client) ReadPump() { defer func() { c.Hub.unregister <- c c.Conn.Close() }() c.Conn.SetReadLimit(maxMessageSize) c.Conn.SetReadDeadline(time.Now().Add(pongWait)) c.Conn.SetPongHandler(func(string) error { c.Conn.SetReadDeadline(time.Now().Add(pongWait)); return nil }) for { _, message, err := c.Conn.ReadMessage() if err != nil { break } var msg Message if err := json.Unmarshal(message, &msg); err == nil { msg.RoomID = c.RoomID msg.Username = c.Username c.Hub.broadcast <- msg } } } func (c *Client) WritePump() { ticker := time.NewTicker(pingPeriod) defer func() { ticker.Stop() c.Conn.Close() }() for { select { case message, ok := <-c.Send: c.Conn.SetWriteDeadline(time.Now().Add(writeWait)) if !ok { c.Conn.WriteMessage(websocket.CloseMessage, []byte{}) return } w, err := c.Conn.NextWriter(websocket.TextMessage) if err != nil { return } w.Write(message) if err := w.Close(); err != nil { return } case <-ticker.C: c.Conn.SetWriteDeadline(time.Now().Add(writeWait)) if err := c.Conn.WriteMessage(websocket.PingMessage, nil); err != nil { return } } } } var MainHub *Hub func InitChat() { MainHub = NewHub() go MainHub.Run() }