52 KiB
分镜看板管理服务
文档版本:v3.1
最后更新:2026-02-02
技术栈符合度:✅ 已符合 jointo-tech-stack 规范
目录
服务概述
分镜看板管理服务是分镜数据的可视化看板,提供多轨道视图展示分镜及其关联的所有资源(角色、场景、视频、音效、对白、配音等)。
核心定位
- 数据来源:所有数据实时从分镜及其关联表计算,无独立数据存储
- 主要功能:可视化展示 + 分镜排序调整
- 数据同步:分镜看板调整分镜顺序 = 直接更新分镜表的
display_order
职责
- 实时计算并返回分镜看板数据(六种轨道)
- 支持拖拽调整分镜顺序(同步更新分镜表)
- 按轨道类型筛选数据
- 按时间范围查询数据
- 计算项目总时长
架构设计
设计原则
无独立数据存储:分镜看板不创建独立的数据表,所有数据实时从分镜及其关联表计算。
为什么不需要分镜看板数据库?
- 数据来源单一:所有内容都源自分镜
- 避免冗余:不需要在分镜看板表中重复存储分镜数据
- 自动同步:分镜变化立即反映到分镜看板,无需维护同步逻辑
- 简化维护:减少数据一致性问题
数据流
分镜表 (storyboards)
├─ display_order (排序)
├─ duration (时长)
└─ 关联数据(通过 Relationship 配置,无物理外键)
├─ resources (角色、场景、道具)
├─ generated_videos (生成的视频)
├─ dialogues (对白台词)
├─ voiceovers (配音音频)
└─ sound_effects (音效)
↓
StoryboardBoardService 实时计算
↓
返回六种轨道的分镜看板数据
技术栈约束
- 异步编程: 所有数据库操作使用
async/await - 日志系统: 使用标准库
logging,禁止loguru,必须使用 %-formatting 格式化 - UUID v7: 应用层生成(
generate_uuid()),禁止数据库默认值 - 无物理外键: 应用层保证引用完整性
- 时间戳: 使用
TIMESTAMPTZ(ADR 006) - 统一响应: 所有 API 使用
ApiResponse格式(RFC-135) - 事务处理: 批量操作使用
async with self.session.begin() - 异常日志: 捕获异常时必须使用
exc_info=True记录完整堆栈
时间计算逻辑
# 按 display_order 排序分镜
storyboards = sorted(storyboards, key=lambda x: x.display_order)
# 累积计算每个分镜的时间位置
current_time = 0
for storyboard in storyboards:
start_time = current_time
end_time = current_time + storyboard.duration
current_time = end_time
数据来源
分镜看板服务不创建独立数据表,所有数据实时从分镜(storyboards)及其关联资源计算。
分镜基础数据
从 storyboards 表获取:
| 字段 | 类型 | 说明 | 用途 |
|---|---|---|---|
id |
UUID | 分镜唯一标识 | 分镜看板项 ID |
title |
VARCHAR(255) | 分镜标题 | 显示在分镜轨道 |
description |
TEXT | 分镜描述 | 悬浮提示 |
display_order |
INTEGER | 显示顺序 | 分镜看板排序依据 |
duration |
NUMERIC(10,2) | 分镜时长(秒) | 时间计算依据 |
shot_size |
SMALLINT | 镜头景别 | 分镜元数据 |
camera_movement |
SMALLINT | 镜头运动 | 分镜元数据 |
thumbnail_url |
TEXT | 缩略图 URL | 分镜预览 |
created_at |
TIMESTAMPTZ | 创建时间(UTC) | 审计字段 |
updated_at |
TIMESTAMPTZ | 更新时间(UTC) | 审计字段 |
关联资源数据
通过 SQLModel Relationship 获取(无物理外键):
1. 资源轨道(Resource Track)
数据来源:storyboard_items 表(统一关联表)
# Storyboard Model 中的 Relationship
items: List["StoryboardItem"] = Relationship(
sa_relationship_kwargs={
"primaryjoin": "Storyboard.id == StoryboardItem.storyboard_id",
"foreign_keys": "[StoryboardItem.storyboard_id]",
}
)
获取字段:
item_id:关联记录 IDitem_type:元素类型(1=ElementTag 剧本元素标签, 2=Resource 项目素材)element_tag_id:标签 ID(item_type=1 时)resource_id:素材 ID(item_type=2 时)element_name:元素名称(如"孙悟空"、"花果山")tag_label:标签名称(如"少年"、"夜晚")cover_url:封面 URLaction_description:动作描述(如"大笑"、"奔跑")spatial_position:画面位置(left/center/right/background/foreground)is_visible:是否在画面内(画外音设为 false)display_order:显示顺序z_index:视觉层级
用途:显示分镜使用的剧本元素(角色/场景/道具的具体标签)和项目素材
2. 视频轨道(Video Track)
数据来源:generated_videos 表
# Storyboard Model 中的 Relationship
generated_video: Optional["GeneratedVideo"] = Relationship(
sa_relationship_kwargs={
"primaryjoin": "Storyboard.id == GeneratedVideo.storyboard_id",
"foreign_keys": "[GeneratedVideo.storyboard_id]",
}
)
获取字段:
video_id:视频 IDurl:视频文件 URLstatus:生成状态(pending/processing/completed/failed)duration:视频实际时长
用途:显示分镜生成的视频内容
3. 对白轨道(Subtitle Track)
数据来源:storyboard_dialogues 表
# Storyboard Model 中的 Relationship
dialogues: List["StoryboardDialogue"] = Relationship(
sa_relationship_kwargs={
"primaryjoin": "Storyboard.id == StoryboardDialogue.storyboard_id",
"foreign_keys": "[StoryboardDialogue.storyboard_id]",
}
)
获取字段:
dialogue_id:对白 IDstoryboard_id:所属分镜 IDcharacter_id:关联的剧本角色 ID(可选)character_name:角色名称(冗余存储)content:对白文本内容dialogue_type:对白类型(1=normal 普通对白, 2=inner_monologue 内心OS, 3=narration 旁白)sequence_order:在分镜中的顺序start_time:在分镜中的开始时间(秒)duration:对白时长(秒)emotion:情绪标记(用于 TTS 生成)
用途:显示分镜的台词文本,区分普通对白、内心独白、旁白
4. 配音轨道(Voice Track)
数据来源:storyboard_voiceovers 表
# Storyboard Model 中的 Relationship
voiceovers: List["StoryboardVoiceover"] = Relationship(
sa_relationship_kwargs={
"primaryjoin": "Storyboard.id == StoryboardVoiceover.storyboard_id",
"foreign_keys": "[StoryboardVoiceover.storyboard_id]",
}
)
获取字段:
voiceover_id:配音 ID(主键)dialogue_id:所属对白 IDstoryboard_id:所属分镜 ID(冗余,便于查询)audio_url:音频访问 URLstatus:生成状态(0=pending, 1=processing, 2=completed, 3=failed)is_active:是否为当前激活版本voice_id:TTS 服务的音色 IDvoice_name:音色名称(如"晓晓(温柔女声)")speed:语速(0.5-2.0)volume:音量(0.0-1.0)pitch:音调(0.5-2.0)duration:音频时长(秒)file_size:文件大小(字节)format:音频格式(mp3/wav)checksum:SHA256 校验和(用于去重)storage_provider:存储提供商(minio/s3/oss)storage_path:对象存储中的文件路径
用途:显示对白生成的配音音频,支持多版本管理
5. 音效轨道(Sound Track)
数据来源:sound_effects 表
# Storyboard Model 中的 Relationship
sound_effects: List["SoundEffect"] = Relationship(
sa_relationship_kwargs={
"primaryjoin": "Storyboard.id == SoundEffect.storyboard_id",
"foreign_keys": "[SoundEffect.storyboard_id]",
}
)
获取字段:
sound_effect_id:音效 ID(主键)project_id:所属项目 IDname:音效名称audio_url:音频文件 URLduration:音效时长(秒)file_size:文件大小(字节)start_time:在分镜看板上的开始时间(秒)end_time:在分镜看板上的结束时间(秒)volume:音量(0-100)fade_in:淡入时长(秒)fade_out:淡出时长(秒)metadata:AI 生成参数等(JSONB)status:生成状态(pending/processing/completed/failed)ai_job_id:AI 任务 ID
用途:显示分镜搭配的背景音效,支持音量和淡入淡出控制
数据查询策略
方式一:预加载关联(推荐)
from sqlalchemy.orm import selectinload
# 一次性加载所有关联数据
storyboards = await session.execute(
select(Storyboard)
.where(Storyboard.project_id == project_id)
.options(
selectinload(Storyboard.resources),
selectinload(Storyboard.generated_video),
selectinload(Storyboard.dialogues),
selectinload(Storyboard.voiceovers),
selectinload(Storyboard.sound_effects)
)
.order_by(Storyboard.display_order)
)
优点:
- 减少数据库查询次数(N+1 问题)
- 适合小型项目(< 100 分镜)
方式二:按需加载
# 先获取分镜基础数据
storyboards = await session.execute(
select(Storyboard)
.where(Storyboard.project_id == project_id)
.order_by(Storyboard.display_order)
)
# 根据 track_types 参数按需加载关联
if 'video' in track_types:
# 批量加载视频数据
videos = await session.execute(
select(GeneratedVideo)
.where(GeneratedVideo.storyboard_id.in_(storyboard_ids))
)
优点:
- 减少不必要的数据传输
- 适合大型项目(> 100 分镜)
核心功能
1. 六种轨道类型
| 轨道类型 | 数据来源 | 说明 |
|---|---|---|
| storyboard | storyboards 表 |
分镜元素块 |
| resource | storyboard_resources 关联表 |
角色、场景、道具、实拍素材 |
| video | generated_videos 表 |
分镜生成的视频 |
| sound | sound_effects 表 |
分镜搭配的音效 |
| subtitle | dialogues 表 |
对白台词文本 |
| voice | voiceovers 表 |
对白生成的配音音频 |
2. 核心操作
2.1 获取分镜看板数据
- 实时计算所有轨道的数据
- 按
display_order排序分镜 - 计算每个项的
start_time和end_time
2.2 调整分镜顺序
- 接收新的分镜排序
- 更新分镜表的
display_order字段 - 返回更新后的分镜看板数据
2.3 按轨道筛选
- 只返回指定轨道类型的数据
- 减少前端数据传输量
2.4 时间范围查询
- 查询指定时间段内的所有项
- 用于局部渲染优化
服务实现
StoryboardBoardService 类
# app/services/storyboard_board_service.py
import logging
from typing import List, Optional, Dict, Any
from uuid import UUID
from datetime import datetime, timezone
from sqlalchemy.orm import Session
from sqlalchemy import select
from app.models.storyboard import Storyboard
from app.repositories.storyboard_repository import StoryboardRepository
from app.repositories.project_repository import ProjectRepository
from app.schemas.storyboard_board import (
StoryboardBoardData,
StoryboardBoardTrack,
StoryboardBoardItem,
StoryboardReorder
)
from app.core.exceptions import NotFoundError, ValidationError, PermissionError
# 获取模块级 logger
logger = logging.getLogger(__name__)
class StoryboardBoardService:
"""
分镜看板服务 - 无独立数据存储,实时计算分镜数据
技术栈约束:
- 使用 logging 记录关键操作
- 应用层验证引用完整性(无物理外键)
- 批量操作使用事务
"""
def __init__(self, db: Session):
self.db = db
self.storyboard_repo = StoryboardRepository(db)
self.project_repo = ProjectRepository(db)
# ==================== 核心功能 ====================
async def get_storyboard_board_data(
self,
user_id: str,
project_id: str,
track_types: Optional[List[str]] = None
) -> StoryboardBoardData:
"""
获取项目的完整分镜看板数据
Args:
user_id: 用户 ID
project_id: 项目 ID
track_types: 可选,筛选特定轨道类型 ['storyboard', 'video', ...]
Returns:
StoryboardBoardData: 包含所有轨道的分镜看板数据
"""
logger.info(
"获取分镜看板数据 | 用户: %s | 项目: %s | 轨道类型: %s",
user_id,
project_id,
track_types or "全部"
)
try:
# 1. 检查项目权限
has_permission = await self.project_repo.check_user_permission(
UUID(user_id), UUID(project_id), 'viewer'
)
if not has_permission:
logger.warning(
"权限不足 | 用户: %s | 项目: %s",
user_id,
project_id
)
raise PermissionError("没有权限访问此项目")
# 2. 获取项目的所有分镜(按 display_order 排序)
storyboards = await self.storyboard_repo.get_by_project(
project_id=UUID(project_id),
order_by='display_order'
)
logger.debug(
"查询到分镜数量: %d | 项目: %s",
len(storyboards),
project_id
)
# 3. 实时计算分镜看板数据
tracks = self._calculate_storyboard_board_tracks(storyboards, track_types)
# 4. 计算总时长
total_duration = sum(sb.duration for sb in storyboards)
logger.info(
"分镜看板数据计算完成 | 项目: %s | 分镜数: %d | 总时长: %.2fs",
project_id,
len(storyboards),
total_duration
)
return StoryboardBoardData(
project_id=project_id,
tracks=tracks,
total_duration=total_duration,
storyboard_count=len(storyboards)
)
except (PermissionError, NotFoundError):
raise
except Exception as e:
logger.error(
"获取分镜看板数据失败 | 项目: %s | 错误: %s",
project_id,
str(e),
exc_info=True
)
raise
async def reorder_storyboards(
self,
user_id: str,
project_id: str,
reorder_data: StoryboardReorder
) -> StoryboardBoardData:
"""
调整分镜顺序(直接更新分镜表的 display_order)
Args:
user_id: 用户 ID
project_id: 项目 ID
reorder_data: 新的分镜排序 {storyboard_id: new_order}
Returns:
StoryboardBoardData: 更新后的分镜看板数据
"""
logger.info(
"调整分镜顺序 | 用户: %s | 项目: %s | 分镜数: %d",
user_id,
project_id,
len(reorder_data.storyboard_orders)
)
try:
# 1. 检查权限
has_permission = await self.project_repo.check_user_permission(
UUID(user_id), UUID(project_id), 'editor'
)
if not has_permission:
logger.warning(
"权限不足 | 用户: %s | 项目: %s",
user_id,
project_id
)
raise PermissionError("没有权限编辑此项目")
# 2. 验证所有分镜是否存在(应用层引用完整性检查)
storyboard_ids = [UUID(sid) for sid in reorder_data.storyboard_orders.keys()]
storyboards_exist = await self.storyboard_repo.batch_exists(storyboard_ids)
missing_ids = [
str(sid) for sid, exists in storyboards_exist.items() if not exists
]
if missing_ids:
logger.error(
"分镜不存在 | 项目: %s | 缺失ID: %s",
project_id,
missing_ids
)
raise NotFoundError("分镜不存在: %s" % missing_ids)
# 3. 使用事务批量更新分镜的 display_order
async with self.db.begin():
for storyboard_id, new_order in reorder_data.storyboard_orders.items():
await self.storyboard_repo.update(
storyboard_id=UUID(storyboard_id),
update_data={
'display_order': new_order,
'updated_at': datetime.now(timezone.utc)
}
)
logger.info(
"分镜顺序更新完成 | 项目: %s | 更新数量: %d",
project_id,
len(reorder_data.storyboard_orders)
)
# 4. 返回更新后的分镜看板数据
return await self.get_storyboard_board_data(user_id, project_id)
except (PermissionError, NotFoundError, ValidationError):
raise
except Exception as e:
logger.error(
"调整分镜顺序失败 | 项目: %s | 错误: %s",
project_id,
str(e),
exc_info=True
)
raise
async def get_track_items(
self,
user_id: str,
project_id: str,
track_type: str
) -> List[StoryboardBoardItem]:
"""
获取指定轨道的所有项
Args:
track_type: 轨道类型 (storyboard/resource/video/sound/subtitle/voice)
"""
logger.debug(
"获取轨道数据 | 用户: %s | 项目: %s | 轨道: %s",
user_id,
project_id,
track_type
)
try:
# 检查权限
has_permission = await self.project_repo.check_user_permission(
UUID(user_id), UUID(project_id), 'viewer'
)
if not has_permission:
raise PermissionError("没有权限访问此项目")
# 验证轨道类型
valid_types = ['storyboard', 'resource', 'video', 'sound', 'subtitle', 'voice']
if track_type not in valid_types:
logger.warning("无效的轨道类型: %s", track_type)
raise ValidationError("无效的轨道类型: %s" % track_type)
# 获取分镜数据
storyboards = await self.storyboard_repo.get_by_project(
project_id=UUID(project_id),
order_by='display_order'
)
# 计算指定轨道的数据
tracks = self._calculate_storyboard_board_tracks(storyboards, [track_type])
items = tracks[0].items if tracks else []
logger.debug(
"轨道数据获取完成 | 轨道: %s | 项数: %d",
track_type,
len(items)
)
return items
except (PermissionError, ValidationError):
raise
except Exception as e:
logger.error(
"获取轨道数据失败 | 项目: %s | 轨道: %s | 错误: %s",
project_id,
track_type,
str(e),
exc_info=True
)
raise
async def get_items_by_time_range(
self,
user_id: str,
project_id: str,
start_time: float,
end_time: float,
track_types: Optional[List[str]] = None
) -> List[StoryboardBoardItem]:
"""
查询时间范围内的所有项
Args:
start_time: 开始时间(秒)
end_time: 结束时间(秒)
track_types: 可选,筛选特定轨道
"""
logger.debug(
"查询时间范围 | 项目: %s | 时间: %.2f-%.2fs",
project_id,
start_time,
end_time
)
try:
# 检查权限
has_permission = await self.project_repo.check_user_permission(
UUID(user_id), UUID(project_id), 'viewer'
)
if not has_permission:
raise PermissionError("没有权限访问此项目")
# 获取完整分镜看板数据
storyboard_board_data = await self.get_storyboard_board_data(user_id, project_id, track_types)
# 筛选时间范围内的项
items_in_range = []
for track in storyboard_board_data.tracks:
for item in track.items:
# 检查时间重叠
if not (item.end_time <= start_time or item.start_time >= end_time):
items_in_range.append(item)
logger.debug(
"时间范围查询完成 | 项目: %s | 匹配项数: %d",
project_id,
len(items_in_range)
)
return items_in_range
except PermissionError:
raise
except Exception as e:
logger.error(
"时间范围查询失败 | 项目: %s | 错误: %s",
project_id,
str(e),
exc_info=True
)
raise
# ==================== 私有方法 ====================
def _calculate_storyboard_board_tracks(
self,
storyboards: List[Storyboard],
track_types: Optional[List[str]] = None
) -> List[StoryboardBoardTrack]:
"""
实时计算分镜看板轨道数据
Args:
storyboards: 分镜列表(已排序)
track_types: 可选,筛选特定轨道类型
"""
# 默认返回所有轨道
if track_types is None:
track_types = ['storyboard', 'resource', 'video', 'sound', 'subtitle', 'voice']
# 初始化轨道
tracks = {
track_type: StoryboardBoardTrack(
type=track_type,
name=self._get_track_name(track_type),
items=[]
)
for track_type in track_types
}
# 累积计算时间位置
current_time = 0.0
for storyboard in storyboards:
start_time = current_time
end_time = current_time + storyboard.duration
# 分镜轨道
if 'storyboard' in tracks:
tracks['storyboard'].items.append(StoryboardBoardItem(
id=str(storyboard.id),
type='storyboard',
start_time=start_time,
end_time=end_time,
data={
'id': str(storyboard.id),
'title': storyboard.title,
'description': storyboard.description,
'shot_size': storyboard.shot_size,
'camera_movement': storyboard.camera_movement,
'thumbnail_url': getattr(storyboard, 'thumbnail_url', None)
}
))
# 资源轨道(剧本元素标签 + 项目素材)
if 'resource' in tracks and storyboard.items:
for item in storyboard.items:
item_data = {
'item_id': str(item.item_id),
'item_type': item.item_type, # 1=ElementTag, 2=Resource
'element_name': item.element_name,
'tag_label': item.tag_label,
'cover_url': item.cover_url,
'action_description': item.action_description,
'spatial_position': item.spatial_position,
'is_visible': item.is_visible,
'display_order': item.display_order,
'z_index': item.z_index,
'storyboard_id': str(storyboard.id)
}
# 根据 item_type 添加额外字段
if item.item_type == 1: # ElementTag
item_data['element_tag_id'] = str(item.element_tag_id)
elif item.item_type == 2: # Resource
item_data['resource_id'] = str(item.resource_id)
tracks['resource'].items.append(StoryboardBoardItem(
id=f"{storyboard.id}-item-{item.item_id}",
type='resource',
start_time=start_time,
end_time=end_time,
data=item_data
))
# 视频轨道
if 'video' in tracks and storyboard.generated_video:
video = storyboard.generated_video
tracks['video'].items.append(StoryboardBoardItem(
id=f"{storyboard.id}-video-{video.id}",
type='video',
start_time=start_time,
end_time=end_time,
data={
'video_id': str(video.id),
'url': video.url,
'status': video.status,
'storyboard_id': str(storyboard.id)
}
))
# 对白轨道
if 'subtitle' in tracks and storyboard.dialogues:
for dialogue in storyboard.dialogues:
tracks['subtitle'].items.append(StoryboardBoardItem(
id=f"{storyboard.id}-dialogue-{dialogue.dialogue_id}",
type='subtitle',
start_time=start_time,
end_time=end_time,
data={
'dialogue_id': str(dialogue.dialogue_id),
'character_id': str(dialogue.character_id) if dialogue.character_id else None,
'character_name': dialogue.character_name,
'content': dialogue.content,
'dialogue_type': dialogue.dialogue_type, # 1=normal, 2=inner_monologue, 3=narration
'sequence_order': dialogue.sequence_order,
'start_time': float(dialogue.start_time) if dialogue.start_time else None,
'duration': float(dialogue.duration) if dialogue.duration else None,
'emotion': dialogue.emotion,
'storyboard_id': str(storyboard.id)
}
))
# 配音轨道
if 'voice' in tracks and storyboard.voiceovers:
for voiceover in storyboard.voiceovers:
tracks['voice'].items.append(StoryboardBoardItem(
id=f"{storyboard.id}-voiceover-{voiceover.voiceover_id}",
type='voice',
start_time=start_time,
end_time=end_time,
data={
'voiceover_id': str(voiceover.voiceover_id),
'dialogue_id': str(voiceover.dialogue_id),
'audio_url': voiceover.audio_url,
'status': voiceover.status, # 0=pending, 1=processing, 2=completed, 3=failed
'is_active': voiceover.is_active,
'voice_id': voiceover.voice_id,
'voice_name': voiceover.voice_name,
'speed': float(voiceover.speed) if voiceover.speed else 1.0,
'volume': float(voiceover.volume) if voiceover.volume else 1.0,
'pitch': float(voiceover.pitch) if voiceover.pitch else 1.0,
'duration': float(voiceover.duration) if voiceover.duration else None,
'file_size': voiceover.file_size,
'format': voiceover.format,
'storyboard_id': str(storyboard.id)
}
))
# 音效轨道
if 'sound' in tracks and storyboard.sound_effects:
for sound in storyboard.sound_effects:
tracks['sound'].items.append(StoryboardBoardItem(
id=f"{storyboard.id}-sound-{sound.sound_effect_id}",
type='sound',
start_time=start_time,
end_time=end_time,
data={
'sound_effect_id': str(sound.sound_effect_id),
'name': sound.name,
'audio_url': sound.audio_url,
'duration': float(sound.duration) if sound.duration else None,
'file_size': sound.file_size,
'start_time': float(sound.start_time) if sound.start_time else 0,
'end_time': float(sound.end_time) if sound.end_time else None,
'volume': sound.volume, # 0-100
'fade_in': float(sound.fade_in) if sound.fade_in else 0,
'fade_out': float(sound.fade_out) if sound.fade_out else 0,
'status': sound.status, # pending/processing/completed/failed
'ai_job_id': str(sound.ai_job_id) if sound.ai_job_id else None,
'storyboard_id': str(storyboard.id)
}
))
# 累加时间
current_time = end_time
return list(tracks.values())
def _get_track_name(self, track_type: str) -> str:
"""获取轨道显示名称"""
track_names = {
'storyboard': '分镜轨道',
'resource': '资源轨道',
'video': '视频轨道',
'sound': '音效轨道',
'subtitle': '对白轨道',
'voice': '配音轨道'
}
return track_names.get(track_type, track_type)
API 接口
统一响应格式(RFC-135)
所有 API 响应必须使用 ApiResponse 统一格式:
# app/schemas/common.py
from pydantic import BaseModel
from typing import TypeVar, Generic, Optional
from datetime import datetime, timezone
T = TypeVar('T')
class ApiResponse(BaseModel, Generic[T]):
"""统一 API 响应格式"""
success: bool # 是否成功
code: int # HTTP 状态码
message: str # 提示消息
data: Optional[T] # 业务数据
timestamp: str # ISO8601 时间戳
@classmethod
def success_response(
cls,
data: T,
message: str = "Success",
code: int = 200
) -> "ApiResponse[T]":
"""成功响应"""
return cls(
success=True,
code=code,
message=message,
data=data,
timestamp=datetime.now(timezone.utc).isoformat()
)
1. 获取完整分镜看板数据
GET /api/v1/projects/{project_id}/storyboard-board
查询参数:
track_types(可选):筛选轨道类型,逗号分隔,如storyboard,video
API 实现:
# app/api/v1/storyboard-board.py
import logging
from fastapi import APIRouter, Depends, Query
from typing import Optional, List
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import get_session
from app.api.deps import get_current_user
from app.services.storyboard_board_service import StoryboardBoardService
from app.schemas.storyboard_board import StoryboardBoardData
from app.schemas.common import ApiResponse
from app.models.user import User
logger = logging.getLogger(__name__)
router = APIRouter()
@router.get(
"/projects/{project_id}/storyboard-board",
response_model=ApiResponse[StoryboardBoardData]
)
async def get_storyboard_board(
project_id: str,
track_types: Optional[str] = Query(None, description="轨道类型,逗号分隔"),
current_user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session)
):
"""获取项目分镜看板数据"""
logger.info(
"API: 获取分镜看板 | 用户: %s | 项目: %s",
current_user.user_id,
project_id
)
service = StoryboardBoardService(session)
# 解析轨道类型
track_type_list = None
if track_types:
track_type_list = [t.strip() for t in track_types.split(',')]
storyboard_board_data = await service.get_storyboard_board_data(
user_id=str(current_user.user_id),
project_id=project_id,
track_types=track_type_list
)
return ApiResponse.success_response(
data=storyboard_board_data,
message="分镜看板数据获取成功"
)
响应示例:
{
"success": true,
"code": 200,
"message": "分镜看板数据获取成功",
"data": {
"project_id": "019c0361-ea8e-7d43-bbcc-edd8eea06bb6",
"total_duration": 120.5,
"storyboard_count": 15,
"tracks": [
{
"type": "storyboard",
"name": "分镜轨道",
"items": [
{
"id": "019c0361-ea8e-7d43-bbcc-edd8eea06bb7",
"type": "storyboard",
"start_time": 0,
"end_time": 5.5,
"data": {
"id": "019c0361-ea8e-7d43-bbcc-edd8eea06bb7",
"title": "开场镜头",
"shot_size": 3,
"camera_movement": 1,
"thumbnail_url": "https://..."
}
}
]
},
{
"type": "resource",
"name": "资源轨道",
"items": [
{
"id": "019c0361-ea8e-7d43-bbcc-edd8eea06bb7-item-019c0361-ea8e-7d43-bbcc-edd8eea06bb8",
"type": "resource",
"start_time": 0,
"end_time": 5.5,
"data": {
"item_id": "019c0361-ea8e-7d43-bbcc-edd8eea06bb8",
"item_type": 1,
"element_tag_id": "019c0361-ea8e-7d43-bbcc-edd8eea06bb9",
"element_name": "孙悟空",
"tag_label": "少年",
"cover_url": "https://...",
"action_description": "大笑",
"spatial_position": "center",
"is_visible": true,
"display_order": 0,
"z_index": 0,
"storyboard_id": "019c0361-ea8e-7d43-bbcc-edd8eea06bb7"
}
}
]
},
{
"type": "video",
"name": "视频轨道",
"items": [
{
"id": "019c0361-ea8e-7d43-bbcc-edd8eea06bb7-video-019c0361-ea8e-7d43-bbcc-edd8eea06bba",
"type": "video",
"start_time": 0,
"end_time": 5.5,
"data": {
"video_id": "019c0361-ea8e-7d43-bbcc-edd8eea06bba",
"url": "https://...",
"status": "completed",
"storyboard_id": "019c0361-ea8e-7d43-bbcc-edd8eea06bb7"
}
}
]
},
{
"type": "subtitle",
"name": "对白轨道",
"items": [
{
"id": "019c0361-ea8e-7d43-bbcc-edd8eea06bb7-dialogue-019c0361-ea8e-7d43-bbcc-edd8eea06bbb",
"type": "subtitle",
"start_time": 0,
"end_time": 5.5,
"data": {
"dialogue_id": "019c0361-ea8e-7d43-bbcc-edd8eea06bbb",
"character_id": "019c0361-ea8e-7d43-bbcc-edd8eea06bbc",
"character_name": "孙悟空",
"content": "你好,世界!",
"dialogue_type": 1,
"sequence_order": 1,
"start_time": 0.5,
"duration": 2.5,
"emotion": "happy",
"storyboard_id": "019c0361-ea8e-7d43-bbcc-edd8eea06bb7"
}
}
]
},
{
"type": "voice",
"name": "配音轨道",
"items": [
{
"id": "019c0361-ea8e-7d43-bbcc-edd8eea06bb7-voiceover-019c0361-ea8e-7d43-bbcc-edd8eea06bbd",
"type": "voice",
"start_time": 0,
"end_time": 5.5,
"data": {
"voiceover_id": "019c0361-ea8e-7d43-bbcc-edd8eea06bbd",
"dialogue_id": "019c0361-ea8e-7d43-bbcc-edd8eea06bbb",
"audio_url": "https://storage.example.com/audio/voice123.mp3",
"status": 2,
"is_active": true,
"voice_id": "zh-CN-XiaoxiaoNeural",
"voice_name": "晓晓(温柔女声)",
"speed": 1.0,
"volume": 1.0,
"pitch": 1.0,
"duration": 2.5,
"file_size": 40960,
"format": "mp3",
"storyboard_id": "019c0361-ea8e-7d43-bbcc-edd8eea06bb7"
}
}
]
},
{
"type": "sound",
"name": "音效轨道",
"items": [
{
"id": "019c0361-ea8e-7d43-bbcc-edd8eea06bb7-sound-019c0361-ea8e-7d43-bbcc-edd8eea06bbe",
"type": "sound",
"start_time": 0,
"end_time": 5.5,
"data": {
"sound_effect_id": "019c0361-ea8e-7d43-bbcc-edd8eea06bbe",
"name": "背景音乐",
"audio_url": "https://storage.example.com/audio/bgm123.mp3",
"duration": 5.5,
"file_size": 102400,
"start_time": 0,
"end_time": 5.5,
"volume": 80,
"fade_in": 0.5,
"fade_out": 0.5,
"status": "completed",
"ai_job_id": null,
"storyboard_id": "019c0361-ea8e-7d43-bbcc-edd8eea06bb7"
}
}
]
}
]
},
"timestamp": "2026-01-29T08:00:00Z"
}
2. 调整分镜顺序
POST /api/v1/projects/{project_id}/storyboard-board/reorder
请求体:
{
"storyboard_orders": {
"019c0361-ea8e-7d43-bbcc-edd8eea06bb7": 0,
"019c0361-ea8e-7d43-bbcc-edd8eea06bb8": 1,
"019c0361-ea8e-7d43-bbcc-edd8eea06bb9": 2
}
}
API 实现:
@router.post(
"/projects/{project_id}/storyboard-board/reorder",
response_model=ApiResponse[StoryboardBoardData]
)
async def reorder_storyboards(
project_id: str,
reorder_data: StoryboardReorder,
current_user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session)
):
"""调整分镜顺序"""
logger.info(
"API: 调整分镜顺序 | 用户: %s | 项目: %s | 分镜数: %d",
current_user.user_id,
project_id,
len(reorder_data.storyboard_orders)
)
service = StoryboardBoardService(session)
storyboard_board_data = await service.reorder_storyboards(
user_id=str(current_user.user_id),
project_id=project_id,
reorder_data=reorder_data
)
return ApiResponse.success_response(
data=storyboard_board_data,
message="分镜顺序调整成功"
)
说明:
- 使用事务批量更新分镜表的
display_order字段 - 应用层验证所有分镜是否存在(无物理外键)
- 返回更新后的完整分镜看板数据
响应示例:
{
"success": true,
"code": 200,
"message": "分镜顺序调整成功",
"data": {
"project_id": "019c0361-ea8e-7d43-bbcc-edd8eea06bb6",
"total_duration": 120.5,
"storyboard_count": 15,
"tracks": [...]
},
"timestamp": "2026-01-29T08:00:00Z"
}
3. 获取指定轨道数据
GET /api/v1/projects/{project_id}/storyboard-board/tracks/{track_type}
路径参数:
track_type:轨道类型(storyboard/resource/video/sound/subtitle/voice)
API 实现:
@router.get(
"/projects/{project_id}/storyboard-board/tracks/{track_type}",
response_model=ApiResponse[List[StoryboardBoardItem]]
)
async def get_track_items(
project_id: str,
track_type: str,
current_user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session)
):
"""获取指定轨道的所有项"""
logger.info(
"API: 获取轨道数据 | 用户: %s | 项目: %s | 轨道: %s",
current_user.user_id,
project_id,
track_type
)
service = StoryboardBoardService(session)
items = await service.get_track_items(
user_id=str(current_user.user_id),
project_id=project_id,
track_type=track_type
)
return ApiResponse.success_response(
data=items,
message=f"{track_type} 轨道数据获取成功"
)
响应示例:
{
"success": true,
"code": 200,
"message": "storyboard 轨道数据获取成功",
"data": [
{
"id": "019c0361-ea8e-7d43-bbcc-edd8eea06bb7",
"type": "storyboard",
"start_time": 0,
"end_time": 5.5,
"data": {...}
}
],
"timestamp": "2026-01-29T08:00:00Z"
}
4. 查询时间范围内的项
GET /api/v1/projects/{project_id}/storyboard-board/items/time-range
查询参数:
start_time:开始时间(秒)end_time:结束时间(秒)track_types(可选):筛选轨道类型
API 实现:
@router.get(
"/projects/{project_id}/storyboard-board/items/time-range",
response_model=ApiResponse[List[StoryboardBoardItem]]
)
async def get_items_by_time_range(
project_id: str,
start_time: float = Query(..., ge=0, description="开始时间(秒)"),
end_time: float = Query(..., gt=0, description="结束时间(秒)"),
track_types: Optional[str] = Query(None, description="轨道类型,逗号分隔"),
current_user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session)
):
"""查询时间范围内的所有项"""
logger.info(
"API: 查询时间范围 | 用户: %s | 项目: %s | 时间: %.2f-%.2fs",
current_user.user_id,
project_id,
start_time,
end_time
)
service = StoryboardBoardService(session)
# 解析轨道类型
track_type_list = None
if track_types:
track_type_list = [t.strip() for t in track_types.split(',')]
items = await service.get_items_by_time_range(
user_id=str(current_user.user_id),
project_id=project_id,
start_time=start_time,
end_time=end_time,
track_types=track_type_list
)
return ApiResponse.success_response(
data=items,
message="时间范围查询成功"
)
响应示例:
{
"success": true,
"code": 200,
"message": "时间范围查询成功",
"data": [
{
"id": "019c0361-ea8e-7d43-bbcc-edd8eea06bb8",
"type": "storyboard",
"start_time": 5.5,
"end_time": 12.0,
"data": {...}
},
{
"id": "019c0361-ea8e-7d43-bbcc-edd8eea06bb8-video-019c0361-ea8e-7d43-bbcc-edd8eea06bb9",
"type": "video",
"start_time": 5.5,
"end_time": 12.0,
"data": {...}
}
],
"timestamp": "2026-01-29T08:00:00Z"
}
Schema 定义
# app/schemas/storyboard_board.py
from pydantic import BaseModel, Field
from typing import List, Optional, Dict, Any
class StoryboardBoardItem(BaseModel):
"""分镜看板项"""
id: str = Field(..., description="项唯一标识")
type: str = Field(..., description="项类型:storyboard/resource/video/sound/subtitle/voice")
start_time: float = Field(..., ge=0, description="开始时间(秒)")
end_time: float = Field(..., gt=0, description="结束时间(秒)")
data: Dict[str, Any] = Field(..., description="项的详细数据")
class Config:
json_schema_extra = {
"example": {
"id": "019c0361-ea8e-7d43-bbcc-edd8eea06bb7",
"type": "storyboard",
"start_time": 0,
"end_time": 5.5,
"data": {
"id": "019c0361-ea8e-7d43-bbcc-edd8eea06bb7",
"title": "开场镜头",
"shot_size": 3,
"camera_movement": 1
}
}
}
class StoryboardBoardTrack(BaseModel):
"""分镜看板轨道"""
type: str = Field(..., description="轨道类型")
name: str = Field(..., description="轨道显示名称")
items: List[StoryboardBoardItem] = Field(default_factory=list, description="轨道项列表")
class Config:
json_schema_extra = {
"example": {
"type": "storyboard",
"name": "分镜轨道",
"items": []
}
}
class StoryboardBoardData(BaseModel):
"""完整分镜看板数据"""
project_id: str = Field(..., description="项目 ID")
tracks: List[StoryboardBoardTrack] = Field(..., description="所有轨道数据")
total_duration: float = Field(..., ge=0, description="项目总时长(秒)")
storyboard_count: int = Field(..., ge=0, description="分镜总数")
class Config:
json_schema_extra = {
"example": {
"project_id": "019c0361-ea8e-7d43-bbcc-edd8eea06bb6",
"tracks": [],
"total_duration": 120.5,
"storyboard_count": 15
}
}
class StoryboardReorder(BaseModel):
"""分镜重新排序"""
storyboard_orders: Dict[str, int] = Field(
...,
description="分镜 ID 到新排序的映射"
)
class Config:
json_schema_extra = {
"example": {
"storyboard_orders": {
"019c0361-ea8e-7d43-bbcc-edd8eea06bb7": 0,
"019c0361-ea8e-7d43-bbcc-edd8eea06bb8": 1,
"019c0361-ea8e-7d43-bbcc-edd8eea06bb9": 2
}
}
}
性能优化
1. 缓存策略
import logging
from functools import lru_cache
from app.core.cache import redis_client
logger = logging.getLogger(__name__)
async def get_storyboard_board_data_cached(
self,
user_id: str,
project_id: str
) -> StoryboardBoardData:
"""带缓存的分镜看板数据获取"""
cache_key = "storyboard-board:%s" % project_id
try:
# 尝试从 Redis 获取
cached_data = await redis_client.get(cache_key)
if cached_data:
logger.debug("分镜看板缓存命中 | 项目: %s", project_id)
return StoryboardBoardData.parse_raw(cached_data)
logger.debug("分镜看板缓存未命中 | 项目: %s", project_id)
# 计算数据
storyboard_board_data = await self.get_storyboard_board_data(user_id, project_id)
# 缓存 5 分钟
await redis_client.setex(
cache_key,
300,
storyboard_board_data.json()
)
logger.debug("分镜看板数据已缓存 | 项目: %s", project_id)
return storyboard_board_data
except Exception as e:
logger.error(
"缓存操作失败 | 项目: %s | 错误: %s",
project_id,
str(e),
exc_info=True
)
# 缓存失败时直接返回数据
return await self.get_storyboard_board_data(user_id, project_id)
2. 分页加载
对于大型项目(100+ 分镜),建议实现分页:
async def get_storyboard_board_data_paginated(
self,
user_id: str,
project_id: str,
page: int = 1,
page_size: int = 50
) -> StoryboardBoardData:
"""分页获取分镜看板数据"""
logger.info(
"分页获取分镜看板 | 项目: %s | 页码: %d | 每页: %d",
project_id,
page,
page_size
)
offset = (page - 1) * page_size
storyboards = await self.storyboard_repo.get_by_project(
project_id=UUID(project_id),
order_by='display_order',
limit=page_size,
offset=offset
)
# 计算分镜看板数据
tracks = self._calculate_storyboard_board_tracks(storyboards)
# 计算总时长(仅当前页)
total_duration = sum(sb.duration for sb in storyboards)
logger.debug(
"分页数据计算完成 | 项目: %s | 分镜数: %d",
project_id,
len(storyboards)
)
return StoryboardBoardData(
project_id=project_id,
tracks=tracks,
total_duration=total_duration,
storyboard_count=len(storyboards)
)
3. 前端虚拟滚动
建议前端使用虚拟滚动库(如 react-window)只渲染可见区域的轨道项。
4. 批量操作优化
# app/repositories/storyboard_repository.py
import logging
from typing import List, Dict
from uuid import UUID
logger = logging.getLogger(__name__)
async def batch_exists(self, storyboard_ids: List[UUID]) -> Dict[UUID, bool]:
"""批量检查分镜是否存在(单次查询)"""
logger.debug("批量检查分镜存在性 | 数量: %d", len(storyboard_ids))
query = select(Storyboard.id).where(
Storyboard.id.in_(storyboard_ids),
Storyboard.deleted_at.is_(None)
)
result = await self.session.execute(query)
existing_ids = {row[0] for row in result.all()}
exists_map = {id: id in existing_ids for id in storyboard_ids}
logger.debug(
"批量检查完成 | 存在: %d | 缺失: %d",
sum(exists_map.values()),
len(storyboard_ids) - sum(exists_map.values())
)
return exists_map
相关文档
文档版本:v3.1
最后更新:2026-02-02
技术栈符合度:✅ 已符合 jointo-tech-stack 规范
变更记录
v3.1 (2026-02-02)
- ✅ 更新资源轨道数据来源:
storyboard_resources→storyboard_items - ✅ 更新对白轨道数据来源:
dialogues→storyboard_dialogues - ✅ 更新配音轨道数据来源:
voiceovers→storyboard_voiceovers - ✅ 资源轨道新增字段:
item_type:元素类型(1=ElementTag, 2=Resource)element_name:元素名称(如"孙悟空")tag_label:标签名称(如"少年")action_description:动作描述spatial_position:画面位置is_visible:是否在画面内display_order、z_index:排序和层级
- ✅ 对白轨道新增字段:
dialogue_type:对白类型(1=normal, 2=inner_monologue, 3=narration)character_name:角色名称(冗余存储)sequence_order:对白顺序start_time、duration:时间信息emotion:情绪标记
- ✅ 配音轨道新增字段:
dialogue_id:所属对白 IDstatus:生成状态(0=pending, 1=processing, 2=completed, 3=failed)is_active:是否为当前激活版本voice_id:TTS 服务的音色 IDspeed、volume、pitch:TTS 参数file_size、format:文件信息
- ✅ 音效轨道新增字段:
file_size:文件大小start_time、end_time:在分镜看板上的时间位置fade_in、fade_out:淡入淡出时长status:生成状态ai_job_id:AI 任务 ID
- ✅ 更新 Service 实现中的数据计算逻辑(配音轨道和音效轨道)
- ✅ 更新 API 响应示例,添加配音轨道和音效轨道的完整字段示例
- ✅ 符合 storyboard-service.md v3.3、storyboard-resource-service.md v1.2 的设计
v3.0 (2026-01-29)