You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

86 KiB

剧本管理服务

文档版本:v2.4
最后更新:2026-01-31


目录

  1. 服务概述
  2. 核心功能
  3. 数据库设计
  4. 服务实现
  5. API 接口
  6. 数据模型

服务概述

剧本管理服务负责处理剧本的创建、编辑、版本管理、审批等业务逻辑。

职责

  • 剧本 CRUD 操作
  • 剧本版本管理
  • 剧本审批流程
  • 角色和场景管理
  • 支持文本输入和文件上传两种方式
  • 文件去重(基于 SHA256 校验和)

核心功能

1. 剧本创建

  • 文本剧本:直接在系统中编写剧本内容
  • 文件剧本:上传 TXT、DOC、DOCX、PDF、RTF、Markdown 等文件
  • 自动计算字数、场景数、角色数
  • 创建初始版本
  • 文件去重(相同文件只存储一次)
  • 自动创建子项目:上传剧本时自动创建关联的子项目(默认启用)

2. 剧本编辑

  • 更新剧本内容
  • 自动创建新版本(内容变化时)
  • 记录变更摘要
  • 保留历史版本

3. 版本管理

  • 查看版本历史
  • 对比版本差异
  • 回滚到历史版本
  • 版本快照

4. 剧本审批

  • 提交审批
  • 审批通过/拒绝
  • 审批记录
  • 状态流转:draftpendingapproved / rejected

5. 角色和场景

  • 添加剧本角色
  • 添加剧本场景
  • 添加剧本道具
  • 角色关系管理
  • 场景顺序管理
  • 道具重要性分类

与 AI Service 集成

Screenplay Service 与 AI Service 集成,实现剧本的智能解析功能。

集成架构

Screenplay Service → AI Service → Celery Worker → AI Model
                  ↓
            Credit Service

剧本解析流程

1. 触发 AI 解析

用户在剧本详情页点击"AI 解析剧本"按钮,前端调用:

POST /api/v1/screenplays/{screenplay_id}/parse

2. Screenplay Service 处理

async def parse_screenplay(
    self,
    user_id: UUID,
    screenplay_id: UUID,
    auto_create_elements: bool = True,
    auto_create_variants: bool = True,
    auto_create_storyboards: bool = True,
    model: str = "gpt-4"
) -> Dict[str, Any]:
    """触发 AI 解析剧本"""
    # 1. 获取剧本内容
    screenplay = await self.repository.get_by_id(screenplay_id)
    if not screenplay:
        raise NotFoundError("剧本不存在")
    
    # 2. 检查权限
    await self._check_project_permission(user_id, screenplay.project_id, 'editor')
    
    # 3. 检查解析状态(防止重复解析)
    if screenplay.parsing_status == 'parsing':
        raise ValidationError("该剧本正在解析中,请稍后再试")
    
    # 4. 更新解析状态
    await self.repository.update(screenplay_id, {'parsing_status': 'parsing'})
    
    # 5. 调用 AI Service
    from app.services.ai_service import AIService
    ai_service = AIService(self.db)
    
    try:
        result = await ai_service.parse_screenplay(
            user_id=user_id,
            screenplay_id=screenplay_id,
            screenplay_content=screenplay.content,
            project_id=screenplay.project_id,
            auto_create_elements=auto_create_elements,
            auto_create_variants=auto_create_variants,
            auto_create_storyboards=auto_create_storyboards,
            model=model
        )
        
        return result
        
    except Exception as e:
        # 解析失败,恢复状态
        await self.repository.update(screenplay_id, {'parsing_status': 'idle'})
        raise

3. AI Service 处理

AI Service 负责:

  1. 检查用户积分
  2. 预扣积分
  3. 创建 AI 任务
  4. 提交 Celery 异步任务
  5. 返回任务 ID

详见 AI Service 文档

4. Celery Worker 处理

Celery Worker 在后台执行:

  1. 调用 AI 模型(GPT-4/Claude 等)
  2. 解析剧本,返回结构化 JSON
  3. 调用 Screenplay Service 存储数据
  4. 调用 Storyboard Service 创建分镜
  5. 更新任务状态
  6. 确认积分消耗

5. 数据自动存储

Screenplay Service 提供数据存储方法供 Celery Worker 调用:

async def store_parsed_elements(
    self,
    screenplay_id: UUID,
    parsed_data: Dict[str, Any]
) -> Dict[str, Any]:
    """存储 AI 解析的剧本元素"""
    async with self.db.begin():  # 使用事务
        try:
            # 1. 存储角色
            character_id_map = {}
            for char_data in parsed_data.get('characters', []):
                character = await self.repository.add_character(
                    screenplay_id, char_data
                )
                character_id_map[char_data['name']] = character.character_id
            
            # 2. 存储场景
            location_id_map = {}
            for location_data in parsed_data.get('locations', []):
                location = await self.repository.add_location(
                    screenplay_id, location_data
                )
                location_id_map[location_data['name']] = location.location_id
            
            # 3. 存储道具
            prop_id_map = {}
            for prop_data in parsed_data.get('props', []):
                prop = await self.repository.add_prop(
                    screenplay_id, prop_data
                )
                prop_id_map[prop_data['name']] = prop.prop_id
            
            # 4. 存储标签
            from app.services.screenplay_tag_service import ScreenplayTagService
            tag_service = ScreenplayTagService(self.db)
            
            tag_id_maps = await tag_service.store_tags(
                screenplay_id,
                parsed_data,
                character_id_map,
                location_id_map,
                prop_id_map
            )
            
            # 5. 更新剧本统计
            await self.repository.update(screenplay_id, {
                'character_count': len(character_id_map),
                'location_count': len(location_id_map),
                'parsing_status': 'completed'
            })
            
            await self.db.commit()
            
            return {
                'character_id_map': character_id_map,
                'location_id_map': location_id_map,
                'prop_id_map': prop_id_map,
                'tag_id_maps': tag_id_maps
            }
            
        except Exception as e:
            await self.db.rollback()
            await self.repository.update(screenplay_id, {'parsing_status': 'failed'})
            raise Exception(f'数据存储失败: {str(e)}')

查询解析结果

用户可以查询剧本的解析结果:

GET /api/v1/screenplays/{screenplay_id}/elements

响应

{
  "screenplay_id": "019d1234-5678-7abc-def0-111111111111",
  "parsing_status": "completed",
  "characters": [
    {
      "character_id": "019d1234-5678-7abc-def0-333333333333",
      "name": "张三",
      "description": "男主角,30岁,程序员",
      "role_type": "main",
      "has_tags": true,
      "tags": [
        {
          "tag_id": "019d1234-5678-7abc-def0-444444444444",
          "tag_key": "youth",
          "tag_label": "少年",
          "description": "15岁的张三"
        }
      ]
    }
  ],
  "scenes": [...],
  "props": [...]
}

错误处理

  1. 积分不足:AI Service 抛出 InsufficientCreditsError,返回 HTTP 402
  2. AI 调用失败:退还积分,更新 parsing_status = 'failed'
  3. 数据存储失败:回滚事务,退还积分,更新 parsing_status = 'failed'
  4. 并发冲突:检查 parsing_status,防止重复解析

相关文档

标签系统集成说明

has_tags 字段维护

剧本元素表(characters/locations/props)包含 has_tags 字段,用于标识元素是否有标签:

-- screenplay_characters 表
has_tags BOOLEAN NOT NULL DEFAULT false

-- screenplay_locations 表
has_tags BOOLEAN NOT NULL DEFAULT false

-- screenplay_props 表
has_tags BOOLEAN NOT NULL DEFAULT false

维护逻辑

  1. 创建标签时ScreenplayTagService.create_tag() 自动设置 has_tags = true
  2. 删除标签时ScreenplayTagService.delete_tag() 检查是否还有其他标签,如果没有则设置 has_tags = false
  3. AI 解析时ScreenplayTagService.store_tags() 批量设置 has_tags = true

查询示例

-- 查询有标签的角色
SELECT character_id, name, description
FROM screenplay_characters
WHERE screenplay_id = '019d1234-5678-7abc-def0-111111111111'
  AND has_tags = true;

-- 查询角色及其标签
SELECT 
    c.character_id,
    c.name,
    c.has_tags,
    t.tag_id,
    t.tag_label,
    t.description
FROM screenplay_characters c
LEFT JOIN screenplay_element_tags t 
    ON t.element_id = c.character_id 
    AND t.element_type = 1
WHERE c.screenplay_id = '019d1234-5678-7abc-def0-111111111111'
ORDER BY c.name, t.display_order;

标签数据结构

AI 解析返回的标签数据结构:

{
  "character_tags": {
    "张三": [
      {
        "tag_key": "youth",
        "tag_label": "少年",
        "description": "15岁的张三,穿着校服",
        "meta_data": {
          "age": 15,
          "clothing": "校服"
        }
      },
      {
        "tag_key": "adult",
        "tag_label": "成年",
        "description": "30岁的张三,身穿西装",
        "meta_data": {
          "age": 30,
          "clothing": "西装"
        }
      }
    ]
  },
  "scene_tags": {
    "花果山": [
      {
        "tag_key": "daytime",
        "tag_label": "白天",
        "description": "阳光明媚的花果山"
      },
      {
        "tag_key": "night",
        "tag_label": "夜晚",
        "description": "月光下的花果山"
      }
    ]
  },
  "prop_tags": {
    "金箍棒": [
      {
        "tag_key": "new",
        "tag_label": "崭新",
        "description": "刚打造的金箍棒"
      }
    ]
  }
}

标签 ID 映射返回

store_parsed_elements() 返回的标签 ID 映射用于后续分镜关联:

{
    'character_id_map': {
        '张三': UUID('019d1234-5678-7abc-def0-111111111111')
    },
    'location_id_map': {
        '花果山': UUID('019d1234-5678-7abc-def0-222222222222')
    },
    'prop_id_map': {
        '金箍棒': UUID('019d1234-5678-7abc-def0-333333333333')
    },
    'tag_id_maps': {
        'character_tags': {
            '张三-youth': UUID('019d1234-5678-7abc-def0-444444444444'),
            '张三-adult': UUID('019d1234-5678-7abc-def0-555555555555')
        },
        'scene_tags': {
            '花果山-daytime': UUID('019d1234-5678-7abc-def0-666666666666'),
            '花果山-night': UUID('019d1234-5678-7abc-def0-777777777777')
        },
        'prop_tags': {
            '金箍棒-new': UUID('019d1234-5678-7abc-def0-888888888888')
        }
    }
}

这些映射会传递给 StoryboardService.create_storyboards_from_ai() 用于建立分镜与标签的关联。


