SSE 服务器推送

Server-Sent Events (SSE) 是一种服务器主动推送的技术,比 WebSocket 更简单,适合单向实时更新。

基本 SSE 实现

package main

import (
    "fmt"
    "time"
    
    "github.com/gin-gonic/gin"
)

func main() {
    r := gin.Default()
    
    r.GET("/events", func(c *gin.Context) {
        c.Header("Content-Type", "text/event-stream")
        c.Header("Cache-Control", "no-cache")
        c.Header("Connection", "keep-alive")
        
        for i := 0; i < 10; i++ {
            fmt.Fprintf(c.Writer, "data: 消息 %d\n\n", i)
            c.Writer.Flush()
            time.Sleep(time.Second)
        }
    })
    
    r.Run(":8080")
}

实时通知示例

type Notification struct {
    ID      int    `json:"id"`
    Type    string `json:"type"`
    Message string `json:"message"`
}

var notificationChan = make(chan Notification, 100)

func init() {
    go func() {
        id := 0
        for {
            time.Sleep(2 * time.Second)
            id++
            notificationChan <- Notification{
                ID:      id,
                Type:    "info",
                Message: fmt.Sprintf("通知消息 %d", id),
            }
        }
    }()
}

func main() {
    r := gin.Default()
    
    r.GET("/notifications", func(c *gin.Context) {
        c.Header("Content-Type", "text/event-stream")
        c.Header("Cache-Control", "no-cache")
        c.Header("Connection", "keep-alive")
        
        for {
            select {
            case notification := <-notificationChan:
                data, _ := json.Marshal(notification)
                fmt.Fprintf(c.Writer, "data: %s\n\n", data)
                c.Writer.Flush()
                
            case <-c.Request.Context().Done():
                return
            }
        }
    })
    
    r.Run(":8080")
}

带事件类型

r.GET("/events-typed", func(c *gin.Context) {
    c.Header("Content-Type", "text/event-stream")
    c.Header("Cache-Control", "no-cache")
    c.Header("Connection", "keep-alive")
    
    for i := 0; ; i++ {
        fmt.Fprintf(c.Writer, "event: message\n")
        fmt.Fprintf(c.Writer, "data: {\"id\": %d, \"text\": \"消息内容\"}\n\n", i)
        c.Writer.Flush()
        time.Sleep(time.Second)
    }
})

重连机制

r.GET("/events-reconnect", func(c *gin.Context) {
    c.Header("Content-Type", "text/event-stream")
    c.Header("Cache-Control", "no-cache")
    c.Header("Connection", "keep-alive")
    
    lastEventID := c.GetHeader("Last-Event-ID")
    startID := 0
    if lastEventID != "" {
        startID, _ = strconv.Atoi(lastEventID)
    }
    
    for i := startID; ; i++ {
        fmt.Fprintf(c.Writer, "id: %d\n", i)
        fmt.Fprintf(c.Writer, "data: 消息 %d\n\n", i)
        c.Writer.Flush()
        time.Sleep(time.Second)
    }
})

广播消息

type Client struct {
    Channel chan string
}

type Broadcaster struct {
    Clients   map[*Client]bool
    Register  chan *Client
    Remove    chan *Client
    Messages  chan string
}

func NewBroadcaster() *Broadcaster {
    return &Broadcaster{
        Clients:  make(map[*Client]bool),
        Register: make(chan *Client),
        Remove:   make(chan *Client),
        Messages: make(chan string, 100),
    }
}

func (b *Broadcaster) Run() {
    for {
        select {
        case client := <-b.Register:
            b.Clients[client] = true
            
        case client := <-b.Remove:
            delete(b.Clients, client)
            close(client.Channel)
            
        case msg := <-b.Messages:
            for client := range b.Clients {
                select {
                case client.Channel <- msg:
                default:
                    delete(b.Clients, client)
                    close(client.Channel)
                }
            }
        }
    }
}

var broadcaster = NewBroadcaster()

func main() {
    go broadcaster.Run()
    
    r := gin.Default()
    
    r.GET("/stream", func(c *gin.Context) {
        c.Header("Content-Type", "text/event-stream")
        c.Header("Cache-Control", "no-cache")
        c.Header("Connection", "keep-alive")
        
        client := &Client{Channel: make(chan string, 10)}
        broadcaster.Register <- client
        
        defer func() {
            broadcaster.Remove <- client
        }()
        
        for {
            select {
            case msg := <-client.Channel:
                fmt.Fprintf(c.Writer, "data: %s\n\n", msg)
                c.Writer.Flush()
                
            case <-c.Request.Context().Done():
                return
            }
        }
    })
    
    r.POST("/broadcast", func(c *gin.Context) {
        var msg struct {
            Message string `json:"message"`
        }
        c.BindJSON(&msg)
        broadcaster.Messages <- msg.Message
        c.JSON(200, gin.H{"status": "sent"})
    })
    
    r.Run(":8080")
}

前端接收示例

const eventSource = new EventSource('/events');

eventSource.onmessage = function(event) {
    console.log('Received:', event.data);
};

eventSource.onerror = function(error) {
    console.log('Error:', error);
    eventSource.close();
};

eventSource.addEventListener('message', function(event) {
    console.log('Custom event:', event.data);
});

小结

SSE 是实现服务器推送的简单方式,适合单向实时更新。相比 WebSocket,SSE 更简单,基于 HTTP,自动重连。适合通知、实时数据更新等场景。