perf: 提升后端高并发承载能力
This commit is contained in:
@@ -9,10 +9,12 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
writeWait = 10 * time.Second
|
||||
pongWait = 60 * time.Second
|
||||
pingPeriod = (pongWait * 9) / 10
|
||||
maxMessageSize = 512
|
||||
writeWait = 10 * time.Second
|
||||
pongWait = 60 * time.Second
|
||||
pingPeriod = (pongWait * 9) / 10
|
||||
maxMessageSize = 512
|
||||
roomQueueBufferSize = 2048
|
||||
historyLimit = 100
|
||||
)
|
||||
|
||||
type Message struct {
|
||||
@@ -32,12 +34,23 @@ type Client struct {
|
||||
}
|
||||
|
||||
type Hub struct {
|
||||
rooms map[string]map[*Client]bool
|
||||
roomsHistory map[string][]Message
|
||||
broadcast chan Message
|
||||
mutex sync.RWMutex
|
||||
rooms map[string]*roomHub
|
||||
}
|
||||
|
||||
type roomHub struct {
|
||||
roomID string
|
||||
manager *Hub
|
||||
register chan *Client
|
||||
unregister chan *Client
|
||||
mutex sync.RWMutex
|
||||
broadcast chan Message
|
||||
clearHistory chan struct{}
|
||||
stop chan struct{}
|
||||
stopOnce sync.Once
|
||||
|
||||
mutex sync.RWMutex
|
||||
clients map[*Client]struct{}
|
||||
history []Message
|
||||
}
|
||||
|
||||
type StatsSnapshot struct {
|
||||
@@ -48,118 +61,179 @@ type StatsSnapshot struct {
|
||||
|
||||
func NewHub() *Hub {
|
||||
return &Hub{
|
||||
broadcast: make(chan Message),
|
||||
register: make(chan *Client),
|
||||
unregister: make(chan *Client),
|
||||
rooms: make(map[string]map[*Client]bool),
|
||||
roomsHistory: make(map[string][]Message),
|
||||
rooms: make(map[string]*roomHub),
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Hub) Run() {
|
||||
func (h *Hub) getRoom(roomID string) *roomHub {
|
||||
h.mutex.RLock()
|
||||
room := h.rooms[roomID]
|
||||
h.mutex.RUnlock()
|
||||
return room
|
||||
}
|
||||
|
||||
func (h *Hub) getOrCreateRoom(roomID string) *roomHub {
|
||||
if room := h.getRoom(roomID); room != nil {
|
||||
return room
|
||||
}
|
||||
|
||||
h.mutex.Lock()
|
||||
defer h.mutex.Unlock()
|
||||
|
||||
if room := h.rooms[roomID]; room != nil {
|
||||
return room
|
||||
}
|
||||
|
||||
room := &roomHub{
|
||||
roomID: roomID,
|
||||
manager: h,
|
||||
register: make(chan *Client, roomQueueBufferSize),
|
||||
unregister: make(chan *Client, roomQueueBufferSize),
|
||||
broadcast: make(chan Message, roomQueueBufferSize),
|
||||
clearHistory: make(chan struct{}, 1),
|
||||
stop: make(chan struct{}),
|
||||
clients: make(map[*Client]struct{}),
|
||||
}
|
||||
h.rooms[roomID] = room
|
||||
go room.run()
|
||||
return room
|
||||
}
|
||||
|
||||
func (h *Hub) deleteRoomIfIdle(room *roomHub) {
|
||||
room.mutex.RLock()
|
||||
idle := len(room.clients) == 0 && len(room.history) == 0
|
||||
room.mutex.RUnlock()
|
||||
if !idle {
|
||||
return
|
||||
}
|
||||
|
||||
h.mutex.Lock()
|
||||
if current := h.rooms[room.roomID]; current == room {
|
||||
delete(h.rooms, room.roomID)
|
||||
room.stopOnce.Do(func() {
|
||||
close(room.stop)
|
||||
})
|
||||
}
|
||||
h.mutex.Unlock()
|
||||
}
|
||||
|
||||
func (r *roomHub) run() {
|
||||
for {
|
||||
select {
|
||||
case client := <-h.register:
|
||||
h.mutex.Lock()
|
||||
if h.rooms[client.RoomID] == nil {
|
||||
h.rooms[client.RoomID] = make(map[*Client]bool)
|
||||
}
|
||||
h.rooms[client.RoomID][client] = true
|
||||
|
||||
// Copy existing history to send outside the lock
|
||||
var historyCopy []Message
|
||||
if history, ok := h.roomsHistory[client.RoomID]; ok {
|
||||
historyCopy = make([]Message, len(history))
|
||||
copy(historyCopy, history)
|
||||
}
|
||||
h.mutex.Unlock()
|
||||
|
||||
// Send history outside the lock
|
||||
for _, msg := range historyCopy {
|
||||
msg.IsHistory = true
|
||||
msgBytes, err := json.Marshal(msg)
|
||||
if err == nil {
|
||||
select {
|
||||
case client.Send <- msgBytes:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
case client := <-h.unregister:
|
||||
h.mutex.Lock()
|
||||
if rooms, ok := h.rooms[client.RoomID]; ok {
|
||||
if _, ok := rooms[client]; ok {
|
||||
delete(rooms, client)
|
||||
close(client.Send)
|
||||
// We no longer delete the room from h.rooms here if we want history to persist
|
||||
// even if everyone leaves (as long as it's active in DB).
|
||||
// But we should clean up if the room is empty and we want to save memory.
|
||||
// However, the history is what matters.
|
||||
if len(rooms) == 0 {
|
||||
delete(h.rooms, client.RoomID)
|
||||
}
|
||||
}
|
||||
}
|
||||
h.mutex.Unlock()
|
||||
|
||||
case message := <-h.broadcast:
|
||||
// Marshal message outside the lock to optimize performance
|
||||
msgBytes, err := json.Marshal(message)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
h.mutex.Lock()
|
||||
// Only store "chat" and "danmaku" messages in history
|
||||
if message.Type == "chat" || message.Type == "danmaku" {
|
||||
h.roomsHistory[message.RoomID] = append(h.roomsHistory[message.RoomID], message)
|
||||
// Limit history size to avoid memory leak (e.g., last 100 messages)
|
||||
if len(h.roomsHistory[message.RoomID]) > 100 {
|
||||
h.roomsHistory[message.RoomID] = h.roomsHistory[message.RoomID][1:]
|
||||
}
|
||||
}
|
||||
|
||||
clients := h.rooms[message.RoomID]
|
||||
if clients != nil {
|
||||
for client := range clients {
|
||||
select {
|
||||
case client.Send <- msgBytes:
|
||||
default:
|
||||
close(client.Send)
|
||||
delete(clients, client)
|
||||
}
|
||||
}
|
||||
}
|
||||
h.mutex.Unlock()
|
||||
case client := <-r.register:
|
||||
r.handleRegister(client)
|
||||
case client := <-r.unregister:
|
||||
r.handleUnregister(client)
|
||||
case message := <-r.broadcast:
|
||||
r.handleBroadcast(message)
|
||||
case <-r.clearHistory:
|
||||
r.handleClearHistory()
|
||||
case <-r.stop:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ClearRoomHistory removes history for a room, should be called when stream ends
|
||||
func (r *roomHub) handleRegister(client *Client) {
|
||||
r.mutex.RLock()
|
||||
historyCopy := make([]Message, len(r.history))
|
||||
copy(historyCopy, r.history)
|
||||
r.mutex.RUnlock()
|
||||
|
||||
for _, msg := range historyCopy {
|
||||
msg.IsHistory = true
|
||||
msgBytes, err := json.Marshal(msg)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
select {
|
||||
case client.Send <- msgBytes:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
r.mutex.Lock()
|
||||
r.clients[client] = struct{}{}
|
||||
r.mutex.Unlock()
|
||||
}
|
||||
|
||||
func (r *roomHub) handleUnregister(client *Client) {
|
||||
r.mutex.Lock()
|
||||
if _, ok := r.clients[client]; ok {
|
||||
delete(r.clients, client)
|
||||
close(client.Send)
|
||||
}
|
||||
r.mutex.Unlock()
|
||||
|
||||
r.manager.deleteRoomIfIdle(r)
|
||||
}
|
||||
|
||||
func (r *roomHub) handleBroadcast(message Message) {
|
||||
msgBytes, err := json.Marshal(message)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
r.mutex.Lock()
|
||||
if message.Type == "chat" || message.Type == "danmaku" {
|
||||
r.history = append(r.history, message)
|
||||
if len(r.history) > historyLimit {
|
||||
r.history = r.history[1:]
|
||||
}
|
||||
}
|
||||
|
||||
for client := range r.clients {
|
||||
select {
|
||||
case client.Send <- msgBytes:
|
||||
default:
|
||||
close(client.Send)
|
||||
delete(r.clients, client)
|
||||
}
|
||||
}
|
||||
r.mutex.Unlock()
|
||||
}
|
||||
|
||||
func (r *roomHub) handleClearHistory() {
|
||||
r.mutex.Lock()
|
||||
r.history = nil
|
||||
r.mutex.Unlock()
|
||||
r.manager.deleteRoomIfIdle(r)
|
||||
}
|
||||
|
||||
func (h *Hub) ClearRoomHistory(roomID string) {
|
||||
h.mutex.Lock()
|
||||
defer h.mutex.Unlock()
|
||||
delete(h.roomsHistory, roomID)
|
||||
if room := h.getRoom(roomID); room != nil {
|
||||
select {
|
||||
case room.clearHistory <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Hub) RegisterClient(c *Client) {
|
||||
h.register <- c
|
||||
h.getOrCreateRoom(c.RoomID).register <- c
|
||||
}
|
||||
|
||||
func (h *Hub) UnregisterClient(c *Client) {
|
||||
if room := h.getRoom(c.RoomID); room != nil {
|
||||
room.unregister <- c
|
||||
}
|
||||
}
|
||||
|
||||
// BroadcastToRoom sends a message to the broadcast channel
|
||||
func (h *Hub) BroadcastToRoom(msg Message) {
|
||||
h.broadcast <- msg
|
||||
h.getOrCreateRoom(msg.RoomID).broadcast <- msg
|
||||
}
|
||||
|
||||
func (c *Client) ReadPump() {
|
||||
defer func() {
|
||||
c.Hub.unregister <- c
|
||||
c.Hub.UnregisterClient(c)
|
||||
c.Conn.Close()
|
||||
}()
|
||||
c.Conn.SetReadLimit(maxMessageSize)
|
||||
c.Conn.SetReadDeadline(time.Now().Add(pongWait))
|
||||
c.Conn.SetPongHandler(func(string) error { c.Conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })
|
||||
c.Conn.SetPongHandler(func(string) error {
|
||||
c.Conn.SetReadDeadline(time.Now().Add(pongWait))
|
||||
return nil
|
||||
})
|
||||
for {
|
||||
_, message, err := c.Conn.ReadMessage()
|
||||
if err != nil {
|
||||
@@ -169,7 +243,7 @@ func (c *Client) ReadPump() {
|
||||
if err := json.Unmarshal(message, &msg); err == nil {
|
||||
msg.RoomID = c.RoomID
|
||||
msg.Username = c.Username
|
||||
c.Hub.broadcast <- msg
|
||||
c.Hub.BroadcastToRoom(msg)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -192,7 +266,10 @@ func (c *Client) WritePump() {
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
w.Write(message)
|
||||
if _, err := w.Write(message); err != nil {
|
||||
_ = w.Close()
|
||||
return
|
||||
}
|
||||
if err := w.Close(); err != nil {
|
||||
return
|
||||
}
|
||||
@@ -209,23 +286,29 @@ var MainHub *Hub
|
||||
|
||||
func InitChat() {
|
||||
MainHub = NewHub()
|
||||
go MainHub.Run()
|
||||
}
|
||||
|
||||
func (h *Hub) GetStatsSnapshot() StatsSnapshot {
|
||||
h.mutex.RLock()
|
||||
defer h.mutex.RUnlock()
|
||||
|
||||
rooms := make([]*roomHub, 0, len(h.rooms))
|
||||
roomClients := make(map[string]int, len(h.rooms))
|
||||
for roomID, room := range h.rooms {
|
||||
rooms = append(rooms, room)
|
||||
roomClients[roomID] = 0
|
||||
}
|
||||
h.mutex.RUnlock()
|
||||
|
||||
totalClients := 0
|
||||
for roomID, clients := range h.rooms {
|
||||
count := len(clients)
|
||||
roomClients[roomID] = count
|
||||
for _, room := range rooms {
|
||||
room.mutex.RLock()
|
||||
count := len(room.clients)
|
||||
room.mutex.RUnlock()
|
||||
roomClients[room.roomID] = count
|
||||
totalClients += count
|
||||
}
|
||||
|
||||
return StatsSnapshot{
|
||||
RoomCount: len(h.rooms),
|
||||
RoomCount: len(rooms),
|
||||
TotalConnectedClient: totalClients,
|
||||
RoomClients: roomClients,
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user