LLM 接口层

本文档介绍 SimpleLLMFunc 的 LLM 接口层设计,包括接口抽象、密钥管理和流量控制等核心组件。

概述

SimpleLLMFunc 提供了完整的 LLM 接口框架,旨在统一不同 LLM 服务提供商的调用方式。该框架包含三个核心组件:

  1. LLM_Interface - 抽象基类,定义标准接口规范

  2. APIKeyPool - API 密钥池,实现负载均衡

  3. TokenBucket - 令牌桶算法,提供流量控制

快速开始

最简单的使用方式

from SimpleLLMFunc import OpenAICompatible

# 从配置文件加载 LLM 接口
llm = OpenAICompatible.load_from_json_file("provider.json")["openai"]["gpt-3.5-turbo"]

# 现在可以在装饰器中使用
from SimpleLLMFunc import llm_function

@llm_function(llm_interface=llm)
async def my_task(text: str) -> str:
    """处理文本任务"""
    pass

配置文件格式

创建 provider.json 文件:

{
  "openai": [
    {
      "model_name": "gpt-3.5-turbo",
      "api_keys": ["sk-key1", "sk-key2"],
      "base_url": "https://api.openai.com/v1",
      "max_retries": 5,
      "retry_delay": 1.0,
      "rate_limit_capacity": 20,
      "rate_limit_refill_rate": 3.0
    },
    {
      "model_name": "gpt-4",
      "api_keys": ["sk-key3"],
      "base_url": "https://api.openai.com/v1"
    }
  ],
  "deepseek": [
    {
      "model_name": "deepseek-chat",
      "api_keys": ["your-deepseek-key"],
      "base_url": "https://api.deepseek.com/v1"
    }
  ]
}

LLM_Interface 基类

设计理念

LLM_Interface 是所有 LLM 实现的抽象基类,定义了统一的接口规范。所有具体实现(如 OpenAICompatible)都继承自这个类。

核心特性

  • 标准化接口: 定义了 chat()chat_stream() 两个核心方法

  • 类型安全: 使用 Python 类型注解确保类型正确性

  • 异步原生: 完全基于异步编程,支持高并发

  • 可扩展性: 通过继承轻松添加新的 LLM 服务支持

接口定义

from abc import ABC, abstractmethod
from typing import Iterable, Dict, Optional, Literal, AsyncGenerator
from openai.types.chat.chat_completion import ChatCompletion
from openai.types.chat.chat_completion_chunk import ChatCompletionChunk

class LLM_Interface(ABC):
    """LLM 接口抽象基类"""

    @abstractmethod
    async def chat(
        self,
        trace_id: str = get_current_trace_id(),
        stream: Literal[False] = False,
        messages: Iterable[Dict[str, str]] = [{"role": "user", "content": ""}],
        timeout: Optional[int] = None,
        *args,
        **kwargs,
    ) -> ChatCompletion:
        """执行非流式对话请求"""
        pass

    @abstractmethod
    async def chat_stream(
        self,
        trace_id: str = get_current_trace_id(),
        stream: Literal[True] = True,
        messages: Iterable[Dict[str, str]] = [{"role": "user", "content": ""}],
        timeout: Optional[int] = None,
        *args,
        **kwargs,
    ) -> AsyncGenerator[ChatCompletionChunk, None]:
        """执行流式对话请求"""
        if False:
            yield ChatCompletionChunk(id="", created=0, model="", object="", choices=[])

OpenAICompatible 实现

什么是 OpenAICompatible

OpenAICompatibleLLM_Interface 的具体实现,支持任何兼容 OpenAI API 格式的服务,包括:

  • OpenAI (GPT-4, GPT-3.5)

  • Deepseek

  • Anthropic Claude

  • 火山引擎 Ark

  • 百度千帆

  • 本地 LLM (Ollama, vLLM)

  • 任何兼容 OpenAI API 的服务

创建接口

方式 1: 从配置文件加载(推荐)

from SimpleLLMFunc import OpenAICompatible

# 从 JSON 配置文件加载
all_models = OpenAICompatible.load_from_json_file("provider.json")