数据库设计

screenplays 表结构

CREATE TABLE screenplays (
    screenplay_id UUID PRIMARY KEY,
    project_id UUID NOT NULL,
    name TEXT NOT NULL,
    type SMALLINT NOT NULL,

    -- 文本剧本字段
    content TEXT, -- 剧本文本内容(Markdown/纯文本)

    -- 文件剧本字段
    file_url TEXT,
    file_size BIGINT,
    mime_type TEXT,
    checksum TEXT, -- SHA256 校验和,配合 file_checksums 表去重
    storage_path TEXT,

    -- 元数据
    version INTEGER NOT NULL DEFAULT 1, -- 版本号
    word_count INTEGER DEFAULT 0, -- 字数统计
    scene_count INTEGER DEFAULT 0, -- 场景数量
    character_count INTEGER DEFAULT 0, -- 角色数量

    -- AI 生成相关
    ai_job_id UUID,
    ai_prompt_id UUID, -- 关联的 AI 提示词模板 ID(应用层保证引用完整性,可选)

    -- 状态管理
    status SMALLINT NOT NULL DEFAULT 1,

    -- 协作字段
    created_by UUID NOT NULL,
    updated_by UUID,
    approved_by UUID,
    approved_at TIMESTAMPTZ,

    -- 审计字段
    created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
    updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
    deleted_at TIMESTAMPTZ,

    -- 约束:content 和 file_url 二选一
    CONSTRAINT screenplays_content_check CHECK (
        (type = 1 AND content IS NOT NULL AND file_url IS NULL) OR
        (type = 2 AND file_url IS NOT NULL AND content IS NULL)
    )
);

-- 索引
CREATE INDEX idx_screenplays_project_id ON screenplays (project_id) WHERE deleted_at IS NULL;
CREATE INDEX idx_screenplays_type ON screenplays (type) WHERE deleted_at IS NULL;
CREATE INDEX idx_screenplays_status ON screenplays (status) WHERE deleted_at IS NULL;
CREATE INDEX idx_screenplays_created_by ON screenplays (created_by) WHERE deleted_at IS NULL;
CREATE INDEX idx_screenplays_ai_job_id ON screenplays (ai_job_id) WHERE ai_job_id IS NOT NULL;
CREATE INDEX idx_screenplays_ai_prompt_id ON screenplays (ai_prompt_id) WHERE ai_prompt_id IS NOT NULL;
CREATE INDEX idx_screenplays_checksum ON screenplays (checksum) WHERE checksum IS NOT NULL;
-- 全文搜索索引(剧本名称和内容)
CREATE INDEX idx_screenplays_name_trgm ON screenplays USING GIN (name gin_trgm_ops) WHERE deleted_at IS NULL;
CREATE INDEX idx_screenplays_content_trgm ON screenplays USING GIN (content gin_trgm_ops) WHERE deleted_at IS NULL AND content IS NOT NULL;

-- 触发器
CREATE TRIGGER update_screenplays_updated_at
    BEFORE UPDATE ON screenplays
    FOR EACH ROW
    EXECUTE FUNCTION update_updated_at_column();

-- 列注释
COMMENT ON COLUMN screenplays.screenplay_id IS '剧本唯一标识符(UUID v7)';
COMMENT ON COLUMN screenplays.project_id IS '所属项目ID';
COMMENT ON COLUMN screenplays.name IS '剧本名称';
COMMENT ON COLUMN screenplays.type IS '剧本类型:1=text(文本), 2=file(文件)';
COMMENT ON COLUMN screenplays.content IS '剧本文本内容(Markdown/纯文本),仅文本剧本使用';
COMMENT ON COLUMN screenplays.file_url IS '文件访问URL,仅文件剧本使用';
COMMENT ON COLUMN screenplays.file_size IS '文件大小(字节)';
COMMENT ON COLUMN screenplays.mime_type IS '文件MIME类型';
COMMENT ON COLUMN screenplays.checksum IS '文件SHA256校验和,用于去重';
COMMENT ON COLUMN screenplays.storage_path IS '文件存储路径';
COMMENT ON COLUMN screenplays.version IS '当前版本号';
COMMENT ON COLUMN screenplays.word_count IS '字数统计';
COMMENT ON COLUMN screenplays.scene_count IS '场景数量';
COMMENT ON COLUMN screenplays.character_count IS '角色数量';
COMMENT ON COLUMN screenplays.ai_job_id IS '关联的AI任务ID';
COMMENT ON COLUMN screenplays.ai_prompt_id IS '关联的AI提示词模板ID(应用层保证引用完整性,可选)';
COMMENT ON COLUMN screenplays.status IS '剧本状态:1=draft(草稿), 2=review(审核中), 3=approved(已批准), 4=archived(已归档)';
COMMENT ON COLUMN screenplays.created_by IS '创建者用户ID';
COMMENT ON COLUMN screenplays.updated_by IS '最后更新者用户ID';
COMMENT ON COLUMN screenplays.approved_by IS '审批者用户ID';
COMMENT ON COLUMN screenplays.approved_at IS '审批时间';
COMMENT ON COLUMN screenplays.created_at IS '创建时间';
COMMENT ON COLUMN screenplays.updated_at IS '最后更新时间';
COMMENT ON COLUMN screenplays.deleted_at IS '软删除时间';

screenplay_versions 表结构

CREATE TABLE screenplay_versions (
    version_id UUID PRIMARY KEY,
    screenplay_id UUID NOT NULL,
    version_number INTEGER NOT NULL,
    content_snapshot TEXT, -- 内容快照
    change_summary TEXT, -- 变更摘要
    word_count INTEGER DEFAULT 0,
    scene_count INTEGER DEFAULT 0,
    character_count INTEGER DEFAULT 0,
    created_by UUID NOT NULL,
    created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
    CONSTRAINT screenplay_versions_unique UNIQUE (screenplay_id, version_number)
);

-- 索引
CREATE INDEX idx_screenplay_versions_screenplay_id ON screenplay_versions (screenplay_id);
CREATE INDEX idx_screenplay_versions_created_at ON screenplay_versions (created_at);
CREATE INDEX idx_screenplay_versions_created_by ON screenplay_versions (created_by);

-- 列注释
COMMENT ON COLUMN screenplay_versions.version_id IS '版本唯一标识符(UUID v7)';
COMMENT ON COLUMN screenplay_versions.screenplay_id IS '所属剧本ID';
COMMENT ON COLUMN screenplay_versions.version_number IS '版本号';
COMMENT ON COLUMN screenplay_versions.content_snapshot IS '内容快照';
COMMENT ON COLUMN screenplay_versions.change_summary IS '变更摘要';
COMMENT ON COLUMN screenplay_versions.word_count IS '字数统计';
COMMENT ON COLUMN screenplay_versions.scene_count IS '场景数量';
COMMENT ON COLUMN screenplay_versions.character_count IS '角色数量';
COMMENT ON COLUMN screenplay_versions.created_by IS '创建者用户ID';
COMMENT ON COLUMN screenplay_versions.created_at IS '创建时间';

screenplay_characters 表结构

CREATE TABLE screenplay_characters (
    character_id UUID PRIMARY KEY,
    screenplay_id UUID NOT NULL,
    name TEXT NOT NULL, -- 角色名
    description TEXT, -- 角色描述
    character_image_url TEXT, -- 角色形象图片地址
    role_type SMALLINT NOT NULL DEFAULT 2,
    is_offscreen BOOLEAN NOT NULL DEFAULT false, -- 是否为画外音角色(不出现在画面中)
    line_count INTEGER DEFAULT 0, -- 台词数量
    appearance_count INTEGER DEFAULT 0, -- 出场次数
    order_index INTEGER NOT NULL, -- 排序
    has_tags BOOLEAN NOT NULL DEFAULT false, -- 是否有标签
    default_tag_id UUID, -- 默认标签ID(应用层保证引用完整性)
    meta_data JSONB NOT NULL DEFAULT '{}', -- 额外元数据(性格、背景等)
    created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
    updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
    CONSTRAINT screenplay_characters_name_unique UNIQUE (screenplay_id, name) NULLS NOT DISTINCT,
    CONSTRAINT screenplay_characters_order_unique UNIQUE (screenplay_id, order_index) DEFERRABLE INITIALLY DEFERRED
);

-- 索引
CREATE INDEX idx_screenplay_characters_screenplay_id ON screenplay_characters (screenplay_id);
CREATE INDEX idx_screenplay_characters_role_type ON screenplay_characters (role_type);
CREATE INDEX idx_screenplay_characters_order ON screenplay_characters (screenplay_id, order_index);
CREATE INDEX idx_screenplay_characters_name_trgm ON screenplay_characters USING GIN (name gin_trgm_ops);
CREATE INDEX idx_screenplay_characters_default_tag ON screenplay_characters (default_tag_id) WHERE default_tag_id IS NOT NULL;
CREATE INDEX idx_screenplay_characters_offscreen ON screenplay_characters (is_offscreen) WHERE is_offscreen = true;

-- 触发器
CREATE TRIGGER update_screenplay_characters_updated_at
    BEFORE UPDATE ON screenplay_characters
    FOR EACH ROW
    EXECUTE FUNCTION update_updated_at_column();

-- 列注释
COMMENT ON COLUMN screenplay_characters.character_id IS '角色唯一标识符(UUID v7)';
COMMENT ON COLUMN screenplay_characters.screenplay_id IS '所属剧本ID';
COMMENT ON COLUMN screenplay_characters.name IS '角色名称';
COMMENT ON COLUMN screenplay_characters.description IS '角色描述';
COMMENT ON COLUMN screenplay_characters.character_image_url IS '角色形象图片地址';
COMMENT ON COLUMN screenplay_characters.role_type IS '角色类型:1=main(主角), 2=supporting(配角), 3=extra(群演)';
COMMENT ON COLUMN screenplay_characters.is_offscreen IS '是否为画外音角色(不出现在画面中,如纪录片解说员)';
COMMENT ON COLUMN screenplay_characters.line_count IS '台词数量';
COMMENT ON COLUMN screenplay_characters.appearance_count IS '出场次数';
COMMENT ON COLUMN screenplay_characters.order_index IS '排序索引';
COMMENT ON COLUMN screenplay_characters.has_tags IS '是否有标签(年龄段、状态等)';
COMMENT ON COLUMN screenplay_characters.default_tag_id IS '默认标签ID(用于快速预览和时间轴拖拽)';
COMMENT ON COLUMN screenplay_characters.meta_data IS '额外元数据(性格、背景、外貌等)';
COMMENT ON COLUMN screenplay_characters.created_at IS '创建时间';
COMMENT ON COLUMN screenplay_characters.updated_at IS '最后更新时间';

