LLM 接口层
本文档介绍 SimpleLLMFunc 的 LLM 接口层设计,包括接口抽象、密钥管理和流量控制等核心组件。
概述
SimpleLLMFunc 提供了完整的 LLM 接口框架,旨在统一不同 LLM 服务提供商的调用方式。该框架包含三个核心组件:
LLM_Interface - 抽象基类,定义标准接口规范
APIKeyPool - API 密钥池,实现负载均衡
TokenBucket - 令牌桶算法,提供流量控制
快速开始
最简单的使用方式
from SimpleLLMFunc import OpenAICompatible
# 从配置文件加载 LLM 接口
llm = OpenAICompatible.load_from_json_file("provider.json")["openai"]["gpt-3.5-turbo"]
# 现在可以在装饰器中使用
from SimpleLLMFunc import llm_function
@llm_function(llm_interface=llm)
async def my_task(text: str) -> str:
"""处理文本任务"""
pass
配置文件格式
创建 provider.json 文件:
{
"openai": [
{
"model_name": "gpt-3.5-turbo",
"api_keys": ["sk-key1", "sk-key2"],
"base_url": "https://api.openai.com/v1",
"max_retries": 5,
"retry_delay": 1.0,
"rate_limit_capacity": 20,
"rate_limit_refill_rate": 3.0
},
{
"model_name": "gpt-4",
"api_keys": ["sk-key3"],
"base_url": "https://api.openai.com/v1"
}
],
"deepseek": [
{
"model_name": "deepseek-chat",
"api_keys": ["your-deepseek-key"],
"base_url": "https://api.deepseek.com/v1"
}
]
}
LLM_Interface 基类
设计理念
LLM_Interface 是所有 LLM 实现的抽象基类,定义了统一的接口规范。所有具体实现(如 OpenAICompatible)都继承自这个类。
核心特性
标准化接口: 定义了
chat()和chat_stream()两个核心方法类型安全: 使用 Python 类型注解确保类型正确性
异步原生: 完全基于异步编程,支持高并发
可扩展性: 通过继承轻松添加新的 LLM 服务支持
接口定义
from abc import ABC, abstractmethod
from typing import Iterable, Dict, Optional, Literal, AsyncGenerator
from openai.types.chat.chat_completion import ChatCompletion
from openai.types.chat.chat_completion_chunk import ChatCompletionChunk
class LLM_Interface(ABC):
"""LLM 接口抽象基类"""
@abstractmethod
async def chat(
self,
trace_id: str = get_current_trace_id(),
stream: Literal[False] = False,
messages: Iterable[Dict[str, str]] = [{"role": "user", "content": ""}],
timeout: Optional[int] = None,
*args,
**kwargs,
) -> ChatCompletion:
"""执行非流式对话请求"""
pass
@abstractmethod
async def chat_stream(
self,
trace_id: str = get_current_trace_id(),
stream: Literal[True] = True,
messages: Iterable[Dict[str, str]] = [{"role": "user", "content": ""}],
timeout: Optional[int] = None,
*args,
**kwargs,
) -> AsyncGenerator[ChatCompletionChunk, None]:
"""执行流式对话请求"""
if False:
yield ChatCompletionChunk(id="", created=0, model="", object="", choices=[])
OpenAICompatible 实现
什么是 OpenAICompatible
OpenAICompatible 是 LLM_Interface 的具体实现,支持任何兼容 OpenAI API 格式的服务,包括:
OpenAI (GPT-4, GPT-3.5)
Deepseek
Anthropic Claude
火山引擎 Ark
百度千帆
本地 LLM (Ollama, vLLM)
任何兼容 OpenAI API 的服务
创建接口
方式 1: 从配置文件加载(推荐)
from SimpleLLMFunc import OpenAICompatible
# 从 JSON 配置文件加载
all_models = OpenAICompatible.load_from_json_file("provider.json")
# 访问具体的模型
gpt35 = all_models["openai"]["gpt-3.5-turbo"]
gpt4 = all_models["openai"]["gpt-4"]
deepseek = all_models["deepseek"]["deepseek-chat"]
方式 2: 直接创建
from SimpleLLMFunc import OpenAICompatible, APIKeyPool
# 创建 API 密钥池
key_pool = APIKeyPool(
api_keys=["sk-key1", "sk-key2", "sk-key3"],
provider_id="openai-gpt35"
)
# 创建 LLM 接口
llm = OpenAICompatible(
api_key_pool=key_pool,
model_name="gpt-3.5-turbo",
base_url="https://api.openai.com/v1",
max_retries=5,
retry_delay=1.0,
rate_limit_capacity=20,
rate_limit_refill_rate=3.0
)
核心特性
自动重试: 请求失败时自动重试(可配置次数)
令牌统计: 自动统计 input 和 output token 数量
流量控制: 使用令牌桶算法防止速率限制
密钥轮换: 自动在多个密钥间负载均衡
APIKeyPool - 密钥管理
设计理念
APIKeyPool 使用小根堆数据结构实现 API 密钥的负载均衡,确保请求均匀分布到不同的密钥上。
核心特性
小根堆轮换: 自动选择负载最低的密钥
负载均衡: 实时跟踪每个密钥的任务数量
线程安全: 使用锁保护并发访问
单例模式: 相同 provider_id 的密钥池共享状态
工作原理
堆结构:[(任务数, API密钥), ...]
堆顶永远是任务数最少的密钥
示例:
heap = [(0, "key1"), (1, "key2"), (2, "key3")]
获取最低负载密钥 → "key1"
key1 任务完成后重新整理堆...
使用示例
from SimpleLLMFunc.interface import APIKeyPool
# 创建密钥池
key_pool = APIKeyPool(
api_keys=["sk-key1", "sk-key2", "sk-key3"],
provider_id="my-provider"
)
# 获取最低负载的密钥
key = key_pool.get_least_loaded_key()
# 任务开始时增加计数
key_pool.increment_task_count(key)
# 任务完成后减少计数
key_pool.decrement_task_count(key)
TokenBucket - 流量控制
设计理念
TokenBucket 实现经典的令牌桶算法,用于 API 请求的流量控制和速率限制。
算法原理
令牌桶工作原理:
令牌生成: 以固定速率向桶中添加令牌
令牌消费: 每次请求消费一个或多个令牌
容量限制: 桶有最大容量,多余令牌会被丢弃
突发支持: 当桶中有足够令牌时,允许突发请求
示例:
容量=5,补充速率=2 tokens/秒
时间: 0s 1s 2s 3s
桶: [●●●●●] [●●●●●] [●●●●●] [●●●]
请求: 0个 0个 2个 2个
核心特性
平滑流量: 避免突发请求冲击后端 API
可配置参数: 支持自定义容量和补充速率
异步支持: 非阻塞的令牌获取,支持超时
线程安全: 统一的锁保护所有操作
配置参数
参数 |
类型 |
说明 |
推荐值 |
|---|---|---|---|
|
int |
令牌桶容量(最大令牌数) |
10-50 |
|
float |
令牌补充速率(tokens/秒) |
0.5-5.0 |
使用场景
# 高频 API(如 OpenAI)
capacity=20, refill_rate=3.0
# 标准 API
capacity=10, refill_rate=1.0
# 受限 API(如百度千帆)
capacity=5, refill_rate=0.5
完整的生产级示例
1. 多模型配置
from SimpleLLMFunc import OpenAICompatible, llm_function
# 加载多个模型
models = OpenAICompatible.load_from_json_file("provider.json")
# 定义不同的任务用不同的模型
fast_llm = models["openai"]["gpt-3.5-turbo"] # 快速、便宜
powerful_llm = models["openai"]["gpt-4"] # 强大、昂贵
deepseek_llm = models["deepseek"]["deepseek-chat"] # 国内服务
@llm_function(llm_interface=fast_llm)
async def simple_task(text: str) -> str:
"""简单任务使用快速模型"""
pass
@llm_function(llm_interface=powerful_llm)
async def complex_task(text: str) -> str:
"""复杂任务使用强大模型"""
pass
2. 错误处理和重试
import asyncio
from SimpleLLMFunc import llm_function
@llm_function(llm_interface=llm)
async def robust_call(text: str) -> str:
"""可靠的 LLM 调用"""
pass
async def call_with_fallback():
"""带备选方案的调用"""
try:
result = await robust_call("test")
return result
except Exception as e:
print(f"主模型失败: {e}")
# 使用备选模型
return await backup_call("test")
3. 监控和调试
# 使用事件流获取 token 用量(推荐)
from SimpleLLMFunc.hooks.events import LLMCallEndEvent
from SimpleLLMFunc.hooks.stream import is_event_yield
async for output in summarize_text("..."):
if is_event_yield(output) and isinstance(output.event, LLMCallEndEvent):
print(output.event.usage)
# 检查密钥池状态
least_loaded = llm.key_pool.get_least_loaded_key()
print(f"最低负载密钥: {least_loaded}")
# 检查令牌桶状态
print(llm.get_rate_limit_status())
最佳实践
1. 密钥管理
# ✅ 推荐:为每个项目/环境配置独立的密钥
{
"openai": {
"prod": {
"api_keys": ["prod-key1", "prod-key2"],
"max_retries": 5
},
"dev": {
"api_keys": ["dev-key1"],
"max_retries": 2
}
}
}
# ✅ 推荐:为不同模型配置不同的速率限制
{
"openai": {
"gpt-4": {
"rate_limit_capacity": 10, # 小容量,保守
"rate_limit_refill_rate": 1.0
},
"gpt-3.5-turbo": {
"rate_limit_capacity": 50, # 大容量,激进
"rate_limit_refill_rate": 5.0
}
}
}
2. 流量控制
# ✅ 推荐:根据 API 限制调整参数
# OpenAI:rate_limit_capacity=10-20, refill_rate=1.0-3.0
# Deepseek:rate_limit_capacity=10-15, refill_rate=2.0-3.0
# 本地 LLM:capacity=100, refill_rate=10.0(几乎无限)
3. 错误处理
import asyncio
from typing import Optional
async def call_with_exponential_backoff(
llm_call,
max_retries: int = 3,
base_delay: float = 1.0
) -> Optional[str]:
"""带指数退避的重试"""
for attempt in range(max_retries):
try:
return await llm_call()
except Exception as e:
if attempt == max_retries - 1:
raise
delay = base_delay * (2 ** attempt)
print(f"第 {attempt + 1} 次失败,等待 {delay} 秒后重试...")
await asyncio.sleep(delay)
故障排除
常见问题
“Rate limit exceeded” 错误
增加
rate_limit_capacity或rate_limit_refill_rate检查配置中的速率限制是否与 API 限制匹配
密钥持续失败
检查 API 密钥是否有效且有足够配额
验证
base_url是否正确
Token 统计不准确
某些 API 可能不返回 token 统计信息
框架会尽力估算,但可能不完全准确
调试技巧
import logging
# 启用详细日志
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("SimpleLLMFunc")
logger.setLevel(logging.DEBUG)
# 查看请求/响应详情
总结
SimpleLLMFunc 的接口层通过以下设计提供强大的 LLM 服务:
LLM_Interface: 统一的抽象基类
OpenAICompatible: 开箱即用的 OpenAI 兼容实现
APIKeyPool: 智能的密钥负载均衡
TokenBucket: 可靠的流量控制
这种设计既保证了易用性,又提供了企业级的功能和可靠性。