流式处理让模型响应实时显示,而不是等待全部生成完毕。这对用户体验很重要。
import ollama
stream = ollama.chat(
model='llama3.2',
messages=[{'role': 'user', 'content': '写一首诗'}],
stream=True
)
for chunk in stream:
if chunk['message']['content']:
print(chunk['message']['content'], end='', flush=True)
stream = ollama.generate(
model='llama3.2',
prompt='用 Python 写一个快速排序',
stream=True
)
for chunk in stream:
if chunk['response']:
print(chunk['response'], end='', flush=True)
def stream_chat(model, messages, callback=None):
stream = ollama.chat(model=model, messages=messages, stream=True)
full_response = ''
for chunk in stream:
content = chunk['message']['content']
if content:
full_response += content
if callback:
callback(content)
return full_response
def print_callback(text):
print(text, end='', flush=True)
result = stream_chat(
'llama3.2',
[{'role': 'user', 'content': '写一首诗'}],
callback=print_callback
)
import ollama
import sys
def stream_with_progress(prompt, model='llama3.2'):
print(f"模型: {model}")
print("-" * 40)
stream = ollama.chat(
model=model,
messages=[{'role': 'user', 'content': prompt}],
stream=True
)
char_count = 0
for chunk in stream:
content = chunk['message']['content']
if content:
print(content, end='', flush=True)
char_count += len(content)
print()
print("-" * 40)
print(f"共输出 {char_count} 个字符")
stream_with_progress("介绍一下 Python")
class StreamingChat:
def __init__(self, model='llama3.2', system=None):
self.model = model
self.messages = []
if system:
self.messages.append({'role': 'system', 'content': system})
def send(self, content):
self.messages.append({'role': 'user', 'content': content})
stream = ollama.chat(
model=self.model,
messages=self.messages,
stream=True
)
full_response = ''
print("助手: ", end='', flush=True)
for chunk in stream:
text = chunk['message']['content']
if text:
print(text, end='', flush=True)
full_response += text
print()
self.messages.append({'role': 'assistant', 'content': full_response})
return full_response
# 使用
chat = StreamingChat(system='你是一个友好的助手')
chat.send('你好')
chat.send('写一首关于春天的诗')
stream = ollama.chat(
model='llama3.2',
messages=[{'role': 'user', 'content': '你好'}],
stream=True
)
for chunk in stream:
if chunk['message']['content']:
print(chunk['message']['content'], end='')
if chunk.get('done'):
print("\n---")
print(f"总耗时: {chunk.get('total_duration', 0) / 1e9:.2f}秒")
print(f"生成token数: {chunk.get('eval_count', 0)}")
import ollama
import threading
import time
class CancellableStream:
def __init__(self):
self.cancelled = False
def cancel(self):
self.cancelled = True
def stream(self, model, messages):
stream = ollama.chat(model=model, messages=messages, stream=True)
full_response = ''
for chunk in stream:
if self.cancelled:
print("\n[已取消]")
break
content = chunk['message']['content']
if content:
print(content, end='', flush=True)
full_response += content
return full_response
# 使用
cs = CancellableStream()
def user_input():
time.sleep(3)
cs.cancel()
threading.Thread(target=user_input, daemon=True).start()
cs.stream('llama3.2', [{'role': 'user', 'content': '写一篇长文章'}])
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import ollama
import json
app = FastAPI()
@app.post("/chat/stream")
async def chat_stream(message: str):
def generate():
stream = ollama.chat(
model='llama3.2',
messages=[{'role': 'user', 'content': message}],
stream=True
)
for chunk in stream:
content = chunk['message']['content']
if content:
yield f"data: {json.dumps({'content': content})}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
from flask import Flask, Response, request
import ollama
import json
app = Flask(__name__)
@app.route('/chat/stream', methods=['POST'])
def chat_stream():
data = request.json
message = data.get('message', '')
def generate():
stream = ollama.chat(
model='llama3.2',
messages=[{'role': 'user', 'content': message}],
stream=True
)
for chunk in stream:
content = chunk['message']['content']
if content:
yield f"data: {json.dumps({'content': content})}\n\n"
return Response(generate(), mimetype='text/event-stream')