Go 语言处理流式响应需要逐行解析 JSON,实现实时输出效果。
package main
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
)
type Message struct {
Role string `json:"role"`
Content string `json:"content"`
}
type ChatRequest struct {
Model string `json:"model"`
Messages []Message `json:"messages"`
Stream bool `json:"stream"`
}
type ChatStreamResponse struct {
Message Message `json:"message"`
Done bool `json:"done"`
}
func streamChat(model string, messages []Message) error {
req := ChatRequest{
Model: model,
Messages: messages,
Stream: true,
}
body, _ := json.Marshal(req)
resp, err := http.Post(
"http://localhost:11434/api/chat",
"application/json",
bytes.NewReader(body),
)
if err != nil {
return err
}
defer resp.Body.Close()
decoder := json.NewDecoder(resp.Body)
for {
var chunk ChatStreamResponse
if err := decoder.Decode(&chunk); err != nil {
if err == io.EOF {
break
}
return err
}
if chunk.Message.Content != "" {
fmt.Print(chunk.Message.Content)
}
}
fmt.Println()
return nil
}
func main() {
streamChat("llama3.2", []Message{
{Role: "user", Content: "写一首诗"},
})
}
type GenerateRequest struct {
Model string `json:"model"`
Prompt string `json:"prompt"`
Stream bool `json:"stream"`
}
type GenerateStreamResponse struct {
Response string `json:"response"`
Done bool `json:"done"`
}
func streamGenerate(model, prompt string) error {
req := GenerateRequest{
Model: model,
Prompt: prompt,
Stream: true,
}
body, _ := json.Marshal(req)
resp, err := http.Post(
"http://localhost:11434/api/generate",
"application/json",
bytes.NewReader(body),
)
if err != nil {
return err
}
defer resp.Body.Close()
decoder := json.NewDecoder(resp.Body)
for {
var chunk GenerateStreamResponse
if err := decoder.Decode(&chunk); err != nil {
if err == io.EOF {
break
}
return err
}
if chunk.Response != "" {
fmt.Print(chunk.Response)
}
}
fmt.Println()
return nil
}
func main() {
streamGenerate("llama3.2", "用 Go 写一个快速排序")
}
func streamChatWithCallback(
model string,
messages []Message,
callback func(string),
) (string, error) {
req := ChatRequest{
Model: model,
Messages: messages,
Stream: true,
}
body, _ := json.Marshal(req)
resp, err := http.Post(
"http://localhost:11434/api/chat",
"application/json",
bytes.NewReader(body),
)
if err != nil {
return "", err
}
defer resp.Body.Close()
var fullResponse string
decoder := json.NewDecoder(resp.Body)
for {
var chunk ChatStreamResponse
if err := decoder.Decode(&chunk); err != nil {
if err == io.EOF {
break
}
return "", err
}
if chunk.Message.Content != "" {
fullResponse += chunk.Message.Content
if callback != nil {
callback(chunk.Message.Content)
}
}
}
return fullResponse, nil
}
func main() {
result, _ := streamChatWithCallback("llama3.2", []Message{
{Role: "user", Content: "写一首诗"},
}, func(text string) {
fmt.Print(text)
})
fmt.Println()
fmt.Printf("总长度: %d\n", len(result))
}
type ChatStreamResponseWithMeta struct {
Message Message `json:"message"`
Done bool `json:"done"`
TotalDuration int64 `json:"total_duration,omitempty"`
EvalCount int `json:"eval_count,omitempty"`
}
func streamWithMeta(model string, messages []Message) error {
req := ChatRequest{
Model: model,
Messages: messages,
Stream: true,
}
body, _ := json.Marshal(req)
resp, err := http.Post(
"http://localhost:11434/api/chat",
"application/json",
bytes.NewReader(body),
)
if err != nil {
return err
}
defer resp.Body.Close()
decoder := json.NewDecoder(resp.Body)
for {
var chunk ChatStreamResponseWithMeta
if err := decoder.Decode(&chunk); err != nil {
if err == io.EOF {
break
}
return err
}
if chunk.Message.Content != "" {
fmt.Print(chunk.Message.Content)
}
if chunk.Done {
fmt.Println("\n---")
fmt.Printf("总耗时: %.2f秒\n", float64(chunk.TotalDuration)/1e9)
fmt.Printf("生成token数: %d\n", chunk.EvalCount)
}
}
return nil
}
type StreamingChatSession struct {
Model string
Messages []Message
}
func NewStreamingChatSession(model, system string) *StreamingChatSession {
session := &StreamingChatSession{
Model: model,
Messages: []Message{},
}
if system != "" {
session.Messages = append(session.Messages, Message{
Role: "system", Content: system,
})
}
return session
}
func (s *StreamingChatSession) Send(content string) (string, error) {
s.Messages = append(s.Messages, Message{
Role: "user", Content: content,
})
fullResponse, err := streamChatWithCallback(s.Model, s.Messages, func(text string) {
fmt.Print(text)
})
if err != nil {
return "", err
}
fmt.Println()
s.Messages = append(s.Messages, Message{
Role: "assistant", Content: fullResponse,
})
return fullResponse, nil
}
func main() {
session := NewStreamingChatSession("llama3.2", "你是一个友好的助手")
fmt.Print("助手: ")
session.Send("你好")
fmt.Print("助手: ")
session.Send("写一首关于春天的诗")
}
package main
import (
"bytes"
"encoding/json"
"io"
"net/http"
)
func streamHandler(w http.ResponseWriter, r *http.Request) {
var req struct {
Message string `json:"message"`
}
json.NewDecoder(r.Body).Decode(&req)
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
chatReq := ChatRequest{
Model: "llama3.2",
Messages: []Message{{Role: "user", Content: req.Message}},
Stream: true,
}
body, _ := json.Marshal(chatReq)
resp, _ := http.Post(
"http://localhost:11434/api/chat",
"application/json",
bytes.NewReader(body),
)
defer resp.Body.Close()
flusher, _ := w.(http.Flusher)
decoder := json.NewDecoder(resp.Body)
for {
var chunk ChatStreamResponse
if err := decoder.Decode(&chunk); err != nil {
if err == io.EOF {
break
}
break
}
if chunk.Message.Content != "" {
data, _ := json.Marshal(map[string]string{"content": chunk.Message.Content})
w.Write([]byte("data: " + string(data) + "\n\n"))
flusher.Flush()
}
}
w.Write([]byte("data: [DONE]\n\n"))
flusher.Flush()
}
func main() {
http.HandleFunc("/chat/stream", streamHandler)
http.ListenAndServe(":8080", nil)
}