# 访问具体的模型
gpt35 = all_models["openai"]["gpt-3.5-turbo"]
gpt4 = all_models["openai"]["gpt-4"]
deepseek = all_models["deepseek"]["deepseek-chat"]

方式 2: 直接创建

from SimpleLLMFunc import OpenAICompatible, APIKeyPool

# 创建 API 密钥池
key_pool = APIKeyPool(
    api_keys=["sk-key1", "sk-key2", "sk-key3"],
    provider_id="openai-gpt35"
)

# 创建 LLM 接口
llm = OpenAICompatible(
    api_key_pool=key_pool,
    model_name="gpt-3.5-turbo",
    base_url="https://api.openai.com/v1",
    max_retries=5,
    retry_delay=1.0,
    rate_limit_capacity=20,
    rate_limit_refill_rate=3.0
)

核心特性

  • 自动重试: 请求失败时自动重试(可配置次数)

  • 令牌统计: 自动统计 input 和 output token 数量

  • 流量控制: 使用令牌桶算法防止速率限制

  • 密钥轮换: 自动在多个密钥间负载均衡

APIKeyPool - 密钥管理

设计理念

APIKeyPool 使用小根堆数据结构实现 API 密钥的负载均衡,确保请求均匀分布到不同的密钥上。

核心特性

  • 小根堆轮换: 自动选择负载最低的密钥

  • 负载均衡: 实时跟踪每个密钥的任务数量

  • 线程安全: 使用锁保护并发访问

  • 单例模式: 相同 provider_id 的密钥池共享状态

工作原理

堆结构:[(任务数, API密钥), ...]
堆顶永远是任务数最少的密钥

示例:
heap = [(0, "key1"), (1, "key2"), (2, "key3")]
获取最低负载密钥 → "key1"
key1 任务完成后重新整理堆...

使用示例

from SimpleLLMFunc.interface import APIKeyPool

# 创建密钥池
key_pool = APIKeyPool(
    api_keys=["sk-key1", "sk-key2", "sk-key3"],
    provider_id="my-provider"
)

# 获取最低负载的密钥
key = key_pool.get_least_loaded_key()

# 任务开始时增加计数
key_pool.increment_task_count(key)

# 任务完成后减少计数
key_pool.decrement_task_count(key)

TokenBucket - 流量控制

设计理念

TokenBucket 实现经典的令牌桶算法,用于 API 请求的流量控制和速率限制。

算法原理

令牌桶工作原理:

  1. 令牌生成: 以固定速率向桶中添加令牌

  2. 令牌消费: 每次请求消费一个或多个令牌

  3. 容量限制: 桶有最大容量,多余令牌会被丢弃

  4. 突发支持: 当桶中有足够令牌时,允许突发请求

示例:

容量=5,补充速率=2 tokens/秒

时间: 0s      1s      2s      3s
桶:  [●●●●●] [●●●●●] [●●●●●] [●●●]
请求:  0个    0个    2个    2个

核心特性

  • 平滑流量: 避免突发请求冲击后端 API

  • 可配置参数: 支持自定义容量和补充速率

  • 异步支持: 非阻塞的令牌获取,支持超时

  • 线程安全: 统一的锁保护所有操作

配置参数

参数

类型

说明

推荐值

capacity

int

令牌桶容量(最大令牌数)

10-50

refill_rate

float

令牌补充速率(tokens/秒)

0.5-5.0

使用场景

# 高频 API(如 OpenAI)
capacity=20, refill_rate=3.0

# 标准 API
capacity=10, refill_rate=1.0

# 受限 API(如百度千帆)
capacity=5, refill_rate=0.5

完整的生产级示例

1. 多模型配置

from SimpleLLMFunc import OpenAICompatible, llm_function

# 加载多个模型
models = OpenAICompatible.load_from_json_file("provider.json")

# 定义不同的任务用不同的模型
fast_llm = models["openai"]["gpt-3.5-turbo"]  # 快速、便宜
powerful_llm = models["openai"]["gpt-4"]      # 强大、昂贵
deepseek_llm = models["deepseek"]["deepseek-chat"] # 国内服务

