Python 异步编程

异步编程可以同时处理多个请求,提高程序效率。Python SDK 提供了完整的异步支持。

异步客户端

from ollama import AsyncClient
import asyncio

async def main():
    client = AsyncClient()
    
    response = await client.chat(
        model='llama3.2',
        messages=[{'role': 'user', 'content': '你好'}]
    )
    
    print(response['message']['content'])

asyncio.run(main())

异步聊天

from ollama import AsyncClient

async def chat(message):
    client = AsyncClient()
    
    response = await client.chat(
        model='llama3.2',
        messages=[{'role': 'user', 'content': message}]
    )
    
    return response['message']['content']

async def main():
    reply = await chat('什么是 Python?')
    print(reply)

asyncio.run(main())

异步流式

from ollama import AsyncClient

async def stream_chat(message):
    client = AsyncClient()
    
    stream = await client.chat(
        model='llama3.2',
        messages=[{'role': 'user', 'content': message}],
        stream=True
    )
    
    async for chunk in stream:
        if chunk['message']['content']:
            print(chunk['message']['content'], end='', flush=True)

asyncio.run(stream_chat('写一首诗'))

并发请求

from ollama import AsyncClient
import asyncio

async def process_question(client, question):
    response = await client.chat(
        model='llama3.2',
        messages=[{'role': 'user', 'content': question}]
    )
    return question, response['message']['content']

async def main():
    client = AsyncClient()
    
    questions = [
        '什么是 Python?',
        '什么是 JavaScript?',
        '什么是 Go?'
    ]
    
    tasks = [process_question(client, q) for q in questions]
    results = await asyncio.gather(*tasks)
    
    for question, answer in results:
        print(f"问题: {question}")
        print(f"回答: {answer[:100]}...")
        print()

asyncio.run(main())

异步上下文管理

from ollama import AsyncClient
import asyncio

class AsyncOllamaSession:
    def __init__(self, model='llama3.2', system=None):
        self.model = model
        self.client = AsyncClient()
        self.messages = []
        if system:
            self.messages.append({'role': 'system', 'content': system})
    
    async def send(self, content):
        self.messages.append({'role': 'user', 'content': content})
        
        response = await self.client.chat(
            model=self.model,
            messages=self.messages
        )
        
        reply = response['message']['content']
        self.messages.append({'role': 'assistant', 'content': reply})
        
        return reply
    
    async def send_stream(self, content):
        self.messages.append({'role': 'user', 'content': content})
        
        stream = await self.client.chat(
            model=self.model,
            messages=self.messages,
            stream=True
        )
        
        full_response = ''
        async for chunk in stream:
            text = chunk['message']['content']
            if text:
                print(text, end='', flush=True)
                full_response += text
        
        print()
        self.messages.append({'role': 'assistant', 'content': full_response})

async def main():
    session = AsyncOllamaSession(system='你是一个友好的助手')
    
    await session.send_stream('你好')
    await session.send_stream('写一首诗')

asyncio.run(main())

异步批量处理

from ollama import AsyncClient
import asyncio

async def get_embeddings(texts):
    client = AsyncClient()
    
    async def embed(text):
        response = await client.embeddings(
            model='nomic-embed-text',
            prompt=text
        )
        return text, response['embedding']
    
    tasks = [embed(text) for text in texts]
    results = await asyncio.gather(*tasks)
    
    return dict(results)

async def main():
    texts = [
        'Python 是一种编程语言',
        'JavaScript 用于网页开发',
        'Go 是 Google 开发的语言'
    ]
    
    embeddings = await get_embeddings(texts)
    
    for text, emb in embeddings.items():
        print(f"{text}: {len(emb)} 维向量")

asyncio.run(main())

超时控制

from ollama import AsyncClient
import asyncio

async def chat_with_timeout(message, timeout=30):
    client = AsyncClient()
    
    try:
        response = await asyncio.wait_for(
            client.chat(
                model='llama3.2',
                messages=[{'role': 'user', 'content': message}]
            ),
            timeout=timeout
        )
        return response['message']['content']
    except asyncio.TimeoutError:
        return "请求超时"

async def main():
    result = await chat_with_timeout('写一首诗', timeout=10)
    print(result)

asyncio.run(main())

异步重试

from ollama import AsyncClient
import asyncio

async def chat_with_retry(message, max_retries=3):
    client = AsyncClient()
    
    for attempt in range(max_retries):
        try:
            response = await client.chat(
                model='llama3.2',
                messages=[{'role': 'user', 'content': message}]
            )
            return response['message']['content']
        except Exception as e:
            print(f"第 {attempt + 1} 次尝试失败: {e}")
            await asyncio.sleep(1)
    
    return None

async def main():
    result = await chat_with_retry('你好')
    if result:
        print(result)
    else:
        print("请求失败")

asyncio.run(main())

异步 Web 服务示例

from fastapi import FastAPI
from ollama import AsyncClient
import asyncio

app = FastAPI()
client = AsyncClient()

@app.post("/chat")
async def chat(message: str):
    response = await client.chat(
        model='llama3.2',
        messages=[{'role': 'user', 'content': message}]
    )
    return {"response": response['message']['content']}

@app.post("/batch")
async def batch_chat(messages: list[str]):
    async def process(msg):
        response = await client.chat(
            model='llama3.2',
            messages=[{'role': 'user', 'content': msg}]
        )
        return response['message']['content']
    
    results = await asyncio.gather(*[process(m) for m in messages])
    return {"responses": results}

同步 vs 异步

场景推荐
脚本工具同步
Web 服务异步
批量处理异步
简单对话同步
高并发场景异步