异步编程可以同时处理多个请求,提高程序效率。Python SDK 提供了完整的异步支持。
from ollama import AsyncClient
import asyncio
async def main():
client = AsyncClient()
response = await client.chat(
model='llama3.2',
messages=[{'role': 'user', 'content': '你好'}]
)
print(response['message']['content'])
asyncio.run(main())
from ollama import AsyncClient
async def chat(message):
client = AsyncClient()
response = await client.chat(
model='llama3.2',
messages=[{'role': 'user', 'content': message}]
)
return response['message']['content']
async def main():
reply = await chat('什么是 Python?')
print(reply)
asyncio.run(main())
from ollama import AsyncClient
async def stream_chat(message):
client = AsyncClient()
stream = await client.chat(
model='llama3.2',
messages=[{'role': 'user', 'content': message}],
stream=True
)
async for chunk in stream:
if chunk['message']['content']:
print(chunk['message']['content'], end='', flush=True)
asyncio.run(stream_chat('写一首诗'))
from ollama import AsyncClient
import asyncio
async def process_question(client, question):
response = await client.chat(
model='llama3.2',
messages=[{'role': 'user', 'content': question}]
)
return question, response['message']['content']
async def main():
client = AsyncClient()
questions = [
'什么是 Python?',
'什么是 JavaScript?',
'什么是 Go?'
]
tasks = [process_question(client, q) for q in questions]
results = await asyncio.gather(*tasks)
for question, answer in results:
print(f"问题: {question}")
print(f"回答: {answer[:100]}...")
print()
asyncio.run(main())
from ollama import AsyncClient
import asyncio
class AsyncOllamaSession:
def __init__(self, model='llama3.2', system=None):
self.model = model
self.client = AsyncClient()
self.messages = []
if system:
self.messages.append({'role': 'system', 'content': system})
async def send(self, content):
self.messages.append({'role': 'user', 'content': content})
response = await self.client.chat(
model=self.model,
messages=self.messages
)
reply = response['message']['content']
self.messages.append({'role': 'assistant', 'content': reply})
return reply
async def send_stream(self, content):
self.messages.append({'role': 'user', 'content': content})
stream = await self.client.chat(
model=self.model,
messages=self.messages,
stream=True
)
full_response = ''
async for chunk in stream:
text = chunk['message']['content']
if text:
print(text, end='', flush=True)
full_response += text
print()
self.messages.append({'role': 'assistant', 'content': full_response})
async def main():
session = AsyncOllamaSession(system='你是一个友好的助手')
await session.send_stream('你好')
await session.send_stream('写一首诗')
asyncio.run(main())
from ollama import AsyncClient
import asyncio
async def get_embeddings(texts):
client = AsyncClient()
async def embed(text):
response = await client.embeddings(
model='nomic-embed-text',
prompt=text
)
return text, response['embedding']
tasks = [embed(text) for text in texts]
results = await asyncio.gather(*tasks)
return dict(results)
async def main():
texts = [
'Python 是一种编程语言',
'JavaScript 用于网页开发',
'Go 是 Google 开发的语言'
]
embeddings = await get_embeddings(texts)
for text, emb in embeddings.items():
print(f"{text}: {len(emb)} 维向量")
asyncio.run(main())
from ollama import AsyncClient
import asyncio
async def chat_with_timeout(message, timeout=30):
client = AsyncClient()
try:
response = await asyncio.wait_for(
client.chat(
model='llama3.2',
messages=[{'role': 'user', 'content': message}]
),
timeout=timeout
)
return response['message']['content']
except asyncio.TimeoutError:
return "请求超时"
async def main():
result = await chat_with_timeout('写一首诗', timeout=10)
print(result)
asyncio.run(main())
from ollama import AsyncClient
import asyncio
async def chat_with_retry(message, max_retries=3):
client = AsyncClient()
for attempt in range(max_retries):
try:
response = await client.chat(
model='llama3.2',
messages=[{'role': 'user', 'content': message}]
)
return response['message']['content']
except Exception as e:
print(f"第 {attempt + 1} 次尝试失败: {e}")
await asyncio.sleep(1)
return None
async def main():
result = await chat_with_retry('你好')
if result:
print(result)
else:
print("请求失败")
asyncio.run(main())
from fastapi import FastAPI
from ollama import AsyncClient
import asyncio
app = FastAPI()
client = AsyncClient()
@app.post("/chat")
async def chat(message: str):
response = await client.chat(
model='llama3.2',
messages=[{'role': 'user', 'content': message}]
)
return {"response": response['message']['content']}
@app.post("/batch")
async def batch_chat(messages: list[str]):
async def process(msg):
response = await client.chat(
model='llama3.2',
messages=[{'role': 'user', 'content': msg}]
)
return response['message']['content']
results = await asyncio.gather(*[process(m) for m in messages])
return {"responses": results}
| 场景 | 推荐 |
|---|---|
| 脚本工具 | 同步 |
| Web 服务 | 异步 |
| 批量处理 | 异步 |
| 简单对话 | 同步 |
| 高并发场景 | 异步 |