You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

9.5 KiB

Client 层与 Provider 层职责分离重构

日期: 2026-01-30
类型: 架构重构
影响范围: AI Provider 层、HTTP Client 层

背景

在之前的实现中,stability_provider.pyrunway_provider.py 直接使用 httpx 进行 HTTP 调用,而 openai_provider.py 已经使用了统一的 AIHubMixClient。这导致了架构不一致和职责混乱。

问题

  1. 职责混乱:Provider 层既负责业务逻辑适配,又负责 HTTP 通信
  2. 代码重复:每个 Provider 都要实现相同的 HTTP 错误处理、重试逻辑
  3. 测试困难:无法独立测试 HTTP 层和业务逻辑层
  4. 配置分散:API Key、Base URL、Timeout 等配置散落在各处

解决方案

四层架构设计

API 层 (ai_api.py)
    ↓
Service 层 (ai_service.py)
    ├─ 积分扣除
    ├─ Celery 任务队列
    └─ MinIO 文件存储
        ↓
Provider 层 (openai_provider.py / stability_provider.py / runway_provider.py)
    ├─ 业务逻辑适配
    ├─ 参数转换
    └─ 统一接口
        ↓
Client 层 (aihubmix_client.py / stability_client.py / runway_client.py)
    ├─ 纯 HTTP 调用
    ├─ 错误处理
    └─ 重试逻辑

职责划分

Client 层职责

  • 纯 HTTP 通信(GET、POST、PUT、DELETE)
  • 错误处理和重试逻辑
  • 超时控制
  • 请求头管理
  • 响应解析(JSON / 二进制)

Provider 层职责

  • 业务逻辑适配(参数转换、数据格式化)
  • 统一接口实现(BaseAIProvider)
  • 调用 Client 层完成 HTTP 请求
  • 返回标准化的业务数据

Service 层职责

  • 业务编排(积分扣除、权限校验)
  • Celery 异步任务管理
  • MinIO 文件存储
  • 任务状态追踪

实施内容

1. 创建 BaseHTTPClient

文件: server/app/clients/base_client.py

功能:

  • 统一的 HTTP 请求封装
  • 自动重试机制(指数退避)
  • 错误处理(4xx 不重试,5xx 重试)
  • 日志记录
  • 超时控制

核心方法:

async def _request(method, endpoint, json, data, files, params, headers, timeout, follow_redirects)
async def get(endpoint, params, **kwargs)
async def post(endpoint, json, data, files, **kwargs)
async def put(endpoint, json, **kwargs)
async def delete(endpoint, **kwargs)

2. 创建 StabilityClient

文件: server/app/clients/stability_client.py

功能:

  • 封装 Stability AI API 的所有 HTTP 调用
  • 文本生成图片(Text-to-Image)
  • 图片生成视频(Image-to-Video)
  • 轮询视频生成状态

核心方法:

async def text_to_image(engine_id, prompt, width, height, cfg_scale, steps, samples, style_preset)
async def image_to_video(image_data, cfg_scale, motion_bucket_id, seed)
async def get_video_result(generation_id)
async def wait_for_video_completion(generation_id, max_wait_seconds, poll_interval)

3. 创建 RunwayClient

文件: server/app/clients/runway_client.py

功能:

  • 封装 Runway API 的所有 HTTP 调用
  • 创建视频生成任务
  • 查询任务状态
  • 等待任务完成
  • 取消任务

核心方法:

async def create_generation(model, duration, text_prompt, init_image, seed, interpolate)
async def get_generation_status(task_id)
async def wait_for_completion(task_id, max_wait_seconds, poll_interval)
async def cancel_generation(task_id)

4. 重构 StabilityProvider

文件: server/app/services/ai_providers/stability_provider.py

变更:

  • 移除直接使用 httpx 的代码
  • 使用 StabilityClient 进行 HTTP 调用
  • 专注于业务逻辑适配(参数转换、数据格式化)
  • 统一初始化方式(从 settings 读取配置)

示例:

# 旧代码
async with httpx.AsyncClient(timeout=self.timeout) as client:
    response = await client.post(...)
    response.raise_for_status()
    data = response.json()

# 新代码
data = await self.client.text_to_image(
    engine_id=engine_id,
    prompt=prompt,
    width=width,
    height=height,
    ...
)

5. 重构 RunwayProvider

文件: server/app/services/ai_providers/runway_provider.py

变更:

  • 移除直接使用 httpx 的代码
  • 使用 RunwayClient 进行 HTTP 调用
  • 专注于业务逻辑适配
  • 统一初始化方式

示例:

# 旧代码
async with httpx.AsyncClient(timeout=self.timeout) as client:
    response = await client.post(f"{self.base_url}/generations", ...)
    response.raise_for_status()
    data = response.json()

# 新代码
task_id = await self.client.create_generation(
    model=model,
    duration=duration,
    text_prompt=prompt,
    ...
)

6. 重命名 OpenAIProvider 为 AIHubMixProvider

文件: server/app/services/ai_providers/openai_provider.pyaihubmix_provider.py

