Server-Sent Events (SSE) 是一种服务器主动推送的技术,比 WebSocket 更简单,适合单向实时更新。
package main
import (
"fmt"
"time"
"github.com/gin-gonic/gin"
)
func main() {
r := gin.Default()
r.GET("/events", func(c *gin.Context) {
c.Header("Content-Type", "text/event-stream")
c.Header("Cache-Control", "no-cache")
c.Header("Connection", "keep-alive")
for i := 0; i < 10; i++ {
fmt.Fprintf(c.Writer, "data: 消息 %d\n\n", i)
c.Writer.Flush()
time.Sleep(time.Second)
}
})
r.Run(":8080")
}
type Notification struct {
ID int `json:"id"`
Type string `json:"type"`
Message string `json:"message"`
}
var notificationChan = make(chan Notification, 100)
func init() {
go func() {
id := 0
for {
time.Sleep(2 * time.Second)
id++
notificationChan <- Notification{
ID: id,
Type: "info",
Message: fmt.Sprintf("通知消息 %d", id),
}
}
}()
}
func main() {
r := gin.Default()
r.GET("/notifications", func(c *gin.Context) {
c.Header("Content-Type", "text/event-stream")
c.Header("Cache-Control", "no-cache")
c.Header("Connection", "keep-alive")
for {
select {
case notification := <-notificationChan:
data, _ := json.Marshal(notification)
fmt.Fprintf(c.Writer, "data: %s\n\n", data)
c.Writer.Flush()
case <-c.Request.Context().Done():
return
}
}
})
r.Run(":8080")
}
r.GET("/events-typed", func(c *gin.Context) {
c.Header("Content-Type", "text/event-stream")
c.Header("Cache-Control", "no-cache")
c.Header("Connection", "keep-alive")
for i := 0; ; i++ {
fmt.Fprintf(c.Writer, "event: message\n")
fmt.Fprintf(c.Writer, "data: {\"id\": %d, \"text\": \"消息内容\"}\n\n", i)
c.Writer.Flush()
time.Sleep(time.Second)
}
})
r.GET("/events-reconnect", func(c *gin.Context) {
c.Header("Content-Type", "text/event-stream")
c.Header("Cache-Control", "no-cache")
c.Header("Connection", "keep-alive")
lastEventID := c.GetHeader("Last-Event-ID")
startID := 0
if lastEventID != "" {
startID, _ = strconv.Atoi(lastEventID)
}
for i := startID; ; i++ {
fmt.Fprintf(c.Writer, "id: %d\n", i)
fmt.Fprintf(c.Writer, "data: 消息 %d\n\n", i)
c.Writer.Flush()
time.Sleep(time.Second)
}
})
type Client struct {
Channel chan string
}
type Broadcaster struct {
Clients map[*Client]bool
Register chan *Client
Remove chan *Client
Messages chan string
}
func NewBroadcaster() *Broadcaster {
return &Broadcaster{
Clients: make(map[*Client]bool),
Register: make(chan *Client),
Remove: make(chan *Client),
Messages: make(chan string, 100),
}
}
func (b *Broadcaster) Run() {
for {
select {
case client := <-b.Register:
b.Clients[client] = true
case client := <-b.Remove:
delete(b.Clients, client)
close(client.Channel)
case msg := <-b.Messages:
for client := range b.Clients {
select {
case client.Channel <- msg:
default:
delete(b.Clients, client)
close(client.Channel)
}
}
}
}
}
var broadcaster = NewBroadcaster()
func main() {
go broadcaster.Run()
r := gin.Default()
r.GET("/stream", func(c *gin.Context) {
c.Header("Content-Type", "text/event-stream")
c.Header("Cache-Control", "no-cache")
c.Header("Connection", "keep-alive")
client := &Client{Channel: make(chan string, 10)}
broadcaster.Register <- client
defer func() {
broadcaster.Remove <- client
}()
for {
select {
case msg := <-client.Channel:
fmt.Fprintf(c.Writer, "data: %s\n\n", msg)
c.Writer.Flush()
case <-c.Request.Context().Done():
return
}
}
})
r.POST("/broadcast", func(c *gin.Context) {
var msg struct {
Message string `json:"message"`
}
c.BindJSON(&msg)
broadcaster.Messages <- msg.Message
c.JSON(200, gin.H{"status": "sent"})
})
r.Run(":8080")
}
const eventSource = new EventSource('/events');
eventSource.onmessage = function(event) {
console.log('Received:', event.data);
};
eventSource.onerror = function(error) {
console.log('Error:', error);
eventSource.close();
};
eventSource.addEventListener('message', function(event) {
console.log('Custom event:', event.data);
});
SSE 是实现服务器推送的简单方式,适合单向实时更新。相比 WebSocket,SSE 更简单,基于 HTTP,自动重连。适合通知、实时数据更新等场景。