连接池可以复用 HTTP 连接,减少连接建立的开销,提高性能。
每次 HTTP 请求都需要:
连接池复用已有连接,避免重复建立。
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
class OllamaClient:
def __init__(self, base_url='http://localhost:11434', pool_connections=10, pool_maxsize=10):
self.base_url = base_url
self.session = requests.Session()
retry_strategy = Retry(
total=3,
backoff_factor=0.5,
status_forcelist=[500, 502, 503, 504]
)
adapter = HTTPAdapter(
max_retries=retry_strategy,
pool_connections=pool_connections,
pool_maxsize=pool_maxsize
)
self.session.mount('http://', adapter)
self.session.mount('https://', adapter)
def chat(self, model, messages, **options):
response = self.session.post(
f'{self.base_url}/api/chat',
json={'model': model, 'messages': messages, **options},
timeout=120
)
response.raise_for_status()
return response.json()
def generate(self, model, prompt, **options):
response = self.session.post(
f'{self.base_url}/api/generate',
json={'model': model, 'prompt': prompt, **options},
timeout=120
)
response.raise_for_status()
return response.json()
def close(self):
self.session.close()
# 使用
client = OllamaClient(pool_connections=5, pool_maxsize=10)
response = client.chat('llama3.2', [{'role': 'user', 'content': '你好'}])
print(response['message']['content'])
client.close()
import requests
from requests.adapters import HTTPAdapter
from contextlib import contextmanager
from threading import Lock
class ConnectionPoolManager:
_instance = None
_lock = Lock()
def __new__(cls, *args, **kwargs):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self, base_url='http://localhost:11434', max_pool_size=20):
if not hasattr(self, 'initialized'):
self.base_url = base_url
self.max_pool_size = max_pool_size
self.session = self._create_session()
self.initialized = True
def _create_session(self):
session = requests.Session()
adapter = HTTPAdapter(
pool_connections=self.max_pool_size,
pool_maxsize=self.max_pool_size,
max_retries=3
)
session.mount('http://', adapter)
session.mount('https://', adapter)
return session
def request(self, method, endpoint, **kwargs):
url = f'{self.base_url}{endpoint}'
response = self.session.request(method, url, **kwargs)
response.raise_for_status()
return response.json()
def post(self, endpoint, **kwargs):
return self.request('POST', endpoint, **kwargs)
def get(self, endpoint, **kwargs):
return self.request('GET', endpoint, **kwargs)
# 使用
pool = ConnectionPoolManager(max_pool_size=15)
response = pool.post('/api/chat', json={
'model': 'llama3.2',
'messages': [{'role': 'user', 'content': '你好'}]
})
import aiohttp
import asyncio
from typing import Optional
class AsyncConnectionPool:
def __init__(self, base_url='http://localhost:11434', max_connections=20):
self.base_url = base_url
self.max_connections = max_connections
self._connector: Optional[aiohttp.TCPConnector] = None
self._session: Optional[aiohttp.ClientSession] = None
async def __aenter__(self):
self._connector = aiohttp.TCPConnector(
limit=self.max_connections,
limit_per_host=self.max_connections
)
self._session = aiohttp.ClientSession(connector=self._connector)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self._session:
await self._session.close()
if self._connector:
await self._connector.close()
async def post(self, endpoint, json_data=None):
url = f'{self.base_url}{endpoint}'
async with self._session.post(url, json=json_data) as response:
response.raise_for_status()
return await response.json()
async def get(self, endpoint):
url = f'{self.base_url}{endpoint}'
async with self._session.get(url) as response:
response.raise_for_status()
return await response.json()
# 使用
async def main():
async with AsyncConnectionPool(max_connections=10) as pool:
response = await pool.post('/api/chat', json_data={
'model': 'llama3.2',
'messages': [{'role': 'user', 'content': '你好'}]
})
print(response['message']['content'])
asyncio.run(main())
import requests
from requests.adapters import HTTPAdapter
import time
class MonitoredConnectionPool:
def __init__(self, base_url='http://localhost:11434', pool_size=10):
self.base_url = base_url
self.pool_size = pool_size
self.session = self._create_session()
self.stats = {
'total_requests': 0,
'successful_requests': 0,
'failed_requests': 0,
'total_time': 0
}
def _create_session(self):
session = requests.Session()
adapter = HTTPAdapter(
pool_connections=self.pool_size,
pool_maxsize=self.pool_size
)
session.mount('http://', adapter)
session.mount('https://', adapter)
return session
def request(self, method, endpoint, **kwargs):
self.stats['total_requests'] += 1
start_time = time.time()
try:
url = f'{self.base_url}{endpoint}'
response = self.session.request(method, url, **kwargs)
response.raise_for_status()
self.stats['successful_requests'] += 1
return response.json()
except Exception as e:
self.stats['failed_requests'] += 1
raise
finally:
self.stats['total_time'] += time.time() - start_time
def get_stats(self):
avg_time = (
self.stats['total_time'] / self.stats['total_requests']
if self.stats['total_requests'] > 0 else 0
)
return {
**self.stats,
'average_time': avg_time,
'success_rate': (
self.stats['successful_requests'] / self.stats['total_requests'] * 100
if self.stats['total_requests'] > 0 else 0
)
}
def reset_stats(self):
self.stats = {
'total_requests': 0,
'successful_requests': 0,
'failed_requests': 0,
'total_time': 0
}
# 使用
pool = MonitoredConnectionPool(pool_size=10)
for i in range(10):
pool.request('POST', '/api/generate', json={
'model': 'llama3.2',
'prompt': f'问题 {i}'
})
stats = pool.get_stats()
print(f"成功率: {stats['success_rate']:.1f}%")
print(f"平均时间: {stats['average_time']:.2f}s")
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
class OptimizedClient:
def __init__(
self,
base_url='http://localhost:11434',
pool_connections=10,
pool_maxsize=20,
max_retries=3,
timeout=120
):
self.base_url = base_url
self.timeout = timeout
self.session = requests.Session()
retry_strategy = Retry(
total=max_retries,
backoff_factor=1,
status_forcelist=[500, 502, 503, 504],
allowed_methods=['POST', 'GET']
)
adapter = HTTPAdapter(
max_retries=retry_strategy,
pool_connections=pool_connections,
pool_maxsize=pool_maxsize,
pool_block=False
)
self.session.mount('http://', adapter)
self.session.mount('https://', adapter)
def chat(self, model, messages, stream=False, **options):
return self.session.post(
f'{self.base_url}/api/chat',
json={
'model': model,
'messages': messages,
'stream': stream,
**options
},
timeout=self.timeout,
stream=stream
)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.session.close()
# 使用
with OptimizedClient(pool_connections=5, pool_maxsize=15) as client:
response = client.chat('llama3.2', [{'role': 'user', 'content': '你好'}])
print(response.json()['message']['content'])