screenplay_locations 表结构

CREATE TABLE screenplay_locations (
    location_id UUID PRIMARY KEY,
    screenplay_id UUID NOT NULL,
    name TEXT NOT NULL, -- 场景名称
    location TEXT, -- 场景地点
    description TEXT, -- 场景描述
    order_index INTEGER NOT NULL, -- 排序
    has_tags BOOLEAN NOT NULL DEFAULT false, -- 是否有标签
    default_tag_id UUID, -- 默认标签ID(应用层保证引用完整性)
    meta_data JSONB NOT NULL DEFAULT '{}', -- 额外元数据
    created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
    updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
    CONSTRAINT screenplay_locations_order_unique UNIQUE (screenplay_id, order_index) DEFERRABLE INITIALLY DEFERRED
);

-- 索引
CREATE INDEX idx_screenplay_locations_screenplay_id ON screenplay_locations (screenplay_id);
CREATE INDEX idx_screenplay_locations_order ON screenplay_locations (screenplay_id, order_index);
CREATE INDEX idx_screenplay_locations_default_tag ON screenplay_locations (default_tag_id) WHERE default_tag_id IS NOT NULL;

-- 触发器
CREATE TRIGGER update_screenplay_locations_updated_at
    BEFORE UPDATE ON screenplay_locations
    FOR EACH ROW
    EXECUTE FUNCTION update_updated_at_column();

-- 列注释
COMMENT ON COLUMN screenplay_locations.location_id IS '场景唯一标识符(UUID v7)';
COMMENT ON COLUMN screenplay_locations.screenplay_id IS '所属剧本ID';
COMMENT ON COLUMN screenplay_locations.name IS '场景名称';
COMMENT ON COLUMN screenplay_locations.location IS '场景地点';
COMMENT ON COLUMN screenplay_locations.description IS '场景描述';
COMMENT ON COLUMN screenplay_locations.order_index IS '排序索引';
COMMENT ON COLUMN screenplay_locations.has_tags IS '是否有标签(时代、氛围等)';
COMMENT ON COLUMN screenplay_locations.default_tag_id IS '默认标签ID(用于快速预览和时间轴拖拽)';
COMMENT ON COLUMN screenplay_locations.meta_data IS '额外元数据';
COMMENT ON COLUMN screenplay_locations.created_at IS '创建时间';
COMMENT ON COLUMN screenplay_locations.updated_at IS '最后更新时间';

screenplay_props 表结构

CREATE TABLE screenplay_props (
    prop_id UUID PRIMARY KEY,
    screenplay_id UUID NOT NULL,
    name TEXT NOT NULL, -- 道具名称
    description TEXT, -- 道具描述
    order_index INTEGER NOT NULL DEFAULT 0, -- 排序索引
    has_tags BOOLEAN NOT NULL DEFAULT false, -- 是否有标签
    default_tag_id UUID, -- 默认标签ID(应用层保证引用完整性)
    meta_data JSONB NOT NULL DEFAULT '{}', -- 额外元数据
    -- meta_data 结构示例: {"material": "金属", "size": "小型", "color": "银色", "function": "开锁", "importance": 2, "appearance_count": 5}
    created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
    updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
    CONSTRAINT screenplay_props_name_unique UNIQUE (screenplay_id, name) NULLS NOT DISTINCT
);

-- 索引
CREATE INDEX idx_screenplay_props_screenplay_id ON screenplay_props (screenplay_id);
CREATE INDEX idx_screenplay_props_order ON screenplay_props (screenplay_id, order_index);
CREATE INDEX idx_screenplay_props_name_trgm ON screenplay_props USING GIN (name gin_trgm_ops);
CREATE INDEX idx_screenplay_props_default_tag ON screenplay_props (default_tag_id) WHERE default_tag_id IS NOT NULL;

-- 触发器
CREATE TRIGGER update_screenplay_props_updated_at
    BEFORE UPDATE ON screenplay_props
    FOR EACH ROW
    EXECUTE FUNCTION update_updated_at_column();

-- 列注释
COMMENT ON COLUMN screenplay_props.prop_id IS '道具唯一标识符(UUID v7)';
COMMENT ON COLUMN screenplay_props.screenplay_id IS '所属剧本ID';
COMMENT ON COLUMN screenplay_props.name IS '道具名称';
COMMENT ON COLUMN screenplay_props.description IS '道具描述';
COMMENT ON COLUMN screenplay_props.order_index IS '排序索引';
COMMENT ON COLUMN screenplay_props.has_tags IS '是否有标签(状态、版本等)';
COMMENT ON COLUMN screenplay_props.default_tag_id IS '默认标签ID(用于快速预览和时间轴拖拽)';
COMMENT ON COLUMN screenplay_props.meta_data IS '额外元数据(材质、尺寸、颜色、功能、重要性、出现次数等)';
COMMENT ON COLUMN screenplay_props.created_at IS '创建时间';
COMMENT ON COLUMN screenplay_props.updated_at IS '最后更新时间';

设计说明

  1. 剧本类型:使用 SMALLINT 存储枚举值(1=text, 2=file),对应 Python IntEnum
  2. 文件存储:文件剧本直接在 screenplays 表存储文件信息(file_url, file_size, checksum 等)
  3. 去重机制:checksum 字段配合 file_checksums 表实现全局去重
  4. 约束检查:使用 CHECK 约束确保 content 和 file_url 二选一
  5. 版本管理:screenplay_versions 表记录每个版本的快照
  6. 角色管理:screenplay_characters 表管理剧本角色,使用 SMALLINT 存储角色类型(1=main, 2=supporting, 3=extra)
  7. 场景管理:screenplay_locations 表管理剧本场景(拍摄地点/环境)
  8. 道具管理:screenplay_props 表管理剧本道具,使用 SMALLINT 存储重要性(1=key, 2=normal, 3=background)
  9. 标签系统has_tags 字段标识元素是否有标签(如角色的年龄段、场景的时代等),详见 剧本标签管理服务
  10. 枚举实现:所有枚举字段使用 SMALLINT 存储,Python 层使用 IntEnum 提供类型安全和转换方法
  11. 全文搜索:使用 pg_trgm 扩展支持剧本名称和内容的模糊搜索
  12. 软删除:使用 deleted_at 字段,部分索引排除已删除记录
  13. 审计追踪:created_by、updated_by、approved_by 记录操作者
  14. 元数据扩展:角色、场景、道具的 meta_data 字段支持存储额外的结构化信息(性别、国籍、物种、人物小传等)

枚举值映射表

枚举类型 数值 字符串值 说明
screenplay_type 1 text 文本剧本
2 file 文件剧本
screenplay_status 1 draft 草稿
2 review 审核中
3 approved 已批准
4 archived 已归档
parsing_status 0 idle 空闲
1 pending 待解析
2 parsing 解析中
3 completed 已完成
4 failed 失败
character_role_type 1 main 主角
2 supporting 配角
3 extra 群演
dialogue_type 1 normal 普通对白
2 inner_monologue 内心OS
3 narration 旁白

API 接口

1. 获取剧本角色列表

GET /api/v1/screenplays/{screenplay_id}/characters

查询参数

  • page: 页码(默认 1)
  • page_size: 每页数量(默认 20)
  • role_type: 角色类型过滤(main | supporting | extra,可选)
  • has_tags: 是否有标签(true | false,可选)

响应

{
  "items": [
    {
      "character_id": "019d1234-5678-7abc-def0-333333333333",
      "name": "张三",
      "description": "男主角,30岁,程序员",
      "character_image_url": "https://...",
      "role_type": "main",
      "line_count": 150,
      "appearance_count": 45,
      "has_tags": true,
      "tag_count": 3,
      "meta_data": {
        "gender": "male",
        "age": 30
      },
      "created_at": "2025-01-27T10:00:00Z"
    }
  ],
  "total": 15,
  "page": 1,
  "page_size": 20,
  "total_pages": 1
}

2. 获取剧本场景列表

GET /api/v1/screenplays/{screenplay_id}/locations

查询参数

  • page: 页码(默认 1)
  • page_size: 每页数量(默认 20)
  • has_tags: 是否有标签(true | false,可选)

响应

{
  "items": [
    {
      "location_id": "019d1234-5678-7abc-def0-444444444444",
      "name": "咖啡厅-白天",
      "location": "市中心咖啡厅",
      "description": "温馨的咖啡厅,阳光透过落地窗洒进来",
      "order_index": 1,
      "has_tags": true,
      "tag_count": 2,
      "meta_data": {},
      "created_at": "2025-01-27T10:00:00Z"
    }
  ],
  "total": 25,
  "page": 1,
  "page_size": 20,
  "total_pages": 2
}

3. 获取剧本道具列表

GET /api/v1/screenplays/{screenplay_id}/props

查询参数

  • page: 页码(默认 1)
  • page_size: 每页数量(默认 20)
  • has_tags: 是否有标签(true | false,可选)

响应

{
  "items": [
    {
      "prop_id": "019d1234-5678-7abc-def0-555555555555",
      "name": "古剑",
      "description": "传说中的神剑",
      "order_index": 1,
      "has_tags": true,
      "tag_count": 3,
      "meta_data": {
        "material": "金属",
        "color": "银色",
        "importance": 1,
        "appearance_count": 20
      },
      "created_at": "2025-01-27T10:00:00Z"
    }
  ],
  "total": 30,
  "page": 1,
  "page_size": 20,
  "total_pages": 2
}

4. 获取角色详情

GET /api/v1/characters/{character_id}

响应

{
  "character_id": "019d1234-5678-7abc-def0-333333333333",
  "screenplay_id": "019d1234-5678-7abc-def0-222222222222",
  "name": "张三",
  "description": "男主角,30岁,程序员",
  "character_image_url": "https://...",
  "role_type": "main",
  "line_count": 150,
  "appearance_count": 45,
  "has_tags": true,
  "meta_data": {
    "gender": "male",
    "age": 30,
    "personality": "内向、聪明"
  },
  "created_at": "2025-01-27T10:00:00Z",
  "updated_at": "2025-01-27T10:00:00Z"
}

5. 获取场景详情

GET /api/v1/locations/{location_id}

6. 获取道具详情

GET /api/v1/props/{prop_id}

7. 创建角色

POST /api/v1/screenplays/{screenplay_id}/characters

请求体

{
  "name": "张三",
  "description": "男主角,30岁,程序员",
  "role_type": "main",
  "meta_data": {
    "gender": "male",
    "age": 30
  }
}

