流式响应

模型生成文本需要时间。如果等全部生成完再返回,用户可能要等好几秒才能看到第一个字。流式响应解决了这个问题。

什么是流式响应?

普通请求:发送 → 等待 → 一次性返回完整结果

流式请求:发送 → 持续接收部分结果 → 直到完成

打个比方,普通请求像下载完整个视频再看,流式请求像在线看视频,边下边播。

开启流式响应

只需要在请求中加一个参数:

curl http://localhost:11434/api/generate -d '{
  "model": "llama3.2",
  "prompt": "写一首关于春天的诗",
  "stream": true
}'

返回的是多个 JSON 对象,每个一行:

{"model":"llama3.2","created_at":"2024-01-15T10:00:00Z","response":"春","done":false}
{"model":"llama3.2","created_at":"2024-01-15T10:00:00Z","response":"风","done":false}
{"model":"llama3.2","created_at":"2024-01-15T10:00:00Z","response":"吹","done":false}
{"model":"llama3.2","created_at":"2024-01-15T10:00:00Z","response":"绿","done":false}
...
{"model":"llama3.2","created_at":"2024-01-15T10:00:01Z","response":"","done":true,"context":[1,2,3],"total_duration":1234567890}

每个对象包含:

字段说明
response本次生成的文本片段
done是否完成
context完成时的上下文(仅最后一条)
total_duration总耗时(仅最后一条)

关闭流式响应

curl http://localhost:11434/api/generate -d '{
  "model": "llama3.2",
  "prompt": "写一首关于春天的诗",
  "stream": false
}'

返回单个 JSON:

{
  "model": "llama3.2",
  "created_at": "2024-01-15T10:00:05Z",
  "response": "春风吹绿柳梢头,\n桃花含笑映溪流。\n燕子归来寻旧垒,\n一帘春色满西楼。",
  "done": true,
  "context": [1, 2, 3, ...],
  "total_duration": 5000000000
}

Python 处理流式响应

使用 requests 库

import requests
import json

url = "http://localhost:11434/api/generate"
data = {
    "model": "llama3.2",
    "prompt": "写一首关于春天的诗",
    "stream": True
}

response = requests.post(url, json=data, stream=True)

for line in response.iter_lines():
    if line:
        chunk = json.loads(line)
        print(chunk["response"], end="", flush=True)
        if chunk["done"]:
            print("\n完成!")

使用 httpx 异步处理

import httpx
import json
import asyncio

async def stream_generate():
    url = "http://localhost:11434/api/generate"
    data = {
        "model": "llama3.2",
        "prompt": "写一首关于春天的诗",
        "stream": True
    }
    
    async with httpx.AsyncClient() as client:
        async with client.stream("POST", url, json=data) as response:
            async for line in response.aiter_lines():
                if line:
                    chunk = json.loads(line)
                    print(chunk["response"], end="", flush=True)

asyncio.run(stream_generate())

使用官方 SDK

import ollama

stream = ollama.generate(
    model="llama3.2",
    prompt="写一首关于春天的诗",
    stream=True
)

for chunk in stream:
    print(chunk["response"], end="", flush=True)

JavaScript 处理流式响应

浏览器环境

async function streamGenerate() {
    const response = await fetch('http://localhost:11434/api/generate', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({
            model: 'llama3.2',
            prompt: '写一首关于春天的诗',
            stream: true
        })
    });

    const reader = response.body.getReader();
    const decoder = new TextDecoder();

    while (true) {
        const { done, value } = await reader.read();
        if (done) break;
        
        const text = decoder.decode(value);
        const lines = text.split('\n').filter(line => line.trim());
        
        for (const line of lines) {
            const chunk = JSON.parse(line);
            process.stdout.write(chunk.response);
        }
    }
}

streamGenerate();

Node.js 环境

const http = require('http');

const data = JSON.stringify({
    model: 'llama3.2',
    prompt: '写一首关于春天的诗',
    stream: true
});

const req = http.request({
    hostname: 'localhost',
    port: 11434,
    path: '/api/generate',
    method: 'POST',
    headers: { 'Content-Type': 'application/json' }
}, res => {
    let buffer = '';
    
    res.on('data', chunk => {
        buffer += chunk;
        const lines = buffer.split('\n');
        buffer = lines.pop();
        
        for (const line of lines) {
            if (line.trim()) {
                const data = JSON.parse(line);
                process.stdout.write(data.response);
            }
        }
    });
});

req.write(data);
req.end();

Go 处理流式响应

package main

import (
    "bufio"
    "encoding/json"
    "fmt"
    "net/http"
    "strings"
)

type GenerateRequest struct {
    Model  string `json:"model"`
    Prompt string `json:"prompt"`
    Stream bool   `json:"stream"`
}

type GenerateResponse struct {
    Response string `json:"response"`
    Done     bool   `json:"done"`
}

func main() {
    reqBody := GenerateRequest{
        Model:  "llama3.2",
        Prompt: "写一首关于春天的诗",
        Stream: true,
    }
    
    jsonData, _ := json.Marshal(reqBody)
    
    resp, err := http.Post(
        "http://localhost:11434/api/generate",
        "application/json",
        strings.NewReader(string(jsonData)),
    )
    if err != nil {
        panic(err)
    }
    defer resp.Body.Close()
    
    scanner := bufio.NewScanner(resp.Body)
    for scanner.Scan() {
        var chunk GenerateResponse
        json.Unmarshal(scanner.Bytes(), &chunk)
        fmt.Print(chunk.Response)
    }
}

聊天接口的流式响应

聊天接口同样支持流式:

curl http://localhost:11434/api/chat -d '{
  "model": "llama3.2",
  "messages": [
    {"role": "user", "content": "讲个笑话"}
  ],
  "stream": true
}'

返回格式:

{"model":"llama3.2","created_at":"2024-01-15T10:00:00Z","message":{"role":"assistant","content":"有"},"done":false}
{"model":"llama3.2","created_at":"2024-01-15T10:00:00Z","message":{"role":"assistant","content":"一"},"done":false}
...

Python 示例:

import ollama

stream = ollama.chat(
    model="llama3.2",
    messages=[{"role": "user", "content": "讲个笑话"}],
    stream=True
)

for chunk in stream:
    print(chunk["message"]["content"], end="", flush=True)

流式响应的注意事项

连接超时

流式请求可能持续较长时间,需要设置合适的超时:

# requests
response = requests.post(url, json=data, stream=True, timeout=60)

# httpx
async with httpx.AsyncClient(timeout=60) as client:
    ...

错误处理

流式传输中可能出错,需要处理:

for line in response.iter_lines():
    if line:
        try:
            chunk = json.loads(line)
            if "error" in chunk:
                print(f"Error: {chunk['error']}")
                break
            print(chunk["response"], end="")
        except json.JSONDecodeError:
            continue

中断处理

用户可能中途取消请求:

import signal

interrupted = False

def handler(signum, frame):
    global interrupted
    interrupted = True

signal.signal(signal.SIGINT, handler)

for line in response.iter_lines():
    if interrupted:
        break
    # 处理响应

实际应用场景

命令行工具

实时显示生成内容,用户体验更好。

Web 聊天界面

类似 ChatGPT 的打字效果,用户能立即看到响应。

API 代理

在代理层转发流式响应,保持实时性。