# AI 服务流程图 (Mermaid) **日期**: 2026-01-30 **版本**: 1.0 ## 目录 1. [整体架构](#整体架构) 2. [图片生成完整流程](#图片生成完整流程) 3. [任务状态流转](#任务状态流转) 4. [积分系统集成](#积分系统集成) 5. [Event Loop 隔离机制](#event-loop-隔离机制) 6. [错误处理流程](#错误处理流程) --- ## 整体架构 ```mermaid graph TB subgraph "客户端层" Client[Web/Mobile Client] end subgraph "API 层" FastAPI[FastAPI Server
端口: 6170] end subgraph "任务队列" RabbitMQ[RabbitMQ
消息队列] CeleryWorker[Celery Worker
AI 任务处理] end subgraph "AI 服务" AIProvider[AI Provider
OpenAI/DALL-E] end subgraph "存储层" PostgreSQL[(PostgreSQL 17
任务状态/积分)] MinIO[MinIO
文件存储] end Client -->|HTTP Request| FastAPI FastAPI -->|提交任务| RabbitMQ RabbitMQ -->|分发任务| CeleryWorker CeleryWorker -->|调用 AI| AIProvider CeleryWorker -->|上传文件| MinIO FastAPI -->|读写数据| PostgreSQL CeleryWorker -->|更新状态| PostgreSQL style Client fill:#e1f5ff style FastAPI fill:#fff3e0 style CeleryWorker fill:#f3e5f5 style AIProvider fill:#e8f5e9 style PostgreSQL fill:#fce4ec style MinIO fill:#fff9c4 ``` --- ## 图片生成完整流程 ```mermaid sequenceDiagram participant C as 客户端 participant A as FastAPI API participant CS as Credit Service participant DB as PostgreSQL participant Q as RabbitMQ participant W as Celery Worker participant AI as AI Provider
(DALL-E 3) participant M as MinIO Note over C,M: 1. 创建任务阶段 C->>A: POST /ai/generate-image
{prompt, width, height} A->>A: 验证请求 & 认证 A->>CS: 预扣积分 CS->>DB: 检查余额 CS->>DB: 创建 consumption_log
(status=PENDING) CS-->>A: 返回 consumption_log_id A->>DB: 创建 ai_jobs
(status=PENDING) A->>Q: 提交 Celery 任务 A-->>C: 返回 Job ID & Task ID Note over C,M: 2. 异步执行阶段 Q->>W: 分发任务 W->>W: 创建独立 Event Loop W->>DB: 更新状态
(PROCESSING, 10%) W->>DB: 获取 Job 信息
(consumption_log_id) W->>W: 创建 OpenAI Provider W->>DB: 更新状态
(PROCESSING, 30%) W->>AI: 调用 DALL-E 3
generate_image() AI-->>W: 返回图片 URL W->>DB: 更新状态
(PROCESSING, 70%) W->>AI: 下载图片 (3MB+) AI-->>W: 图片数据 W->>M: 上传到 MinIO M-->>W: 返回文件元数据
(url, checksum, size) W->>DB: 更新状态
(COMPLETED, 100%)
+ output_data W->>CS: 确认积分消耗 CS->>DB: 更新 consumption_log
(status=CONFIRMED) Note over C,M: 3. 查询结果阶段 C->>A: GET /ai/jobs/{job_id} A->>DB: 查询任务状态 DB-->>A: 返回完整结果 A-->>C: 返回 output_data
(file_url, metadata) ``` --- ## 任务状态流转 ```mermaid stateDiagram-v2 [*] --> PENDING: 任务创建 PENDING --> PROCESSING: Worker 接收任务 PROCESSING --> COMPLETED: 执行成功 PROCESSING --> FAILED: 执行失败 PROCESSING --> TIMEOUT: 执行超时 PROCESSING --> CANCELLED: 用户取消 COMPLETED --> [*]: 确认积分 FAILED --> [*]: 退还积分 TIMEOUT --> [*]: 退还积分 CANCELLED --> [*]: 退还积分 note right of PENDING 状态码: 1 进度: 0% end note note right of PROCESSING 状态码: 2 进度: 0% → 100% 步骤: - 10%: 开始处理 - 30%: AI 调用中 - 70%: 文件处理中 - 100%: 完成 end note note right of COMPLETED 状态码: 3 进度: 100% 包含 output_data end note note right of FAILED 状态码: 4 包含 error_message end note ``` --- ## 积分系统集成 ```mermaid sequenceDiagram participant U as 用户 participant A as AI Service participant CS as Credit Service participant DB as Database Note over U,DB: 任务创建时 U->>A: 请求生成图片 A->>CS: 预扣积分
deduct_credits() CS->>DB: 检查用户余额 alt 余额充足 CS->>DB: 创建消费记录
(status=PENDING) CS->>DB: 扣减用户积分 CS-->>A: 返回 consumption_log_id A->>DB: 创建任务
(关联 consumption_log_id) else 余额不足 CS-->>A: 抛出异常 A-->>U: 返回错误: 积分不足 end Note over U,DB: 任务完成时 A->>CS: 确认积分消耗
confirm_consumption() CS->>DB: 更新消费记录
(status=CONFIRMED) CS->>DB: 关联资源 ID Note over U,DB: 任务失败时 A->>CS: 退还积分
refund_credits() CS->>DB: 更新消费记录
(status=REFUNDED) CS->>DB: 返还用户积分 ``` ### 积分状态流转 ```mermaid stateDiagram-v2 [*] --> PENDING: 预扣积分 PENDING --> CONFIRMED: 任务成功 PENDING --> REFUNDED: 任务失败/取消/超时 CONFIRMED --> [*] REFUNDED --> [*] note right of PENDING 积分已扣除 等待任务完成 end note note right of CONFIRMED 积分消耗确认 关联资源 ID end note note right of REFUNDED 积分已退还 记录失败原因 end note ``` --- ## Event Loop 隔离机制 ```mermaid graph TB subgraph "Celery ForkPoolWorker" subgraph "任务 1" T1[generate_image_task] L1[Event Loop 1] E1[Engine 1] S1[Session 1] T1 -->|创建| L1 L1 -->|创建| E1 E1 -->|创建| S1 S1 -.->|dispose| E1 E1 -.->|close| L1 end subgraph "任务 2" T2[generate_video_task] L2[Event Loop 2] E2[Engine 2] S2[Session 2] T2 -->|创建| L2 L2 -->|创建| E2 E2 -->|创建| S2 S2 -.->|dispose| E2 E2 -.->|close| L2 end end DB[(PostgreSQL)] S1 -->|独立连接| DB S2 -->|独立连接| DB style T1 fill:#e3f2fd style T2 fill:#f3e5f5 style L1 fill:#fff3e0 style L2 fill:#fff3e0 style E1 fill:#e8f5e9 style E2 fill:#e8f5e9 style S1 fill:#fce4ec style S2 fill:#fce4ec ``` ### Event Loop 创建流程 ```mermaid sequenceDiagram participant CT as Celery Task participant RA as run_async_task() participant EL as Event Loop participant GA as get_async_session() participant E as Engine participant S as Session participant DB as Database CT->>RA: 调用异步任务 RA->>EL: 创建新 Event Loop
asyncio.new_event_loop() RA->>EL: 设置为当前 Loop
set_event_loop() loop 每次数据库操作 EL->>GA: 获取独立连接 GA->>E: 创建 Engine
(poolclass=None) GA->>S: 创建 Session S->>DB: 执行数据库操作 S->>E: dispose() end RA->>EL: 关闭 Loop
loop.close() RA-->>CT: 返回结果 ``` --- ## 错误处理流程 ```mermaid graph TB Start[任务开始] --> Try{执行任务} Try -->|成功| UpdateSuccess[更新状态: COMPLETED
progress: 100%] Try -->|失败| CatchError[捕获异常] UpdateSuccess --> ConfirmCredit[确认积分消耗] ConfirmCredit --> End[任务结束] CatchError --> UpdateFailed[更新状态: FAILED
记录错误信息] UpdateFailed --> RefundCredit[退还积分] RefundCredit --> CheckRetry{检查重试次数} CheckRetry -->|< 3 次| Retry[延迟重试
countdown = 60 * retries] CheckRetry -->|>= 3 次| RaiseError[抛出异常] Retry --> Start RaiseError --> End style Start fill:#e3f2fd style End fill:#e3f2fd style UpdateSuccess fill:#c8e6c9 style UpdateFailed fill:#ffcdd2 style ConfirmCredit fill:#fff9c4 style RefundCredit fill:#ffe0b2 ``` ### 错误类型处理 ```mermaid graph LR subgraph "错误类型" E1[AI Provider 错误] E2[网络错误] E3[文件存储错误] E4[数据库错误] E5[积分不足] E6[超时错误] end subgraph "处理策略" H1[重试 3 次] H2[立即失败] H3[退还积分] end E1 --> H1 E2 --> H1 E3 --> H1 E4 --> H1 E5 --> H2 E6 --> H2 H1 --> H3 H2 --> H3 style E1 fill:#ffcdd2 style E2 fill:#ffcdd2 style E3 fill:#ffcdd2 style E4 fill:#ffcdd2 style E5 fill:#ffcdd2 style E6 fill:#ffcdd2 style H1 fill:#fff9c4 style H2 fill:#ffe0b2 style H3 fill:#c8e6c9 ``` --- ## 文件存储流程 ```mermaid sequenceDiagram participant W as Celery Worker participant AI as AI Provider participant FS as FileStorageService participant M as MinIO participant DB as Database W->>AI: 生成内容 AI-->>W: 返回临时 URL W->>W: 下载文件
httpx.AsyncClient.get() W->>FS: upload_file()
(content, filename, type) FS->>FS: 计算 checksum
(SHA-256) FS->>FS: 生成存储路径
(category/user_id/checksum.ext) FS->>M: 上传文件
put_object() M-->>FS: 上传成功 FS->>DB: 创建 file_metadata 记录 DB-->>FS: 记录创建成功 FS-->>W: 返回文件元数据
(url, size, checksum) W->>W: 更新 output_data
包含文件信息 ``` --- ## 关键技术点 ### 1. Event Loop 隔离 ```python def run_async_task(coro): """为每个任务创建独立的 Event Loop""" loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: return loop.run_until_complete(coro) finally: loop.close() ``` ### 2. 独立数据库连接 ```python def get_async_session(): """为每个操作创建独立的数据库连接""" engine = create_async_engine( settings.DATABASE_URL, poolclass=None # 禁用连接池 ) async_session_maker = sessionmaker( engine, class_=AsyncSession ) return async_session_maker, engine ``` ### 3. 资源清理模式 ```python async_session_maker, engine = get_async_session() try: async with async_session_maker() as session: # 数据库操作 pass finally: await engine.dispose() # 确保资源释放 ``` --- ## 性能指标 ```mermaid pie title 任务执行时间分布 (总计 ~60秒) "AI 生成" : 10 "文件下载" : 1 "文件上传" : 0.1 "数据库操作" : 0.05 "网络延迟" : 48.85 ``` --- ## 相关文档 - [OpenAI Provider 集成](./changelogs/2026-01-30-openai-provider-integration.md) - [Event Loop 修复完成](./changelogs/2026-01-30-ai-tasks-event-loop-fix-complete.md) - [文件存储集成](./changelogs/2026-01-30-ai-service-file-storage-integration.md) - [OpenAI 集成最终总结](./changelogs/2026-01-30-openai-integration-final-summary.md)