8. 更新角色

PATCH /api/v1/characters/{character_id}

9. 删除角色

DELETE /api/v1/characters/{character_id}

服务实现

ScreenplayService 类

# app/services/screenplay_service.py
from typing import List, Optional, Dict, Any
from uuid import UUID
from sqlalchemy.ext.asyncio import AsyncSession
from fastapi import UploadFile
from app.models.screenplay import (
    Screenplay, 
    ScreenplayVersion, 
    ScreenplayCharacter, 
    ScreenplayScene,
    ScreenplayProp,
    ScreenplayType,
    ScreenplayStatus,
    CharacterRoleType,
    TimeOfDay
)
from app.repositories.screenplay_repository import ScreenplayRepository
from app.services.file_storage_service import FileStorageService
from app.schemas.screenplay import ScreenplayCreate, ScreenplayUpdate
from app.core.exceptions import NotFoundError, ValidationError, PermissionError
from app.core.logging import get_logger

logger = get_logger(__name__)

class ScreenplayService:
    def __init__(self, db: AsyncSession):
        self.repository = ScreenplayRepository(db)
        self.file_storage = FileStorageService(db)
        self.db = db

    # 允许的文件类型
    ALLOWED_FILE_TYPES = {
        'text/plain',                           # TXT
        'application/msword',                   # DOC
        'application/vnd.openxmlformats-officedocument.wordprocessingml.document',  # DOCX
        'application/pdf',                      # PDF
        'application/rtf',                      # RTF
        'text/rtf',                             # RTF (alternative MIME type)
        'text/markdown'                         # Markdown (额外支持)
    }

    async def get_screenplays(
        self,
        project_id: UUID,
        user_id: UUID,
        status: Optional[str] = None,
        page: int = 1,
        page_size: int = 20
    ) -> Dict[str, Any]:
        """获取剧本列表"""
        logger.info(
            "获取剧本列表: 项目=%s, 用户=%s, 状态=%s, 页码=%d",
            project_id, user_id, status, page
        )
        
        try:
            # 检查项目权限
            await self._check_project_permission(user_id, project_id, 'viewer')

            screenplays = await self.repository.get_by_project(
                project_id, status, page, page_size
            )

            total = await self.repository.count_by_project(project_id, status)

            logger.info(
                "剧本列表获取成功: 项目=%s, 总数=%d, 当前页=%d",
                project_id, total, page
            )

            return {
                'items': screenplays,
                'total': total,
                'page': page,
                'page_size': page_size,
                'total_pages': (total + page_size - 1) // page_size
            }
        except PermissionError:
            raise
        except Exception as e:
            logger.error(
                "获取剧本列表失败: 项目=%s, 用户=%s, 错误=%s",
                project_id, user_id, str(e),
                exc_info=True
            )
            raise

    async def create_screenplay(
        self,
        user_id: UUID,
        screenplay_data: ScreenplayCreate
    ) -> Screenplay:
        """创建文本剧本"""
        logger.info(
            "创建文本剧本: 用户=%s, 名称=%s, 项目=%s",
            user_id, screenplay_data.name, screenplay_data.project_id
        )
        
        try:
            # 检查项目权限
            await self._check_project_permission(
                user_id, screenplay_data.project_id, 'editor'
            )

            # 验证剧本类型和内容
            if not screenplay_data.content:
                logger.warning(
                    "创建文本剧本失败: 内容为空 (用户=%s, 项目=%s)",
                    user_id, screenplay_data.project_id
                )
                raise ValidationError("文本剧本必须提供内容")

            screenplay = Screenplay(
                project_id=screenplay_data.project_id,
                name=screenplay_data.name,
                type=ScreenplayType.TEXT,
                content=screenplay_data.content,
                status=ScreenplayStatus.DRAFT,
                created_by=user_id,
                updated_by=user_id
            )

            # 计算字数
            screenplay.word_count = len(screenplay.content)

            created_screenplay = await self.repository.create(screenplay)

            # 创建初始版本
            await self._create_version(created_screenplay, user_id, "初始版本")

            logger.info(
                "文本剧本创建成功: ID=%s, 名称=%s, 字数=%d",
                created_screenplay.screenplay_id, created_screenplay.name, created_screenplay.word_count
            )

            return created_screenplay
            
        except (ValidationError, PermissionError):
            raise
        except Exception as e:
            logger.error(
                "创建文本剧本失败: 用户=%s, 项目=%s, 错误=%s",
                user_id, screenplay_data.project_id, str(e),
                exc_info=True
            )
            raise

    async def create_screenplay_from_file(
        self,
        user_id: UUID,
        project_id: UUID,  # 父项目ID
        name: str,
        file: UploadFile,
        auto_create_subproject: bool = True  # 默认自动创建子项目
    ) -> Dict[str, Any]:
        """创建文件剧本(自动创建子项目)"""
        logger.info(
            "创建文件剧本: 用户=%s, 项目=%s, 文件名=%s, 类型=%s, 自动创建子项目=%s",
            user_id, project_id, file.filename, file.content_type, auto_create_subproject
        )
        
        try:
            # 检查父项目权限
            await self._check_project_permission(user_id, project_id, 'editor')

            # 验证文件类型
            if file.content_type not in self.ALLOWED_FILE_TYPES:
                logger.warning(
                    "文件类型不支持: %s (用户=%s, 项目=%s)",
                    file.content_type, user_id, project_id
                )
                raise ValidationError(f"不支持的文件类型: {file.content_type}")

            # 读取文件内容
            content = await file.read()
            file_size = len(content)

            # 验证文件大小
            if file_size > self.MAX_FILE_SIZE:
                logger.warning(
                    "文件大小超限: %.2fMB > %.2fMB (用户=%s, 项目=%s)",
                    file_size / 1024 / 1024, self.MAX_FILE_SIZE / 1024 / 1024,
                    user_id, project_id
                )
                raise ValidationError(
                    f"文件大小超过限制: {file_size / 1024 / 1024:.2f}MB > {self.MAX_FILE_SIZE / 1024 / 1024:.2f}MB"
                )

            # 使用 FileStorageService 上传(带去重)
            logger.debug("上传文件到存储服务: %s", file.filename)
            file_meta = await self.file_storage.upload_file(
                file_content=content,
                filename=file.filename,
                content_type=file.content_type,
                category='screenplay',
                user_id=user_id
            )

            # 创建剧本记录(暂时关联父项目)
            screenplay = Screenplay(
                project_id=project_id,  # 先关联父项目,后续会更新为子项目ID
                name=name,
                type=ScreenplayType.FILE,
                file_url=file_meta.file_url,
                file_size=file_meta.file_size,
                mime_type=file.content_type,
                checksum=file_meta.checksum,
                storage_path=file_meta.storage_path,
                status=ScreenplayStatus.DRAFT,
                created_by=user_id,
                updated_by=user_id
            )

            created_screenplay = await self.repository.create(screenplay)

            # 创建初始版本
            await self._create_version(created_screenplay, user_id, "初始版本")

            # 自动创建子项目(如果启用)
            subproject = None
            if auto_create_subproject:
                logger.info(
                    "自动创建子项目: 剧本=%s, 父项目=%s",
                    created_screenplay.screenplay_id, project_id
                )
                
                from app.services.project_service import ProjectService
                project_service = ProjectService(self.db)
                
                subproject = await project_service.create_subproject(
                    user_id=user_id,
                    parent_project_id=project_id,
                    screenplay_id=created_screenplay.screenplay_id,
                    name=name,  # 子项目名称与剧本同名
                    description=f"基于剧本《{name}》的制作项目"
                )
                
                # 更新剧本的 project_id 为子项目ID
                await self.repository.update(
                    created_screenplay.screenplay_id,
                    {'project_id': subproject.project_id}
                )
                
                # 刷新剧本对象
                created_screenplay = await self.repository.get_by_id(
                    created_screenplay.screenplay_id
                )
                
                logger.info(
                    "子项目创建成功: ID=%s, 名称=%s, 剧本=%s",
                    subproject.project_id, subproject.name, created_screenplay.screenplay_id
                )

            logger.info(
                "文件剧本创建成功: ID=%s, 名称=%s, 文件大小=%.2fMB, 校验和=%s, 子项目=%s",
                created_screenplay.screenplay_id, created_screenplay.name,
                file_size / 1024 / 1024, file_meta.checksum[:8],
                subproject.project_id if subproject else None
            )

            return {
                'screenplay': created_screenplay,
                'subproject': subproject  # 可能为 None
            }
            
        except (ValidationError, PermissionError):
            raise
        except Exception as e:
            logger.error(
                "创建文件剧本失败: 用户=%s, 项目=%s, 文件=%s, 错误=%s",
                user_id, project_id, file.filename, str(e),
                exc_info=True
            )
            raise

    async def update_screenplay(
        self,
        user_id: UUID,
        screenplay_id: UUID,
        screenplay_data: ScreenplayUpdate
    ) -> Screenplay:
        """更新剧本"""
        logger.info(
            "更新剧本: ID=%s, 用户=%s",
            screenplay_id, user_id
        )
        
        try:
            screenplay = await self.repository.get_by_id(screenplay_id)
            if not screenplay:
                logger.warning("剧本不存在: %s (用户=%s)", screenplay_id, user_id)
                raise NotFoundError("剧本不存在")

            # 检查权限
            await self._check_project_permission(user_id, screenplay.project_id, 'editor')

            # 如果内容有变化,创建新版本
            content_changed = False
            if screenplay_data.content and screenplay_data.content != screenplay.content:
                content_changed = True
                logger.debug("剧本内容已变更: %s", screenplay_id)

            # 更新剧本
            update_data = screenplay_data.dict(exclude_unset=True)
            update_data['updated_by'] = user_id

            # 重新计算字数
            if 'content' in update_data and update_data['content']:
                update_data['word_count'] = len(update_data['content'])

            updated_screenplay = await self.repository.update(screenplay_id, update_data)

            # 如果内容变化,创建新版本
            if content_changed:
                updated_screenplay.version += 1
                await self._create_version(
                    updated_screenplay,
                    user_id,
                    screenplay_data.change_summary or "内容更新"
                )
                logger.info(
                    "剧本版本更新: ID=%s, 版本=%d",
                    screenplay_id, updated_screenplay.version
                )

            logger.info(
                "剧本更新成功: ID=%s, 名称=%s",
                updated_screenplay.screenplay_id, updated_screenplay.name
            )

            return updated_screenplay
            
        except (NotFoundError, PermissionError):
            raise
        except Exception as e:
            logger.error(
                "更新剧本失败: ID=%s, 用户=%s, 错误=%s",
                screenplay_id, user_id, str(e),
                exc_info=True
            )
            raise

    async def approve_screenplay(
        self,
        user_id: UUID,
        screenplay_id: UUID
    ) -> Screenplay:
        """审批剧本"""
        logger.info("审批剧本: ID=%s, 审批人=%s", screenplay_id, user_id)
        
        try:
            screenplay = await self.repository.get_by_id(screenplay_id)
            if not screenplay:
                logger.warning("剧本不存在: %s (审批人=%s)", screenplay_id, user_id)
                raise NotFoundError("剧本不存在")

            # 检查权限(需要 owner 权限)
            await self._check_project_permission(user_id, screenplay.project_id, 'owner')

            result = await self.repository.update(screenplay_id, {
                'status': ScreenplayStatus.APPROVED,
                'approved_by': user_id,
                'approved_at': 'now()',
                'updated_by': user_id
            })
            
            logger.info(
                "剧本审批成功: ID=%s, 名称=%s, 审批人=%s",
                screenplay_id, screenplay.name, user_id
            )
            
            return result
            
        except (NotFoundError, PermissionError):
            raise
        except Exception as e:
            logger.error(
                "审批剧本失败: ID=%s, 审批人=%s, 错误=%s",
                screenplay_id, user_id, str(e),
                exc_info=True
            )
            raise

    async def get_screenplay_versions(
        self,
        user_id: UUID,
        screenplay_id: UUID
    ) -> List[ScreenplayVersion]:
        """获取剧本版本历史"""
        logger.info("获取剧本版本历史: ID=%s, 用户=%s", screenplay_id, user_id)
        
        try:
            screenplay = await self.repository.get_by_id(screenplay_id)
            if not screenplay:
                logger.warning("剧本不存在: %s (用户=%s)", screenplay_id, user_id)
                raise NotFoundError("剧本不存在")

            await self._check_project_permission(user_id, screenplay.project_id, 'viewer')

            versions = await self.repository.get_versions(screenplay_id)
            
            logger.info(
                "版本历史获取成功: 剧本=%s, 版本数=%d",
                screenplay_id, len(versions)
            )
            
            return versions
            
        except (NotFoundError, PermissionError):
            raise
        except Exception as e:
            logger.error(
                "获取版本历史失败: 剧本=%s, 用户=%s, 错误=%s",
                screenplay_id, user_id, str(e),
                exc_info=True
            )
            raise

    async def add_character(
        self,
        user_id: UUID,
        screenplay_id: UUID,
        character_data: Dict[str, Any]
    ) -> ScreenplayCharacter:
        """添加剧本角色"""
        logger.info(
            "添加剧本角色: 剧本=%s, 角色名=%s, 用户=%s",
            screenplay_id, character_data.get('name'), user_id
        )
        
        try:
            screenplay = await self.repository.get_by_id(screenplay_id)
            if not screenplay:
                logger.warning("剧本不存在: %s (用户=%s)", screenplay_id, user_id)
                raise NotFoundError("剧本不存在")

            await self._check_project_permission(user_id, screenplay.project_id, 'editor')

            character = await self.repository.add_character(screenplay_id, character_data)
            
            logger.info(
                "角色添加成功: ID=%s, 名称=%s, 剧本=%s",
                character.character_id, character.name, screenplay_id
            )
            
            return character
            
        except (NotFoundError, PermissionError):
            raise
        except Exception as e:
            logger.error(
                "添加角色失败: 剧本=%s, 角色名=%s, 错误=%s",
                screenplay_id, character_data.get('name'), str(e),
                exc_info=True
            )
            raise

    async def add_scene(
        self,
        user_id: UUID,
        screenplay_id: UUID,
        scene_data: Dict[str, Any]
    ) -> ScreenplayScene:
        """添加剧本场景"""
        logger.info(
            "添加剧本场景: 剧本=%s, 场景名称=%s, 用户=%s",
            screenplay_id, scene_data.get('name'), user_id
        )
        
        try:
            screenplay = await self.repository.get_by_id(screenplay_id)
            if not screenplay:
                logger.warning("剧本不存在: %s (用户=%s)", screenplay_id, user_id)
                raise NotFoundError("剧本不存在")

            await self._check_project_permission(user_id, screenplay.project_id, 'editor')

            scene = await self.repository.add_scene(screenplay_id, scene_data)
            
            logger.info(
                "场景添加成功: ID=%s, 名称=%s, 剧本=%s",
                location.location_id, location.name, screenplay_id
            )
            
            return scene
            
        except (NotFoundError, PermissionError):
            raise
        except Exception as e:
            logger.error(
                "添加场景失败: 剧本=%s, 场景名称=%s, 错误=%s",
                screenplay_id, scene_data.get('name'), str(e),
                exc_info=True
            )
            raise

    async def add_prop(
        self,
        user_id: UUID,
        screenplay_id: UUID,
        prop_data: Dict[str, Any]
    ) -> ScreenplayProp:
        """添加剧本道具"""
        logger.info(
            "添加剧本道具: 剧本=%s, 道具名=%s, 用户=%s",
            screenplay_id, prop_data.get('name'), user_id
        )
        
        try:
            screenplay = await self.repository.get_by_id(screenplay_id)
            if not screenplay:
                logger.warning("剧本不存在: %s (用户=%s)", screenplay_id, user_id)
                raise NotFoundError("剧本不存在")

            await self._check_project_permission(user_id, screenplay.project_id, 'editor')

            prop = await self.repository.add_prop(screenplay_id, prop_data)
            
            logger.info(
                "道具添加成功: ID=%s, 名称=%s, 剧本=%s",
                prop.prop_id, prop.name, screenplay_id
            )
            
            return prop
            
        except (NotFoundError, PermissionError):
            raise
        except Exception as e:
            logger.error(
                "添加道具失败: 剧本=%s, 道具名=%s, 错误=%s",
                screenplay_id, prop_data.get('name'), str(e),
                exc_info=True
            )
            raise

    async def set_character_default_tag(
        self,
        user_id: UUID,
        character_id: UUID,
        tag_id: Optional[UUID]
    ) -> Dict[str, Any]:
        """设置角色默认标签
        
        Args:
            user_id: 用户ID
            character_id: 角色ID
            tag_id: 标签ID(None 表示清除默认标签)
        
        Returns:
            包含角色信息和默认缩略图的字典
        """
        logger.info(
            "设置角色默认标签: 角色=%s, 标签=%s, 用户=%s",
            character_id, tag_id, user_id
        )
        
        try:
            # 1. 获取角色
            character = await self.repository.get_character_by_id(character_id)
            if not character:
                logger.warning("角色不存在: %s (用户=%s)", character_id, user_id)
                raise NotFoundError("角色不存在")
            
            # 2. 检查权限
            screenplay = await self.repository.get_by_id(character.screenplay_id)
            await self._check_project_permission(user_id, screenplay.project_id, 'editor')
            
            # 3. 验证标签(如果不是清除操作)
            if tag_id is not None:
                from app.repositories.screenplay_tag_repository import ScreenplayTagRepository
                tag_repo = ScreenplayTagRepository(self.db)
                
                tag = await tag_repo.get_by_id(tag_id)
                if not tag:
                    logger.warning("标签不存在: %s", tag_id)
                    raise NotFoundError("标签不存在")
                
                # 验证标签属于该角色
                if tag.element_id != character_id or tag.element_type != 1:
                    logger.warning(
                        "标签不属于该角色: 标签=%s, 角色=%s",
                        tag_id, character_id
                    )
                    raise ValidationError("标签不属于该角色")
            
            # 4. 更新 default_tag_id
            await self.repository.update_character(
                character_id,
                {'default_tag_id': tag_id}
            )
            
            # 5. 计算默认缩略图
            default_thumbnail_url = None
            if tag_id is not None:
                default_thumbnail_url = await self._get_default_thumbnail(tag_id)
            
            logger.info(
                "角色默认标签设置成功: 角色=%s, 标签=%s, 缩略图=%s",
                character_id, tag_id, default_thumbnail_url
            )
            
            return {
                'character_id': str(character_id),
                'name': character.name,
                'default_tag_id': str(tag_id) if tag_id else None,
                'default_thumbnail_url': default_thumbnail_url
            }
            
        except (NotFoundError, ValidationError, PermissionError):
            raise
        except Exception as e:
            logger.error(
                "设置角色默认标签失败: 角色=%s, 标签=%s, 错误=%s",
                character_id, tag_id, str(e),
                exc_info=True
            )
            raise

    async def set_location_default_tag(
        self,
        user_id: UUID,
        location_id: UUID,
        tag_id: Optional[UUID]
    ) -> Dict[str, Any]:
        """设置场景默认标签"""
        logger.info(
            "设置场景默认标签: 场景=%s, 标签=%s, 用户=%s",
            location_id, tag_id, user_id
        )
        
        try:
            # 1. 获取场景
            location = await self.repository.get_location_by_id(location_id)
            if not location:
                logger.warning("场景不存在: %s (用户=%s)", location_id, user_id)
                raise NotFoundError("场景不存在")
            
            # 2. 检查权限
            screenplay = await self.repository.get_by_id(location.screenplay_id)
            await self._check_project_permission(user_id, screenplay.project_id, 'editor')
            
            # 3. 验证标签(如果不是清除操作)
            if tag_id is not None:
                from app.repositories.screenplay_tag_repository import ScreenplayTagRepository
                tag_repo = ScreenplayTagRepository(self.db)
                
                tag = await tag_repo.get_by_id(tag_id)
                if not tag:
                    logger.warning("标签不存在: %s", tag_id)
                    raise NotFoundError("标签不存在")
                
                # 验证标签属于该场景(element_type = 2)
                if tag.element_id != location_id or tag.element_type != 2:
                    logger.warning(
                        "标签不属于该场景: 标签=%s, 场景=%s",
                        tag_id, location_id
                    )
                    raise ValidationError("标签不属于该场景")
            
            # 4. 更新 default_tag_id
            await self.repository.update_location(
                location_id,
                {'default_tag_id': tag_id}
            )
            
            # 5. 计算默认缩略图
            default_thumbnail_url = None
            if tag_id is not None:
                default_thumbnail_url = await self._get_default_thumbnail(tag_id)
            
            logger.info(
                "场景默认标签设置成功: 场景=%s, 标签=%s, 缩略图=%s",
                location_id, tag_id, default_thumbnail_url
            )
            
            return {
                'location_id': str(location_id),
                'name': location.name,
                'default_tag_id': str(tag_id) if tag_id else None,
                'default_thumbnail_url': default_thumbnail_url
            }
            
        except (NotFoundError, ValidationError, PermissionError):
            raise
        except Exception as e:
            logger.error(
                "设置场景默认标签失败: 场景=%s, 标签=%s, 错误=%s",
                location_id, tag_id, str(e),
                exc_info=True
            )
            raise

    async def set_prop_default_tag(
        self,
        user_id: UUID,
        prop_id: UUID,
        tag_id: Optional[UUID]
    ) -> Dict[str, Any]:
        """设置道具默认标签"""
        logger.info(
            "设置道具默认标签: 道具=%s, 标签=%s, 用户=%s",
            prop_id, tag_id, user_id
        )
        
        try:
            # 1. 获取道具
            prop = await self.repository.get_prop_by_id(prop_id)
            if not prop:
                logger.warning("道具不存在: %s (用户=%s)", prop_id, user_id)
                raise NotFoundError("道具不存在")
            
            # 2. 检查权限
            screenplay = await self.repository.get_by_id(prop.screenplay_id)
            await self._check_project_permission(user_id, screenplay.project_id, 'editor')
            
            # 3. 验证标签(如果不是清除操作)
            if tag_id is not None:
                from app.repositories.screenplay_tag_repository import ScreenplayTagRepository
                tag_repo = ScreenplayTagRepository(self.db)
                
                tag = await tag_repo.get_by_id(tag_id)
                if not tag:
                    logger.warning("标签不存在: %s", tag_id)
                    raise NotFoundError("标签不存在")
                
                # 验证标签属于该道具(element_type = 3)
                if tag.element_id != prop_id or tag.element_type != 3:
                    logger.warning(
                        "标签不属于该道具: 标签=%s, 道具=%s",
                        tag_id, prop_id
                    )
                    raise ValidationError("标签不属于该道具")
            
            # 4. 更新 default_tag_id
            await self.repository.update_prop(
                prop_id,
                {'default_tag_id': tag_id}
            )
            
            # 5. 计算默认缩略图
            default_thumbnail_url = None
            if tag_id is not None:
                default_thumbnail_url = await self._get_default_thumbnail(tag_id)
            
            logger.info(
                "道具默认标签设置成功: 道具=%s, 标签=%s, 缩略图=%s",
                prop_id, tag_id, default_thumbnail_url
            )
            
            return {
                'prop_id': str(prop_id),
                'name': prop.name,
                'default_tag_id': str(tag_id) if tag_id else None,
                'default_thumbnail_url': default_thumbnail_url
            }
            
        except (NotFoundError, ValidationError, PermissionError):
            raise
        except Exception as e:
            logger.error(
                "设置道具默认标签失败: 道具=%s, 标签=%s, 错误=%s",
                prop_id, tag_id, str(e),
                exc_info=True
            )
            raise

    async def _get_default_thumbnail(self, tag_id: UUID) -> Optional[str]:
        """获取标签的默认缩略图
        
        查询该标签下的第一个资源,返回其缩略图 URL
        
        Args:
            tag_id: 标签ID
        
        Returns:
            缩略图 URL,若无资源则返回 None
        """
        from app.repositories.project_resource_repository import ProjectResourceRepository
        resource_repo = ProjectResourceRepository(self.db)
        
        # 查询该标签下的第一个资源(按创建时间排序)
        resources = await resource_repo.get_by_element_tag_id(tag_id, limit=1)
        
        if not resources:
            logger.debug("标签 %s 下无资源", tag_id)
            return None
        
        resource = resources[0]
        # 优先使用 thumbnail_url,若为空则使用 file_url
        thumbnail_url = resource.thumbnail_url or resource.file_url
        
        logger.debug(
            "获取默认缩略图: 标签=%s, 资源=%s, URL=%s",
            tag_id, resource.project_resource_id, thumbnail_url
        )
        
        return thumbnail_url

    async def _create_version(
        self,
        screenplay: Screenplay,
        user_id: UUID,
        change_summary: str
    ) -> ScreenplayVersion:
        """创建剧本版本"""
        logger.debug(
            "创建剧本版本: 剧本=%s, 版本=%d, 摘要=%s",
            screenplay.screenplay_id, screenplay.version, change_summary
        )
        
        version = ScreenplayVersion(
            screenplay_id=screenplay.screenplay_id,
            version_number=screenplay.version,
            content_snapshot=screenplay.content,
            change_summary=change_summary,
            word_count=screenplay.word_count,
            scene_count=screenplay.scene_count,
            character_count=screenplay.character_count,
            created_by=user_id
        )
        return await self.repository.create_version(version)

    async def _check_project_permission(
        self,
        user_id: UUID,
        project_id: UUID,
        required_role: str = 'viewer'
    ) -> None:
        """检查项目权限"""
        from app.repositories.project_repository import ProjectRepository
        project_repo = ProjectRepository(self.db)

        has_permission = await project_repo.check_user_permission(
            user_id, project_id, required_role
        )
        if not has_permission:
            logger.warning(
                "权限不足: 用户=%s, 项目=%s, 需要角色=%s",
                user_id, project_id, required_role
            )
            raise PermissionError("没有权限访问此项目")