原因:

  • OpenAIProvider 命名不准确(实际通过 AIHubMix 代理调用多个厂商)
  • AIHubMixClient 命名不一致
  • 容易误导新开发者

变更:

# 旧代码
class OpenAIProvider(BaseAIProvider):
    """OpenAI 提供商(通过 AIHubMix 代理)"""

# 新代码
class AIHubMixProvider(BaseAIProvider):
    """AIHubMix 提供商(通过 AIHubMix 代理服务调用多个 AI 厂商)"""

支持的模型:

  • OpenAI 模型(DALL-E, GPT, TTS, Whisper)
  • Stability AI 模型(通过代理)
  • 视频生成模型(Veo, Sora, Wan 系列)

7. 更新 clients/init.py

文件: server/app/clients/__init__.py

变更:

from .aihubmix_client import AIHubMixClient
from .base_client import BaseHTTPClient
from .stability_client import StabilityClient
from .runway_client import RunwayClient

__all__ = [
    'AIHubMixClient',
    'BaseHTTPClient',
    'StabilityClient',
    'RunwayClient'
]

技术细节

重试策略

  • 4xx 错误:不重试(客户端错误)
  • 5xx 错误:重试 3 次(服务器错误)
  • 超时错误:重试 3 次
  • 退避策略:指数退避(2^attempt 秒)

日志记录

所有 HTTP 请求都记录以下信息:

  • 请求方法、URL
  • 是否包含 JSON / 文件
  • 响应状态码
  • 响应大小(字节)
  • 错误信息(如果失败)

配置管理

所有 API 配置统一在 server/app/core/config.py 中管理:

STABILITY_API_KEY: str = ""
STABILITY_BASE_URL: str = "https://api.stability.ai/v1"
RUNWAY_API_KEY: str = ""
RUNWAY_BASE_URL: str = "https://api.runwayml.com/v1"

优势

1. 职责清晰

  • Client 层:只负责 HTTP 通信
  • Provider 层:只负责业务逻辑
  • Service 层:只负责业务编排

2. 代码复用

  • 多个 Provider 可以共享同一个 Client
  • HTTP 通用逻辑(错误处理、重试)只需实现一次

3. 易于测试

  • Client 层可以独立测试 HTTP 调用
  • Provider 层可以 Mock Client 进行单元测试
  • Service 层可以 Mock Provider 进行集成测试

4. 易于维护

  • HTTP 逻辑变更只需修改 Client 层
  • 业务逻辑变更只需修改 Provider 层
  • 配置变更只需修改 config.py

5. 易于扩展

  • 新增 AI 厂商只需:
    1. 创建新的 Client(继承 BaseHTTPClient)
    2. 创建新的 Provider(继承 BaseAIProvider)
    3. 在 config.py 添加配置

影响范围

修改的文件

  • server/app/clients/base_client.py (新建)
  • server/app/clients/stability_client.py (新建)
  • server/app/clients/runway_client.py (新建)
  • server/app/clients/__init__.py (更新)
  • server/app/services/ai_providers/stability_provider.py (重构)
  • server/app/services/ai_providers/runway_provider.py (重构)

不受影响的文件

  • server/app/services/ai_service.py (Service 层接口不变)
  • server/app/api/v1/ai.py (API 层接口不变)
  • server/app/tasks/ai_tasks.py (Celery 任务接口不变)

测试建议

1. Client 层测试

# 测试 HTTP 调用
async def test_stability_client_text_to_image():
    client = StabilityClient(api_key="test_key")
    result = await client.text_to_image(
        engine_id="stable-diffusion-xl-1024-v1-0",
        prompt="A beautiful sunset",
        width=1024,
        height=1024
    )
    assert 'artifacts' in result

2. Provider 层测试

# Mock Client 测试业务逻辑
async def test_stability_provider_generate_image(mocker):
    mock_client = mocker.Mock()
    mock_client.text_to_image.return_value = {...}
    
    provider = StabilityProvider(model_name="stable-diffusion-xl")
    provider.client = mock_client
    
    result = await provider.generate_image(prompt="test")
    assert 'image_data' in result

3. 集成测试

# 端到端测试
async def test_ai_service_generate_image_e2e():
    result = await ai_service.generate_image(
        user_id=user_id,
        model_name="stable-diffusion-xl",
        prompt="A beautiful sunset"
    )
    assert result['status'] == 'completed'

后续优化

  1. 缓存机制:在 Client 层添加响应缓存
  2. 限流控制:在 Client 层添加 Rate Limiting
  3. 监控指标:记录 HTTP 请求耗时、成功率等指标
  4. 断路器:在 Client 层添加 Circuit Breaker 模式
  5. 连接池:优化 httpx 连接池配置

参考资料

总结

本次重构通过引入 Client 层,实现了 HTTP 通信与业务逻辑的完全分离,提升了代码的可维护性、可测试性和可扩展性。所有 AI Provider 现在都遵循统一的架构模式,为后续新增 AI 厂商提供了清晰的模板。