并发与限流

合理控制并发和限流可以保护服务稳定性,提高资源利用率。

并发控制

基本并发

import ollama
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict

def concurrent_chat(
    messages_list: List[List[Dict]],
    model: str = 'llama3.2',
    max_workers: int = 4
) -> List[str]:
    results = [None] * len(messages_list)
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {
            executor.submit(
                lambda m: ollama.chat(model=model, messages=m)['message']['content'],
                messages
            ): i
            for i, messages in enumerate(messages_list)
        }
        
        for future in as_completed(futures):
            index = futures[future]
            try:
                results[index] = future.result()
            except Exception as e:
                results[index] = f"错误: {e}"
    
    return results

# 使用
messages_list = [
    [{'role': 'user', 'content': '问题1'}],
    [{'role': 'user', 'content': '问题2'}],
    [{'role': 'user', 'content': '问题3'}]
]

results = concurrent_chat(messages_list, max_workers=2)

高级并发控制器

import ollama
from concurrent.futures import ThreadPoolExecutor
from threading import Semaphore
from typing import Callable, List, Any
import time

class ConcurrencyController:
    def __init__(self, max_concurrent: int = 4, max_workers: int = 8):
        self.semaphore = Semaphore(max_concurrent)
        self.max_workers = max_workers
    
    def execute(
        self,
        func: Callable,
        items: List[Any],
        on_complete: Callable = None
    ) -> List[Any]:
        def wrapped(item):
            with self.semaphore:
                return func(item)
        
        results = []
        
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            futures = [executor.submit(wrapped, item) for item in items]
            
            for future in futures:
                result = future.result()
                results.append(result)
                
                if on_complete:
                    on_complete(result)
        
        return results

# 使用
def process_question(question):
    return ollama.chat(
        model='llama3.2',
        messages=[{'role': 'user', 'content': question}]
    )['message']['content']

controller = ConcurrencyController(max_concurrent=3, max_workers=6)
results = controller.execute(process_question, ['问题1', '问题2', '问题3'])

限流策略

令牌桶限流

import time
from threading import Lock

class TokenBucket:
    def __init__(self, rate: float, capacity: int):
        self.rate = rate
        self.capacity = capacity
        self.tokens = capacity
        self.last_time = time.time()
        self.lock = Lock()
    
    def acquire(self, tokens: int = 1) -> bool:
        with self.lock:
            now = time.time()
            elapsed = now - self.last_time
            self.last_time = now
            
            self.tokens = min(
                self.capacity,
                self.tokens + elapsed * self.rate
            )
            
            if self.tokens >= tokens:
                self.tokens -= tokens
                return True
            
            return False
    
    def wait_and_acquire(self, tokens: int = 1):
        while not self.acquire(tokens):
            time.sleep(0.1)

# 使用
bucket = TokenBucket(rate=2.0, capacity=10)

def rate_limited_request(prompt):
    bucket.wait_and_acquire()
    return ollama.generate(model='llama3.2', prompt=prompt)['response']

滑动窗口限流

import time
from collections import deque
from threading import Lock

class SlidingWindowLimiter:
    def __init__(self, max_requests: int, window_seconds: float):
        self.max_requests = max_requests
        self.window = window_seconds
        self.requests = deque()
        self.lock = Lock()
    
    def acquire(self) -> bool:
        with self.lock:
            now = time.time()
            
            while self.requests and self.requests[0] <= now - self.window:
                self.requests.popleft()
            
            if len(self.requests) < self.max_requests:
                self.requests.append(now)
                return True
            
            return False
    
    def wait_and_acquire(self):
        while not self.acquire():
            time.sleep(0.1)

# 使用
limiter = SlidingWindowLimiter(max_requests=10, window_seconds=60)

for i in range(20):
    limiter.wait_and_acquire()
    print(f"请求 {i+1}")

组合限流器

import ollama
import time
from threading import Semaphore
from typing import List, Any, Callable

class RateLimitedExecutor:
    def __init__(
        self,
        max_concurrent: int = 4,
        requests_per_second: float = 2.0,
        max_workers: int = 8
    ):
        self.semaphore = Semaphore(max_concurrent)
        self.rps = requests_per_second
        self.min_interval = 1.0 / requests_per_second
        self.max_workers = max_workers
        self.last_request_time = 0
        self._lock = Semaphore(1)
    
    def _rate_limit(self):
        with self._lock:
            now = time.time()
            elapsed = now - self.last_request_time
            
            if elapsed < self.min_interval:
                time.sleep(self.min_interval - elapsed)
            
            self.last_request_time = time.time()
    
    def execute(
        self,
        func: Callable,
        items: List[Any],
        on_progress: Callable = None
    ) -> List[Any]:
        from concurrent.futures import ThreadPoolExecutor
        
        def wrapped(item):
            with self.semaphore:
                self._rate_limit()
                return func(item)
        
        results = []
        total = len(items)
        
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            futures = [executor.submit(wrapped, item) for item in items]
            
            for i, future in enumerate(futures):
                result = future.result()
                results.append(result)
                
                if on_progress:
                    on_progress(i + 1, total, result)
        
        return results

# 使用
def generate_text(prompt):
    return ollama.generate(model='llama3.2', prompt=prompt)['response']

executor = RateLimitedExecutor(
    max_concurrent=3,
    requests_per_second=1.5
)

def on_progress(current, total, result):
    print(f"进度: {current}/{total}")

prompts = ['问题1', '问题2', '问题3', '问题4', '问题5']
results = executor.execute(generate_text, prompts, on_progress)

自适应限流

import time
from threading import Lock
from typing import List

class AdaptiveLimiter:
    def __init__(
        self,
        initial_rate: float = 2.0,
        min_rate: float = 0.5,
        max_rate: float = 10.0
    ):
        self.rate = initial_rate
        self.min_rate = min_rate
        self.max_rate = max_rate
        self.success_count = 0
        self.failure_count = 0
        self.lock = Lock()
        self.last_time = time.time()
    
    def record_success(self):
        with self.lock:
            self.success_count += 1
            
            if self.success_count >= 10:
                self.rate = min(self.max_rate, self.rate * 1.2)
                self.success_count = 0
    
    def record_failure(self):
        with self.lock:
            self.failure_count += 1
            self.rate = max(self.min_rate, self.rate * 0.5)
            self.failure_count = 0
    
    def acquire(self):
        with self.lock:
            now = time.time()
            elapsed = now - self.last_time
            min_interval = 1.0 / self.rate
            
            if elapsed < min_interval:
                time.sleep(min_interval - elapsed)
            
            self.last_time = time.time()

# 使用
limiter = AdaptiveLimiter(initial_rate=2.0)

def safe_request(prompt):
    limiter.acquire()
    try:
        result = ollama.generate(model='llama3.2', prompt=prompt)
        limiter.record_success()
        return result['response']
    except Exception as e:
        limiter.record_failure()
        raise