# 异步任务处理 > **文档版本**:v1.0 > **最后更新**:2025-01-27 --- ## 目录 1. [异步任务概述](#异步任务概述) 2. [Celery 配置](#celery-配置) 3. [AI 生成任务](#ai-生成任务) 4. [视频导出任务](#视频导出任务) 5. [任务监控](#任务监控) --- ## 异步任务概述 ### 为什么需要异步任务 Jointo平台中有大量耗时操作: - **AI 生成**:图片生成(10-30秒)、视频生成(1-5分钟) - **视频导出**:视频合成、转码(5-30分钟) - **文件处理**:大文件上传、转换 - **批量操作**:批量导入、批量处理 这些操作如果同步执行会: - 阻塞 API 响应 - 占用服务器资源 - 影响用户体验 ### 异步任务架构 ``` ┌─────────────┐ │ API 服务 │ │ │ │ 创建任务 │ └──────┬──────┘ │ ▼ ┌─────────────┐ │ RabbitMQ │ 消息队列 │ │ └──────┬──────┘ │ ▼ ┌─────────────┐ │ Celery │ 任务调度 │ Worker │ │ │ │ 执行任务 │ └──────┬──────┘ │ ▼ ┌─────────────┐ │ Redis │ 结果存储 │ │ └─────────────┘ ``` --- ## Celery 配置 ### 安装依赖 ```bash pip install celery[redis] pip install celery[amqp] # 如果使用 RabbitMQ ``` ### Celery 应用配置 ```python # app/tasks/celery_app.py from celery import Celery from app.config import settings celery_app = Celery( "jointo", broker=settings.CELERY_BROKER_URL, backend=settings.CELERY_RESULT_BACKEND, include=['app.tasks.ai_tasks', 'app.tasks.export_tasks'] ) # Celery 配置 celery_app.conf.update( # 序列化 task_serializer='json', accept_content=['json'], result_serializer='json', # 时区 timezone='UTC', enable_utc=True, # 任务配置 task_track_started=True, task_time_limit=3600, # 1 小时 task_soft_time_limit=3300, # 55 分钟 # 结果过期时间 result_expires=3600, # 任务路由 task_routes={ 'app.tasks.ai_tasks.*': {'queue': 'ai'}, 'app.tasks.export_tasks.*': {'queue': 'export'}, }, # 并发配置 worker_prefetch_multiplier=1, worker_max_tasks_per_child=1000, ) ``` ### 环境配置 ```python # app/config.py from pydantic_settings import BaseSettings class Settings(BaseSettings): # Celery 配置 CELERY_BROKER_URL: str = "redis://localhost:6379/0" CELERY_RESULT_BACKEND: str = "redis://localhost:6379/0" # 或使用 RabbitMQ # CELERY_BROKER_URL: str = "amqp://guest:guest@localhost:5672//" class Config: env_file = ".env" settings = Settings() ``` --- ## AI 生成任务 ### 图片生成任务 ```python # app/tasks/ai_tasks.py from app.tasks.celery_app import celery_app from app.services.ai_service import AIService from app.repositories.ai_job_repository import AIJobRepository from app.core.database import SessionLocal from app.core.storage import StorageService import httpx @celery_app.task(bind=True, max_retries=3) def generate_image_task(self, prompt: str, model: str, **kwargs): """生成图片任务""" db = SessionLocal() try: job_id = self.request.id # 更新任务状态 job_repo = AIJobRepository(db) job_repo.update_status(job_id, 'processing', progress=10) # 调用 AI API async with httpx.AsyncClient() as client: response = await client.post( f"{settings.AI_API_URL}/generate-image", json={"prompt": prompt, "model": model, **kwargs}, headers={"Authorization": f"Bearer {settings.AI_API_KEY}"}, timeout=300 ) result = response.json() # 更新进度 job_repo.update_status(job_id, 'processing', progress=50) # 下载并上传到对象存储 image_url = result['image_url'] storage = StorageService() # 下载图片 async with httpx.AsyncClient() as client: image_response = await client.get(image_url) image_data = image_response.content # 上传到对象存储 object_name = f"ai-generated/images/{job_id}.png" final_url = await storage.upload_bytes( image_data, object_name, content_type="image/png" ) # 更新任务完成 job_repo.update_status( job_id, 'completed', progress=100, output_data={'image_url': final_url} ) return {'image_url': final_url} except Exception as exc: job_repo.update_status( job_id, 'failed', error_message=str(exc) ) # 重试:60秒后重试 raise self.retry(exc=exc, countdown=60) finally: db.close() ``` ### 视频生成任务 ```python @celery_app.task(bind=True, max_retries=3) def generate_video_task( self, video_type: str, prompt: Optional[str] = None, image_url: Optional[str] = None, **kwargs ): """生成视频任务""" db = SessionLocal() try: job_id = self.request.id job_repo = AIJobRepository(db) # 更新任务状态 job_repo.update_status(job_id, 'processing', progress=10) # 根据视频类型调用不同的 AI 服务 if video_type == 'text2video': result = await _generate_text2video(prompt, **kwargs) elif video_type == 'img2video': result = await _generate_img2video(image_url, **kwargs) else: raise ValueError(f"Unsupported video type: {video_type}") # 更新进度 job_repo.update_status(job_id, 'processing', progress=50) # 下载并上传视频 video_url = result['video_url'] storage = StorageService() async with httpx.AsyncClient() as client: video_response = await client.get(video_url) video_data = video_response.content object_name = f"ai-generated/videos/{job_id}.mp4" final_url = await storage.upload_bytes( video_data, object_name, content_type="video/mp4" ) # 更新任务完成 job_repo.update_status( job_id, 'completed', progress=100, output_data={'video_url': final_url} ) return {'video_url': final_url} except Exception as exc: job_repo.update_status(job_id, 'failed', error_message=str(exc)) raise self.retry(exc=exc, countdown=120) finally: db.close() ``` --- ## 视频导出任务 ### 视频合成任务 ```python # app/tasks/export_tasks.py from app.tasks.celery_app import celery_app from app.services.export_service import ExportService from app.repositories.export_job_repository import ExportJobRepository from app.core.database import SessionLocal import ffmpeg @celery_app.task(bind=True, max_retries=2) def export_video_task( self, project_id: str, format: str, quality: str, **kwargs ): """导出视频任务""" db = SessionLocal() try: job_id = self.request.id export_repo = ExportJobRepository(db) # 更新任务状态 export_repo.update_status(job_id, 'processing', progress=0) # 获取项目所有素材 export_service = ExportService() materials = await export_service.get_project_materials(project_id) # 更新进度 export_repo.update_status(job_id, 'processing', progress=20) # 使用 FFmpeg 合成视频 output_path = f"/tmp/{job_id}.{format}" # 构建 FFmpeg 命令 inputs = [] for material in materials: inputs.append(ffmpeg.input(material['url'])) # 合成视频 stream = ffmpeg.concat(*inputs, v=1, a=1) stream = ffmpeg.output( stream, output_path, vcodec='libx264', acodec='aac', **_get_quality_params(quality) ) # 执行 FFmpeg ffmpeg.run(stream, capture_stdout=True, capture_stderr=True) # 更新进度 export_repo.update_status(job_id, 'processing', progress=80) # 上传到对象存储 storage = StorageService() object_name = f"exports/{project_id}/{job_id}.{format}" output_url = await storage.upload_file( output_path, object_name, content_type=f"video/{format}" ) # 清理临时文件 os.remove(output_path) # 更新任务完成 export_repo.update_status( job_id, 'completed', progress=100, output_url=output_url ) return {'output_url': output_url} except Exception as exc: export_repo.update_status(job_id, 'failed', error_message=str(exc)) # 5 分钟后重试 raise self.retry(exc=exc, countdown=300) finally: db.close() def _get_quality_params(quality: str) -> dict: """获取质量参数""" quality_map = { 'low': {'crf': 28, 'preset': 'fast'}, 'medium': {'crf': 23, 'preset': 'medium'}, 'high': {'crf': 18, 'preset': 'slow'} } return quality_map.get(quality, quality_map['medium']) ``` --- ## 任务监控 ### 启动 Celery Worker ```bash # 启动 AI 任务队列 celery -A app.tasks.celery_app worker -Q ai --loglevel=info # 启动导出任务队列 celery -A app.tasks.celery_app worker -Q export --loglevel=info # 启动所有队列 celery -A app.tasks.celery_app worker --loglevel=info ``` ### 启动 Celery Beat(定时任务) ```bash celery -A app.tasks.celery_app beat --loglevel=info ``` ### 使用 Flower 监控 ```bash # 安装 Flower pip install flower # 启动 Flower celery -A app.tasks.celery_app flower --port=5555 ``` 访问 `http://localhost:5555` 查看任务监控面板。 ### 查询任务状态 ```python # app/services/ai_service.py async def get_job_status(self, job_id: str) -> Dict[str, Any]: """查询任务状态""" from app.tasks.celery_app import celery_app task = celery_app.AsyncResult(job_id) return { 'job_id': job_id, 'status': task.state, 'progress': task.info.get('progress', 0) if task.info else 0, 'result': task.result if task.ready() else None, 'error': str(task.info) if task.failed() else None } ``` --- ## 相关文档 - [AI 服务集成](./09-ai-integration.md) - [文件存储方案](./08-storage.md) - [部署架构](./10-deployment.md) --- **文档版本**:v1.0 **最后更新**:2025-01-27