Python 流式处理

流式处理让模型响应实时显示,而不是等待全部生成完毕。这对用户体验很重要。

基本流式聊天

import ollama

stream = ollama.chat(
    model='llama3.2',
    messages=[{'role': 'user', 'content': '写一首诗'}],
    stream=True
)

for chunk in stream:
    if chunk['message']['content']:
        print(chunk['message']['content'], end='', flush=True)

基本流式生成

stream = ollama.generate(
    model='llama3.2',
    prompt='用 Python 写一个快速排序',
    stream=True
)

for chunk in stream:
    if chunk['response']:
        print(chunk['response'], end='', flush=True)

封装流式处理

def stream_chat(model, messages, callback=None):
    stream = ollama.chat(model=model, messages=messages, stream=True)
    
    full_response = ''
    for chunk in stream:
        content = chunk['message']['content']
        if content:
            full_response += content
            if callback:
                callback(content)
    
    return full_response

def print_callback(text):
    print(text, end='', flush=True)

result = stream_chat(
    'llama3.2',
    [{'role': 'user', 'content': '写一首诗'}],
    callback=print_callback
)

带进度条的流式输出

import ollama
import sys

def stream_with_progress(prompt, model='llama3.2'):
    print(f"模型: {model}")
    print("-" * 40)
    
    stream = ollama.chat(
        model=model,
        messages=[{'role': 'user', 'content': prompt}],
        stream=True
    )
    
    char_count = 0
    for chunk in stream:
        content = chunk['message']['content']
        if content:
            print(content, end='', flush=True)
            char_count += len(content)
    
    print()
    print("-" * 40)
    print(f"共输出 {char_count} 个字符")

stream_with_progress("介绍一下 Python")

流式对话类

class StreamingChat:
    def __init__(self, model='llama3.2', system=None):
        self.model = model
        self.messages = []
        if system:
            self.messages.append({'role': 'system', 'content': system})
    
    def send(self, content):
        self.messages.append({'role': 'user', 'content': content})
        
        stream = ollama.chat(
            model=self.model,
            messages=self.messages,
            stream=True
        )
        
        full_response = ''
        print("助手: ", end='', flush=True)
        
        for chunk in stream:
            text = chunk['message']['content']
            if text:
                print(text, end='', flush=True)
                full_response += text
        
        print()
        self.messages.append({'role': 'assistant', 'content': full_response})
        return full_response

# 使用
chat = StreamingChat(system='你是一个友好的助手')
chat.send('你好')
chat.send('写一首关于春天的诗')

处理流式响应的元数据

stream = ollama.chat(
    model='llama3.2',
    messages=[{'role': 'user', 'content': '你好'}],
    stream=True
)

for chunk in stream:
    if chunk['message']['content']:
        print(chunk['message']['content'], end='')
    
    if chunk.get('done'):
        print("\n---")
        print(f"总耗时: {chunk.get('total_duration', 0) / 1e9:.2f}秒")
        print(f"生成token数: {chunk.get('eval_count', 0)}")

取消流式请求

import ollama
import threading
import time

class CancellableStream:
    def __init__(self):
        self.cancelled = False
    
    def cancel(self):
        self.cancelled = True
    
    def stream(self, model, messages):
        stream = ollama.chat(model=model, messages=messages, stream=True)
        
        full_response = ''
        for chunk in stream:
            if self.cancelled:
                print("\n[已取消]")
                break
            
            content = chunk['message']['content']
            if content:
                print(content, end='', flush=True)
                full_response += content
        
        return full_response

# 使用
cs = CancellableStream()

def user_input():
    time.sleep(3)
    cs.cancel()

threading.Thread(target=user_input, daemon=True).start()
cs.stream('llama3.2', [{'role': 'user', 'content': '写一篇长文章'}])

Web 框架集成示例

FastAPI 流式响应

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import ollama
import json

app = FastAPI()

@app.post("/chat/stream")
async def chat_stream(message: str):
    def generate():
        stream = ollama.chat(
            model='llama3.2',
            messages=[{'role': 'user', 'content': message}],
            stream=True
        )
        
        for chunk in stream:
            content = chunk['message']['content']
            if content:
                yield f"data: {json.dumps({'content': content})}\n\n"
        
        yield "data: [DONE]\n\n"
    
    return StreamingResponse(generate(), media_type="text/event-stream")

Flask 流式响应

from flask import Flask, Response, request
import ollama
import json

app = Flask(__name__)

@app.route('/chat/stream', methods=['POST'])
def chat_stream():
    data = request.json
    message = data.get('message', '')
    
    def generate():
        stream = ollama.chat(
            model='llama3.2',
            messages=[{'role': 'user', 'content': message}],
            stream=True
        )
        
        for chunk in stream:
            content = chunk['message']['content']
            if content:
                yield f"data: {json.dumps({'content': content})}\n\n"
    
    return Response(generate(), mimetype='text/event-stream')