连接池管理

连接池可以复用 HTTP 连接,减少连接建立的开销,提高性能。

为什么需要连接池

每次 HTTP 请求都需要:

  1. DNS 解析
  2. TCP 握手
  3. TLS 握手(HTTPS)

连接池复用已有连接,避免重复建立。

基本连接池

requests 连接池

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

class OllamaClient:
    def __init__(self, base_url='http://localhost:11434', pool_connections=10, pool_maxsize=10):
        self.base_url = base_url
        self.session = requests.Session()
        
        retry_strategy = Retry(
            total=3,
            backoff_factor=0.5,
            status_forcelist=[500, 502, 503, 504]
        )
        
        adapter = HTTPAdapter(
            max_retries=retry_strategy,
            pool_connections=pool_connections,
            pool_maxsize=pool_maxsize
        )
        
        self.session.mount('http://', adapter)
        self.session.mount('https://', adapter)
    
    def chat(self, model, messages, **options):
        response = self.session.post(
            f'{self.base_url}/api/chat',
            json={'model': model, 'messages': messages, **options},
            timeout=120
        )
        response.raise_for_status()
        return response.json()
    
    def generate(self, model, prompt, **options):
        response = self.session.post(
            f'{self.base_url}/api/generate',
            json={'model': model, 'prompt': prompt, **options},
            timeout=120
        )
        response.raise_for_status()
        return response.json()
    
    def close(self):
        self.session.close()

# 使用
client = OllamaClient(pool_connections=5, pool_maxsize=10)

response = client.chat('llama3.2', [{'role': 'user', 'content': '你好'}])
print(response['message']['content'])

client.close()

连接池管理器

import requests
from requests.adapters import HTTPAdapter
from contextlib import contextmanager
from threading import Lock

class ConnectionPoolManager:
    _instance = None
    _lock = Lock()
    
    def __new__(cls, *args, **kwargs):
        if cls._instance is None:
            with cls._lock:
                if cls._instance is None:
                    cls._instance = super().__new__(cls)
        return cls._instance
    
    def __init__(self, base_url='http://localhost:11434', max_pool_size=20):
        if not hasattr(self, 'initialized'):
            self.base_url = base_url
            self.max_pool_size = max_pool_size
            self.session = self._create_session()
            self.initialized = True
    
    def _create_session(self):
        session = requests.Session()
        
        adapter = HTTPAdapter(
            pool_connections=self.max_pool_size,
            pool_maxsize=self.max_pool_size,
            max_retries=3
        )
        
        session.mount('http://', adapter)
        session.mount('https://', adapter)
        
        return session
    
    def request(self, method, endpoint, **kwargs):
        url = f'{self.base_url}{endpoint}'
        response = self.session.request(method, url, **kwargs)
        response.raise_for_status()
        return response.json()
    
    def post(self, endpoint, **kwargs):
        return self.request('POST', endpoint, **kwargs)
    
    def get(self, endpoint, **kwargs):
        return self.request('GET', endpoint, **kwargs)

# 使用
pool = ConnectionPoolManager(max_pool_size=15)

response = pool.post('/api/chat', json={
    'model': 'llama3.2',
    'messages': [{'role': 'user', 'content': '你好'}]
})

异步连接池

import aiohttp
import asyncio
from typing import Optional

class AsyncConnectionPool:
    def __init__(self, base_url='http://localhost:11434', max_connections=20):
        self.base_url = base_url
        self.max_connections = max_connections
        self._connector: Optional[aiohttp.TCPConnector] = None
        self._session: Optional[aiohttp.ClientSession] = None
    
    async def __aenter__(self):
        self._connector = aiohttp.TCPConnector(
            limit=self.max_connections,
            limit_per_host=self.max_connections
        )
        self._session = aiohttp.ClientSession(connector=self._connector)
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self._session:
            await self._session.close()
        if self._connector:
            await self._connector.close()
    
    async def post(self, endpoint, json_data=None):
        url = f'{self.base_url}{endpoint}'
        async with self._session.post(url, json=json_data) as response:
            response.raise_for_status()
            return await response.json()
    
    async def get(self, endpoint):
        url = f'{self.base_url}{endpoint}'
        async with self._session.get(url) as response:
            response.raise_for_status()
            return await response.json()

