You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

8.6 KiB

RFC 136: Celery 异步任务系统集成

状态: 实施中
创建日期: 2026-01-27
作者: System
类型: 功能增强

概述

为 Jointo 项目集成 Celery 异步任务系统,支持 AI 生成、视频导出等长时间运行的任务。

动机

当前项目所有任务都在 FastAPI 主进程中同步执行,存在以下问题:

  1. 阻塞请求:AI 生成、视频渲染等耗时任务会阻塞 HTTP 请求
  2. 超时风险:长时间任务可能导致请求超时
  3. 资源浪费:无法有效利用多核 CPU 和分布式资源
  4. 用户体验差:用户需要等待任务完成才能继续操作
  5. 无法重试:任务失败后无法自动重试
  6. 缺少定时任务:无法执行定期维护任务

设计方案

架构概览

┌─────────────┐
│  FastAPI    │  提交任务
│  应用       ├──────────┐
└─────────────┘          │
                         ▼
                  ┌──────────────┐
                  │  RabbitMQ    │  消息队列
                  │  (Broker)    │
                  └──────┬───────┘
                         │
        ┌────────────────┼────────────────┐
        │                │                │
┌───────▼────────┐ ┌────▼─────────┐ ┌────▼─────────┐
│ Celery Worker  │ │ Celery Worker│ │ Celery Beat  │
│ (AI 队列)      │ │ (导出队列)   │ │ (定时任务)   │
└────────────────┘ └──────────────┘ └──────────────┘
        │                │                │
        └────────────────┼────────────────┘
                         ▼
                  ┌──────────────┐
                  │    Redis     │  结果存储
                  │  (Backend)   │
                  └──────────────┘

组件说明

1. RabbitMQ (消息队列)

  • 作用: 任务消息的中间件
  • 端口: 6182 (AMQP), 6186 (管理界面)
  • 优势:
    • 高可靠性,支持消息持久化
    • 支持复杂的路由规则
    • 提供管理界面

2. Celery Worker (任务执行器)

AI 任务队列 (celery-worker-ai):

  • 处理 AI 生成任务(图片、视频、配音)
  • 并发数: 2
  • 限流: 图片 10/分钟,视频 5/分钟

导出任务队列 (celery-worker-export):

  • 处理项目导出、视频渲染
  • 并发数: 1 (避免资源竞争)
  • 限流: 3/分钟

3. Celery Beat (定时任务)

  • 清理过期任务结果 (每天凌晨 2 点)
  • 收集任务统计 (每小时)
  • 清理临时文件 (每天)

4. Redis (结果存储)

  • 存储任务结果
  • 结果保留时间: 1 小时
  • 数据库: Redis DB 1

任务类型

AI 任务 (app.tasks.ai_tasks)

# 图片生成
generate_image(prompt, model, **kwargs)

# 视频生成
generate_video(prompt, duration, **kwargs)

# 配音生成
generate_voiceover(text, voice, **kwargs)

# 批量图片生成
batch_generate_images(prompts, **kwargs)

导出任务 (app.tasks.export_tasks)

# 项目导出
export_project(project_id, format, **kwargs)

# 时间轴渲染
render_timeline(timeline_id, **kwargs)

# 缩略图生成
generate_thumbnail(video_url, timestamp)

# 批量导出
batch_export_projects(project_ids, **kwargs)

维护任务 (app.tasks.maintenance_tasks)

# 清理过期结果
cleanup_expired_results()

# 收集统计
collect_task_statistics()

# 清理临时文件
cleanup_temp_files()

任务配置

重试策略

class BaseTask(celery_app.Task):
    autoretry_for = (Exception,)
    retry_kwargs = {"max_retries": 3}
    retry_backoff = True
    retry_backoff_max = 600
    retry_jitter = True
  • 自动重试最多 3 次
  • 指数退避策略
  • 最大退避时间 600 秒
  • 添加随机抖动避免雷鸣群效应

任务路由

task_routes = {
    "app.tasks.ai_tasks.*": {"queue": "ai"},
    "app.tasks.export_tasks.*": {"queue": "export"},
}

限流配置