@llm_function(llm_interface=fast_llm)
async def simple_task(text: str) -> str:
    """简单任务使用快速模型"""
    pass

@llm_function(llm_interface=powerful_llm)
async def complex_task(text: str) -> str:
    """复杂任务使用强大模型"""
    pass

2. 错误处理和重试

import asyncio
from SimpleLLMFunc import llm_function

@llm_function(llm_interface=llm)
async def robust_call(text: str) -> str:
    """可靠的 LLM 调用"""
    pass

async def call_with_fallback():
    """带备选方案的调用"""
    try:
        result = await robust_call("test")
        return result
    except Exception as e:
        print(f"主模型失败: {e}")
        # 使用备选模型
        return await backup_call("test")

3. 监控和调试

# 使用事件流获取 token 用量(推荐)
from SimpleLLMFunc.hooks.events import LLMCallEndEvent
from SimpleLLMFunc.hooks.stream import is_event_yield

async for output in summarize_text("..."):
    if is_event_yield(output) and isinstance(output.event, LLMCallEndEvent):
        print(output.event.usage)

# 检查密钥池状态
least_loaded = llm.key_pool.get_least_loaded_key()
print(f"最低负载密钥: {least_loaded}")

# 检查令牌桶状态
print(llm.get_rate_limit_status())

最佳实践

1. 密钥管理

# ✅ 推荐:为每个项目/环境配置独立的密钥
{
  "openai": {
    "prod": {
      "api_keys": ["prod-key1", "prod-key2"],
      "max_retries": 5
    },
    "dev": {
      "api_keys": ["dev-key1"],
      "max_retries": 2
    }
  }
}

# ✅ 推荐:为不同模型配置不同的速率限制
{
  "openai": {
    "gpt-4": {
      "rate_limit_capacity": 10,  # 小容量,保守
      "rate_limit_refill_rate": 1.0
    },
    "gpt-3.5-turbo": {
      "rate_limit_capacity": 50,  # 大容量,激进
      "rate_limit_refill_rate": 5.0
    }
  }
}

2. 流量控制

# ✅ 推荐:根据 API 限制调整参数
# OpenAI:rate_limit_capacity=10-20, refill_rate=1.0-3.0
# Deepseek:rate_limit_capacity=10-15, refill_rate=2.0-3.0
# 本地 LLM:capacity=100, refill_rate=10.0(几乎无限)

3. 错误处理

import asyncio
from typing import Optional

async def call_with_exponential_backoff(
    llm_call,
    max_retries: int = 3,
    base_delay: float = 1.0
) -> Optional[str]:
    """带指数退避的重试"""
    for attempt in range(max_retries):
        try:
            return await llm_call()
        except Exception as e:
            if attempt == max_retries - 1:
                raise
            delay = base_delay * (2 ** attempt)
            print(f"第 {attempt + 1} 次失败,等待 {delay} 秒后重试...")
            await asyncio.sleep(delay)

故障排除

常见问题

  1. “Rate limit exceeded” 错误

    • 增加 rate_limit_capacityrate_limit_refill_rate

    • 检查配置中的速率限制是否与 API 限制匹配

  2. 密钥持续失败

    • 检查 API 密钥是否有效且有足够配额

    • 验证 base_url 是否正确

  3. Token 统计不准确

    • 某些 API 可能不返回 token 统计信息

    • 框架会尽力估算,但可能不完全准确

调试技巧

import logging

# 启用详细日志
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("SimpleLLMFunc")
logger.setLevel(logging.DEBUG)

# 查看请求/响应详情

总结

SimpleLLMFunc 的接口层通过以下设计提供强大的 LLM 服务:

  1. LLM_Interface: 统一的抽象基类

  2. OpenAICompatible: 开箱即用的 OpenAI 兼容实现

  3. APIKeyPool: 智能的密钥负载均衡

  4. TokenBucket: 可靠的流量控制

这种设计既保证了易用性,又提供了企业级的功能和可靠性。