# 使用
async def main():
    async with AsyncConnectionPool(max_connections=10) as pool:
        response = await pool.post('/api/chat', json_data={
            'model': 'llama3.2',
            'messages': [{'role': 'user', 'content': '你好'}]
        })
        print(response['message']['content'])

asyncio.run(main())

连接池监控

import requests
from requests.adapters import HTTPAdapter
import time

class MonitoredConnectionPool:
    def __init__(self, base_url='http://localhost:11434', pool_size=10):
        self.base_url = base_url
        self.pool_size = pool_size
        self.session = self._create_session()
        self.stats = {
            'total_requests': 0,
            'successful_requests': 0,
            'failed_requests': 0,
            'total_time': 0
        }
    
    def _create_session(self):
        session = requests.Session()
        adapter = HTTPAdapter(
            pool_connections=self.pool_size,
            pool_maxsize=self.pool_size
        )
        session.mount('http://', adapter)
        session.mount('https://', adapter)
        return session
    
    def request(self, method, endpoint, **kwargs):
        self.stats['total_requests'] += 1
        start_time = time.time()
        
        try:
            url = f'{self.base_url}{endpoint}'
            response = self.session.request(method, url, **kwargs)
            response.raise_for_status()
            
            self.stats['successful_requests'] += 1
            return response.json()
        except Exception as e:
            self.stats['failed_requests'] += 1
            raise
        finally:
            self.stats['total_time'] += time.time() - start_time
    
    def get_stats(self):
        avg_time = (
            self.stats['total_time'] / self.stats['total_requests']
            if self.stats['total_requests'] > 0 else 0
        )
        
        return {
            **self.stats,
            'average_time': avg_time,
            'success_rate': (
                self.stats['successful_requests'] / self.stats['total_requests'] * 100
                if self.stats['total_requests'] > 0 else 0
            )
        }
    
    def reset_stats(self):
        self.stats = {
            'total_requests': 0,
            'successful_requests': 0,
            'failed_requests': 0,
            'total_time': 0
        }

# 使用
pool = MonitoredConnectionPool(pool_size=10)

for i in range(10):
    pool.request('POST', '/api/generate', json={
        'model': 'llama3.2',
        'prompt': f'问题 {i}'
    })

stats = pool.get_stats()
print(f"成功率: {stats['success_rate']:.1f}%")
print(f"平均时间: {stats['average_time']:.2f}s")

连接池最佳实践

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

class OptimizedClient:
    def __init__(
        self,
        base_url='http://localhost:11434',
        pool_connections=10,
        pool_maxsize=20,
        max_retries=3,
        timeout=120
    ):
        self.base_url = base_url
        self.timeout = timeout
        self.session = requests.Session()
        
        retry_strategy = Retry(
            total=max_retries,
            backoff_factor=1,
            status_forcelist=[500, 502, 503, 504],
            allowed_methods=['POST', 'GET']
        )
        
        adapter = HTTPAdapter(
            max_retries=retry_strategy,
            pool_connections=pool_connections,
            pool_maxsize=pool_maxsize,
            pool_block=False
        )
        
        self.session.mount('http://', adapter)
        self.session.mount('https://', adapter)
    
    def chat(self, model, messages, stream=False, **options):
        return self.session.post(
            f'{self.base_url}/api/chat',
            json={
                'model': model,
                'messages': messages,
                'stream': stream,
                **options
            },
            timeout=self.timeout,
            stream=stream
        )
    
    def __enter__(self):
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.session.close()

# 使用
with OptimizedClient(pool_connections=5, pool_maxsize=15) as client:
    response = client.chat('llama3.2', [{'role': 'user', 'content': '你好'}])
    print(response.json()['message']['content'])