Go 流式处理

Go 语言处理流式响应需要逐行解析 JSON,实现实时输出效果。

流式聊天

package main

import (
    "bytes"
    "encoding/json"
    "fmt"
    "io"
    "net/http"
)

type Message struct {
    Role    string `json:"role"`
    Content string `json:"content"`
}

type ChatRequest struct {
    Model    string    `json:"model"`
    Messages []Message `json:"messages"`
    Stream   bool      `json:"stream"`
}

type ChatStreamResponse struct {
    Message Message `json:"message"`
    Done    bool    `json:"done"`
}

func streamChat(model string, messages []Message) error {
    req := ChatRequest{
        Model:    model,
        Messages: messages,
        Stream:   true,
    }
    
    body, _ := json.Marshal(req)
    resp, err := http.Post(
        "http://localhost:11434/api/chat",
        "application/json",
        bytes.NewReader(body),
    )
    if err != nil {
        return err
    }
    defer resp.Body.Close()
    
    decoder := json.NewDecoder(resp.Body)
    for {
        var chunk ChatStreamResponse
        if err := decoder.Decode(&chunk); err != nil {
            if err == io.EOF {
                break
            }
            return err
        }
        
        if chunk.Message.Content != "" {
            fmt.Print(chunk.Message.Content)
        }
    }
    
    fmt.Println()
    return nil
}

func main() {
    streamChat("llama3.2", []Message{
        {Role: "user", Content: "写一首诗"},
    })
}

流式生成

type GenerateRequest struct {
    Model  string `json:"model"`
    Prompt string `json:"prompt"`
    Stream bool   `json:"stream"`
}

type GenerateStreamResponse struct {
    Response string `json:"response"`
    Done     bool   `json:"done"`
}

func streamGenerate(model, prompt string) error {
    req := GenerateRequest{
        Model:  model,
        Prompt: prompt,
        Stream: true,
    }
    
    body, _ := json.Marshal(req)
    resp, err := http.Post(
        "http://localhost:11434/api/generate",
        "application/json",
        bytes.NewReader(body),
    )
    if err != nil {
        return err
    }
    defer resp.Body.Close()
    
    decoder := json.NewDecoder(resp.Body)
    for {
        var chunk GenerateStreamResponse
        if err := decoder.Decode(&chunk); err != nil {
            if err == io.EOF {
                break
            }
            return err
        }
        
        if chunk.Response != "" {
            fmt.Print(chunk.Response)
        }
    }
    
    fmt.Println()
    return nil
}

func main() {
    streamGenerate("llama3.2", "用 Go 写一个快速排序")
}

带回调的流式处理

func streamChatWithCallback(
    model string,
    messages []Message,
    callback func(string),
) (string, error) {
    req := ChatRequest{
        Model:    model,
        Messages: messages,
        Stream:   true,
    }
    
    body, _ := json.Marshal(req)
    resp, err := http.Post(
        "http://localhost:11434/api/chat",
        "application/json",
        bytes.NewReader(body),
    )
    if err != nil {
        return "", err
    }
    defer resp.Body.Close()
    
    var fullResponse string
    decoder := json.NewDecoder(resp.Body)
    
    for {
        var chunk ChatStreamResponse
        if err := decoder.Decode(&chunk); err != nil {
            if err == io.EOF {
                break
            }
            return "", err
        }
        
        if chunk.Message.Content != "" {
            fullResponse += chunk.Message.Content
            if callback != nil {
                callback(chunk.Message.Content)
            }
        }
    }
    
    return fullResponse, nil
}

func main() {
    result, _ := streamChatWithCallback("llama3.2", []Message{
        {Role: "user", Content: "写一首诗"},
    }, func(text string) {
        fmt.Print(text)
    })
    fmt.Println()
    fmt.Printf("总长度: %d\n", len(result))
}

处理流式元数据

type ChatStreamResponseWithMeta struct {
    Message       Message `json:"message"`
    Done          bool    `json:"done"`
    TotalDuration int64   `json:"total_duration,omitempty"`
    EvalCount     int     `json:"eval_count,omitempty"`
}

func streamWithMeta(model string, messages []Message) error {
    req := ChatRequest{
        Model:    model,
        Messages: messages,
        Stream:   true,
    }
    
    body, _ := json.Marshal(req)
    resp, err := http.Post(
        "http://localhost:11434/api/chat",
        "application/json",
        bytes.NewReader(body),
    )
    if err != nil {
        return err
    }
    defer resp.Body.Close()
    
    decoder := json.NewDecoder(resp.Body)
    for {
        var chunk ChatStreamResponseWithMeta
        if err := decoder.Decode(&chunk); err != nil {
            if err == io.EOF {
                break
            }
            return err
        }
        
        if chunk.Message.Content != "" {
            fmt.Print(chunk.Message.Content)
        }
        
        if chunk.Done {
            fmt.Println("\n---")
            fmt.Printf("总耗时: %.2f秒\n", float64(chunk.TotalDuration)/1e9)
            fmt.Printf("生成token数: %d\n", chunk.EvalCount)
        }
    }
    
    return nil
}

流式对话会话

type StreamingChatSession struct {
    Model    string
    Messages []Message
}

func NewStreamingChatSession(model, system string) *StreamingChatSession {
    session := &StreamingChatSession{
        Model:    model,
        Messages: []Message{},
    }
    if system != "" {
        session.Messages = append(session.Messages, Message{
            Role: "system", Content: system,
        })
    }
    return session
}

func (s *StreamingChatSession) Send(content string) (string, error) {
    s.Messages = append(s.Messages, Message{
        Role: "user", Content: content,
    })
    
    fullResponse, err := streamChatWithCallback(s.Model, s.Messages, func(text string) {
        fmt.Print(text)
    })
    if err != nil {
        return "", err
    }
    
    fmt.Println()
    s.Messages = append(s.Messages, Message{
        Role: "assistant", Content: fullResponse,
    })
    
    return fullResponse, nil
}

func main() {
    session := NewStreamingChatSession("llama3.2", "你是一个友好的助手")
    
    fmt.Print("助手: ")
    session.Send("你好")
    
    fmt.Print("助手: ")
    session.Send("写一首关于春天的诗")
}

HTTP 服务流式响应

package main

import (
    "bytes"
    "encoding/json"
    "io"
    "net/http"
)

func streamHandler(w http.ResponseWriter, r *http.Request) {
    var req struct {
        Message string `json:"message"`
    }
    json.NewDecoder(r.Body).Decode(&req)
    
    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")
    w.Header().Set("Connection", "keep-alive")
    
    chatReq := ChatRequest{
        Model:    "llama3.2",
        Messages: []Message{{Role: "user", Content: req.Message}},
        Stream:   true,
    }
    
    body, _ := json.Marshal(chatReq)
    resp, _ := http.Post(
        "http://localhost:11434/api/chat",
        "application/json",
        bytes.NewReader(body),
    )
    defer resp.Body.Close()
    
    flusher, _ := w.(http.Flusher)
    decoder := json.NewDecoder(resp.Body)
    
    for {
        var chunk ChatStreamResponse
        if err := decoder.Decode(&chunk); err != nil {
            if err == io.EOF {
                break
            }
            break
        }
        
        if chunk.Message.Content != "" {
            data, _ := json.Marshal(map[string]string{"content": chunk.Message.Content})
            w.Write([]byte("data: " + string(data) + "\n\n"))
            flusher.Flush()
        }
    }
    
    w.Write([]byte("data: [DONE]\n\n"))
    flusher.Flush()
}

func main() {
    http.HandleFunc("/chat/stream", streamHandler)
    http.ListenAndServe(":8080", nil)
}