API 接口

统一响应格式

所有 API 响应遵循统一格式:

成功响应

{
  "success": true,
  "message": "操作成功",
  "data": {
    // 实际数据
  }
}

错误响应

{
  "success": false,
  "message": "错误描述",
  "error_code": "ERROR_CODE",
  "details": {}  // 可选的详细信息
}

1. 获取剧本列表

GET /api/v1/screenplays?project_id={project_id}

查询参数

  • project_id: 项目 ID(必填)
  • status: 剧本状态(draft | approved
  • page: 页码
  • page_size: 每页数量

2. 创建文本剧本

POST /api/v1/screenplays

请求体

{
  "project_id": 1,
  "name": "第一集剧本",
  "content": "剧本内容..."
}

响应

{
  "success": true,
  "message": "获取成功",
  "data": {
    "screenplay_id": 1,
    "project_id": 1,
    "name": "第一集剧本",
    "type": "text",
    "content": "剧本内容...",
    "version": 1,
    "word_count": 5000,
    "status": "draft",
    "created_at": "2025-01-27T10:00:00Z"
  }
}

2.1 创建剧本

POST /api/v1/screenplays

支持两种创建模式:文件上传文本粘贴。无论哪种方式,后端都会将内容转换为 Markdown 格式存储,并自动创建子项目。


模式 1:文件上传

请求Content-Type: multipart/form-data):

  • project_id: 父项目 ID(必填)
  • name: 剧本名称(必填)
  • file: 剧本文件(必填,支持 TXT、DOC、DOCX、PDF、RTF、Markdown)

