You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
57 KiB
57 KiB
资源库服务
文档版本:v1.3
最后更新:2026-02-03
符合规范:jointo-tech-stack v1.0
目录
服务概述
资源库服务(ResourceLibraryService)负责为资源库面板提供统一的资源查询接口。该服务整合了剧本元素、标签和项目资源的数据,为前端提供按类型分组的资源列表。
职责
- 获取所有角色列表(从剧本提取,包含有资源和无资源的)
- 获取所有场景列表(从剧本提取,包含有资源和无资源的)
- 获取所有道具列表(从剧本提取,包含有资源和无资源的)
- 获取实拍资源列表(直接从项目资源查询)
- 获取元素的所有标签和资源(可能为空)
- 支持分页和搜索
设计原则
- 数据整合:整合剧本元素、标签和项目资源的数据
- 按类型分组:角色、场景、道具、实拍分别查询
- 应用层关联:通过 screenplay_element_tags 中间表关联剧本元素和项目资源
- UUID v7 主键:应用层生成 UUID v7,非数据库默认值
- 无物理外键:禁止数据库外键约束,应用层保证引用完整性
- 枚举使用 SMALLINT:使用 SMALLINT + Python IntEnum,不使用 PostgreSQL ENUM
- 异步优先:所有数据库操作使用 async/await
与其他服务的关系
| 服务 | 职责 | 关联 |
|---|---|---|
| ScreenplayService | 管理剧本元素(角色/场景/道具) | 提供剧本元素基础数据 |
| ProjectResourceService | 管理项目素材(实际文件) | 提供资源文件数据 |
| ScreenplayTagService | 管理剧本标签(变体) | 提供标签关联数据 |
| ResourceLibraryService | 整合数据,提供资源库面板查询 | 依赖上述三个服务 |
| StoryboardService | 管理分镜与元素的关联(storyboard_items) | 使用资源库服务提供的数据创建关联 |
核心功能
1. 获取所有角色列表
- 从剧本中提取角色(screenplay_characters)
- 关联标签(screenplay_element_tags)
- 关联资源(project_resources)
- 返回所有角色,标记每个角色是否有资源
- 支持分页和搜索
默认标签的作用:
default_tag_id:用户设置的默认标签(如"青年版")- 当用户从资源库拖拽角色到分镜时,自动使用默认标签的资源
- 如果未设置默认标签,使用
display_order=0的标签
2. 获取所有场景列表
- 从剧本中提取场景(screenplay_locations)
- 关联标签(screenplay_element_tags)
- 关联资源(project_resources)
- 返回所有场景,标记每个场景是否有资源
- 支持分页和搜索
3. 获取所有道具列表
- 从剧本中提取道具(screenplay_props)
- 关联标签(screenplay_element_tags)
- 关联资源(project_resources)
- 返回所有道具,标记每个道具是否有资源
- 支持分页和搜索
4. 获取实拍资源列表
- 直接从项目资源查询(project_resources)
- 过滤类型为 footage
- 不关联剧本元素
- 支持分页和搜索
5. 获取元素的所有标签和资源
- 根据元素 ID 查询所有标签
- 每个标签关联对应的资源
- 支持多视图展示(正视图、侧视图等)
服务实现
ResourceLibraryService 类
# app/services/resource_library_service.py
from typing import List, Optional, Dict, Any, TYPE_CHECKING
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func, and_, or_
from sqlalchemy.orm import selectinload
from app.models.screenplay import (
ScreenplayCharacter,
ScreenplayLocation,
ScreenplayProp,
ScreenplayElementTag
)
from app.models.project_resource import ProjectResource
from app.models.screenplay import Screenplay
from app.repositories.screenplay_repository import ScreenplayRepository
from app.repositories.project_resource_repository import ProjectResourceRepository
from app.repositories.screenplay_tag_repository import ScreenplayTagRepository
from app.core.exceptions import NotFoundError, ValidationError
from app.core.logging import get_logger
if TYPE_CHECKING:
from app.services.project_service import ProjectService
logger = get_logger(__name__)
class ResourceLibraryService:
def __init__(
self,
db: AsyncSession,
screenplay_repo: ScreenplayRepository,
project_resource_repo: ProjectResourceRepository,
screenplay_tag_repo: ScreenplayTagRepository,
project_service: 'ProjectService'
):
self.db = db
self.screenplay_repo = screenplay_repo
self.project_resource_repo = project_resource_repo
self.screenplay_tag_repo = screenplay_tag_repo
self.project_service = project_service
# ==================== 角色资源 ====================
async def get_characters(
self,
user_id: str,
project_id: str,
search: Optional[str] = None,
page: int = 1,
page_size: int = 20,
include_subprojects: bool = False
) -> Dict[str, Any]:
"""获取所有角色列表(包含有资源和无资源的)
Args:
user_id: 用户ID
project_id: 项目ID(可以是父项目或子项目)
search: 搜索关键词
page: 页码
page_size: 每页数量
include_subprojects: 是否包含子项目(仅当project_id为父项目时有效)
"""
logger.info(
"获取所有角色列表: user_id=%s, project_id=%s, search=%s, page=%d, page_size=%d, include_subprojects=%s",
user_id, project_id, search, page, page_size, include_subprojects
)
# 检查项目权限
await self._check_project_permission(user_id, project_id, 'viewer')
# 获取项目及其子项目的剧本ID列表
screenplay_ids = await self._get_screenplay_ids(
project_id,
include_subprojects
)
if not screenplay_ids:
return {
'items': [],
'total': 0,
'page': page,
'page_size': page_size,
'total_pages': 0
}
# 构建查询 - 获取所有角色,不限制 has_tags
query = (
select(ScreenplayCharacter)
.options(selectinload(ScreenplayCharacter.tags))
.where(ScreenplayCharacter.screenplay_id.in_(screenplay_ids))
)
# 搜索过滤
if search:
search_pattern = f"%{search}%"
query = query.where(
or_(
ScreenplayCharacter.name.ilike(search_pattern),
ScreenplayCharacter.description.ilike(search_pattern)
)
)
# 排序
query = query.order_by(ScreenplayCharacter.name)
# 分页
offset = (page - 1) * page_size
query = query.offset(offset).limit(page_size)
# 执行查询
result = await self.db.execute(query)
characters = result.scalars().all()
# 获取总数 - 不限制 has_tags
count_query = (
select(func.count())
.select_from(ScreenplayCharacter)
.where(ScreenplayCharacter.screenplay_id.in_(screenplay_ids))
)
if search:
count_query = count_query.where(
or_(
ScreenplayCharacter.name.ilike(search_pattern),
ScreenplayCharacter.description.ilike(search_pattern)
)
)
total_result = await self.db.execute(count_query)
total = total_result.scalar() or 0
# 加载标签和资源
items = []
for character in characters:
item = await self._build_character_with_resources(character)
if item:
items.append(item)
total_pages = (total + page_size - 1) // page_size
logger.info(
"获取所有角色列表成功: count=%d, total=%d",
len(items), total
)
return {
'items': items,
'total': total,
'page': page,
'page_size': page_size,
'total_pages': total_pages
}
async def _build_character_with_resources(
self,
character: ScreenplayCharacter
) -> Dict[str, Any]:
"""构建角色及其标签和资源(支持无资源的角色)"""
# 获取标签和资源
tags = []
default_thumbnail_url = None
if character.tags:
for tag in character.tags:
# 获取该标签的资源
resources = await self.project_resource_repo.get_by_element_tag_id(tag.tag_id)
if not resources:
continue
# 获取第一个资源的缩略图
thumbnail_url = None
if resources:
thumbnail_url = resources[0].thumbnail_url or resources[0].file_url
# 如果是第一个标签(display_order最小),记录缩略图作为默认
if not default_thumbnail_url and tag.display_order == 0:
default_thumbnail_url = thumbnail_url
tags.append({
'tag_id': str(tag.tag_id),
'tag_key': tag.tag_key,
'tag_label': tag.tag_label,
'description': tag.description,
'display_order': tag.display_order,
'resource_count': len(resources),
'thumbnail_url': thumbnail_url
})
return {
'character_id': str(character.character_id),
'screenplay_id': str(character.screenplay_id),
'name': character.name,
'description': character.description,
'role_type': character.role_type,
'has_tags': character.has_tags,
'default_tag_id': str(character.default_tag_id) if character.default_tag_id else None,
'resource_count': sum(tag['resource_count'] for tag in tags),
'default_thumbnail_url': default_thumbnail_url,
'tags': tags
}
# ==================== 场景资源 ====================
async def get_locations(
self,
user_id: str,
project_id: str,
search: Optional[str] = None,
page: int = 1,
page_size: int = 20,
include_subprojects: bool = False
) -> Dict[str, Any]:
"""获取所有场景列表(包含有资源和无资源的)
Args:
user_id: 用户ID
project_id: 项目ID(可以是父项目或子项目)
search: 搜索关键词
page: 页码
page_size: 每页数量
include_subprojects: 是否包含子项目(仅当project_id为父项目时有效)
"""
logger.info(
"获取所有场景列表: user_id=%s, project_id=%s, search=%s, page=%d, page_size=%d, include_subprojects=%s",
user_id, project_id, search, page, page_size, include_subprojects
)
# 检查项目权限
await self._check_project_permission(user_id, project_id, 'viewer')
# 获取项目及其子项目的剧本ID列表
screenplay_ids = await self._get_screenplay_ids(
project_id,
include_subprojects
)
if not screenplay_ids:
return {
'items': [],
'total': 0,
'page': page,
'page_size': page_size,
'total_pages': 0
}
# 构建查询 - 获取所有场景,不限制 has_tags
query = (
select(ScreenplayLocation)
.options(selectinload(ScreenplayLocation.tags))
.where(ScreenplayLocation.screenplay_id.in_(screenplay_ids))
)
# 搜索过滤
if search:
search_pattern = f"%{search}%"
query = query.where(
or_(
ScreenplayLocation.name.ilike(search_pattern),
ScreenplayLocation.description.ilike(search_pattern)
)
)
# 排序
query = query.order_by(ScreenplayLocation.order_index)
# 分页
offset = (page - 1) * page_size
query = query.offset(offset).limit(page_size)
# 执行查询
result = await self.db.execute(query)
locations = result.scalars().all()
# 获取总数 - 不限制 has_tags
count_query = (
select(func.count())
.select_from(ScreenplayLocation)
.where(ScreenplayLocation.screenplay_id.in_(screenplay_ids))
)
if search:
count_query = count_query.where(
or_(
ScreenplayLocation.name.ilike(search_pattern),
ScreenplayLocation.description.ilike(search_pattern)
)
)
total_result = await self.db.execute(count_query)
total = total_result.scalar() or 0
# 加载标签和资源
items = []
for location in locations:
item = await self._build_location_with_resources(location)
if item:
items.append(item)
total_pages = (total + page_size - 1) // page_size
logger.info(
"获取所有场景列表成功: count=%d, total=%d",
len(items), total
)
return {
'items': items,
'total': total,
'page': page,
'page_size': page_size,
'total_pages': total_pages
}
async def _build_location_with_resources(
self,
location: ScreenplayLocation
) -> Dict[str, Any]:
"""构建场景及其标签和资源(支持无资源的场景)"""
# 获取标签和资源
tags = []
default_thumbnail_url = None
if location.tags:
for tag in location.tags:
# 获取该标签的资源
resources = await self.project_resource_repo.get_by_element_tag_id(tag.tag_id)
if not resources:
continue
# 获取第一个资源的缩略图
thumbnail_url = None
if resources:
thumbnail_url = resources[0].thumbnail_url or resources[0].file_url
# 如果是第一个标签(display_order最小),记录缩略图作为默认
if not default_thumbnail_url and tag.display_order == 0:
default_thumbnail_url = thumbnail_url
tags.append({
'tag_id': str(tag.tag_id),
'tag_key': tag.tag_key,
'tag_label': tag.tag_label,
'description': tag.description,
'display_order': tag.display_order,
'resource_count': len(resources),
'thumbnail_url': thumbnail_url
})
return {
'location_id': str(location.location_id),
'screenplay_id': str(location.screenplay_id),
'name': location.name,
'location': location.location,
'description': location.description,
'has_tags': location.has_tags,
'default_tag_id': str(location.default_tag_id) if location.default_tag_id else None,
'resource_count': sum(tag['resource_count'] for tag in tags),
'default_thumbnail_url': default_thumbnail_url,
'tags': tags
}
# ==================== 道具资源 ====================
async def get_props(
self,
user_id: str,
project_id: str,
search: Optional[str] = None,
page: int = 1,
page_size: int = 20,
include_subprojects: bool = False
) -> Dict[str, Any]:
"""获取所有道具列表(包含有资源和无资源的)
Args:
user_id: 用户ID
project_id: 项目ID(可以是父项目或子项目)
search: 搜索关键词
page: 页码
page_size: 每页数量
include_subprojects: 是否包含子项目(仅当project_id为父项目时有效)
"""
logger.info(
"获取所有道具列表: user_id=%s, project_id=%s, search=%s, page=%d, page_size=%d, include_subprojects=%s",
user_id, project_id, search, page, page_size, include_subprojects
)
# 检查项目权限
await self._check_project_permission(user_id, project_id, 'viewer')
# 获取项目及其子项目的剧本ID列表
screenplay_ids = await self._get_screenplay_ids(
project_id,
include_subprojects
)
if not screenplay_ids:
return {
'items': [],
'total': 0,
'page': page,
'page_size': page_size,
'total_pages': 0
}
# 构建查询 - 获取所有道具,不限制 has_tags
query = (
select(ScreenplayProp)
.options(selectinload(ScreenplayProp.tags))
.where(ScreenplayProp.screenplay_id.in_(screenplay_ids))
)
# 搜索过滤
if search:
search_pattern = f"%{search}%"
query = query.where(
or_(
ScreenplayProp.name.ilike(search_pattern),
ScreenplayProp.description.ilike(search_pattern)
)
)
# 排序
query = query.order_by(ScreenplayProp.name)
# 分页
offset = (page - 1) * page_size
query = query.offset(offset).limit(page_size)
# 执行查询
result = await self.db.execute(query)
props = result.scalars().all()
# 获取总数 - 不限制 has_tags
count_query = (
select(func.count())
.select_from(ScreenplayProp)
.where(ScreenplayProp.screenplay_id.in_(screenplay_ids))
)
if search:
count_query = count_query.where(
or_(
ScreenplayProp.name.ilike(search_pattern),
ScreenplayProp.description.ilike(search_pattern)
)
)
total_result = await self.db.execute(count_query)
total = total_result.scalar() or 0
# 加载标签和资源
items = []
for prop in props:
item = await self._build_prop_with_resources(prop)
if item:
items.append(item)
total_pages = (total + page_size - 1) // page_size
logger.info(
"获取所有道具列表成功: count=%d, total=%d",
len(items), total
)
return {
'items': items,
'total': total,
'page': page,
'page_size': page_size,
'total_pages': total_pages
}
async def _build_prop_with_resources(
self,
prop: ScreenplayProp
) -> Dict[str, Any]:
"""构建道具及其标签和资源(支持无资源的道具)"""
# 获取标签和资源
tags = []
default_thumbnail_url = None
if prop.tags:
for tag in prop.tags:
# 获取该标签的资源
resources = await self.project_resource_repo.get_by_element_tag_id(tag.tag_id)
if not resources:
continue
# 获取第一个资源的缩略图
thumbnail_url = None
if resources:
thumbnail_url = resources[0].thumbnail_url or resources[0].file_url
# 如果是第一个标签(display_order最小),记录缩略图作为默认
if not default_thumbnail_url and tag.display_order == 0:
default_thumbnail_url = thumbnail_url
tags.append({
'tag_id': str(tag.tag_id),
'tag_key': tag.tag_key,
'tag_label': tag.tag_label,
'description': tag.description,
'display_order': tag.display_order,
'resource_count': len(resources),
'thumbnail_url': thumbnail_url
})
return {
'prop_id': str(prop.prop_id),
'screenplay_id': str(prop.screenplay_id),
'name': prop.name,
'description': prop.description,
'has_tags': prop.has_tags,
'default_tag_id': str(prop.default_tag_id) if prop.default_tag_id else None,
'resource_count': sum(tag['resource_count'] for tag in tags),
'default_thumbnail_url': default_thumbnail_url,
'tags': tags
}
# ==================== 实拍资源 ====================
async def get_footage_resources(
self,
user_id: str,
project_id: str,
search: Optional[str] = None,
page: int = 1,
page_size: int = 20,
include_subprojects: bool = False
) -> Dict[str, Any]:
"""获取实拍资源列表
Args:
user_id: 用户ID
project_id: 项目ID(可以是父项目或子项目)
search: 搜索关键词
page: 页码
page_size: 每页数量
include_subprojects: 是否包含子项目(仅当project_id为父项目时有效)
"""
logger.info(
"获取实拍资源列表: user_id=%s, project_id=%s, search=%s, page=%d, page_size=%d, include_subprojects=%s",
user_id, project_id, search, page, page_size, include_subprojects
)
# 检查项目权限
await self._check_project_permission(user_id, project_id, 'viewer')
# 获取项目及其子项目的ID列表
project_ids = await self._get_project_ids(
project_id,
include_subprojects
)
if not project_ids:
return {
'items': [],
'total': 0,
'page': page,
'page_size': page_size,
'total_pages': 0
}
# 构建查询
from app.models.project_resource import ProjectResource
from app.enums.resource_type import ResourceType
query = (
select(ProjectResource)
.where(ProjectResource.project_id.in_(project_ids))
.where(ProjectResource.type == ResourceType.FOOTAGE.value)
.where(ProjectResource.deleted_at == None)
)
# 搜索过滤
if search:
search_pattern = f"%{search}%"
query = query.where(ProjectResource.name.ilike(search_pattern))
# 排序
query = query.order_by(ProjectResource.created_at.desc())
# 分页
offset = (page - 1) * page_size
query = query.offset(offset).limit(page_size)
# 执行查询
result = await self.db.execute(query)
resources = result.scalars().all()
# 获取总数
count_query = (
select(func.count())
.select_from(ProjectResource)
.where(ProjectResource.project_id.in_(project_ids))
.where(ProjectResource.type == ResourceType.FOOTAGE.value)
.where(ProjectResource.deleted_at == None)
)
if search:
count_query = count_query.where(ProjectResource.name.ilike(search_pattern))
total_result = await self.db.execute(count_query)
total = total_result.scalar() or 0
# 构建响应
items = []
for resource in resources:
items.append({
'resource_id': str(resource.project_resource_id),
'project_id': str(resource.project_id),
'name': resource.name,
'type': 'footage',
'file_url': resource.file_url,
'thumbnail_url': resource.thumbnail_url,
'file_size': resource.file_size,
'mime_type': resource.mime_type,
'width': resource.width,
'height': resource.height,
'duration': resource.meta_data.get('duration') if resource.meta_data else None,
'meta_data': resource.meta_data,
'created_by': str(resource.created_by),
'created_at': resource.created_at.isoformat()
})
total_pages = (total + page_size - 1) // page_size
logger.info(
"获取实拍资源列表成功: count=%d, total=%d",
len(items), total
)
return {
'items': items,
'total': total,
'page': page,
'page_size': page_size,
'total_pages': total_pages
}
# ==================== 元素标签和资源 ====================
async def get_element_with_tags(
self,
user_id: str,
element_id: str,
element_type: str
) -> Dict[str, Any]:
"""获取元素的所有标签和资源"""
logger.info(
"获取元素的所有标签和资源: user_id=%s, element_id=%s, element_type=%s",
user_id, element_id, element_type
)
# 根据 element_type 获取元素
if element_type == 'character':
element = await self.screenplay_repo.get_character_by_id(element_id)
elif element_type == 'location':
element = await self.screenplay_repo.get_location_by_id(element_id)
elif element_type == 'prop':
element = await self.screenplay_repo.get_prop_by_id(element_id)
else:
logger.warning(
"无效的元素类型: element_type=%s",
element_type
)
raise ValidationError("无效的元素类型: %s" % element_type)
if not element:
logger.warning(
"元素不存在: element_id=%s, element_type=%s",
element_id, element_type
)
raise NotFoundError("元素不存在: %s" % element_id)
# 检查项目权限
await self._check_project_permission(user_id, element.screenplay.project_id, 'viewer')
# 获取标签
tags = await self.screenplay_tag_repo.get_by_element_id(
element_id,
element_type
)
# 构建标签和资源
tag_list = []
for tag in tags:
# 获取该标签的资源
resources = await self.project_resource_repo.get_by_element_tag_id(tag.tag_id)
tag_list.append({
'tag_id': str(tag.tag_id),
'tag_key': tag.tag_key,
'tag_label': tag.tag_label,
'description': tag.description,
'display_order': tag.display_order,
'resources': [
{
'resource_id': str(res.project_resource_id),
'element_tag_id': str(res.element_tag_id) if res.element_tag_id else None,
'name': res.name,
'file_url': res.file_url,
'thumbnail_url': res.thumbnail_url,
'width': res.width,
'height': res.height,
'file_size': res.file_size,
'mime_type': res.mime_type
}
for res in resources
]
})
# 获取元素名称
if element_type == 'character':
element_name = element.name
element_description = element.description
elif element_type == 'location':
element_name = element.name
element_description = element.description
elif element_type == 'prop':
element_name = element.name
element_description = element.description
else:
element_name = ''
element_description = None
return {
'element_id': str(element_id),
'element_name': element_name,
'element_type': element_type,
'element_description': element_description,
'default_tag_id': str(element.default_tag_id) if element.default_tag_id else None,
'tags': tag_list
}
# ==================== 辅助方法 ====================
async def _get_screenplay_ids(
self,
project_id: str,
include_subprojects: bool = False
) -> List[str]:
"""获取项目及其子项目的剧本ID列表
Args:
project_id: 项目ID
include_subprojects: 是否包含子项目
Returns:
剧本ID列表
"""
from app.models.project import Project
# 获取项目信息
project_query = select(Project).where(Project.project_id == project_id)
project_result = await self.db.execute(project_query)
project = project_result.scalar_one_or_none()
if not project:
return []
# 收集剧本ID
screenplay_ids = []
# 如果是子项目,获取关联的剧本
if project.parent_project_id:
if project.screenplay_id:
screenplay_ids.append(project.screenplay_id)
else:
# 如果是父项目
if project.screenplay_id:
screenplay_ids.append(project.screenplay_id)
# 如果需要包含子项目,获取所有子项目的剧本
if include_subprojects:
subprojects_query = (
select(Project.screenplay_id)
.where(Project.parent_project_id == project_id)
.where(Project.screenplay_id.isnot(None))
)
subprojects_result = await self.db.execute(subprojects_query)
subproject_screenplay_ids = subprojects_result.scalars().all()
screenplay_ids.extend(subproject_screenplay_ids)
return screenplay_ids
async def _get_project_ids(
self,
project_id: str,
include_subprojects: bool = False
) -> List[str]:
"""获取项目及其子项目的ID列表
Args:
project_id: 项目ID
include_subprojects: 是否包含子项目
Returns:
项目ID列表
"""
from app.models.project import Project
# 获取项目信息
project_query = select(Project).where(Project.project_id == project_id)
project_result = await self.db.execute(project_query)
project = project_result.scalar_one_or_none()
if not project:
return []
# 收集项目ID
project_ids = [project_id]
# 如果需要包含子项目,获取所有子项目的ID
if include_subprojects and not project.parent_project_id:
subprojects_query = (
select(Project.project_id)
.where(Project.parent_project_id == project_id)
)
subprojects_result = await self.db.execute(subprojects_query)
subproject_ids = subprojects_result.scalars().all()
project_ids.extend(subproject_ids)
return project_ids
async def _check_project_permission(
self,
user_id: str,
project_id: str,
required_permission: str
) -> None:
"""检查项目权限"""
has_permission = await self.project_service.check_user_permission(
user_id, project_id, required_permission
)
if not has_permission:
from app.core.exceptions import PermissionError
raise PermissionError("没有权限访问项目: %s" % project_id)
API 接口
1. 获取所有角色列表
GET /api/v1/projects/{project_id}/characters
查询参数:
search: 搜索关键词(可选)page: 页码(默认 1)page_size: 每页数量(默认 20)include_subprojects: 是否包含子项目资源(默认 false,仅当 project_id 为父项目时有效)
说明:
- 当
project_id为父项目时:include_subprojects=false:只返回父项目的资源include_subprojects=true:返回父项目及其所有子项目的资源
- 当
project_id为子项目时:只返回该子项目的资源,忽略include_subprojects参数
响应(符合统一响应格式):
{
"success": true,
"code": 200,
"message": "Success",
"data": {
"items": [
{
"character_id": "018e1234-5678-7abc-8def-300000000001",
"screenplay_id": "018e1234-5678-7abc-8def-100000000001",
"name": "孙悟空",
"description": "主角,齐天大圣",
"role_type": "main",
"has_tags": true,
"default_tag_id": "tag_char_002",
"resource_count": 8,
"default_thumbnail_url": "https://...",
"tags": [
{
"tag_id": "tag_char_002",
"tag_key": "qingnian",
"tag_label": "青年",
"description": "25-30岁的孙悟空",
"display_order": 1,
"resource_count": 5,
"thumbnail_url": "https://..."
},
{
"tag_id": "tag_char_001",
"tag_key": "shaonian",
"tag_label": "少年",
"description": "15-17岁的孙悟空",
"display_order": 0,
"resource_count": 3,
"thumbnail_url": "https://..."
}
]
},
{
"character_id": "018e1234-5678-7abc-8def-300000000002",
"screenplay_id": "018e1234-5678-7abc-8def-100000000001",
"name": "猪八戒",
"description": "二师兄",
"role_type": "supporting",
"has_tags": false,
"default_tag_id": null,
"resource_count": 0,
"default_thumbnail_url": null,
"tags": []
}
],
"total": 4,
"page": 1,
"page_size": 20,
"total_pages": 1
},
"timestamp": "2026-02-01T12:00:00+00:00"
}
2. 获取所有场景列表
GET /api/v1/projects/{project_id}/locations
查询参数:
search: 搜索关键词(可选)page: 页码(默认 1)page_size: 每页数量(默认 20)include_subprojects: 是否包含子项目资源(默认 false,仅当 project_id 为父项目时有效)
说明:
- 当
project_id为父项目时:include_subprojects=false:只返回父项目的资源include_subprojects=true:返回父项目及其所有子项目的资源
- 当
project_id为子项目时:只返回该子项目的资源,忽略include_subprojects参数
响应:
{
"success": true,
"code": 200,
"message": "Success",
"data": {
"items": [
{
"location_id": "018e1234-5678-7abc-8def-400000000001",
"screenplay_id": "018e1234-5678-7abc-8def-100000000001",
"name": "花果山水帘洞",
"location": "花果山",
"description": "孙悟空的老家",
"has_tags": true,
"default_tag_id": "tag_loc_001",
"resource_count": 8,
"default_thumbnail_url": "https://...",
"tags": [
{
"tag_id": "tag_loc_001",
"tag_key": "baitian",
"tag_label": "白天",
"description": "阳光明媚",
"display_order": 0,
"resource_count": 5,
"thumbnail_url": "https://..."
}
]
}
],
"total": 3,
"page": 1,
"page_size": 20,
"total_pages": 1
},
"timestamp": "2026-02-01T12:00:00+00:00"
}
3. 获取所有道具列表
GET /api/v1/projects/{project_id}/props
查询参数:
search: 搜索关键词(可选)page: 页码(默认 1)page_size: 每页数量(默认 20)include_subprojects: 是否包含子项目资源(默认 false,仅当 project_id 为父项目时有效)
说明:
- 当
project_id为父项目时:include_subprojects=false:只返回父项目的资源include_subprojects=true:返回父项目及其所有子项目的资源
- 当
project_id为子项目时:只返回该子项目的资源,忽略include_subprojects参数
响应:
{
"success": true,
"code": 200,
"message": "Success",
"data": {
"items": [
{
"prop_id": "018e1234-5678-7abc-8def-500000000001",
"screenplay_id": "018e1234-5678-7abc-8def-100000000001",
"name": "金箍棒",
"description": "孙悟空的武器",
"has_tags": true,
"default_tag_id": "tag_prop_001",
"resource_count": 4,
"default_thumbnail_url": "https://...",
"tags": [
{
"tag_id": "tag_prop_001",
"tag_key": "quanxin",
"tag_label": "全新",
"description": "崭新出厂",
"display_order": 0,
"resource_count": 3,
"thumbnail_url": "https://..."
}
]
},
{
"prop_id": "018e1234-5678-7abc-8def-500000000002",
"screenplay_id": "018e1234-5678-7abc-8def-100000000001",
"name": "紫金钵",
"description": "唐僧的钵盂",
"has_tags": false,
"default_tag_id": null,
"resource_count": 0,
"default_thumbnail_url": null,
"tags": []
}
],
"total": 4,
"page": 1,
"page_size": 20,
"total_pages": 1
},
"timestamp": "2026-02-01T12:00:00+00:00"
}
4. 获取实拍资源列表
GET /api/v1/projects/{project_id}/footage-resources
查询参数:
search: 搜索关键词(可选)page: 页码(默认 1)page_size: 每页数量(默认 20)include_subprojects: 是否包含子项目资源(默认 false,仅当 project_id 为父项目时有效)
说明:
- 当
project_id为父项目时:include_subprojects=false:只返回父项目的资源include_subprojects=true:返回父项目及其所有子项目的资源
- 当
project_id为子项目时:只返回该子项目的资源,忽略include_subprojects参数
响应:
{
"success": true,
"code": 200,
"message": "Success",
"data": {
"items": [
{
"resource_id": "res_001",
"project_id": "018e1234-5678-7abc-8def-100000000001",
"name": "航拍视频1",
"type": "footage",
"file_url": "https://...",
"thumbnail_url": "https://...",
"file_size": 1024000,
"mime_type": "video/mp4",
"width": 1920,
"height": 1080,
"duration": 30.5,
"meta_data": {
"resolution": "1920x1080",
"frame_rate": 30
},
"created_by": "user_001",
"created_at": "2026-01-27T10:00:00+00:00"
}
],
"total": 10,
"page": 1,
"page_size": 20,
"total_pages": 1
},
"timestamp": "2026-02-01T12:00:00+00:00"
}
5. 获取元素的所有标签和资源
GET /api/v1/elements/{element_id}/tags-with-resources
查询参数:
element_type: 元素类型(character|location|prop)
响应:
{
"success": true,
"data": {
"element_id": "018e1234-5678-7abc-8def-300000000001",
"element_name": "孙悟空",
"element_type": "character",
"element_description": "主角,齐天大圣",
"default_tag_id": "tag_char_002",
"tags": [
{
"tag_id": "tag_char_002",
"tag_key": "qingnian",
"tag_label": "青年",
"description": "25-30岁的孙悟空",
"display_order": 1,
"resources": [
{
"resource_id": "res_003",
"element_tag_id": "tag_char_002",
"name": "孙悟空-青年版1",
"file_url": "https://...",
"thumbnail_url": "https://...",
"width": 400,
"height": 600,
"file_size": 102400,
"mime_type": "image/jpeg"
}
]
},
{
"tag_id": "tag_char_001",
"tag_key": "shaonian",
"tag_label": "少年",
"description": "15-17岁的孙悟空",
"display_order": 0,
"resources": [
{
"resource_id": "res_001",
"element_tag_id": "tag_char_001",
"name": "孙悟空-少年版1",
"file_url": "https://...",
"thumbnail_url": "https://...",
"width": 400,
"height": 600,
"file_size": 102400,
"mime_type": "image/jpeg"
}
]
}
]
}
}
数据库设计
表关系
screenplay_characters (剧本角色)
↓ (character_id)
screenplay_element_tags (剧本标签,element_type=1)
↓ (tag_id)
project_resources (项目素材,element_tag_id)
screenplay_locations (剧本场景)
↓ (location_id)
screenplay_element_tags (剧本标签,element_type=2)
↓ (tag_id)
project_resources (项目素材,element_tag_id)
screenplay_props (剧本道具)
↓ (prop_id)
screenplay_element_tags (剧本标签,element_type=3)
↓ (tag_id)
project_resources (项目素材,element_tag_id)
索引优化
-- screenplay_element_tags 索引
CREATE INDEX idx_screenplay_element_tags_element_id ON screenplay_element_tags(element_id);
CREATE INDEX idx_screenplay_element_tags_element_type ON screenplay_element_tags(element_type);
CREATE INDEX idx_screenplay_element_tags_tag_id ON screenplay_element_tags(tag_id);
-- project_resources 索引
CREATE INDEX idx_project_resources_element_tag_id ON project_resources(element_tag_id)
WHERE element_tag_id IS NOT NULL;
CREATE INDEX idx_project_resources_type ON project_resources(type);
CREATE INDEX idx_project_resources_project_id ON project_resources(project_id);
数据模型
Pydantic 模型
# app/schemas/resource_library.py
from pydantic import BaseModel, Field
from typing import List, Optional, Dict, Any, Generic, TypeVar
from datetime import datetime
T = TypeVar('T')
class TagWithCount(BaseModel):
tag_id: str
tag_key: str
tag_label: str
description: Optional[str] = None
display_order: int
resource_count: int
thumbnail_url: Optional[str] = None
class CharacterWithResources(BaseModel):
character_id: str
screenplay_id: str
name: str
description: Optional[str] = None
role_type: str
has_tags: bool
default_tag_id: Optional[str] = None
resource_count: int
default_thumbnail_url: Optional[str] = None
tags: List[TagWithCount] = []
class LocationWithResources(BaseModel):
location_id: str
screenplay_id: str
name: str
location: Optional[str] = None
description: Optional[str] = None
has_tags: bool
default_tag_id: Optional[str] = None
resource_count: int
default_thumbnail_url: Optional[str] = None
tags: List[TagWithCount] = []
class PropWithResources(BaseModel):
prop_id: str
screenplay_id: str
name: str
description: Optional[str] = None
has_tags: bool
default_tag_id: Optional[str] = None
resource_count: int
default_thumbnail_url: Optional[str] = None
tags: List[TagWithCount] = []
class ResourceInfo(BaseModel):
resource_id: str
element_tag_id: Optional[str] = None
name: str
file_url: str
thumbnail_url: Optional[str] = None
width: Optional[int] = None
height: Optional[int] = None
file_size: int
mime_type: str
class TagWithResources(BaseModel):
tag_id: str
tag_key: str
tag_label: str
description: Optional[str] = None
display_order: int
resources: List[ResourceInfo] = []
class ElementWithTags(BaseModel):
element_id: str
element_name: str
element_type: str
element_description: Optional[str] = None
default_tag_id: Optional[str] = None
tags: List[TagWithResources] = []
class FootageResource(BaseModel):
resource_id: str
project_id: str
name: str
type: str = 'footage'
file_url: str
thumbnail_url: Optional[str] = None
file_size: int
mime_type: str
width: Optional[int] = None
height: Optional[int] = None
duration: Optional[float] = None
meta_data: Optional[Dict[str, Any]] = None
created_by: str
created_at: datetime
class PaginatedResponse(BaseModel, Generic[T]):
items: List[T]
total: int
page: int
page_size: int
total_pages: int
错误处理
错误码
| 错误码 | 说明 | HTTP 状态码 |
|---|---|---|
PERMISSION_DENIED |
没有权限访问项目 | 403 |
NOT_FOUND |
元素或项目不存在 | 404 |
INVALID_ELEMENT_TYPE |
无效的元素类型 | 400 |
错误响应示例
{
"success": false,
"code": 403,
"message": "没有权限访问项目",
"data": null,
"timestamp": "2026-02-01T12:00:00+00:00"
}
性能优化
1. 查询优化
- 使用
selectinload预加载关联数据 - 使用索引优化 JOIN 查询
- 分页查询避免全表扫描
2. 缓存策略
- 缓存角色/场景/道具列表(5分钟)
- 缓存标签和资源(5分钟)
- 使用 Redis 缓存热门数据
3. 数据库连接池
- 使用 asyncpg 连接池
- 配置合理的连接池大小
- 避免连接泄漏
测试
单元测试
# 运行 ResourceLibraryService 单元测试
docker exec jointo-server-app pytest tests/unit/services/test_resource_library_service.py -v
# 运行特定测试
docker exec jointo-server-app pytest tests/unit/services/test_resource_library_service.py::test_get_characters -v
集成测试
# 运行 ResourceLibraryService 集成测试
docker exec jointo-server-app pytest tests/integration/services/test_resource_library_service.py -v
# 测试资源库查询
docker exec jointo-server-app pytest tests/integration/services/test_resource_library_service.py::test_get_all_resources -v
测试覆盖率
# 生成测试覆盖率报告
docker exec jointo-server-app pytest tests/ --cov=app.services.resource_library_service --cov-report=html
数据库迁移
相关迁移文件
资源库服务依赖以下表的迁移:
# 1. 剧本表(screenplay_characters, screenplay_locations, screenplay_props)
docker exec jointo-server-app alembic revision -m "create_screenplay_tables"
# 2. 剧本标签表(screenplay_element_tags)
docker exec jointo-server-app alembic revision -m "create_screenplay_element_tags_table"
# 3. 项目资源表(project_resources)
docker exec jointo-server-app alembic revision -m "create_project_resources_table"
# 4. 执行所有迁移
docker exec jointo-server-app python scripts/db_migrate.py upgrade
# 5. 验证迁移
docker exec jointo-server-app alembic history
索引优化迁移
# 创建资源库查询优化索引
docker exec jointo-server-app alembic revision -m "add_resource_library_indexes"
# 在迁移文件中添加以下索引:
# CREATE INDEX idx_screenplay_element_tags_element_id ON screenplay_element_tags(element_id);
# CREATE INDEX idx_screenplay_element_tags_element_type ON screenplay_element_tags(element_type);
# CREATE INDEX idx_project_resources_element_tag_id ON project_resources(element_tag_id)
# WHERE element_tag_id IS NOT NULL;
相关文档
与分镜服务的集成
工作流:从资源库添加到分镜
资源库服务与分镜服务的职责清晰分离:
- 资源库服务:提供素材选择界面(Read-Only),展示所有可用的角色/场景/道具/实拍素材
- 分镜服务:管理分镜与素材的关联(CRUD),使用
storyboard_items表存储关联关系 - 前端:协调两个服务完成完整的业务流程
完整流程
1. 用户在资源库面板选择素材
前端调用资源库服务获取角色列表:
GET /api/v1/projects/{project_id}/characters
响应示例:
{
"success": true,
"data": {
"items": [
{
"character_id": "018e1234-5678-7abc-8def-300000000001",
"name": "孙悟空",
"default_tag_id": "tag_char_002",
"tags": [
{
"tag_id": "tag_char_002",
"tag_key": "qingnian",
"tag_label": "青年",
"resource_count": 5,
"thumbnail_url": "https://..."
},
{
"tag_id": "tag_char_001",
"tag_key": "shaonian",
"tag_label": "少年",
"resource_count": 3,
"thumbnail_url": "https://..."
}
]
}
]
}
}
2. 前端调用分镜服务添加元素
用户选择"孙悟空-青年版"后,前端调用分镜服务:
POST /api/v1/storyboards/{storyboard_id}/items
{
"item_type": 1, // 1=Character
"target_id": "018e1234-5678-7abc-8def-300000000001", // 孙悟空的 character_id
"tag_id": "tag_char_002", // 青年标签
"action_description": "大笑", // 可选
"spatial_position": "center", // 可选
"is_visible": true
}
3. StoryboardService 创建关联记录
分镜服务在 storyboard_items 表创建记录:
INSERT INTO storyboard_items (
item_id,
storyboard_id,
item_type,
target_id,
target_name, -- 冗余存储:"孙悟空"
target_cover_url, -- 冗余存储:从资源获取
tag_id,
action_description,
spatial_position,
is_visible,
display_order
) VALUES (
'019d1234-5678-7abc-def0-111111111111',
'019d1234-5678-7abc-def0-999999999999',
1, -- Character
'018e1234-5678-7abc-8def-300000000001',
'孙悟空',
'https://...',
'tag_char_002',
'大笑',
'center',
true,
0
);
数据流向图
sequenceDiagram
participant User as 用户(前端)
participant ResourceLib as ResourceLibraryService<br/>资源库服务
participant DB1 as screenplay_characters<br/>screenplay_element_tags<br/>project_resources
participant StoryboardSvc as StoryboardService<br/>分镜服务
participant DB2 as storyboard_items
Note over ResourceLib: 职责:提供素材选择界面(Read-Only)
User->>ResourceLib: 1. 请求资源列表<br/>(搜索、分页、筛选)
ResourceLib->>DB1: 2. 查询可用素材
DB1-->>ResourceLib: 3. 返回素材数据<br/>(角色/场景/道具/实拍)
ResourceLib-->>User: 4. 展示资源库面板
Note over User: 用户选择素材
User->>StoryboardSvc: 5. 添加素材到分镜<br/>(character_id, tag_id)
Note over StoryboardSvc: 职责:管理分镜与素材的关联(CRUD)
StoryboardSvc->>DB2: 6. 创建 storyboard_items 记录
Note right of DB2: item_type: 1 (Character)<br/>target_id: character_id<br/>tag_id: tag_id<br/>target_name: "孙悟空"(冗余)<br/>target_cover_url: "https://..."(冗余)<br/>action_description: "大笑"<br/>spatial_position: "center"
DB2-->>StoryboardSvc: 7. 返回创建结果
StoryboardSvc-->>User: 8. 更新分镜预览
Note over ResourceLib: 不关心素材是否已被分镜使用
Note over StoryboardSvc: 支持添加/移除/更新分镜元素<br/>查询分镜的所有关联元素
关键设计点
1. 资源库服务不感知 storyboard_items
- 资源库只负责展示可用素材
- 不关心素材是否已被分镜使用
- 不查询
storyboard_items表
原因:
- 职责单一:资源库专注于"展示可用资源"
- 性能优化:避免复杂的 JOIN 查询
- 解耦设计:资源库与分镜服务独立演进
2. 分镜服务依赖资源库数据
创建关联时,分镜服务需要从资源库数据获取:
target_name:元素名称(如"孙悟空")target_cover_url:元素封面(从标签的第一个资源获取)
实现方式:
# StoryboardService.add_element_to_storyboard()
async def add_element_to_storyboard(self, ...):
# 获取目标元素信息(用于冗余字段)
if item_type == ItemType.CHARACTER:
character = await self.screenplay_repo.get_character_by_id(target_id)
target_name = character.name
# 获取封面:优先使用指定标签的资源,否则使用默认标签
if tag_id:
resources = await self.project_resource_repo.get_by_element_tag_id(tag_id)
else:
resources = await self.project_resource_repo.get_by_element_tag_id(
character.default_tag_id
)
target_cover_url = resources[0].thumbnail_url if resources else None
# 创建关联记录
item = StoryboardItem(
storyboard_id=storyboard_id,
item_type=item_type,
target_id=target_id,
target_name=target_name, # 冗余存储
target_cover_url=target_cover_url, # 冗余存储
tag_id=tag_id,
...
)
冗余存储的优势:
- ✅ 查询分镜时无需 JOIN,性能极佳
- ✅ 即使元素名称变更,历史分镜仍显示原名称
- ✅ 前端只需调用一个 API 获取完整数据
3. 前端负责协调两个服务
场景 1:用户从资源库添加元素到分镜
// 1. 获取资源库数据
const { data: characters } = await api.get(`/projects/${projectId}/characters`);
// 2. 用户选择"孙悟空-青年版"
const selectedCharacter = characters.items[0];
const selectedTag = selectedCharacter.tags.find(t => t.tag_id === 'tag_char_002');
// 3. 调用分镜服务添加
await api.post(`/storyboards/${storyboardId}/items`, {
item_type: 1,
target_id: selectedCharacter.character_id,
tag_id: selectedTag.tag_id,
action_description: '大笑',
spatial_position: 'center'
});
场景 2:用户查看分镜的关联元素
// 直接调用分镜服务,无需资源库服务
const { data: items } = await api.get(`/storyboards/${storyboardId}/items`);
// 响应已包含完整信息(冗余字段)
items.forEach(item => {
console.log(item.target_name); // "孙悟空"
console.log(item.target_cover_url); // "https://..."
console.log(item.action_description); // "大笑"
});
默认标签的处理
当用户从资源库拖拽元素到分镜时,前端应自动使用默认标签:
// 获取默认标签
const defaultTag = character.tags.find(
t => t.tag_id === character.default_tag_id
) || character.tags[0]; // 如果未设置默认标签,使用第一个
// 添加到分镜
await api.post(`/storyboards/${storyboardId}/items`, {
item_type: 1,
target_id: character.character_id,
tag_id: defaultTag.tag_id // 使用默认标签
});
数据一致性保证
1. 冗余字段同步
当元素名称或封面变更时,需要异步更新 storyboard_items 表:
# ScreenplayService.update_character()
async def update_character(self, character_id, update_data):
# 更新角色
character = await self.repo.update(character_id, update_data)
# 如果名称变更,异步更新 storyboard_items
if 'name' in update_data:
await self._sync_storyboard_items_name(
item_type=ItemType.CHARACTER,
target_id=character_id,
new_name=update_data['name']
)
return character
2. 级联删除
删除元素时,自动删除所有关联的 storyboard_items 记录:
# ScreenplayService.delete_character()
async def delete_character(self, character_id):
# 删除角色
await self.repo.delete(character_id)
# 删除所有关联的 storyboard_items
await self.storyboard_repo.delete_items_by_target(
item_type=ItemType.CHARACTER,
target_id=character_id
)
总结
| 维度 | 资源库服务 | 分镜服务 |
|---|---|---|
| 数据源 | screenplay_element_tags + project_resources | storyboard_items |
| 职责 | 展示可用素材(Read-Only) | 管理分镜关联(CRUD) |
| 查询性能 | 需要 JOIN 多表 | 单表查询(冗余字段) |
| 使用场景 | 资源库面板、素材选择 | 分镜编辑、时间轴展示 |
| 前端调用 | 获取素材列表 | 添加/移除/查询分镜元素 |
文档版本:v1.3
最后更新:2026-02-03
变更记录
v1.3 (2026-02-03)
- ✅ 修复技术栈合规性:
- 修复日志格式化:统一使用 %-formatting 替代 f-string
- 添加测试指南:单元测试、集成测试、覆盖率测试
- 添加数据库迁移说明:相关迁移文件、索引优化迁移
- ✅ 符合 jointo-tech-stack v1.0 规范:
- 日志系统规范(logging.md)
- 测试规范(testing.md)
- 迁移规范(migration.md)
v1.2 (2026-02-03)
- ✅ 文档规范统一:
- 统一合规状态标识
- 更新版本号和日期
v1.1 (2026-02-01)
- 新增"与分镜服务的集成"章节
- 补充"从资源库添加到分镜"的完整工作流
- 补充数据流向图和关键设计点
- 明确资源库服务与分镜服务的职责边界
- 补充默认标签的处理逻辑
- 补充数据一致性保证方案
v1.0 (2026-02-01)
- 初始版本