You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
11 KiB
11 KiB
AI 生成结果自动写入业务表功能实现
日期: 2026-02-13
类型: 功能完善
影响范围: AI 对话系统、Celery 任务
📋 背景
现状问题
之前的 AI 对话生成流程中,虽然任务执行完成并生成了文件,但缺少将生成结果写入业务表的逻辑:
- ✅ 创建对话会话 →
ai_conversations表 - ✅ 发送消息触发生成 →
ai_conversation_messages表 - ✅ Celery 任务生成文件 → 更新
ai_jobs.output_data - ❌ 【缺失】 写入业务表(
storyboard_images,project_resources等)
结果:生成的图片/视频/音频文件只存在于 ai_jobs 表的 output_data 字段中,前端无法从业务表查询到资源。
🎯 实现目标
根据 conversation.target_type 和 conversation.media_type,自动将生成结果写入对应的业务表:
| target_type | media_type | 目标表 | 说明 |
|---|---|---|---|
| 1 (分镜) | 1 (图片) | storyboard_images |
分镜图片 |
| 1 (分镜) | 2 (视频) | storyboard_videos |
分镜视频 |
| 2 (角色) | 1 (图片) | project_resources |
角色形象 (type=1) |
| 3 (场景) | 1 (图片) | project_resources |
场景图片 (type=2) |
| 4 (道具) | 1 (图片) | project_resources |
道具图片 (type=3) |
🔧 实现方案
1. 新增服务:AIGenerationResultService
文件: server/app/services/ai_generation_result_service.py
核心方法:
async def save_generation_result(
job_id: UUID,
output_data: Dict[str, Any],
user_id: UUID
) -> Optional[Dict[str, Any]]:
"""保存 AI 生成结果到业务表"""
实现逻辑:
- 通过
job_id查找关联的ai_conversation_messages - 获取对话上下文
ai_conversations - 根据
target_type+media_type路由到对应的保存方法 - 写入业务表,返回资源元数据
支持的保存方法:
_save_storyboard_image()→storyboard_images_save_storyboard_video()→storyboard_videos_save_project_resource()→project_resources
2. 修改 Repository:添加 get_by_job_id
文件: server/app/repositories/ai_conversation_message_repository.py
新增方法:
async def get_by_job_id(
ai_job_id: UUID
) -> Optional[AIConversationMessage]:
"""根据 AI 任务 ID 获取消息(单条)"""
3. 修改 Celery 任务:集成自动保存
文件: server/app/tasks/ai_tasks.py
修改任务:
generate_image_taskgenerate_video_taskgenerate_sound_taskgenerate_voice_task
集成逻辑:
# 任务完成后
await _update_job_status(job_id, AIJobStatus.COMPLETED, output_data=result)
# 🆕 自动保存到业务表
try:
result_service = AIGenerationResultService(session)
await result_service.save_generation_result(
job_id=UUID(job_id),
output_data=result,
user_id=UUID(user_id)
)
logger.info("生成结果已写入业务表")
except Exception as save_error:
logger.error("写入业务表失败(不影响任务完成)", exc_info=True)
⚠️ 容错处理:
- 写入业务表失败不影响任务状态
- 记录错误日志,方便排查
- 保证
ai_jobs表状态正确更新
📊 数据流图
┌──────────────────────────────────────────────────────┐
│ 1. 前端创建会话 │
│ POST /api/v1/ai/conversations │
│ ↓ │
│ ai_conversations (记录 target_type, target_id) │
└──────────────────────────────────────────────────────┘
↓
┌──────────────────────────────────────────────────────┐
│ 2. 前端发送消息 + 触发生成 │
│ POST /api/v1/ai/conversations/{id}/messages │
│ { "content": "...", "generate": {...} } │
│ ↓ │
│ ai_conversation_messages (记录 content, ai_job_id) │
│ ↓ │
│ trigger_ai_generation() → ai_jobs (PENDING) │
│ ↓ │
│ Celery 任务入队 │
└──────────────────────────────────────────────────────┘
↓
┌──────────────────────────────────────────────────────┐
│ 3. Celery Worker 执行任务 │
│ generate_image_task / generate_video_task │
│ ↓ │
│ 调用 AI Provider (OpenAI, FLUX, etc.) │
│ ↓ │
│ 下载文件 → 上传到自有 OSS (MinIO) │
│ ↓ │
│ 更新 ai_jobs (status=COMPLETED, output_data) │
└──────────────────────────────────────────────────────┘
↓
┌──────────────────────────────────────────────────────┐
│ 4. 🆕 自动写入业务表 │
│ AIGenerationResultService.save_generation_result() │
│ ↓ │
│ 根据 conversation.target_type 路由: │
│ ├─ target_type=1, media_type=1 → storyboard_images│
│ ├─ target_type=1, media_type=2 → storyboard_videos│
│ ├─ target_type=6 → storyboard_sound_effects │
│ ├─ target_type=7 → storyboard_voiceovers │
│ └─ target_type=2/3/4 → project_resources │
│ ↓ │
│ ✅ 前端可直接查询业务表获取资源 │
└──────────────────────────────────────────────────────┘
🔍 关键设计决策
1. 通过 ai_job_id 反查对话上下文
ai_jobs表本身没有conversation_id- 通过
ai_conversation_messages.ai_job_id关联 - 获取
conversation后,读取target_type,target_id,tag_id等上下文
2. 容错设计:不影响任务状态
- 写入业务表失败时,仅记录日志
ai_jobs仍然标记为COMPLETED- 管理员可通过日志排查问题,手动补录
3. 资源默认状态
status = ResourceStatus.COMPLETED(已完成)is_active = True(默认激活)version = 1(版本号)
4. 元数据记录
- 保存 AI 模型名称 (
ai_model) - 保存用户提示词 (
ai_prompt) - 保存生成参数 (
ai_params) - 方便后续审计和复现
⚠️ 限制与待完善
1. 分镜音频 (target_type=1, media_type=3) 预留扩展
现状: 仅记录警告日志,不保存
说明: 音效和配音不通过对话生成,使用专用 API 接口
2. 缺少资源去重检查
现状: 每次生成都创建新记录,即使文件相同
建议:
- 根据
checksum去重 - 或使用
version字段管理版本
📝 相关代码文件
新增文件
server/app/services/ai_generation_result_service.py(370 行)
修改文件
-
server/app/tasks/ai_tasks.pygenerate_image_task(增加自动保存逻辑)generate_video_task(增加自动保存逻辑)generate_sound_task(增加自动保存逻辑)generate_voice_task(增加自动保存逻辑)
-
server/app/repositories/ai_conversation_message_repository.py- 新增
get_by_job_id()方法
- 新增
-
server/app/schemas/ai_conversation.py- 移除
GenerateConfig.enabled字段(简化 API)
- 移除
-
server/app/api/v1/ai_conversations.py- 更新文档示例
-
server/app/services/ai_conversation_service.py- 简化
if generate_config判断逻辑
- 简化
🧪 测试建议
单元测试
async def test_save_storyboard_image():
"""测试保存分镜图片"""
service = AIGenerationResultService(db)
result = await service.save_generation_result(
job_id=UUID("..."),
output_data={
'file_url': 'https://...',
'file_size': 1024,
'checksum': 'abc123',
...
},
user_id=UUID("...")
)
assert result['resource_type'] == 'storyboard_image'
集成测试
- 创建对话会话(分镜生图)
- 发送消息触发生成
- 等待 Celery 任务完成
- 查询
storyboard_images表,验证记录已创建
📌 后续优化方向
- 支持批量生成: 一次生成多张图片
- 资源版本管理: 支持生成同一对象的多个版本
- 智能去重: 基于
checksum避免重复存储 - 配音关联: 完善
dialogue_id的传递逻辑 - 错误重试: 业务表写入失败时的自动重试机制
✅ 验证清单
- 新增
AIGenerationResultService服务 - 实现分镜图片保存逻辑
- 实现分镜视频保存逻辑
- 实现角色/场景/道具保存逻辑
- 集成到 Celery 图片生成任务
- 集成到 Celery 视频生成任务
- 集成到 Celery 音效生成任务(兼容保留)
- 集成到 Celery 配音生成任务(兼容保留)
- 新增
get_by_job_idRepository 方法 - 移除
generate.enabled字段 - 移除 target_type 6 (音效) 和 7 (配音)
- 添加单元测试
- 添加集成测试
🎉 总结
本次实现补齐了 AI 对话生成流程的最后一环,使得生成的资源可以被前端正常查询和使用。
核心价值:
- ✅ 完整闭环:从对话创建 → 生成触发 → 文件生成 → 业务表写入
- ✅ 容错设计:写入失败不影响任务状态
- ✅ 易扩展:新增资源类型只需添加新的
_save_*方法
技术亮点:
- 通过
ai_job_id反查对话上下文,解耦任务与对话 - 使用 Service 层统一处理,避免在 Celery 任务中硬编码业务逻辑
- 容错设计确保系统稳定性