示例

curl -X POST "https://api.jointo.ai/api/v1/screenplays" \
  -H "Authorization: Bearer {token}" \
  -F "project_id=019d1234-5678-7abc-def0-111111111111" \
  -F "name=第一集剧本" \
  -F "file=@screenplay.pdf"

模式 2:文本粘贴

请求Content-Type: application/json):

{
  "project_id": "019d1234-5678-7abc-def0-111111111111",
  "name": "第一集剧本",
  "content": "场景1:室内 - 咖啡厅 - 白天\n\n张三坐在窗边..."
}

参数说明

  • project_id: 父项目 ID(必填)
  • name: 剧本名称(必填)
  • content: 剧本文本内容(必填)

统一响应

{
  "success": true,
  "message": "剧本创建成功",
  "data": {
    "screenplay": {
      "screenplay_id": "019d1234-5678-7abc-def0-222222222222",
      "project_id": "019d1234-5678-7abc-def0-333333333333",
      "name": "第一集剧本",
      "type": "file",
      "file_url": "https://storage.jointo.ai/screenplays/abc123.md",
      "file_size": 1024000,
      "mime_type": "text/markdown",
      "checksum": "abc123...",
      "version": 1,
      "status": "draft",
      "created_at": "2025-01-27T10:00:00Z"
    },
    "subproject": {
      "project_id": "019d1234-5678-7abc-def0-333333333333",
      "name": "第一集剧本",
      "parent_project_id": "019d1234-5678-7abc-def0-111111111111",
      "screenplay_id": "019d1234-5678-7abc-def0-222222222222",
      "type": "mine",
      "description": "基于剧本《第一集剧本》的制作项目",
      "created_at": "2025-01-27T10:00:00Z"
    }
  }
}

业务规则

  1. 格式转换

    • 文件上传:TXT/DOC/DOCX/PDF/RTF/Markdown → 统一转换为 Markdown
    • 文本粘贴:直接将文本内容转换为 Markdown 格式
    • 转换后的 Markdown 文件存储到对象存储(MinIO/OSS)
  2. 子项目创建

    • 转换成功后强制创建子项目(无可选参数)
    • 子项目名称与剧本同名
    • 子项目继承父项目的类型(mine/collab)和权限设置
    • 剧本的 project_id 指向子项目
    • 后续分镜也属于该子项目
  3. 文件存储

    • 原始文件(如有)保留备份
    • 返回的 file_url 指向转换后的 .md 文件
    • mime_type 统一为 text/markdown
  4. 错误处理

    • 文件格式不支持 → 400 错误
    • 文件转换失败 → 500 错误(保留原始文件)
    • 父项目不存在 → 404 错误
    • 用户无权限 → 403 错误

3. 更新剧本

PUT /api/v1/screenplays/{screenplay_id}

请求体

{
  "content": "更新后的剧本内容...",
  "change_summary": "修改了第三场戏"
}

4. 审批剧本

POST /api/v1/screenplays/{screenplay_id}/approve

5. 获取版本历史

GET /api/v1/screenplays/{screenplay_id}/versions

响应

{
  "items": [
    {
      "version_number": 2,
      "change_summary": "修改了第三场戏",
      "word_count": 5000,
      "created_at": "2025-01-27T10:00:00Z",
      "created_by": 1
    },
    {
      "version_number": 1,
      "change_summary": "初始版本",
      "word_count": 4800,
      "created_at": "2025-01-26T10:00:00Z",
      "created_by": 1
    }
  ]
}

6. 添加角色

POST /api/v1/screenplays/{screenplay_id}/characters

请求体

{
  "name": "张三",
  "description": "男主角",
  "age": 30,
  "gender": "male"
}

7. 添加场景

POST /api/v1/screenplays/{screenplay_id}/locations

请求体

{
  "location": "咖啡厅",
  "time": "白天",
  "description": "张三和李四在咖啡厅见面"
}

8. 添加道具

POST /api/v1/screenplays/{screenplay_id}/props

