# Client 层与 Provider 层职责分离重构 **日期**: 2026-01-30 **类型**: 架构重构 **影响范围**: AI Provider 层、HTTP Client 层 ## 背景 在之前的实现中,`stability_provider.py` 和 `runway_provider.py` 直接使用 `httpx` 进行 HTTP 调用,而 `openai_provider.py` 已经使用了统一的 `AIHubMixClient`。这导致了架构不一致和职责混乱。 ## 问题 1. **职责混乱**:Provider 层既负责业务逻辑适配,又负责 HTTP 通信 2. **代码重复**:每个 Provider 都要实现相同的 HTTP 错误处理、重试逻辑 3. **测试困难**:无法独立测试 HTTP 层和业务逻辑层 4. **配置分散**:API Key、Base URL、Timeout 等配置散落在各处 ## 解决方案 ### 四层架构设计 ``` API 层 (ai_api.py) ↓ Service 层 (ai_service.py) ├─ 积分扣除 ├─ Celery 任务队列 └─ MinIO 文件存储 ↓ Provider 层 (openai_provider.py / stability_provider.py / runway_provider.py) ├─ 业务逻辑适配 ├─ 参数转换 └─ 统一接口 ↓ Client 层 (aihubmix_client.py / stability_client.py / runway_client.py) ├─ 纯 HTTP 调用 ├─ 错误处理 └─ 重试逻辑 ``` ### 职责划分 #### Client 层职责 - 纯 HTTP 通信(GET、POST、PUT、DELETE) - 错误处理和重试逻辑 - 超时控制 - 请求头管理 - 响应解析(JSON / 二进制) #### Provider 层职责 - 业务逻辑适配(参数转换、数据格式化) - 统一接口实现(BaseAIProvider) - 调用 Client 层完成 HTTP 请求 - 返回标准化的业务数据 #### Service 层职责 - 业务编排(积分扣除、权限校验) - Celery 异步任务管理 - MinIO 文件存储 - 任务状态追踪 ## 实施内容 ### 1. 创建 BaseHTTPClient **文件**: `server/app/clients/base_client.py` **功能**: - 统一的 HTTP 请求封装 - 自动重试机制(指数退避) - 错误处理(4xx 不重试,5xx 重试) - 日志记录 - 超时控制 **核心方法**: ```python async def _request(method, endpoint, json, data, files, params, headers, timeout, follow_redirects) async def get(endpoint, params, **kwargs) async def post(endpoint, json, data, files, **kwargs) async def put(endpoint, json, **kwargs) async def delete(endpoint, **kwargs) ``` ### 2. 创建 StabilityClient **文件**: `server/app/clients/stability_client.py` **功能**: - 封装 Stability AI API 的所有 HTTP 调用 - 文本生成图片(Text-to-Image) - 图片生成视频(Image-to-Video) - 轮询视频生成状态 **核心方法**: ```python async def text_to_image(engine_id, prompt, width, height, cfg_scale, steps, samples, style_preset) async def image_to_video(image_data, cfg_scale, motion_bucket_id, seed) async def get_video_result(generation_id) async def wait_for_video_completion(generation_id, max_wait_seconds, poll_interval) ``` ### 3. 创建 RunwayClient **文件**: `server/app/clients/runway_client.py` **功能**: - 封装 Runway API 的所有 HTTP 调用 - 创建视频生成任务 - 查询任务状态 - 等待任务完成 - 取消任务 **核心方法**: ```python async def create_generation(model, duration, text_prompt, init_image, seed, interpolate) async def get_generation_status(task_id) async def wait_for_completion(task_id, max_wait_seconds, poll_interval) async def cancel_generation(task_id) ``` ### 4. 重构 StabilityProvider **文件**: `server/app/services/ai_providers/stability_provider.py` **变更**: - 移除直接使用 `httpx` 的代码 - 使用 `StabilityClient` 进行 HTTP 调用 - 专注于业务逻辑适配(参数转换、数据格式化) - 统一初始化方式(从 `settings` 读取配置) **示例**: ```python # 旧代码 async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.post(...) response.raise_for_status() data = response.json() # 新代码 data = await self.client.text_to_image( engine_id=engine_id, prompt=prompt, width=width, height=height, ... ) ``` ### 5. 重构 RunwayProvider **文件**: `server/app/services/ai_providers/runway_provider.py` **变更**: - 移除直接使用 `httpx` 的代码 - 使用 `RunwayClient` 进行 HTTP 调用 - 专注于业务逻辑适配 - 统一初始化方式 **示例**: ```python # 旧代码 async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.post(f"{self.base_url}/generations", ...) response.raise_for_status() data = response.json() # 新代码 task_id = await self.client.create_generation( model=model, duration=duration, text_prompt=prompt, ... ) ``` ### 6. 重命名 OpenAIProvider 为 AIHubMixProvider **文件**: `server/app/services/ai_providers/openai_provider.py` → `aihubmix_provider.py` **原因**: - `OpenAIProvider` 命名不准确(实际通过 AIHubMix 代理调用多个厂商) - 与 `AIHubMixClient` 命名不一致 - 容易误导新开发者 **变更**: ```python # 旧代码 class OpenAIProvider(BaseAIProvider): """OpenAI 提供商(通过 AIHubMix 代理)""" # 新代码 class AIHubMixProvider(BaseAIProvider): """AIHubMix 提供商(通过 AIHubMix 代理服务调用多个 AI 厂商)""" ``` **支持的模型**: - OpenAI 模型(DALL-E, GPT, TTS, Whisper) - Stability AI 模型(通过代理) - 视频生成模型(Veo, Sora, Wan 系列) ### 7. 更新 clients/__init__.py **文件**: `server/app/clients/__init__.py` **变更**: ```python from .aihubmix_client import AIHubMixClient from .base_client import BaseHTTPClient from .stability_client import StabilityClient from .runway_client import RunwayClient __all__ = [ 'AIHubMixClient', 'BaseHTTPClient', 'StabilityClient', 'RunwayClient' ] ``` ## 技术细节 ### 重试策略 - **4xx 错误**:不重试(客户端错误) - **5xx 错误**:重试 3 次(服务器错误) - **超时错误**:重试 3 次 - **退避策略**:指数退避(2^attempt 秒) ### 日志记录 所有 HTTP 请求都记录以下信息: - 请求方法、URL - 是否包含 JSON / 文件 - 响应状态码 - 响应大小(字节) - 错误信息(如果失败) ### 配置管理 所有 API 配置统一在 `server/app/core/config.py` 中管理: ```python STABILITY_API_KEY: str = "" STABILITY_BASE_URL: str = "https://api.stability.ai/v1" RUNWAY_API_KEY: str = "" RUNWAY_BASE_URL: str = "https://api.runwayml.com/v1" ``` ## 优势 ### 1. 职责清晰 - Client 层:只负责 HTTP 通信 - Provider 层:只负责业务逻辑 - Service 层:只负责业务编排 ### 2. 代码复用 - 多个 Provider 可以共享同一个 Client - HTTP 通用逻辑(错误处理、重试)只需实现一次 ### 3. 易于测试 - Client 层可以独立测试 HTTP 调用 - Provider 层可以 Mock Client 进行单元测试 - Service 层可以 Mock Provider 进行集成测试 ### 4. 易于维护 - HTTP 逻辑变更只需修改 Client 层 - 业务逻辑变更只需修改 Provider 层 - 配置变更只需修改 config.py ### 5. 易于扩展 - 新增 AI 厂商只需: 1. 创建新的 Client(继承 BaseHTTPClient) 2. 创建新的 Provider(继承 BaseAIProvider) 3. 在 config.py 添加配置 ## 影响范围 ### 修改的文件 - `server/app/clients/base_client.py` (新建) - `server/app/clients/stability_client.py` (新建) - `server/app/clients/runway_client.py` (新建) - `server/app/clients/__init__.py` (更新) - `server/app/services/ai_providers/stability_provider.py` (重构) - `server/app/services/ai_providers/runway_provider.py` (重构) ### 不受影响的文件 - `server/app/services/ai_service.py` (Service 层接口不变) - `server/app/api/v1/ai.py` (API 层接口不变) - `server/app/tasks/ai_tasks.py` (Celery 任务接口不变) ## 测试建议 ### 1. Client 层测试 ```python # 测试 HTTP 调用 async def test_stability_client_text_to_image(): client = StabilityClient(api_key="test_key") result = await client.text_to_image( engine_id="stable-diffusion-xl-1024-v1-0", prompt="A beautiful sunset", width=1024, height=1024 ) assert 'artifacts' in result ``` ### 2. Provider 层测试 ```python # Mock Client 测试业务逻辑 async def test_stability_provider_generate_image(mocker): mock_client = mocker.Mock() mock_client.text_to_image.return_value = {...} provider = StabilityProvider(model_name="stable-diffusion-xl") provider.client = mock_client result = await provider.generate_image(prompt="test") assert 'image_data' in result ``` ### 3. 集成测试 ```python # 端到端测试 async def test_ai_service_generate_image_e2e(): result = await ai_service.generate_image( user_id=user_id, model_name="stable-diffusion-xl", prompt="A beautiful sunset" ) assert result['status'] == 'completed' ``` ## 后续优化 1. **缓存机制**:在 Client 层添加响应缓存 2. **限流控制**:在 Client 层添加 Rate Limiting 3. **监控指标**:记录 HTTP 请求耗时、成功率等指标 4. **断路器**:在 Client 层添加 Circuit Breaker 模式 5. **连接池**:优化 httpx 连接池配置 ## 参考资料 - [FastAPI 最佳实践](https://fastapi.tiangolo.com/tutorial/) - [httpx 文档](https://www.python-httpx.org/) - [Stability AI API 文档](https://platform.stability.ai/docs/api-reference) - [Runway API 文档](https://docs.runwayml.com/) ## 总结 本次重构通过引入 Client 层,实现了 HTTP 通信与业务逻辑的完全分离,提升了代码的可维护性、可测试性和可扩展性。所有 AI Provider 现在都遵循统一的架构模式,为后续新增 AI 厂商提供了清晰的模板。