task_annotations = {
    "app.tasks.ai_tasks.generate_image": {"rate_limit": "10/m"},
    "app.tasks.ai_tasks.generate_video": {"rate_limit": "5/m"},
    "app.tasks.export_tasks.export_project": {"rate_limit": "3/m"},
}

使用示例

提交任务

from app.tasks.ai_tasks import generate_image

# 异步提交任务
task = generate_image.delay(
    prompt="一只可爱的猫咪",
    model="stable-diffusion"
)

# 获取任务 ID
task_id = task.id

# 返回给前端
return {"task_id": task_id, "status": "submitted"}

查询任务状态

from celery.result import AsyncResult

# 通过任务 ID 查询
result = AsyncResult(task_id)

# 获取状态
status = result.state  # PENDING, STARTED, SUCCESS, FAILURE, RETRY

# 获取结果
if result.ready():
    data = result.get()

前端轮询

async function pollTaskStatus(taskId: string) {
  const response = await fetch(`/api/v1/tasks/${taskId}/status`);
  const data = await response.json();
  
  if (data.status === 'SUCCESS') {
    return data.result;
  } else if (data.status === 'FAILURE') {
    throw new Error(data.error);
  } else {
    // 继续轮询
    await new Promise(resolve => setTimeout(resolve, 2000));
    return pollTaskStatus(taskId);
  }
}

部署配置

Docker Compose

新增服务:

rabbitmq:
  image: rabbitmq:3.13-management-alpine
  container_name: jointo-server-rabbitmq
  ports:
    - "6182:5672"   # AMQP
    - "6186:15672"  # 管理界面

celery-worker-ai:
  container_name: jointo-server-celery-ai
  command: celery -A app.core.celery_app worker -Q ai --loglevel=info --concurrency=2

celery-worker-export:
  container_name: jointo-server-celery-export
  command: celery -A app.core.celery_app worker -Q export --loglevel=info --concurrency=1

celery-beat:
  container_name: jointo-server-celery-beat
  command: celery -A app.core.celery_app beat --loglevel=info

环境变量

CELERY_BROKER_URL=amqp://guest:guest@rabbitmq:5672//
CELERY_RESULT_BACKEND=redis://redis:6379/1

启动服务

# 启动所有服务
docker compose up -d

# 查看 Celery Worker 日志
docker logs -f jointo-server-celery-ai
docker logs -f jointo-server-celery-export

# 查看 RabbitMQ 管理界面
open http://localhost:6186
# 用户名: guest, 密码: guest

监控和管理

RabbitMQ 管理界面

Celery Flower (可选)

# 安装 Flower
pip install flower

# 启动监控
celery -A app.core.celery_app flower --port=5555

日志监控

# 查看任务日志
docker logs -f jointo-server-celery-ai

# 查看 RabbitMQ 日志
docker logs -f jointo-server-rabbitmq

性能优化

Worker 配置

# 预取数量
worker_prefetch_multiplier = 1  # 每次只预取一个任务

# Worker 重启
worker_max_tasks_per_child = 100  # 处理 100 个任务后重启

任务优先级

# 高优先级任务
task.apply_async(args=[...], priority=9)

# 低优先级任务
task.apply_async(args=[...], priority=0)

资源限制

celery-worker-ai:
  deploy:
    resources:
      limits:
        cpus: '2'
        memory: 4G

安全考虑

  1. 消息加密: 生产环境使用 SSL/TLS
  2. 认证: 修改 RabbitMQ 默认密码
  3. 网络隔离: 仅内部网络访问
  4. 任务验证: 验证任务参数防止注入

测试策略

单元测试

@pytest.mark.asyncio
async def test_generate_image_task():
    result = generate_image.delay("test prompt")
    assert result.id is not None

集成测试

def test_task_execution():
    result = generate_image.apply(args=["test"])
    assert result.status == "SUCCESS"

迁移计划

  1. 添加 Celery 配置和任务定义
  2. 更新 API 端点支持异步任务
  3. 前端添加任务状态轮询
  4. 迁移现有同步任务到 Celery
  5. 生产环境部署和测试

相关文档

变更日志

  • 2026-01-27: 初始版本,添加 Celery 集成