请求体

{
  "name": "古剑",
  "description": "一把传说中的宝剑",
  "order_index": 1,
  "meta_data": {
    "material": "玄铁",
    "length": "90cm",
    "weight": "2kg",
    "special_ability": "可以斩断任何物体",
    "importance": 1,
    "appearance_count": 0
  }
}

响应

{
  "prop_id": "019d1234-5678-7abc-def0-123456789abc",
  "screenplay_id": "019d1234-5678-7abc-def0-987654321fed",
  "name": "古剑",
  "description": "一把传说中的宝剑",
  "order_index": 1,
  "has_tags": false,
  "meta_data": {
    "material": "玄铁",
    "length": "90cm",
    "weight": "2kg",
    "special_ability": "可以斩断任何物体",
    "importance": 1,
    "appearance_count": 0
  },
  "created_at": "2025-01-27T10:00:00Z"
}

9. 设置角色默认标签

PUT /api/v1/characters/{character_id}/default-tag

业务场景

  • 用户在中间预览区域查看角色的不同标签(少年/青年/老年)
  • 用户点击"设为默认标签"按钮
  • 左侧资源库面板的角色缩略图自动更新为新默认标签的第一张图片

业务流程时序图

sequenceDiagram
    actor User as 用户
    participant UI as 前端UI
    participant API as API路由层
    participant Service as ScreenplayService
    participant TagRepo as ScreenplayTagRepository
    participant CharRepo as ScreenplayRepository
    participant ResRepo as ProjectResourceRepository
    participant DB as PostgreSQL

    User->>UI: 点击"设为默认标签"
    UI->>API: PUT /api/v1/characters/{id}/default-tag
    API->>Service: set_character_default_tag(user_id, character_id, tag_id)
    
    Service->>CharRepo: get_character_by_id(character_id)
    CharRepo->>DB: SELECT * FROM screenplay_characters
    DB-->>Service: 返回角色数据
    
    Service->>Service: _check_project_permission(user_id, project_id, 'editor')
    
    Service->>TagRepo: get_by_id(tag_id)
    TagRepo->>DB: SELECT * FROM screenplay_element_tags
    DB-->>Service: 返回标签数据
    
    Service->>Service: 验证标签属于该角色
    
    Service->>CharRepo: update_character(character_id, {default_tag_id})
    CharRepo->>DB: UPDATE screenplay_characters SET default_tag_id = ?
    DB-->>Service: 更新成功
    
    Service->>ResRepo: get_by_element_tag_id(tag_id, limit=1)
    ResRepo->>DB: SELECT * FROM project_resources LIMIT 1
    DB-->>Service: 返回第一个资源
    
    Service->>Service: 计算缩略图 URL
    Service-->>API: 返回结果
    API-->>UI: HTTP 200 OK
    UI->>UI: 更新左侧面板缩略图

请求体

{
  "tag_id": "019d1234-5678-7abc-def0-444444444444"
}

响应

{
  "success": true,
  "code": 200,
  "message": "Success",
  "data": {
    "character_id": "019d1234-5678-7abc-def0-333333333333",
    "name": "孙悟空",
    "default_tag_id": "019d1234-5678-7abc-def0-444444444444",
    "default_thumbnail_url": "https://storage.jointo.ai/resources/abc123_thumb.jpg"
  },
  "timestamp": "2026-02-01T12:00:00+00:00"
}

