合理控制并发和限流可以保护服务稳定性,提高资源利用率。
import ollama
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict
def concurrent_chat(
messages_list: List[List[Dict]],
model: str = 'llama3.2',
max_workers: int = 4
) -> List[str]:
results = [None] * len(messages_list)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(
lambda m: ollama.chat(model=model, messages=m)['message']['content'],
messages
): i
for i, messages in enumerate(messages_list)
}
for future in as_completed(futures):
index = futures[future]
try:
results[index] = future.result()
except Exception as e:
results[index] = f"错误: {e}"
return results
# 使用
messages_list = [
[{'role': 'user', 'content': '问题1'}],
[{'role': 'user', 'content': '问题2'}],
[{'role': 'user', 'content': '问题3'}]
]
results = concurrent_chat(messages_list, max_workers=2)
import ollama
from concurrent.futures import ThreadPoolExecutor
from threading import Semaphore
from typing import Callable, List, Any
import time
class ConcurrencyController:
def __init__(self, max_concurrent: int = 4, max_workers: int = 8):
self.semaphore = Semaphore(max_concurrent)
self.max_workers = max_workers
def execute(
self,
func: Callable,
items: List[Any],
on_complete: Callable = None
) -> List[Any]:
def wrapped(item):
with self.semaphore:
return func(item)
results = []
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = [executor.submit(wrapped, item) for item in items]
for future in futures:
result = future.result()
results.append(result)
if on_complete:
on_complete(result)
return results
# 使用
def process_question(question):
return ollama.chat(
model='llama3.2',
messages=[{'role': 'user', 'content': question}]
)['message']['content']
controller = ConcurrencyController(max_concurrent=3, max_workers=6)
results = controller.execute(process_question, ['问题1', '问题2', '问题3'])
import time
from threading import Lock
class TokenBucket:
def __init__(self, rate: float, capacity: int):
self.rate = rate
self.capacity = capacity
self.tokens = capacity
self.last_time = time.time()
self.lock = Lock()
def acquire(self, tokens: int = 1) -> bool:
with self.lock:
now = time.time()
elapsed = now - self.last_time
self.last_time = now
self.tokens = min(
self.capacity,
self.tokens + elapsed * self.rate
)
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
def wait_and_acquire(self, tokens: int = 1):
while not self.acquire(tokens):
time.sleep(0.1)
# 使用
bucket = TokenBucket(rate=2.0, capacity=10)
def rate_limited_request(prompt):
bucket.wait_and_acquire()
return ollama.generate(model='llama3.2', prompt=prompt)['response']
import time
from collections import deque
from threading import Lock
class SlidingWindowLimiter:
def __init__(self, max_requests: int, window_seconds: float):
self.max_requests = max_requests
self.window = window_seconds
self.requests = deque()
self.lock = Lock()
def acquire(self) -> bool:
with self.lock:
now = time.time()
while self.requests and self.requests[0] <= now - self.window:
self.requests.popleft()
if len(self.requests) < self.max_requests:
self.requests.append(now)
return True
return False
def wait_and_acquire(self):
while not self.acquire():
time.sleep(0.1)
# 使用
limiter = SlidingWindowLimiter(max_requests=10, window_seconds=60)
for i in range(20):
limiter.wait_and_acquire()
print(f"请求 {i+1}")
import ollama
import time
from threading import Semaphore
from typing import List, Any, Callable
class RateLimitedExecutor:
def __init__(
self,
max_concurrent: int = 4,
requests_per_second: float = 2.0,
max_workers: int = 8
):
self.semaphore = Semaphore(max_concurrent)
self.rps = requests_per_second
self.min_interval = 1.0 / requests_per_second
self.max_workers = max_workers
self.last_request_time = 0
self._lock = Semaphore(1)
def _rate_limit(self):
with self._lock:
now = time.time()
elapsed = now - self.last_request_time
if elapsed < self.min_interval:
time.sleep(self.min_interval - elapsed)
self.last_request_time = time.time()
def execute(
self,
func: Callable,
items: List[Any],
on_progress: Callable = None
) -> List[Any]:
from concurrent.futures import ThreadPoolExecutor
def wrapped(item):
with self.semaphore:
self._rate_limit()
return func(item)
results = []
total = len(items)
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = [executor.submit(wrapped, item) for item in items]
for i, future in enumerate(futures):
result = future.result()
results.append(result)
if on_progress:
on_progress(i + 1, total, result)
return results
# 使用
def generate_text(prompt):
return ollama.generate(model='llama3.2', prompt=prompt)['response']
executor = RateLimitedExecutor(
max_concurrent=3,
requests_per_second=1.5
)
def on_progress(current, total, result):
print(f"进度: {current}/{total}")
prompts = ['问题1', '问题2', '问题3', '问题4', '问题5']
results = executor.execute(generate_text, prompts, on_progress)
import time
from threading import Lock
from typing import List
class AdaptiveLimiter:
def __init__(
self,
initial_rate: float = 2.0,
min_rate: float = 0.5,
max_rate: float = 10.0
):
self.rate = initial_rate
self.min_rate = min_rate
self.max_rate = max_rate
self.success_count = 0
self.failure_count = 0
self.lock = Lock()
self.last_time = time.time()
def record_success(self):
with self.lock:
self.success_count += 1
if self.success_count >= 10:
self.rate = min(self.max_rate, self.rate * 1.2)
self.success_count = 0
def record_failure(self):
with self.lock:
self.failure_count += 1
self.rate = max(self.min_rate, self.rate * 0.5)
self.failure_count = 0
def acquire(self):
with self.lock:
now = time.time()
elapsed = now - self.last_time
min_interval = 1.0 / self.rate
if elapsed < min_interval:
time.sleep(min_interval - elapsed)
self.last_time = time.time()
# 使用
limiter = AdaptiveLimiter(initial_rate=2.0)
def safe_request(prompt):
limiter.acquire()
try:
result = ollama.generate(model='llama3.2', prompt=prompt)
limiter.record_success()
return result['response']
except Exception as e:
limiter.record_failure()
raise