业务规则

  1. 权限要求:用户必须对角色所属项目拥有 editor 权限
  2. 标签验证
    • 标签必须存在
    • 标签必须属于该角色(element_id 匹配且 element_type = 1
  3. 自动计算缩略图
    • 查询该标签下的第一个资源(按 created_at 排序)
    • 优先使用 thumbnail_url,若为空则使用 file_url
    • 若标签下无资源,返回 null
  4. 清除默认标签:传递 {"tag_id": null} 可清除默认标签

10. 设置场景默认标签

PUT /api/v1/locations/{location_id}/default-tag

请求体

{
  "tag_id": "019d1234-5678-7abc-def0-555555555555"
}

响应

{
  "success": true,
  "code": 200,
  "message": "Success",
  "data": {
    "location_id": "019d1234-5678-7abc-def0-444444444444",
    "name": "花果山",
    "default_tag_id": "019d1234-5678-7abc-def0-555555555555",
    "default_thumbnail_url": "https://storage.jointo.ai/resources/def456_thumb.jpg"
  },
  "timestamp": "2026-02-01T12:00:00+00:00"
}

11. 设置道具默认标签

PUT /api/v1/props/{prop_id}/default-tag

请求体

{
  "tag_id": "019d1234-5678-7abc-def0-666666666666"
}

响应

{
  "success": true,
  "code": 200,
  "message": "Success",
  "data": {
    "prop_id": "019d1234-5678-7abc-def0-555555555555",
    "name": "金箍棒",
    "default_tag_id": "019d1234-5678-7abc-def0-666666666666",
    "default_thumbnail_url": "https://storage.jointo.ai/resources/ghi789_thumb.jpg"
  },
  "timestamp": "2026-02-01T12:00:00+00:00"
}

数据模型

Screenplay 模型

# app/models/screenplay.py
from uuid import UUID
from enum import IntEnum
from datetime import datetime, UTC
from sqlalchemy import Column, String, Text, DateTime, ForeignKey, BigInteger, Integer, SmallInteger, Numeric, Boolean
from sqlalchemy.dialects.postgresql import UUID as PG_UUID, JSONB
from sqlalchemy.orm import relationship
from sqlmodel import Field
from app.core.database import Base, generate_uuid_v7

class ScreenplayType(IntEnum):
    """剧本类型枚举"""
    TEXT = 1
    FILE = 2

    @classmethod
    def from_string(cls, value: str) -> int:
        """字符串转数字"""
        mapping = {'text': cls.TEXT, 'file': cls.FILE}
        return mapping.get(value.lower(), cls.TEXT)

    @classmethod
    def to_string(cls, value: int) -> str:
        """数字转字符串"""
        mapping = {cls.TEXT: 'text', cls.FILE: 'file'}
        return mapping.get(value, 'text')


class ScreenplayStatus(IntEnum):
    """剧本状态枚举"""
    DRAFT = 1
    REVIEW = 2
    APPROVED = 3
    ARCHIVED = 4

    @classmethod
    def from_string(cls, value: str) -> int:
        """字符串转数字"""
        mapping = {
            'draft': cls.DRAFT,
            'review': cls.REVIEW,
            'approved': cls.APPROVED,
            'archived': cls.ARCHIVED
        }
        return mapping.get(value.lower(), cls.DRAFT)

    @classmethod
    def to_string(cls, value: int) -> str:
        """数字转字符串"""
        mapping = {
            cls.DRAFT: 'draft',
            cls.REVIEW: 'review',
            cls.APPROVED: 'approved',
            cls.ARCHIVED: 'archived'
        }
        return mapping.get(value, 'draft')


class Screenplay(Base):
    __tablename__ = "screenplays"

    screenplay_id: UUID = Field(
        sa_column=Column(
            PG_UUID(as_uuid=True),
            primary_key=True,
            default=generate_uuid_v7
        )
    )
    project_id: UUID = Field(sa_column=Column(PG_UUID(as_uuid=True), ForeignKey('projects.project_id'), nullable=False))
    name = Column(String(255), nullable=False)
    type = Column(SmallInteger, nullable=False)

    # 文本剧本字段
    content = Column(Text)

    # 文件剧本字段
    file_url = Column(String(500))
    file_size = Column(BigInteger)
    mime_type = Column(String(100))
    checksum = Column(String(64))  # SHA256
    storage_path = Column(String(500))

    # 元数据
    version = Column(Integer, default=1)
    word_count = Column(Integer, default=0)
    scene_count = Column(Integer, default=0)
    character_count = Column(Integer, default=0)

    # AI 生成相关
    ai_job_id: UUID | None = Field(default=None, sa_column=Column(PG_UUID(as_uuid=True), ForeignKey('ai_jobs.ai_job_id')))
    ai_prompt = Column(Text)

    # 状态管理
    status = Column(SmallInteger, nullable=False, default=ScreenplayStatus.DRAFT)

    # 协作字段
    created_by: UUID = Field(sa_column=Column(PG_UUID(as_uuid=True), ForeignKey('users.user_id'), nullable=False))
    updated_by: UUID | None = Field(default=None, sa_column=Column(PG_UUID(as_uuid=True), ForeignKey('users.user_id')))
    approved_by: UUID | None = Field(default=None, sa_column=Column(PG_UUID(as_uuid=True), ForeignKey('users.user_id')))
    approved_at = Column(DateTime)

    # 审计字段
    created_at = Column(DateTime, default=lambda: datetime.now(UTC))
    updated_at = Column(DateTime, default=lambda: datetime.now(UTC), onupdate=lambda: datetime.now(UTC))
    deleted_at = Column(DateTime)

    # 关系
    project = relationship("Project", back_populates="screenplays")
    versions = relationship("ScreenplayVersion", back_populates="screenplay")
    characters = relationship("ScreenplayCharacter", back_populates="screenplay")
    scenes = relationship("ScreenplayScene", back_populates="screenplay")
    props = relationship("ScreenplayProp", back_populates="screenplay")
    ai_job = relationship("AIJob")
    
    @property
    def type_str(self) -> str:
        """获取剧本类型字符串"""
        return ScreenplayType.to_string(self.type)
    
    @property
    def status_str(self) -> str:
        """获取状态字符串"""
        return ScreenplayStatus.to_string(self.status)

ScreenplayVersion 模型

class ScreenplayVersion(Base):
    __tablename__ = "screenplay_versions"

    version_id: UUID = Field(
        sa_column=Column(
            PG_UUID(as_uuid=True),
            primary_key=True,
            default=generate_uuid_v7
        )
    )
    screenplay_id: UUID = Field(sa_column=Column(PG_UUID(as_uuid=True), ForeignKey('screenplays.screenplay_id'), nullable=False))
    version_number = Column(Integer, nullable=False)
    content_snapshot = Column(Text)
    change_summary = Column(String(500))
    word_count = Column(Integer, default=0)
    scene_count = Column(Integer, default=0)
    character_count = Column(Integer, default=0)
    created_by: UUID = Field(sa_column=Column(PG_UUID(as_uuid=True), ForeignKey('users.user_id'), nullable=False))
    created_at = Column(DateTime, default=lambda: datetime.now(UTC))

    # 关系
    screenplay = relationship("Screenplay", back_populates="versions")

ScreenplayCharacter 模型

class CharacterRoleType(IntEnum):
    """角色类型枚举"""
    MAIN = 1
    SUPPORTING = 2
    EXTRA = 3

    @classmethod
    def from_string(cls, value: str) -> int:
        """字符串转数字"""
        mapping = {
            'main': cls.MAIN,
            'supporting': cls.SUPPORTING,
            'extra': cls.EXTRA
        }
        return mapping.get(value.lower(), cls.SUPPORTING)

    @classmethod
    def to_string(cls, value: int) -> str:
        """数字转字符串"""
        mapping = {
            cls.MAIN: 'main',
            cls.SUPPORTING: 'supporting',
            cls.EXTRA: 'extra'
        }
        return mapping.get(value, 'supporting')


class ScreenplayCharacter(Base):
    __tablename__ = "screenplay_characters"

    character_id: UUID = Field(
        sa_column=Column(
            PG_UUID(as_uuid=True),
            primary_key=True,
            default=generate_uuid_v7
        )
    )
    screenplay_id: UUID = Field(sa_column=Column(PG_UUID(as_uuid=True), ForeignKey('screenplays.screenplay_id'), nullable=False))
    name = Column(String(255), nullable=False)
    description = Column(Text)
    character_image_url = Column(String(500))
    role_type = Column(SmallInteger, nullable=False, default=CharacterRoleType.SUPPORTING)
    is_offscreen = Column(Boolean, nullable=False, default=False)
    line_count = Column(Integer, default=0)
    appearance_count = Column(Integer, default=0)
    order_index = Column(Integer, nullable=False)
    has_tags = Column(Boolean, nullable=False, default=False)
    meta_data = Column(JSONB, default={})
    # meta_data 结构示例:
    # {
    #   "gender": "male",
    #   "age": 30,
    #   "nationality": "中国",
    #   "species": "人类",
    #   "biography": "张三是一名30岁的侦探...",
    #   "personality": "冷静、理性、善于推理",
    #   "appearance": "身高180cm,短发,戴眼镜"
    # }
    created_at = Column(DateTime, default=lambda: datetime.now(UTC))
    updated_at = Column(DateTime, default=lambda: datetime.now(UTC), onupdate=lambda: datetime.now(UTC))

    # 关系
    screenplay = relationship("Screenplay", back_populates="characters")
    
    @property
    def role_type_str(self) -> str:
        """获取角色类型字符串"""
        return CharacterRoleType.to_string(self.role_type)

ScreenplayScene 模型

class TimeOfDay(IntEnum):
    """时间段枚举"""
    DAWN = 1
    MORNING = 2
    NOON = 3
    AFTERNOON = 4
    DUSK = 5
    NIGHT = 6

    @classmethod
    def from_string(cls, value: str) -> int:
        """字符串转数字"""
        mapping = {
            'dawn': cls.DAWN,
            'morning': cls.MORNING,
            'noon': cls.NOON,
            'afternoon': cls.AFTERNOON,
            'dusk': cls.DUSK,
            'night': cls.NIGHT
        }
        return mapping.get(value.lower(), cls.MORNING)

    @classmethod
    def to_string(cls, value: int) -> str:
        """数字转字符串"""
        mapping = {
            cls.DAWN: 'dawn',
            cls.MORNING: 'morning',
            cls.NOON: 'noon',
            cls.AFTERNOON: 'afternoon',
            cls.DUSK: 'dusk',
            cls.NIGHT: 'night'
        }
        return mapping.get(value, 'morning')


class ScreenplayLocation(Base):
    __tablename__ = "screenplay_locations"

    location_id: UUID = Field(
        sa_column=Column(
            PG_UUID(as_uuid=True),
            primary_key=True,
            default=generate_uuid_v7
        )
    )
    screenplay_id: UUID = Field(sa_column=Column(PG_UUID(as_uuid=True), ForeignKey('screenplays.screenplay_id'), nullable=False))
    name = Column(String(255), nullable=False)
    location = Column(String(255))
    description = Column(Text)
    order_index = Column(Integer, nullable=False)
    has_tags = Column(Boolean, nullable=False, default=False)
    meta_data = Column(JSONB, default={})
    created_at = Column(DateTime, default=lambda: datetime.now(UTC))
    updated_at = Column(DateTime, default=lambda: datetime.now(UTC), onupdate=lambda: datetime.now(UTC))

    # 关系
    screenplay = relationship("Screenplay", back_populates="scenes")

ScreenplayProp 模型

class ScreenplayProp(Base):
    __tablename__ = "screenplay_props"

    prop_id: UUID = Field(
        sa_column=Column(
            PG_UUID(as_uuid=True),
            primary_key=True,
            default=generate_uuid_v7
        )
    )
    screenplay_id: UUID = Field(sa_column=Column(PG_UUID(as_uuid=True), ForeignKey('screenplays.screenplay_id'), nullable=False))
    name = Column(String(255), nullable=False)
    description = Column(Text)
    order_index = Column(Integer, nullable=False, default=0)
    has_tags = Column(Boolean, nullable=False, default=False)
    meta_data = Column(JSONB, default={})
    # meta_data 结构示例:
    # {
    #   "material": "金属",
    #   "size": "小型",
    #   "color": "银色",
    #   "function": "开锁",
    #   "owner": "张三",
    #   "importance": 1,  # 可选:1=关键, 2=普通, 3=背景
    #   "appearance_count": 5  # 可选:出现次数
    # }
    created_at = Column(DateTime, default=lambda: datetime.now(UTC))
    updated_at = Column(DateTime, default=lambda: datetime.now(UTC), onupdate=lambda: datetime.now(UTC))

    # 关系
    screenplay = relationship("Screenplay", back_populates="props")

---

## 测试规范

### 测试文件结构

tests/ ├── unit/ │ ├── test_screenplay_service.py # Service 层单元测试 │ └── test_screenplay_repository.py # Repository 层单元测试 └── integration/ └── test_screenplay_api.py # API 集成测试


### 单元测试示例

**测试 Service 层**:
```python
# tests/unit/test_screenplay_service.py
import pytest
from unittest.mock import AsyncMock, MagicMock
from uuid import uuid4
from app.services.screenplay_service import ScreenplayService
from app.models.screenplay import Screenplay, ScreenplayType, ScreenplayStatus

@pytest.mark.asyncio
async def test_create_screenplay_success():
    """测试创建文本剧本成功"""
    # Arrange
    mock_db = AsyncMock()
    service = ScreenplayService(mock_db)
    service.repository = AsyncMock()
    
    user_id = uuid4()
    project_id = uuid4()
    screenplay_data = MagicMock(
        project_id=project_id,
        name="测试剧本",
        content="剧本内容"
    )
    
    # Act
    result = await service.create_screenplay(user_id, screenplay_data)
    
    # Assert
    assert result is not None
    service.repository.create.assert_called_once()

集成测试示例

测试 API 端点

# tests/integration/test_screenplay_api.py
import pytest
from httpx import AsyncClient
from app.main import app

@pytest.mark.asyncio
async def test_create_screenplay_api(async_client: AsyncClient, auth_headers):
    """测试创建剧本 API"""
    response = await async_client.post(
        "/api/v1/screenplays",
        json={
            "project_id": "019d1234-5678-7abc-def0-111111111111",
            "name": "测试剧本",
            "content": "剧本内容"
        },
        headers=auth_headers
    )
    
    assert response.status_code == 200
    data = response.json()
    assert data["success"] is True
    assert data["data"]["name"] == "测试剧本"

运行测试

# 运行所有测试
docker exec jointo-server-app pytest

# 运行单元测试
docker exec jointo-server-app pytest tests/unit/

# 运行集成测试
docker exec jointo-server-app pytest tests/integration/

# 运行特定测试文件
docker exec jointo-server-app pytest tests/unit/test_screenplay_service.py

# 生成覆盖率报告
docker exec jointo-server-app pytest --cov=app/services/screenplay_service --cov-report=html

相关文档


变更记录

v2.4 (2026-01-31)

  • 集成子项目功能:
    • 修改 create_screenplay_from_file() 方法(自动创建子项目)
    • 新增 auto_create_subproject 参数(默认 true
    • 剧本的 project_id 指向子项目(而非父项目)
    • 更新 API 响应格式(包含 subproject 字段)
    • 更新核心功能说明(新增"自动创建子项目")

v2.3 (2026-01-30)

  • 道具表结构优化:
    • 移除 category 字段(改用标签系统统一管理分类)
    • 移除 importance 字段(移至 meta_data 可选字段)
    • 移除 appearance_count 字段(移至 meta_data 可选字段)
    • 新增 order_index 字段(支持自定义排序)
    • 删除 PropImportance 枚举类
    • 删除相关索引(idx_screenplay_props_importance, idx_screenplay_props_category
    • 新增 idx_screenplay_props_order 索引
    • 更新 API 接口(移除 importancecategory 查询参数)
    • 更新数据模型和响应示例

v2.2 (2026-01-29)

  • 技术栈合规性修复:
    • 添加日志系统(所有关键操作使用 logger)
    • 修改 UUID 生成方式(应用层 generate_uuid_v7)
    • 修改依赖注入(AsyncSession 替代 Session)
    • 统一时区处理(datetime.now(UTC))
    • 添加错误日志(exc_info=True 记录堆栈)
    • 补充 API 统一响应格式说明
    • 添加测试规范章节
    • 完善类型注解(UUID 类型)
    • 添加 has_tags 字段到角色、场景、道具模型

v2.1 (2026-01-22)

  • 枚举类型重构:
    • 所有 PostgreSQL ENUM 类型改为 SMALLINT + Python IntEnum
    • 数据库层:使用 SMALLINT 存储枚举值
    • Python 模型层:使用 IntEnum 提供类型安全和转换方法
    • API 层:保持使用字符串,向后兼容
    • 添加列注释说明数值映射
    • 添加枚举值映射表
    • 涉及枚举:ScreenplayType, ScreenplayStatus, CharacterRoleType, TimeOfDay

v2.0 (2025-01-27)

  • 重构剧本文件存储架构:
    • 剧本文件直接在 screenplays 表存储(file_url, file_size, checksum 等)
    • 移除 attachment_id 关联
    • 集成 FileStorageService 实现文件上传和去重
    • 新增 create_screenplay_from_file 方法
    • 更新 API 接口(新增 POST /api/v1/screenplays/upload)
    • 更新数据模型(Screenplay 模型)
  • 新增数据库设计章节
  • 扩展剧本状态(draft, review, approved, archived)
  • 统一术语:script → screenplay

v1.0 (2025-01-27)

  • 初始版本

文档版本:v2.4
最后更新:2026-01-31