You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

107 KiB

ADR 07: LangGraph 剧本智能抽取系统

状态: 已接受
日期: 2026-02-24
决策者: 开发团队


目录

  1. 背景与问题
  2. 决策
  3. 系统架构
  4. 数据库表关系
  5. Mermaid 流程图
  6. 模块设计
  7. LangGraph 状态与节点
  8. 各节点提示词模板
  9. API 接口设计
  10. 数据流说明
  11. 依赖与部署
  12. 迁移策略
  13. 优化建议
  14. 风险与对策
  15. 实施计划

背景与问题

现有方案瓶颈

当前剧本抽取方案(AIService.parse_screenplay)存在以下结构性问题:

问题维度 现状 影响
单次巨型 Prompt 全量剧本内容 + 7 类抽取任务塞入同一 LLM 请求 长剧本(>1 万字)精度显著下降
无依赖链 道具、分镜抽取时无法复用已抽的角色/场景结果 角色名不一致、道具归属错误
无自校验 LLM 输出错误格式或引用不一致时无补偿机制 频繁触发 JSON 解析失败
无断点恢复 Celery 任务失败后只能整体重试 长剧本失败成本高
上下文丢失 场景之间缺乏信息传递 分镜叙事连贯性差

新方案动机

引入 LangGraph 的多节点智能抽取管道:

  • 依赖链管理:角色 → 场景 → 道具 → 分镜 → 对白,每步复用已抽结果
  • 场景级上下文传递:每个场景独立处理,完整文本直接传递,无信息丢失
  • 自校验闭环:分镜/对白抽取后逻辑校验,失败自动重抽(最多 2 次)
  • 节点级断点恢复:LangGraph 状态持久化,支持单节点重跑
  • 分阶段迁移:新方案作为 mode=langgraph 可选模式,不破坏现有接口

决策

采用 LangGraph 作为高质量抽取模式,与现有 Celery 单次 LLM 方案并行运行。

技术选型理由

组件 选型 理由
流程控制 LangGraph 原生支持有状态图、条件边、循环、节点级重试,专为 LLM 工作流设计
文本切片 自实现(正则 + 字符串处理) 轻量级 MD 解析,零额外依赖,完全可控
上下文传递 LangGraph State(内存) 场景文本和抽取结果直接在 State 中传递,无需外部存储
LLM 调用 OpenAI SDK(现有) 复用项目现有 AIProvider,无需额外封装
结构化存储 PostgreSQL(现有) 复用现有 DB,最终 JSON 结果存入现有 screenplay 相关表
任务调度 Celery(现有) LangGraph pipeline 封装为 Celery 任务,复用现有调度基础设施

放弃的方案

  • Qdrant / Chroma / pgvector:剧本已按场景切分,每场景 500-2000 字可直接传 LLM,无需向量检索;State 已缓存抽取结果,无需向量库复用
  • LangChain Agents:相比 LangGraph 缺乏显式状态管理,循环控制较复杂

系统架构

前置说明:MD 转换与 OSS 存储已就位

⚠️ 经代码分析,ScreenplayFileParserServiceupload-and-parse 接口)已完整实现 MD 转换与 OSS 存储链路,LangGraph 方案无需重新实现此模块,直接复用即可。

已实现的上传 & 转换链路(POST /v1/screenplays/upload-and-parse

用户上传文件(MD / TXT / PDF / DOCX / RTF / DOC / PPTX)
    │
    ▼ ScreenplayFileParserService
    ├── TXT / MD       → 直接读取(同步)
    ├── DOCX           → python-docx 解析,Heading 转 MD 标题
    ├── PDF            → pdfplumber + PyMuPDF 去水印,表格转 MD 表格
    ├── RTF            → striprtf 解析
    └── DOC / PPTX     → textract / python-pptx 解析
    │
    ▼ _format_as_markdown()
    标准化为 Markdown 格式(全大写行 → ## 标题,数字行 → ### 子标题)
    │
    ▼ _upload_markdown_file()
    上传至 OSS:screenplays/parsed/{year}/{month}/{day}/{uuid}.md
    │
    ▼ screenplay.file_url = OSS MD URL  ← 唯一事实来源
      screenplay.content = 原始文本     ← 冗余备份
      screenplay.parsing_status = COMPLETED

结论:调用 POST /{screenplay_id}/parse?mode=langgraph 时,screenplay.file_url 已指向 OSS 上的标准 MD 文件。LangGraph Init Node 直接使用此内容切片,无需任何格式转换。


总体架构图

flowchart TB
    subgraph Stage1["阶段 1(已实现):上传 & MD 标准化"]
        Upload["POST /v1/screenplays/upload-and-parse"]
        Parser["ScreenplayFileParserService"]
        Process["任意格式 → 标准 MD → OSS 存储"]
        Upload --> Parser --> Process
    end

    subgraph Stage2["阶段 2(新增):LangGraph 智能抽取"]
        API["POST /{screenplay_id}/parse?mode=langgraph"]
        Download["① 从 screenplay.file_url 下载 MD 内容"]
        CeleryTask["② 提交 Celery Task: parse_screenplay_langgraph_task"]
        API --> Download --> CeleryTask
    end

    subgraph LangGraphWorkflow["ScreenplayExtractionService (LangGraph 工作流)"]
        InitNode["Init Node<br/>正则切片<br/>→ scenes_list (State)"]
        RoleNode["Role Node<br/>全局角色抽取<br/>写 PG + State"]
        SceneLoop["Scene Loop Node<br/>场景迭代调度"]
        
        subgraph SceneIteration["每场景循环"]
            SceneNode["Scene Node<br/>场景+标签抽取<br/>写 PG + State"]
            PropNode["Prop Node<br/>道具抽取<br/>写 PG + State"]
            ShotNode["Shot Node<br/>分镜抽取(含对白)<br/>仅写 State"]
            ValidateNode["Validate Node<br/>全量 Shot 校验<br/>Pass/Fail 路由"]
        end
        
        AggregateNode["Aggregate Node<br/>写 refs/分镜/对白<br/>写 project_resources"]
        
        InitNode --> RoleNode --> SceneLoop
        SceneLoop -->|"有场景"| SceneNode
        SceneNode --> PropNode --> ShotNode --> ValidateNode
        ValidateNode -->|"Pass / 超限"| SceneLoop
        ValidateNode -->|"Fail (retry<2)"| ShotNode
        SceneLoop -->|"全部完成"| AggregateNode
    end

    Stage1 -->|"screenplay.file_url<br/>(OSS MD URL)"| Stage2
    Stage2 -->|"MD 纯文本内容"| LangGraphWorkflow

    style Stage1 fill:#e1f5ff
    style Stage2 fill:#fff4e1
    style LangGraphWorkflow fill:#f0f0f0
    style SceneIteration fill:#e8f5e9

关键数据流向

原始 MD 内容
    │
    ▼ (LlamaIndex 切片,chunk_size=512, overlap=50)
MD Chunks (List[str])
    │
    ▼ (OpenAI text-embedding-3-small / 本地模型)
Raw VectorDB (Chroma Collection: raw_{task_id})
    │
    ├── Role Node 检索全文语义 → LLM 提取角色字典
    │   └──► Result VectorDB 存入角色向量
    │
    ├── Prop Node 检索 Raw + 复用角色向量 → LLM 提取道具
    │   └──► Result VectorDB 追加道具向量
    │
    ├── Shot Node 检索 Raw + 复用角色+场景+道具向量 → LLM 生成分镜
    │   └──► Result VectorDB 追加分镜向量
    │
    └── Shot Node 检索 Raw + 复用角色+场景+道具向量 → LLM 生成分镜(含内嵌对白)
        └──► 场景结果写入 LangGraph State
                │
                ▼ (所有场景完成后)
         Aggregate Node
                │
                ▼
         PostgreSQL (现有 screenplay 表结构)

数据库表关系

涉及数据表总览

剧本解析分两个阶段,分别写入不同的表。

阶段一:上传 & 文件解析(POST /v1/screenplays/upload-and-parse

表名 操作 说明
projects READ / UPDATE 读取父项目信息;创建子项目后回填 screenplay_id
screenplays INSERT 创建剧本主记录,parsing_status=PENDINGfile_url 初始为空
attachments INSERT 存储原始上传文件(related_type=SCREENPLAY, purpose=SOURCE

阶段二:AI 解析结果写入(store_parsed_elements,Celery Worker 调用)

表名 操作 写入内容 归属项目
project_characters UPSERT AI 抽取的角色(按名称去重) 父级项目
project_locations UPSERT AI 抽取的场景(按名称去重) 父级项目
project_props UPSERT AI 抽取的道具(按名称去重) 父级项目
screenplay_element_refs INSERT 剧本与项目元素的逻辑引用关系(多态)
project_element_tags INSERT 元素变体标签(如:孙悟空-青年、花果山-日景) 父级项目
storyboards INSERT 分镜主记录(含 meta_data 冗余存储元素信息) 子项目
storyboard_items INSERT 分镜↔元素标签关联明细(element_tag_id
storyboard_dialogues INSERT 分镜对白记录(可选关联 character_id
project_resources INSERT(可选) 为每个元素标签创建占位符资源,等待 AI 生图 父级项目
screenplays UPDATE 回填 parsing_status=COMPLETEDcharacter_countscene_count

项目层级与数据归属

剧本解析存在两层项目的数据归属,需特别注意:

父级项目 (parent_project_id IS NULL)
├── project_characters   ← 角色(跨剧本共享)
├── project_locations    ← 场景(跨剧本共享)
├── project_props        ← 道具(跨剧本共享)
├── project_element_tags ← 元素变体标签(随元素归属父级)
├── project_resources    ← 素材资源(随元素归属父级)
│
└── 子项目 (parent_project_id = 父级项目.id)
    ├── screenplays          ← 剧本(一对一绑定子项目)
    ├── storyboards          ← 分镜(属于具体剧本的子项目)
    ├── storyboard_items     ← 分镜元素关联
    └── storyboard_dialogues ← 分镜对白

关键设计screenplay_element_refs 是连接剧本(子项目)与项目元素(父级项目)的桥梁表,通过 screenplay_id + element_type + element_id(多态)建立引用。


表关系 ER 图

erDiagram
    projects {
        UUID id PK
        UUID parent_project_id FK
        UUID screenplay_id FK
    }
    screenplays {
        UUID screenplay_id PK
        UUID project_id FK
        int parsing_status
        int character_count
        int scene_count
    }
    attachments {
        UUID attachment_id PK
        UUID related_id
        int related_type
        int attachment_purpose
    }
    project_characters {
        UUID character_id PK
        UUID project_id FK
        UUID default_tag_id FK
        string name
    }
    project_locations {
        UUID location_id PK
        UUID project_id FK
        UUID default_tag_id FK
        string name
    }
    project_props {
        UUID prop_id PK
        UUID project_id FK
        UUID default_tag_id FK
        string name
    }
    screenplay_element_refs {
        UUID ref_id PK
        UUID screenplay_id FK
        int element_type
        UUID element_id
        int order_index
    }
    project_element_tags {
        UUID tag_id PK
        UUID project_id FK
        int element_type
        UUID element_id
        string tag_label
    }
    storyboards {
        UUID storyboard_id PK
        UUID project_id FK
        jsonb meta_data
        int order_index
    }
    storyboard_items {
        UUID item_id PK
        UUID storyboard_id FK
        UUID element_tag_id FK
        string element_name
        string tag_label
    }
    storyboard_dialogues {
        UUID dialogue_id PK
        UUID storyboard_id FK
        UUID character_id FK
        int dialogue_type
        string content
    }
    project_resources {
        UUID project_resource_id PK
        UUID project_id FK
        UUID element_tag_id FK
        string file_url
    }

    projects ||--o{ projects : "parent_project_id(子项目)"
    projects ||--o| screenplays : "screenplay_id(子项目绑定)"
    projects ||--o{ project_characters : "父级项目 project_id"
    projects ||--o{ project_locations : "父级项目 project_id"
    projects ||--o{ project_props : "父级项目 project_id"
    projects ||--o{ project_element_tags : "父级项目 project_id"
    projects ||--o{ storyboards : "子项目 project_id"

    screenplays ||--o{ attachments : "related_id(多态)"
    screenplays ||--o{ screenplay_element_refs : "screenplay_id"

    screenplay_element_refs }o--|| project_characters : "element_id(element_type=1)"
    screenplay_element_refs }o--|| project_locations : "element_id(element_type=2)"
    screenplay_element_refs }o--|| project_props : "element_id(element_type=3)"

    project_characters ||--o{ project_element_tags : "element_id(element_type=1)"
    project_locations ||--o{ project_element_tags : "element_id(element_type=2)"
    project_props ||--o{ project_element_tags : "element_id(element_type=3)"

    storyboards ||--o{ storyboard_items : "storyboard_id"
    storyboards ||--o{ storyboard_dialogues : "storyboard_id"

    storyboard_items }o--|| project_element_tags : "element_tag_id"
    storyboard_dialogues }o--o| project_characters : "character_id(可选)"

    project_element_tags ||--o{ project_resources : "element_tag_id"

写入顺序与依赖链

上传阶段:
① projects (子项目 CREATE)
② screenplays (INSERT) ← project_id = 子项目.id
③ attachments (INSERT) ← related_id = screenplay_id
④ projects (UPDATE)   ← screenplay_id = screenplay.screenplay_id

AI 解析阶段(Aggregate Node 顺序写入):
① project_characters / project_locations / project_props
         │  (UPSERT,返回 element_id)
         ▼
② screenplay_element_refs
         │  (INSERT,screenplay_id → element_id,多态)
         ▼
③ project_element_tags
         │  (UPSERT,element_id → tag_id;无标签则自动创建"常规"标签)
         ▼
④ storyboards
         │  (INSERT,project_id = 子项目;meta_data 冗余存储元素信息)
      ┌──┴──────────────────┐
      ▼                     ▼
⑤ storyboard_items    storyboard_dialogues
   (INSERT,              (INSERT,
    element_tag_id)        character_id 可选)
      │
      ▼
⑥ project_resources(可选)
      │  (INSERT 占位符,等待 AI 生图)
      ▼
⑦ screenplays
      (UPDATE:parsing_status=COMPLETED,
               character_count, scene_count)

Mermaid 流程图

1. 总体系统流程

flowchart TD
    UserUpload["用户上传剧本\nMD / TXT / PDF / DOC"] --> APILayer["FastAPI\nPOST /{screenplay_id}/parse\nmode=langgraph"]
    APILayer --> CeleryTask["Celery Task\nparse_screenplay_langgraph_task"]
    CeleryTask --> ExtractionService["ScreenplayExtractionService\nLangGraph 入口"]
    ExtractionService --> LangGraph["LangGraph 工作流\n有状态图编排"]

    subgraph VectorDB ["向量数据库层(Chroma embedded)"]
        RawVDB["Raw VectorDB\nraw_{task_id}\n原文 MD Chunks"]
        ResultVDB["Result VectorDB\nresult_{task_id}\n已抽取元素向量"]
    end

    LangGraph <-->|"检索/写入"| VectorDB
    LangGraph --> PostgreSQL["PostgreSQL\n结构化最终结果"]
    LangGraph --> AIJobStatus["AI Job 状态回写\n进度跟踪"]

2. LangGraph 节点完整流程

flowchart TD
    START([开始]) --> InitNode

    subgraph InitNode ["Init Node|初始化(无 LLM 调用)"]
        I1["Level 1: MarkdownNodeParser\n按 ## 标题切场景级节点"]
        I2["从 header_path 提取\nscenes_list(零 LLM)"]
        I3["Level 2: SentenceSplitter\n超长场景切段落级 chunks"]
        I4["每个 chunk 附加\nscene_name / chunk_type metadata"]
        I5["批量 Embedding 向量化\n并发控制 + 指数退避重试"]
        I6["写入 Raw VectorDB\n含 scene_name 过滤字段"]
        I1 --> I2
        I1 --> I3 --> I4 --> I5 --> I6
    end

    InitNode --> RoleNode

    subgraph RoleNode ["Role Node|全局角色抽取(一次性)"]
        R1["检索 Raw VectorDB\n全文语义 Top-20\n角色相关 chunks"]
        R2["LLM 抽取角色字典\ncharacters + character_tags"]
        R3["写入 Result VectorDB\ntype=character"]
        R1 --> R2 --> R3
    end

    RoleNode --> SceneLoop

    subgraph SceneLoop ["Scene Loop Node|场景调度"]
        SL1{"current_scene_idx\n< len(scenes_list)?"}
    end

    SL1 -->|"是,处理场景 N"| PropNode
    SL1 -->|"否,全部完成"| AggNode

    subgraph PropNode ["Prop Node|道具抽取(每场景)"]
        P1["检索 Raw VectorDB\nwhere={scene_name: 场景N}\nTop-10 道具相关 chunks"]
        P2["检索 Result VectorDB\ntype=character(复用角色)"]
        P3["LLM 抽取道具列表\nprops + prop_tags"]
        P4["写入 Result VectorDB\ntype=prop"]
        P1 --> P3
        P2 --> P3
        P3 --> P4
    end

    PropNode --> ShotNode

    subgraph ShotNode ["Shot Node|分镜抽取(每场景)"]
        SH1["检索 Raw VectorDB\nwhere={scene_name: 场景N}\nTop-10 拍摄相关 chunks"]
        SH2["检索 Result VectorDB\ntype=character + type=prop"]
        SH3["LLM 生成分镜列表\nshot_size / camera_movement\nestimate_duration"]
        SH4["写入 Result VectorDB\ntype=shot"]
        SH1 --> SH3
        SH2 --> SH3
        SH3 --> SH4
    end

    ShotNode --> ValidateNode

    subgraph ValidateNode ["Validate Node|全量自校验"]
        V1["遍历场景内所有 shots\n聚合引用集合"]
        V2["对比 Result VectorDB\n已知 character/prop/location 集合"]
        V3{"存在未知引用?"}
        V1 --> V2 --> V3
    end

    V3 -->|"Pass ✅\n全部已知"| UpdateIdx
    V3 -->|"Fail ❌\nretry_count < max_retries\n注入 unknown_refs 提示"| ShotNode
    V3 -->|"Fail ❌\n超出最大重试\n接受结果并记录报告"| UpdateIdx

    UpdateIdx["更新 scene_results\ncurrent_scene_idx += 1"]
    UpdateIdx --> SceneLoop

    subgraph AggNode ["Aggregate Node|汇总"]
        AG1["合并所有场景结果"]
        AG2["生成最终 JSON 结构\ncharacters / locations\nprops / storyboards"]
        AG3["调用 ScreenplayService\n存入 PostgreSQL"]
        AG4["清理 Chroma 临时数据\ncleanup()"]
        AG1 --> AG2 --> AG3 --> AG4
    end

    AggNode --> END_NODE([完成])

3. 双向量库协作数据流

flowchart LR
    subgraph Input ["输入层"]
        MD["原始 MD 内容"]
    end

    subgraph RawVDB ["Raw VectorDB\nChroma: raw_{task_id}"]
        RC1["Chunk 1\n剧本段落 1"]
        RC2["Chunk 2\n剧本段落 2"]
        RC3["Chunk N\n剧本段落 N"]
    end

    subgraph ResultVDB ["Result VectorDB\nChroma: result_{task_id}"]
        RES1["角色向量\ntype=character"]
        RES2["道具向量\ntype=prop"]
        RES3["分镜向量\ntype=shot(含内嵌对白)"]
    end

    subgraph Nodes ["LangGraph 节点"]
        N_Init["Init Node"]
        N_Role["Role Node"]
        N_Prop["Prop Node"]
        N_Shot["Shot Node"]
    end

    MD -->|"正则切片 + 滑动窗口"| N_Init
    N_Init -->|"写入 chunks"| RawVDB

    RawVDB -->|"检索 Top-20"| N_Role
    N_Role -->|"写入角色向量"| RES1

    RawVDB -->|"检索 Top-10"| N_Prop
    RES1 -->|"复用角色"| N_Prop
    N_Prop -->|"写入道具向量"| RES2

    RawVDB -->|"检索 Top-10"| N_Shot
    RES1 -->|"复用角色"| N_Shot
    RES2 -->|"复用道具"| N_Shot
    N_Shot -->|"写入分镜向量"| RES3

4. API 调用时序图

sequenceDiagram
    participant Client as 前端客户端
    participant API as FastAPI
    participant Celery as Celery Worker
    participant LG as LangGraph
    participant Chroma as Chroma VectorDB
    participant LLM as LLM Provider
    participant PG as PostgreSQL

    Client->>API: POST /{screenplay_id}/parse\n{mode: "langgraph"}
    API->>API: 权限校验 + 内容读取
    API->>Celery: parse_screenplay_langgraph_task.delay()
    API-->>Client: 202 Accepted\n{task_id, job_id}

    Note over Celery,PG: 异步执行阶段

    Celery->>LG: graph.ainvoke(initial_state)
    LG->>Chroma: 创建 raw_{task_id} Collection
    LG->>LLM: Embedding 请求(MD chunks)
    LLM-->>LG: 向量数据
    LG->>Chroma: 写入 Raw VectorDB

    loop 角色抽取(一次)
        LG->>Chroma: 检索 Raw VectorDB Top-20
        LG->>LLM: 角色抽取请求
        LLM-->>LG: 角色 JSON
        LG->>Chroma: 写入 Result VectorDB
    end

    loop 每场景循环
        LG->>Chroma: 检索 Raw + Result VectorDB
        LG->>LLM: 道具/分镜/对白抽取
        LLM-->>LG: 结构化 JSON
        LG->>Chroma: 更新 Result VectorDB
        LG->>LG: 自校验(Validate Node)
    end

    LG->>PG: 写入最终结构化结果
    LG->>Chroma: cleanup() 清理临时数据
    LG-->>Celery: 完成状态

    Client->>API: GET /{screenplay_id}/parse-status
    API->>PG: 查询 AI Job 状态
    API-->>Client: {status: "completed", progress: 100}

5. 自校验闭环详细流程

flowchart TD
    ShotDone["Shot Node 完成\n生成分镜列表"] --> ExtractRefs

    subgraph ValidateLoop ["自校验闭环(max_retries=2)"]
        ExtractRefs["提取分镜引用\ncharacters / locations / props"]
        QueryKnown["查询 Result VectorDB\n已知角色 + 道具名称集合"]
        CheckDiff{"存在未知引用?"}
        CheckRetry{"retry_count\n< max_retries?"}
        AcceptResult["接受当前结果\n继续执行"]
        IncrRetry["retry_count += 1\n记录未知引用日志"]

        ExtractRefs --> QueryKnown --> CheckDiff
        CheckDiff -->|"否,全部已知 ✅"| AcceptResult
        CheckDiff -->|"是,存在未知 ❌"| CheckRetry
        CheckRetry -->|"是,还可重试"| IncrRetry
        CheckRetry -->|"否,超限"| AcceptResult
        IncrRetry -->|"返回 Shot Node 重抽"| ShotRetry["Shot Node\n重新抽取(带已知引用提示)"]
        ShotRetry --> ExtractRefs
    end

    AcceptResult --> AggregateNode["进入 Aggregate Node"]

模块设计

目录结构

server/app/services/screenplay_extraction/
├── __init__.py
├── extraction_service.py          # 对外唯一入口
├── graph_state.py                 # LangGraph 状态定义(TypedDict)
├── graph_builder.py               # 图构建:节点注册 + 边定义
├── nodes/
│   ├── __init__.py
│   ├── init_node.py               # MD 切片 + 构建 Raw VectorDB
│   ├── role_node.py               # 全局角色抽取(加载 role_prompt.md)
│   ├── scene_loop_node.py         # 场景循环控制
│   ├── scene_node.py              # 场景抽取(加载 scene_prompt.md)
│   ├── prop_node.py               # 道具抽取(加载 prop_prompt.md)
│   ├── shot_node.py               # 分镜抽取(加载 shot_prompt.md + format_*_context())
│   ├── validate_node.py           # 自校验(引用一致性)
│   └── aggregate_node.py          # 汇总 + 写入 PostgreSQL
├── prompts/                       # 提示词模板文件(可独立修改,重启容器生效)
│   ├── role_prompt.md             # 角色抽取提示词模板
│   ├── scene_prompt.md            # 场景抽取提示词模板
│   ├── prop_prompt.md             # 道具抽取提示词模板
│   └── shot_prompt.md             # 分镜抽取提示词模板(含内嵌对白规则)
└── vector_store/
    ├── __init__.py
    ├── chroma_store.py            # Chroma 封装(embedded 模式)
    ├── raw_vector_builder.py      # LlamaIndex MD 切片 + 向量化
    └── result_vector_store.py     # 抽取结果的存取操作

server/app/tasks/
└── screenplay_langgraph_task.py   # 新 Celery 任务(参照现有模式)

为什么单独建 prompts/ 目录?

维度 说明
提示词文案与逻辑解耦 调整提示词措辞只改 .md 文件,节点代码不动
重启容器即生效 模板文件在容器内,docker restart jointo-server-app 即可热更新提示词
与项目现有风格一致 screenplay_parsing.md 也是文件形式,团队已有编辑习惯
可读性好 纯 Markdown 格式,不受 Python 字符串缩进/转义干扰

节点代码只负责:① 启动时加载模板(进程内缓存)→ ② 运行时注入变量 → ③ 调用 LLM。提示词文案调优无需走代码审查流程。

各模块职责

文件转换模块复用说明

不需要新建此模块ScreenplayFileParserService 已完整实现以下能力,LangGraph 方案直接复用。

功能 现有实现 位置
TXT / MD 读取 _parse_txt / _parse_markdown screenplay_file_parser_service.py
DOCX → MD _parse_docx_sync(Heading 转标题,Bold/Italic 转 MD 格式) 同上
PDF → MD _parse_pdf_sync(pdfplumber + PyMuPDF 去水印,表格转 MD) 同上
RTF 解析 _parse_rtf(striprtf) 同上
DOC 解析 _parse_doc(textract) 同上
MD 上传 OSS _upload_markdown_file(路径:screenplays/parsed/{Y}/{M}/{D}/{uuid}.md 同上
字数统计 _count_words(中英文混合) 同上

结论:LangGraph 新增模块(screenplay_extraction/)的起点是 screenplay.file_url 指向的 OSS MD 文件,无需关心文件格式转换


extraction_service.py

class ScreenplayExtractionService:
    """LangGraph 剧本抽取对外唯一入口"""
    
    async def run(
        self,
        screenplay_id: str,
        task_id: str,
        content: str,
        custom_requirements: str | None = None
    ) -> dict:
        """
        执行完整抽取管道
        
        Returns:
            {"status": "completed", "result": {...}}
        """
        graph = build_extraction_graph()
        initial_state = ScreenplayExtractionState(
            task_id=task_id,
            screenplay_id=screenplay_id,
            content=content,
            custom_requirements=custom_requirements or "",
            scenes_list=[],
            current_scene_idx=0,
            role_result={},
            scene_results=[],
            retry_count=0,
            max_retries=2,
            status="running",
            error=None
        )
        final_state = await graph.ainvoke(initial_state)
        return final_state

graph_builder.py

def build_extraction_graph() -> CompiledGraph:
    workflow = StateGraph(ScreenplayExtractionState)
    
    # 节点注册
    workflow.add_node("init", init_node)
    workflow.add_node("role", role_node)
    workflow.add_node("scene_loop", scene_loop_node)
    workflow.add_node("prop", prop_node)
    workflow.add_node("shot", shot_node)
    workflow.add_node("validate", validate_node)
    workflow.add_node("aggregate", aggregate_node)
    
    # 顺序边
    workflow.set_entry_point("init")
    workflow.add_edge("init", "role")
    workflow.add_edge("role", "scene_loop")
    
    # 场景循环条件边
    workflow.add_conditional_edges(
        "scene_loop",
        route_scene_loop,
        {"prop": "prop", "aggregate": "aggregate"}
    )
    
    workflow.add_edge("prop", "shot")
    workflow.add_edge("shot", "validate")
    
    # 自校验条件边:Pass/超限 → scene_loop(下一场景),Fail → shot(重抽)
    workflow.add_conditional_edges(
        "validate",
        route_validation,
        {"scene_loop": "scene_loop", "shot": "shot"}
    )
    
    workflow.add_edge("aggregate", END)
    
    return workflow.compile()


def route_scene_loop(state: ScreenplayExtractionState) -> str:
    if state["current_scene_idx"] < len(state["scenes_list"]):
        return "prop"
    return "aggregate"


def route_validation(state: ScreenplayExtractionState) -> str:
    if state.get("validation_passed", False):
        return "scene_loop"
    if state["retry_count"] >= state["max_retries"]:
        return "scene_loop"  # 超过最大重试次数,接受结果继续
    return "shot"

向量库设计

两个 Chroma Collection

Collection 命名规则 写入时机 数据内容 查询场景
Raw VectorDB raw_{task_id} Init Node 一次性写入 原文 MD chunk 向量 每个抽取节点检索相关原文段落
Result VectorDB result_{task_id} 每个抽取节点写入后 已抽取的角色/场景/道具/分镜向量 下游节点复用上游抽取结果

Chroma Store 封装

# server/app/services/screenplay_extraction/vector_store/chroma_store.py

import chromadb
from chromadb.config import Settings

class ChromaStore:
    """Chroma 向量库封装(embedded 持久化模式)"""
    
    def __init__(self, task_id: str, persist_dir: str = "/tmp/chroma"):
        self.task_id = task_id
        self.client = chromadb.PersistentClient(
            path=f"{persist_dir}/{task_id}",
            settings=Settings(anonymized_telemetry=False)
        )
        self._raw_collection = None
        self._result_collection = None
    
    def get_raw_collection(self):
        if not self._raw_collection:
            self._raw_collection = self.client.get_or_create_collection(
                name=f"raw_{self.task_id}",
                metadata={"hnsw:space": "cosine"}
            )
        return self._raw_collection
    
    def get_result_collection(self):
        if not self._result_collection:
            self._result_collection = self.client.get_or_create_collection(
                name=f"result_{self.task_id}",
                metadata={"hnsw:space": "cosine"}
            )
        return self._result_collection
    
    def cleanup(self):
        """任务完成后清理临时向量数据"""
        self.client.delete_collection(f"raw_{self.task_id}")
        self.client.delete_collection(f"result_{self.task_id}")

Chroma Store 封装

# server/app/services/screenplay_extraction/vector_store/raw_vector_builder.py

from llama_index.core import Document, VectorStoreIndex
from llama_index.core.node_parser import SentenceSplitter

class RawVectorBuilder:
    """MD 内容切片 + 向量化 → Raw VectorDB"""
    
    CHUNK_SIZE = 512      # 每个 chunk 约 512 tokens
    CHUNK_OVERLAP = 50    # chunk 间重叠 50 tokens,避免上下文断裂
    
    async def build(self, content: str, chroma_store: ChromaStore) -> list[str]:
        """
        将 MD 内容切片并写入 Raw VectorDB
        
        Returns:
            chunk_ids: 所有 chunk 的 ID 列表
        """
        splitter = SentenceSplitter(
            chunk_size=self.CHUNK_SIZE,
            chunk_overlap=self.CHUNK_OVERLAP
        )
        nodes = splitter.get_nodes_from_documents([Document(text=content)])
        
        collection = chroma_store.get_raw_collection()
        chunk_ids = []
        
        for i, node in enumerate(nodes):
            chunk_id = f"chunk_{i}"
            # 使用 OpenAI embedding(复用项目已有配置)
            embedding = await get_embedding(node.text)
            collection.add(
                ids=[chunk_id],
                documents=[node.text],
                embeddings=[embedding],
                metadatas=[{"chunk_index": i, "char_count": len(node.text)}]
            )
            chunk_ids.append(chunk_id)
        
        return chunk_ids

Result VectorDB 写入规范

写入节点 向量内容 metadata 字段
Role Node 角色名 + 描述文本 type=character, name, role_type
Prop Node 道具名 + 描述 type=prop, name, scene_name, owner_character
Shot Node 分镜描述 + 拍摄说明(含内嵌对白) type=shot, scene_name, shot_index, characters, locations, props

LangGraph 状态与节点

全局状态定义

# server/app/services/screenplay_extraction/graph_state.py

from typing import TypedDict, Optional

class SceneResult(TypedDict):
    scene_name: str
    scene_description: str
    props: list[dict]
    shots: list[dict]
    dialogs: list[dict]

class ScreenplayExtractionState(TypedDict):
    # 任务标识
    task_id: str
    screenplay_id: str
    
    # 输入
    content: str                      # 原始 MD 内容
    custom_requirements: str
    
    # Init Node 产出
    scenes_list: list[str]            # 场景名称列表(全局)
    chunk_ids: list[str]              # Raw VectorDB chunk IDs
    
    # 场景循环控制
    current_scene_idx: int            # 当前处理的场景索引
    
    # 角色全局抽取结果
    role_result: dict                 # {"characters": [...], "character_tags": {...}}
    
    # 每场景抽取结果(累积)
    scene_results: list[SceneResult]
    
    # 自校验
    validation_passed: bool
    retry_count: int
    max_retries: int
    
    # 任务状态
    status: str                       # running / completed / failed
    error: Optional[str]
    
    # 最终汇总
    final_result: Optional[dict]

节点实现规范

每个节点必须:

  1. 接受 ScreenplayExtractionState,返回 dict(部分状态更新)
  2. 捕获异常时更新 status=failederror,不 raise(避免终止图)
  3. 使用 chroma_store 查询 Raw + Result VectorDB,而非直接传递全文

Init Node(示例)

重要content 在进入 LangGraph 之前已由 ScreenplayFileParserService 转换为标准 Markdown 并存储于 OSS(screenplay.file_url)。解析接口下载后直接传入 State,Init Node 不再负责任何格式转换,职责仅为:切片 → 向量化 → 场景分割。

# server/app/services/screenplay_extraction/nodes/init_node.py

@node_monitor("init_node")
async def init_node(state: ScreenplayExtractionState) -> dict:
    """
    职责(仅切片与索引,无 MD 转换):
    1. 三级场景分割(正则 → 段落切片 → LLM fallback)
    2. LlamaIndex 切片 + Embedding → Raw VectorDB
    
    前置条件:
    - state["content"] 已是标准 MD(由 ScreenplayFileParserService 保证)
    - MD 来源:OSS screenplay.file_url,由 parse 接口下载后注入
    """
    content = state["content"]
    task_id = state["task_id"]
    
    # 1. 三级场景分割(优化建议 #2)
    scenes_list = await extract_scene_list(content)
    if not scenes_list:
        return node_error("init_node", ExtractionErrorCode.SCENE_SPLIT_FAILED, "场景列表为空")
    
    # 2. LlamaIndex 批量切片 + 向量化(优化建议 #3:批量 embedding)
    chroma_store = ChromaStore(task_id=task_id)
    builder = RawVectorBuilder()
    chunk_ids = await builder.build(content, chroma_store)  # 内部已做批量 + 重试
    
    return {
        "scenes_list": scenes_list,
        "chunk_ids": chunk_ids,
        "total_chunks": len(chunk_ids),  # 供动态检索策略使用(优化建议 #8)
        "current_scene_idx": 0
    }

content 的来源链路(parse 接口中)

# screenplays.py - parse_screenplay endpoint
# 已有代码(无需修改),复用 StorageService 下载逻辑
if screenplay.type == ScreenplayType.FILE:
    full_file_url = build_file_url(screenplay.file_url)   # OSS MD URL
    screenplay_content = await storage_service.download_text_file(full_file_url)
    # ↑ screenplay_content 已是标准 MD,直接传入 LangGraph

Role Node(示例)

async def role_node(state: ScreenplayExtractionState) -> dict:
    """
    职责:全局角色抽取(一次性处理整个剧本)
    检索策略:语义检索 Top-20 Raw chunks
    复用 ResultVectorDB:无(首个节点)
    """
    try:
        task_id = state["task_id"]
        chroma_store = ChromaStore(task_id=task_id)
        
        # 检索与"角色、人物"相关的原文段落
        raw_chunks = chroma_store.get_raw_collection().query(
            query_texts=["角色 人物 出场"],
            n_results=20
        )
        context = "\n\n".join(raw_chunks["documents"][0])
        
        # 调用 LLM 抽取角色(复用现有 AI Provider)
        role_result = await extract_roles_with_llm(context, state["custom_requirements"])
        
        # 将角色结果写入 Result VectorDB
        result_store = chroma_store.get_result_collection()
        for char in role_result.get("characters", []):
            embedding = await get_embedding(f"{char['name']} {char['description']}")
            result_store.add(
                ids=[f"char_{char['name']}"],
                documents=[f"{char['name']}: {char['description']}"],
                embeddings=[embedding],
                metadatas=[{"type": "character", "name": char["name"]}]
            )
        
        return {"role_result": role_result}
    except Exception as e:
        return {"status": "failed", "error": f"Role Node 失败: {str(e)}"}

Validate Node(自校验逻辑)

async def validate_node(state: ScreenplayExtractionState) -> dict:
    """
    自校验逻辑:
    检查当前场景最新一条 shot 引用的 characters/locations/props
    是否均存在于 Result VectorDB 的已抽取集合中
    """
    task_id = state["task_id"]
    current_idx = state["current_scene_idx"]
    scene_results = state["scene_results"]
    
    if not scene_results or current_idx >= len(scene_results):
        return {"validation_passed": True}
    
    current_scene = scene_results[current_idx]
    shots = current_scene.get("shots", [])
    
    if not shots:
        return {"validation_passed": True}
    
    # 获取 Result VectorDB 中已存在的角色和道具名称集合
    chroma_store = ChromaStore(task_id=task_id)
    result_col = chroma_store.get_result_collection()
    existing = result_col.get(where={"type": {"$in": ["character", "prop"]}})
    known_names = {m["name"] for m in existing.get("metadatas", []) if "name" in m}
    
    # 校验最后一个分镜的引用
    last_shot = shots[-1]
    referenced_chars = set(last_shot.get("characters", []))
    referenced_props = set(last_shot.get("props", []))
    
    unknown_refs = (referenced_chars | referenced_props) - known_names
    
    if unknown_refs and state["retry_count"] < state["max_retries"]:
        return {
            "validation_passed": False,
            "retry_count": state["retry_count"] + 1
        }
    
    return {"validation_passed": True, "retry_count": 0}

各节点提示词模板

设计原则

参考基准docs/prompts/screenplay-ai-parse-prompt.md(v2.0,两阶段原始设计文档)
该文档是本节提示词的起点:Stage 1 拆分为 Role/Scene/Prop 三节点,Stage 2 拆分为 Shot/Dialog 两节点。

关键决策:提示词模板放 prompts/*.md 文件,节点代码只做变量注入

决策 原因
不存入 ai_skills_registry LangGraph 节点需要动态注入运行时变量(角色名、tag_id_map 等),放进 DB 无法解决此问题
提示词文案放 prompts/*.md screenplay_parsing.md 风格一致,改文案只改文件,重启容器即生效,无需走代码审查
节点代码只负责变量注入 build_xxx_prompt() 函数加载模板 + str.format() 注入变量,逻辑与文案完全解耦
ai_skills_registry 保留 用于其他通用技能(AI 对话、图片描述词生成等),剧本解析不再使用

运行时流程

容器启动
  └─ 各节点首次调用时懒加载 prompts/*.md(进程内缓存,全生命周期只读一次)

每次 LLM 调用
  └─ build_xxx_prompt(context, characters_context, ...)
       └─ 模板字符串.format(context=context, ...)  → 最终 system_prompt

公共模板加载器

所有节点共用同一个懒加载工具函数,进程内缓存,启动后只读一次:

# server/app/services/screenplay_extraction/nodes/__init__.py

from pathlib import Path
from functools import lru_cache

_PROMPTS_DIR = Path(__file__).parent.parent / "prompts"

@lru_cache(maxsize=None)
def load_prompt_template(name: str) -> str:
    """加载 prompts/{name}.md,进程内永久缓存(重启容器生效)"""
    path = _PROMPTS_DIR / f"{name}.md"
    return path.read_text(encoding="utf-8")

Role Node 提示词

职责:全局一次性,从整个剧本中提取所有角色及其变体标签。
上下文注入:Raw VectorDB Top-20 原文片段。
模板文件prompts/role_prompt.md

# server/app/services/screenplay_extraction/nodes/role_node.py

from . import load_prompt_template

def build_role_prompt(context: str, custom_requirements: str = "") -> str:
    """加载模板 + 注入运行时变量"""
    tmpl = load_prompt_template("role_prompt")
    prompt = tmpl.format(context=context)
    if custom_requirements:
        prompt += f"\n\n## 用户特殊要求\n{custom_requirements}"
    return prompt

prompts/role_prompt.md 内容

你是专业影视剧本分析专家,专注从剧本中识别所有角色信息。

## 任务
从以下剧本片段中提取所有角色,包括主角、配角和群演。
为每个角色识别不同的**外形变体标签**(如年龄段、服装状态、特殊状态等),标签必须有明确的视觉差异。

## 识别要求
- 识别所有出现的角色(含画外音、旁白角色)
- 每个角色至少1个标签,若剧本中角色有不同年龄/状态则分别创建
- tag_key 使用英文小写(如 youth / adult / injured)
- tag_label 使用中文(如 少年 / 青年 / 受伤)
- 角色类型:main=主角,supporting=配角,extra=群演

## 输出格式(严格遵守,仅返回 JSON)
```json
{
  "characters": [
    {
      "name": "角色名",
      "description": "角色描述",
      "role_type": "main|supporting|extra",
      "is_offscreen": false,
      "meta_data": {"gender": "male|female", "personality": "性格描述"}
    }
  ],
  "character_tags": {
    "角色名": [
      {
        "tag_key": "youth",
        "tag_label": "少年",
        "description": "15岁,穿着布衣,活泼好动",
        "meta_data": {"age": 15, "clothing": "布衣", "mood": "活泼"}
      }
    ]
  }
}
```

## 剧本片段
{context}

Scene Node 提示词

职责:每场景独立执行,抽取当前场景的描述及时间/氛围变体标签。
上下文注入:当前场景名 + Raw VectorDB 该场景原文 Top-10 + 已知角色名列表。
模板文件prompts/scene_prompt.md

# server/app/services/screenplay_extraction/nodes/scene_node.py

from . import load_prompt_template

def build_scene_prompt(
    scene_name: str,
    context: str,
    known_characters: list[str]
) -> str:
    """加载模板 + 注入场景名、已知角色、原文片段"""
    char_list = "、".join(known_characters) if known_characters else "(暂无已知角色)"
    tmpl = load_prompt_template("scene_prompt")
    return tmpl.format(
        scene_name=scene_name,
        char_list=char_list,
        context=context,
    )

prompts/scene_prompt.md 内容

你是专业影视剧本分析专家,专注从剧本中识别拍摄场景信息。

## 当前任务
为场景「{scene_name}」提取描述和时间/氛围变体标签。

## 已知角色(本场景可能出现)
{char_list}

## 识别要求
- 为场景提取不同的时间段/天气/氛围标签,标签必须有明确的视觉/光照差异
- 至少1个标签(如仅一种时间则创建"常规"标签)
- tag_key 使用英文小写(如 daytime / night / rainy)
- tag_label 使用中文(如 白天 / 夜晚 / 雨天)
- 顶层键必须为 "locations"(数组),禁止使用 "location"(单对象)或 "scenes"

## 输出格式(严格遵守,仅返回 JSON)
```json
{
  "locations": [
    {
      "name": "{scene_name}",
      "location": "地点描述",
      "description": "场景整体描述",
      "meta_data": {
        "time_of_day": "morning|afternoon|evening|night",
        "scene_number": 1
      }
    }
  ],
  "location_tags": {
    "{scene_name}": [
      {
        "tag_key": "daytime",
        "tag_label": "白天",
        "description": "阳光明媚,鸟语花香",
        "meta_data": {"lighting": "natural", "weather": "sunny"}
      }
    ]
  }
}
```

> locations 数组始终只有一个元素(Scene Node 每次处理单个场景)。

## 当前场景原文片段
{context}

Prop Node 提示词

职责:每场景独立执行,抽取本场景涉及的道具及状态标签。
上下文注入:当前场景名 + Raw VectorDB 该场景原文 Top-10 + 已知角色名列表(约束 owner_character,防幻觉)。
模板文件prompts/prop_prompt.md

# server/app/services/screenplay_extraction/nodes/prop_node.py

from . import load_prompt_template

def build_prop_prompt(
    scene_name: str,
    context: str,
    known_characters: list[str]
) -> str:
    """加载模板 + 注入场景名、已知角色、原文片段"""
    char_list = "、".join(known_characters) if known_characters else "(暂无)"
    tmpl = load_prompt_template("prop_prompt")
    return tmpl.format(
        scene_name=scene_name,
        char_list=char_list,
        context=context,
    )

prompts/prop_prompt.md 内容

你是专业影视剧本分析专家,专注识别剧本中的道具和物品。

## 当前任务
识别场景「{scene_name}」中涉及的所有道具,包含互动道具和布景道具。

## 已知角色(owner_character 只能从此列表中选择)
{char_list}

## 识别要求
- **互动道具**(interactive):角色会使用/操作的物品(武器、工具、手机、钥匙等),必须填写 owner_character
- **布景道具**(set_dressing):场景装饰性物品(家具、花瓶、背景陈设等),必须填写 owner_location
- owner_character 必须从上方已知角色中选择,不得编造角色名
- 每个道具至少1个状态标签(如无明显差异则创建"常规"标签)
- 即使剧本未明确描述,也要根据场景合理推断存在的道具

## 输出格式(严格遵守,仅返回 JSON)
```json
{
  "props": [
    {
      "name": "道具名",
      "description": "道具描述",
      "prop_type": "interactive|set_dressing",
      "owner_character": "角色名(互动道具必填)",
      "owner_location": "场景名(布景道具必填)",
      "meta_data": {"material": "材质", "size": "尺寸"}
    }
  ],
  "prop_tags": {
    "道具名": [
      {
        "tag_key": "new",
        "tag_label": "崭新",
        "description": "刚打造,金光闪闪",
        "meta_data": {"condition": "new"}
      }
    ]
  }
}
```

## 当前场景原文片段
{context}

Shot Node 提示词

职责:每场景独立执行,将场景原文拆解为分镜序列。
上下文注入:Raw VectorDB 该场景原文 Top-10 + 已知角色/场景/道具列表(含各标签名)。
模板文件prompts/shot_prompt.md

注意:Shot Node 的 LLM 输出使用对象格式(含 action/position/is_visible),对应 storyboard_items 表的冗余字段,比当前实现更完整。

# server/app/services/screenplay_extraction/nodes/shot_node.py

from . import load_prompt_template

def build_shot_prompt(
    scene_name: str,
    context: str,
    characters_context: str,    # format_characters_context() 输出
    location_context: str,      # format_location_context() 输出
    props_context: str,         # format_props_context() 输出
    storyboard_count_hint: int = 0,
    custom_requirements: str = ""
) -> str:
    """加载模板 + 注入已知元素列表和原文片段"""
    count_hint = f"- 本场景目标分镜数量约 {storyboard_count_hint}\n" if storyboard_count_hint else ""
    extra_req = f"\n\n## 用户特殊要求\n{custom_requirements}" if custom_requirements else ""
    tmpl = load_prompt_template("shot_prompt")
    return tmpl.format(
        characters_context=characters_context,
        location_context=location_context,
        props_context=props_context,
        count_hint=count_hint,
        context=context,
        extra_req=extra_req,
    )


def format_characters_context(character_tag_id_map: dict[str, dict]) -> str:
    seen: dict[str, list[str]] = {}
    for key in character_tag_id_map:
        parts = key.rsplit("-", 1)
        if len(parts) == 2:
            seen.setdefault(parts[0], []).append(parts[1])
    lines = [f"- {n}(可用标签:{'、'.join(t)})" for n, t in seen.items()]
    return "\n".join(lines) if lines else "(无已知角色)"


def format_location_context(scene_name: str, location_tag_id_map: dict[str, dict]) -> str:
    tags = [k.rsplit("-", 1)[1] for k in location_tag_id_map if k.startswith(f"{scene_name}-")]
    return f"- {scene_name}(可用标签:{'、'.join(tags)})" if tags else f"- {scene_name}(标签:常规)"


def format_props_context(prop_tag_id_map: dict[str, dict]) -> str:
    seen: dict[str, list[str]] = {}
    for key in prop_tag_id_map:
        parts = key.rsplit("-", 1)
        if len(parts) == 2:
            seen.setdefault(parts[0], []).append(parts[1])
    lines = [f"- {n}(可用标签:{'、'.join(t)})" for n, t in seen.items()]
    return "\n".join(lines) if lines else "(无已知道具)"

prompts/shot_prompt.md 内容

你是专业影视分镜师,擅长将剧本片段拆解为可执行的分镜脚本。

## 已知元素(所有引用必须严格来自以下列表,不得编造)

### 角色列表
{characters_context}

### 当前场景
{location_context}

### 道具列表
{props_context}

## 分镜拆解要求

### 镜头参数
{count_hint}- shot_size 基础值:close_up / medium_shot / full_shot / long_shot
  扩展值(可选):extreme_wide_shot / wide_shot / medium_close_up / extreme_close_up / over_shoulder
- camera_movement 基础值:static / pan / tilt / zoom / dolly / tracking
  扩展值(可选):arc / crane / handheld

### 元素引用规则
- characters 和 props 使用**对象数组**格式(含 action / position / is_visible)
- 所有 name 必须与已知元素列表完全一致,不得编造
- tag_label 必须是该元素已知的标签名之一

### 对白(dialogues)提取规则
- 该分镜画面内发生的所有对白,**必须内嵌在对应分镜的 dialogues 数组中**
- **character_name 只能从上方已知角色列表中选择**,不得编造角色名
- 旁白/画外音:dialogue_type=3,character_name 填 null
- 内心OS:dialogue_type=2,其他角色听不到
- 同一分镜多条对白按发生顺序填写 sequence_order(从 0 开始)
- emotion 填写情绪词(兴奋 / 愤怒 / 悲伤 / 平静 / 疑惑 / 恐惧等),有助于 TTS 生成语调

## 输出格式(严格遵守,仅返回 JSON)
```json
{
  "storyboards": [
    {
      "title": "分镜标题",
      "description": "画面内容详细描述",
      "shooting_description": "景深、运镜、镜头语言描述",
      "shot_size": "medium_shot",
      "camera_movement": "static",
      "estimated_duration": 5.5,
      "order_index": 1,
      "start_time": 0.0,
      "end_time": 5.5,
      "meta_data": {"lighting": "自然光", "time_of_day": "上午"},
      "characters": [
        {"name": "孙悟空", "tag_label": "少年", "action": "挥舞金箍棒", "position": "center", "is_visible": true, "order": 0}
      ],
      "locations": [
        {"name": "花果山", "tag_label": "白天", "order": 0}
      ],
      "props": [
        {"name": "金箍棒", "tag_label": "崭新", "action": "被挥舞", "position": "foreground", "is_visible": true, "order": 0}
      ],
      "dialogues": [
        {"character_name": "孙悟空", "content": "今日我便要闯出一番天地!", "dialogue_type": 1, "sequence_order": 0, "emotion": "兴奋"},
        {"character_name": null, "content": "命运在此刻悄然转向。", "dialogue_type": 3, "sequence_order": 1, "emotion": "平静"}
      ]
    }
  ]
}
```

## 对白类型(dialogue_type)枚举
| 值 | 类型 | 说明 |
|----|------|------|
| 1 | normal | 角色间正常对话,其他角色可听到 |
| 2 | inner_monologue | 内心OS,角色内心独白,其他角色听不到 |
| 3 | narration | 旁白 / 画外音 / 解说,character_name 填 null |

## 当前场景原文片段
{context}
{extra_req}

提示词上下文构建流程

State 中的数据                      传入提示词
─────────────────────────────────────────────────────────────────
character_tag_id_map                → format_characters_context()
  {"孙悟空-少年": UUID, ...}          → "- 孙悟空(可用标签:少年、青年)"
                                    ↓ Shot Node 提示词的 characters_context

location_tag_id_map[scene_N]        → format_location_context()
  {"花果山-白天": UUID, ...}          → "- 花果山(可用标签:白天、夜晚)"
                                    ↓ Shot Node 提示词的 location_context

prop_tag_id_map[scene_N]            → format_props_context()
  {"金箍棒-崭新": UUID, ...}          → "- 金箍棒(可用标签:崭新、破损)"
                                    ↓ Shot Node 提示词的 props_context

对白(dialogues)直接内嵌在每个分镜对象中,由 Shot Node 一次性提取,无需独立节点。


Shot Node 输出 Schema 对比

参考基准screenplay_parsing.md v1.5.0(当前 skill 文档,字段权威来源)

字段 v1.5.0 格式(screenplay_parsing.md) LangGraph Shot Node(升级) 对应表列
characters ["孙悟空"](字符串数组)+ character_tags: {"孙悟空": "youth"} [{"name", "tag_label", "action", "position", "is_visible", "order"}] storyboard_items: element_name / tag_label / action_description / spatial_position / is_visible
locations ["花果山"](字符串数组) [{"name", "tag_label", "order"}] storyboard_items: element_name / tag_label
props ["金箍棒"](字符串数组) [{"name", "tag_label", "action", "position", "is_visible", "order"}] storyboard_items: element_name / tag_label / action_description / spatial_position / is_visible
dialogues [{"content", "dialogue_type", "character_name", "emotion", "sequence_order"}] 同 v1.5.0,内嵌在每个分镜对象中 storyboard_dialogues
shot_size close_up|medium_shot|full_shot|long_shot(4 值) 基础 4 值 + 可选扩展 5 值 shot_size
camera_movement static|pan|tilt|zoom|dolly|tracking(6 值) 基础 6 值 + 可选扩展 3 值(arc/crane/handheld) camera_movement

升级说明characters / locations / props 从字符串数组升级为对象数组,首次填充 storyboard_items 中长期空置的 action_descriptionspatial_positionis_visible 三列。dialogues 与 v1.5.0 字段名完全对齐,由 Shot Node 一次性提取,Aggregate Node 写入 storyboard_dialogues 表。


API 接口设计

修改现有解析接口

ScreenplayParseRequest 增加 mode 字段:

# server/app/schemas/screenplay.py

class ParseMode(str, Enum):
    DEFAULT = "default"      # 现有方案(单次 LLM 调用)
    LANGGRAPH = "langgraph"  # 新方案(LangGraph 多节点管道)

class ScreenplayParseRequest(BaseModel):
    custom_requirements: Optional[str] = None
    mode: ParseMode = ParseMode.DEFAULT  # 默认保持现有行为

API 路由变化

# server/app/api/v1/screenplays.py

@router.post("/{screenplay_id}/parse")
async def parse_screenplay(
    screenplay_id: UUID,
    request: ScreenplayParseRequest,
    ...
):
    if request.mode == ParseMode.LANGGRAPH:
        # 路由到新管道
        from app.tasks.screenplay_langgraph_task import parse_screenplay_langgraph_task
        task = parse_screenplay_langgraph_task.delay(
            screenplay_id=str(screenplay_id),
            content=screenplay_content,
            custom_requirements=request.custom_requirements
        )
        return SuccessResponse(
            data={"task_id": task.id, "mode": "langgraph"},
            message="LangGraph 抽取任务已提交"
        )
    else:
        # 现有逻辑不变
        ...

新 Celery 任务

# server/app/tasks/screenplay_langgraph_task.py

@shared_task(bind=True, max_retries=2, queue="ai")
def parse_screenplay_langgraph_task(
    self,
    screenplay_id: str,
    content: str,
    custom_requirements: str | None = None
):
    """LangGraph 剧本抽取任务"""
    try:
        result = run_async_task(
            _run_langgraph_extraction(screenplay_id, content, custom_requirements)
        )
        return result
    except Exception as e:
        raise self.retry(exc=e, countdown=120)


async def _run_langgraph_extraction(
    screenplay_id: str,
    content: str,
    custom_requirements: str | None
):
    task_id = str(uuid7())
    async_session_maker, engine = get_async_session()
    try:
        async with async_session_maker() as db:
            service = ScreenplayExtractionService(db)
            return await service.run(
                screenplay_id=screenplay_id,
                task_id=task_id,
                content=content,
                custom_requirements=custom_requirements
            )
    finally:
        await engine.dispose()

数据流说明

现有 store_parsed_elements 数据库写入全链路

通过逆向分析 screenplay_service.pystore_parsed_elements() 方法,明确 LangGraph 各节点的 PostgreSQL 写入职责边界。

现有方案(单次 LLM)写入顺序

LLM 返回 parsed_data(全量一次性 JSON)
    │
    ├─ ① project_characters       [父项目级] 角色去重创建/更新
    │      ↓ 返回 character_id_map: {name → UUID}
    │
    ├─ ② project_locations        [父项目级] 场景去重创建/更新
    │      ↓ 返回 location_id_map: {name → UUID}
    │
    ├─ ③ project_props            [父项目级] 道具去重创建/更新
    │      meta_data 含 prop_type / owner_character / owner_location
    │      ↓ 返回 prop_id_map: {name → UUID}
    │
    ├─ ④ screenplay_element_refs  [剧本级] 角色/场景/道具 → screenplay 引用
    │
    ├─ ⑤ project_element_tags     [父项目级] 三类元素标签
    │      character_tags / location_tags / prop_tags
    │      ⚠️ 无标签时自动补"常规"默认标签
    │      ↓ 返回 tag_id_maps: {name-label → UUID} + tag_id_to_label 反向映射
    │
    ├─ ⑥ storyboards              [子项目级] 分镜
    │      依赖 character_tag_ids + location_tag_ids + prop_tag_ids
    │      标签找不到时 fallback 到默认"常规"标签
    │      ↓ 写入 storyboard_items(含对白 dialogue_type/emotion/sequence_order)
    │
    └─ ⑦ project_resources       [子项目级] 每个 tag 的占位资源
           file_url = "placeholder://pending-ai-generation"
           等待后续 AI 生图填充

关键约束(影响 LangGraph 节点拆分)

约束 说明 LangGraph 影响
分镜依赖 tag_id_maps storyboards 的角色/场景/道具关联通过 tag_id 而非名称字符串 Shot Node 写 State 时需携带 tag_id,不能只存名称
道具含 owner_character project_props.meta_data.owner_character 需角色名 Prop Node 必须在 Role Node 之后执行,复用角色名集合
分镜标签 fallback 逻辑 找不到指定 tag → 尝试"常规"标签 → 仍找不到则跳过 Validate Node 需检查引用是否均有对应 tag_id
项目级别分离 角色/场景/道具写父项目,分镜写子项目 Aggregate Node 需区分 parent_project_id vs screenplay.project_id
resources 占位符 每个 tag 创建一条占位 project_resources Aggregate Node 统一处理,无需各节点分散调用

LangGraph 节点 × 数据库写入责任矩阵

节点 写入表 依赖的上游数据 写入 Result VectorDB
Init Node - Raw chunks
Role Node project_characters + project_element_tags(角色标签) - type=character(名称+tag_id)
Scene Node(新增) project_locations + project_element_tags(场景标签) character_id_map(可选引用) type=location(名称+tag_id)
Prop Node project_props + project_element_tags(道具标签) character_id_map(owner 引用) type=prop(名称+tag_id)
Shot Node 仅写 State / Result VectorDB(不写 PG) character_tag_ids + location_tag_ids + prop_tag_ids type=shot(含关联 tag_ids + 内嵌 dialogues)
Aggregate Node screenplay_element_refs + storyboards + storyboard_items + storyboard_dialogues + project_resources 全部上游 id_map + tag_id_maps -

设计关键:Role/Scene/Prop Node 在节点内直接写 PostgreSQL(增量),Shot Node 仅操作内存 State(对白已内嵌在分镜内),Aggregate Node 统一写分镜、对白和资源。


调整后的依赖链:角色 → 场景 → 道具 → 分镜 → 对白

┌─────────────────────────────────────────────────────────────────────────────┐
│  角色节点(全局,一次性)                                                    │
│                                                                              │
│  输入:Raw VectorDB 全文检索 Top-20                                          │
│  LLM 产出:characters[] + character_tags{}                                   │
│  DB 写入:project_characters + project_element_tags(CHARACTER)               │
│  State 输出:character_id_map {name→UUID}                                    │
│              character_tag_id_map {name-label→UUID}  ← 分镜节点必需          │
└──────────────────────────────────┬──────────────────────────────────────────┘
                                   │
                                   ▼
┌─────────────────────────────────────────────────────────────────────────────┐
│  场景节点(每场景一次)                                                       │
│                                                                              │
│  输入:Raw VectorDB 场景相关 Top-10                                          │
│        Result VectorDB type=character(了解哪些角色出现在此场景)             │
│  LLM 产出:location + location_tags{}(当前场景的时间/氛围变体)             │
│  DB 写入:project_locations + project_element_tags(LOCATION)                 │
│  State 输出:location_id_map[scene_N] {name→UUID}                           │
│              location_tag_id_map[scene_N] {name-label→UUID}                 │
└──────────────────────────────────┬──────────────────────────────────────────┘
                                   │
                                   ▼
┌─────────────────────────────────────────────────────────────────────────────┐
│  道具节点(每场景一次)                                                       │
│                                                                              │
│  输入:Raw VectorDB 道具相关 Top-10                                          │
│        Result VectorDB type=character(确定 owner_character 归属)           │
│  LLM 产出:props[] + prop_tags{}                                             │
│            ✅ owner_character 字段已有角色名可用(避免幻觉)                  │
│  DB 写入:project_props(meta_data 含 owner_character) +                    │
│            project_element_tags(PROP)                                        │
│  State 输出:prop_id_map[scene_N] {name→UUID}                               │
│              prop_tag_id_map[scene_N] {name-label→UUID}                     │
└──────────────────────────────────┬──────────────────────────────────────────┘
                                   │
                                   ▼
┌─────────────────────────────────────────────────────────────────────────────┐
│  分镜节点(每场景一次)                                                       │
│                                                                              │
│  输入:Raw VectorDB 拍摄相关 Top-10                                          │
│        Result VectorDB type=character + type=prop(复用已抽结果)            │
│        State 注入:character_tag_id_map + location_tag_id_map                │
│                     + prop_tag_id_map(直接传 tag_id 给 LLM 上下文)         │
│  LLM 产出:storyboards[]                                                     │
│            characters: [name]  →  在节点内解析为 character_tag_ids           │
│            locations: [name]   →  在节点内解析为 location_tag_ids            │
│            props: [name]       →  在节点内解析为 prop_tag_ids                │
│  DB 写入:❌ 不写 PG(由 Aggregate Node 统一写)                             │
│  State 输出:shot_list[scene_N]                                              │
│              ✅ 每条 shot 携带 resolved tag_ids(不是名称字符串)            │
└──────────────────────────────────┬──────────────────────────────────────────┘
                                   │ (Validate Node 校验)
                                   ▼
┌─────────────────────────────────────────────────────────────────────────────┐
│  对白节点(每场景一次)                                                       │
│                                                                              │
│  输入:Raw VectorDB 对白相关 Top-10                                          │
│        Result VectorDB type=character(角色口吻/身份约束)                   │
│        State 注入:shot_list[scene_N](对白挂载到具体分镜下)                │
│  LLM 产出:dialogues[]                                                       │
│            dialogue_type: 1=普通/2=内心OS/3=旁白                            │
│            character_name / emotion / sequence_order                         │
│  DB 写入:❌ 不写 PG(由 Aggregate Node 统一写)                             │
│  State 输出:dialog_list[scene_N](已关联到 shot.order_index)               │
└──────────────────────────────────┬──────────────────────────────────────────┘
                                   │
                                   ▼
┌─────────────────────────────────────────────────────────────────────────────┐
│  汇总节点(全局一次)                                                         │
│                                                                              │
│  复用现有 ScreenplayService._create_storyboards_from_ai()                   │
│  DB 写入(按顺序):                                                          │
│    ① screenplay_element_refs  (角色/场景/道具 → 剧本引用关系)              │
│    ② storyboards              (分镜主记录,子项目级别)                     │
│    ③ storyboard_items         (对白,含 dialogue_type/emotion)             │
│    ④ project_resources        (每个 tag 的占位资源,等待 AI 生图)          │
│    ⑤ screenplay.parsing_status = COMPLETED                                  │
└─────────────────────────────────────────────────────────────────────────────┘

场景节点说明(对比原设计的变更)

原 ADR 设计中没有独立的"Scene Node",场景是在 Init Node 中提取列表、在 Prop/Shot Node 中隐式引用。

调整原因:现有 store_parsed_elementsproject_locations 写入独立于分镜之外,且场景标签(location_tags)需要在 Shot Node 之前就解析为 tag_id,否则分镜节点无法正确关联。

新增 Scene Node(在 Prop Node 之前):

@node_monitor("scene_node")
async def scene_node(state: ScreenplayExtractionState) -> dict:
    """
    每场景独立执行:
    - 抽取当前场景描述 + 标签(白天/夜晚/春天等)
    - 写入 project_locations + project_element_tags(LOCATION)
    - 输出 location_tag_id_map 供后续 Shot Node 使用
    """
    scene_name = state["scenes_list"][state["current_scene_idx"]]
    
    # 检索当前场景原文
    raw_chunks = chroma_store.get_raw_collection().query(
        query_texts=[f"{scene_name} 场景描述 环境"],
        n_results=10
    )
    # 复用已知角色(辅助判断哪些角色出现在该场景)
    known_chars = chroma_store.get_result_collection().get(
        where={"type": "character"}
    )
    
    location_result = await extract_location_with_llm(
        scene_name=scene_name,
        context=raw_chunks,
        known_characters=known_chars
    )
    
    # 写入 PG(复用现有 loc_repo.create_or_update)
    location_id, tag_id_map = await save_location_to_db(location_result)
    
    # 写入 Result VectorDB
    chroma_store.get_result_collection().add(
        ids=[f"loc_{scene_name}"],
        documents=[f"{scene_name}: {location_result['description']}"],
        embeddings=[await get_embedding(location_result['description'])],
        metadatas=[{"type": "location", "name": scene_name, "tag_ids": json.dumps(list(tag_id_map.values()))}]
    )
    
    return {
        f"location_tag_id_map_{state['current_scene_idx']}": tag_id_map
    }

更新后的节点执行顺序

Init → Role → [每场景循环: Scene → Prop → Shot → Validate → Dialog] → Aggregate

对应更新 graph_builder.py 中的边定义:

# 场景循环内部顺序:Scene → Prop → Shot
workflow.add_edge("prop", "shot")  # 改为:
workflow.add_edge("scene", "prop")
workflow.add_edge("prop", "shot")

# 循环路由
workflow.add_conditional_edges(
    "scene_loop",
    route_scene_loop,
    {"scene": "scene", "aggregate": "aggregate"}  # 原为 prop,改为 scene
)

场景循环数据流(更新版,含 Scene Node)

场景 N 开始
    │
    ▼ (Scene Node) ← 新增
Raw VectorDB 检索:"场景 N + 环境描述"  Top-10
Result VectorDB 检索:type=character(辅助确认出场角色)
    │
    ▼ LLM → location + location_tags → 写 project_locations + project_element_tags
    ▼ 写入 Result VectorDB (type=location,含 tag_ids)
    │
    ▼ (Prop Node)
Raw VectorDB 检索:"场景 N + 道具 物品"  Top-10
Result VectorDB 检索:type=character(确认 owner_character)
    │
    ▼ LLM → props + prop_tags → 写 project_props + project_element_tags
    ▼ 写入 Result VectorDB (type=prop,含 tag_ids)
    │
    ▼ (Shot Node)
Raw VectorDB 检索:"场景 N + 拍摄 分镜"  Top-10
Result VectorDB 检索:type=character + type=location + type=prop
    │
    ▼ LLM → storyboards(含 characters/locations/props 名称)
    ▼ 节点内解析名称 → tag_ids(角色/场景/道具 tag_id_maps)
    ▼ 写入 Result VectorDB (type=shot,携带已解析 tag_ids)
    │
    ▼ (Validate Node)
检查 shot 引用的 tag_ids 是否全部已在 Result VectorDB → Pass/Fail
    │   Pass / 超出最大重试
    ▼ 更新 scene_results_meta[N](对白已内嵌在 storyboards.dialogues 中)
    ▼ current_scene_idx += 1 → 回到 Scene Loop Node

最终汇总(Aggregate Node)—— 复用现有逻辑

# aggregate_node.py
# ✅ 直接复用 ScreenplayService.store_parsed_elements() 中的分镜写入逻辑
# 只需补充 screenplay_element_refs + storyboards + storyboard_items + project_resources

async def aggregate_node(state: ScreenplayExtractionState) -> dict:
    async with get_async_session() as db:
        screenplay_service = ScreenplayService(db)
        
        # 1. 构建 final_parsed_data(与现有方案 JSON 结构完全对齐)
        final_parsed_data = assemble_final_json(state)
        # {
        #   "characters": [...],     ← Role Node 已写 PG,此处仅用于 refs
        #   "character_tags": {...},
        #   "locations": [...],      ← Scene Node 已写 PG,此处仅用于 refs
        #   "location_tags": {...},
        #   "props": [...],          ← Prop Node 已写 PG,此处仅用于 refs
        #   "prop_tags": {...},
        #   "storyboards": [...]     ← Shot Node 产出(含内嵌 dialogues),此处首次写 PG
        # }
        
        # 2. 复用现有 store_parsed_elements(跳过已写入步骤①②③⑤)
        await screenplay_service.store_parsed_elements(
            screenplay_id=state["screenplay_id"],
            parsed_data=final_parsed_data,
            auto_create_elements=False,   # ✅ 角色/场景/道具已由各节点单独写入
            auto_create_tags=False,        # ✅ 标签也已由各节点单独写入
            auto_create_storyboards=True   # ✅ 只写分镜、对白、refs、resources
        )
        
    return {"status": "completed", "final_result": final_parsed_data}

依赖与部署

新增 Python 依赖

# requirements.txt 新增
langgraph>=0.2.0
llama-index-core>=0.11.0
llama-index-embeddings-openai>=0.2.0
chromadb>=0.5.0

安装命令

docker exec jointo-server-app pip install \
  "langgraph>=0.2.0" \
  "llama-index-core>=0.11.0" \
  "llama-index-embeddings-openai>=0.2.0" \
  "chromadb>=0.5.0"

Chroma 持久化目录

# server/app/core/config.py 新增
CHROMA_PERSIST_DIR: str = "/tmp/chroma_screenplay"  # 开发环境
# 生产环境建议挂载为 Docker volume

Docker Compose 变更

# docker-compose.yml
services:
  app:
    volumes:
      - chroma_data:/tmp/chroma_screenplay  # 新增 Chroma 持久化

volumes:
  chroma_data:  # 新增

迁移策略

双模式并行阶段(当前阶段)

  • mode=default(默认):现有 Celery + 单次 LLM,100% 流量
  • mode=langgraph(新):新管道,灰度测试,按需触发

质量验证指标

指标 现有方案基线 LangGraph 目标
角色识别准确率 ~85% >92%
道具归属正确率 ~70% >85%
分镜引用一致性 ~75% >90%
长剧本(>5000字)成功率 ~60% >90%

切换条件

满足以下条件后,将 mode=langgraph 设为默认值:

  1. 在 50+ 个真实剧本上验证质量指标达标
  2. 平均处理时间 < 现有方案 3 倍
  3. 错误率 < 5%

风险与对策

风险 概率 影响 对策
LLM 调用次数增加导致 Credit 消耗过多 场景循环节点合并调用;对话节点 batch 处理
Chroma 持久化目录磁盘占用过大 Aggregate Node 完成后自动 cleanup()
LangGraph 图状态体积过大(超长剧本) scene_results 按场景完成后立即持久化到 DB,State 只保留必要元数据
embedding 模型费用 复用项目现有 OpenAI API key 额度;text-embedding-3-small 成本极低
场景分割不准确导致循环混乱 Init Node 场景分割使用规则+LLM 双重策略;fallback 到整文一次性处理
自校验死循环(始终失败) 硬性限制 max_retries=2,超过后接受当前结果继续

优化建议

以下优化点为工程落地阶段的重点事项,按优先级排序。

1. LangGraph 状态体积优化(优先级:高)

问题scene_results 随长剧本线性膨胀,50 场景 × 每场景结果约 10KB = 500KB+ 的 State 内存压力,影响 LangGraph 序列化性能。

方案

# validate_node.py 通过后立即持久化,清理 State 内的完整结果
async def validate_node(state: ScreenplayExtractionState) -> dict:
    # ... 引用校验逻辑(Pass 分支)...
    
    scene_result = build_scene_result(state)  # storyboards 已含内嵌 dialogues
    
    # ✅ 立即写入 PostgreSQL(部分提交)
    await screenplay_service.upsert_scene_partial(
        screenplay_id=state["screenplay_id"],
        scene_idx=state["current_scene_idx"],
        scene_result=scene_result
    )
    
    # ✅ State 只保留必要 meta,释放大对象
    scene_meta = {
        "scene_name": scene_result["scene_name"],
        "scene_idx": state["current_scene_idx"],
        "char_names": [c["name"] for c in scene_result.get("characters", [])],
        "prop_names": [p["name"] for p in scene_result.get("props", [])]
    }
    
    return {
        "scene_results": state["scene_results"] + [scene_meta],  # 仅保留 meta
        "current_scene_idx": state["current_scene_idx"] + 1
    }

State 字段调整:将 scene_results: list[SceneResult] 拆为:

  • scene_results: list[SceneMeta](State 中只存轻量 meta)
  • PostgreSQL 中存完整 scene_detail(按场景增量写入)

2. 剧本结构化切片策略(优先级:高)

原"三级 Fallback 场景分割"已完全重新设计:利用 MarkdownNodeParser 的 MD 标题感知能力,将场景分割与向量切片合并为一个步骤,且完全无 LLM 调用

背景:MD 结构已由上游保证

ScreenplayFileParserService._format_as_markdown() 已将剧本标准化为:

## 第一场 海边

日景。海浪拍打着礁石...

张三看着远方,眼神迷离。

## 第二场 咖啡馆

夜景。灯光昏暗,烟雾弥漫...
  • ## 二级标题 = 场景边界(全大写行 → ##第X场 / INT. / EXT.##
  • 段落之间以空行分隔
  • 标题层级天然可用于结构切片

两级切片策略(MarkdownNodeParser + SentenceSplitter)

# vector_store/raw_vector_builder.py

from llama_index.core.node_parser import MarkdownNodeParser, SentenceSplitter
from llama_index.core import Document

class RawVectorBuilder:
    """
    两级切片策略:
    Level 1: MarkdownNodeParser  → 按 ## 标题切出场景级节点
    Level 2: SentenceSplitter    → 对超长场景进一步按段落切片
    
    优势:
    - 场景分割完全无 LLM 调用(100% 确定性)
    - scenes_list 直接从标题 metadata 提取
    - 每个 chunk 携带 scene_name metadata
    - 各抽取节点可按 scene_name 精确过滤检索
    """
    SCENE_CHUNK_MAX_TOKENS = 512   # 超过此值对场景内部进一步切片
    PARA_CHUNK_SIZE = 256          # 段落级 chunk 大小
    PARA_CHUNK_OVERLAP = 32        # 段落级 chunk 重叠

    async def build(self, content: str, chroma_store) -> tuple[list[str], list[str]]:
        """
        Returns:
            (chunk_ids, scenes_list)  ← scenes_list 从 MD 标题直接提取,无 LLM
        """
        # === Level 1: 按 ## 标题切场景 ===
        md_parser = MarkdownNodeParser(include_metadata=True)
        scene_nodes = md_parser.get_nodes_from_documents(
            [Document(text=content)]
        )
        
        # 从 header_path metadata 提取场景名(无 LLM)
        scenes_list = []
        for node in scene_nodes:
            # header_path 示例: "第一场 海边" 或 "INT. 咖啡馆 - 夜"
            scene_name = node.metadata.get("header_path", "").strip()
            if scene_name:
                scenes_list.append(scene_name)

        # fallback: 无标题时整文为一个"场景"
        if not scenes_list:
            scenes_list = ["全文"]
            scene_nodes[0].metadata["scene_name"] = "全文"

        # === Level 2: 超长场景内部切段落 ===
        para_splitter = SentenceSplitter(
            chunk_size=self.PARA_CHUNK_SIZE,
            chunk_overlap=self.PARA_CHUNK_OVERLAP
        )
        final_chunks = []

        for scene_idx, scene_node in enumerate(scene_nodes):
            scene_name = scenes_list[scene_idx] if scene_idx < len(scenes_list) else f"场景_{scene_idx}"

            if len(scene_node.text.split()) > self.SCENE_CHUNK_MAX_TOKENS:
                # 场景过长:切段落级 sub-chunks
                sub_nodes = para_splitter.get_nodes_from_documents(
                    [Document(text=scene_node.text)]
                )
                for sub_idx, sub in enumerate(sub_nodes):
                    sub.metadata.update({
                        "scene_name": scene_name,
                        "scene_idx": scene_idx,
                        "chunk_type": "paragraph",
                        "sub_idx": sub_idx
                    })
                    final_chunks.append(sub)
            else:
                # 场景长度合适:直接作为一个 chunk
                scene_node.metadata.update({
                    "scene_name": scene_name,
                    "scene_idx": scene_idx,
                    "chunk_type": "scene"
                })
                final_chunks.append(scene_node)

        # === 批量 Embedding + 写入 Chroma ===
        texts = [node.text for node in final_chunks]
        metadatas = [node.metadata for node in final_chunks]
        chunk_ids = [f"chunk_{i}" for i in range(len(final_chunks))]

        embeddings = await self._batch_embed(texts)  # 批量请求,内部限流+重试

        collection = chroma_store.get_raw_collection()
        collection.add(
            ids=chunk_ids,
            documents=texts,
            embeddings=embeddings,
            metadatas=metadatas
        )

        return chunk_ids, scenes_list

场景过滤检索(各抽取节点的核心改进)

有了 scene_name metadata,每个节点检索时不再依赖纯语义相似度,而是精确过滤到当前场景:

# 旧方式(可能召回其他场景内容)
raw_col.query(query_texts=["道具"], n_results=10)

# ✅ 新方式:过滤 + 语义双保障
raw_col.query(
    query_texts=["道具 物品"],
    n_results=10,
    where={"scene_name": current_scene_name}   # Chroma metadata 过滤
)

# 如果场景名未命中(容错):降级为全文语义检索
if not results["documents"][0]:
    raw_col.query(query_texts=["道具 物品"], n_results=10)

Init Node 简化后的实现

@node_monitor("init_node")
async def init_node(state: ScreenplayExtractionState) -> dict:
    """
    scenes_list 直接来自 MarkdownNodeParser 的标题 metadata
    ✅ 无 LLM 调用,无正则 Fallback
    """
    content = state["content"]
    task_id = state["task_id"]

    chroma_store = ChromaStore(task_id=task_id)
    builder = RawVectorBuilder()

    # build() 同时返回 chunk_ids 和 scenes_list
    chunk_ids, scenes_list = await builder.build(content, chroma_store)

    if not scenes_list:
        return node_error("init_node", ExtractionErrorCode.SCENE_SPLIT_FAILED, "未能提取场景列表")

    return {
        "scenes_list": scenes_list,
        "chunk_ids": chunk_ids,
        "total_chunks": len(chunk_ids),
        "current_scene_idx": 0
    }

切片结构示意

原始 MD 内容
│
├── ## 第一场 海边                    ← MarkdownNodeParser Level 1
│   ├── [场景整体 < 512 tokens]       → chunk_0 {scene_name="第一场 海边", chunk_type="scene"}
│
├── ## 第二场 皇宫(长场景)           ← MarkdownNodeParser Level 1
│   ├── 段落1(皇帝登场)             → chunk_1 {scene_name="第二场 皇宫", chunk_type="paragraph"}
│   ├── 段落2(对话)                 → chunk_2 {scene_name="第二场 皇宫", chunk_type="paragraph"}
│   └── 段落3(结尾动作)             → chunk_3 {scene_name="第二场 皇宫", chunk_type="paragraph"}
│
└── ## 第三场 咖啡馆                  ← MarkdownNodeParser Level 1
    └── [场景整体 < 512 tokens]       → chunk_4 {scene_name="第三场 咖啡馆", chunk_type="scene"}

Raw VectorDB:5 个 chunk,全部携带 scene_name 字段
scenes_list: ["第一场 海边", "第二场 皇宫", "第三场 咖啡馆"]  ← 零 LLM 成本

3. Embedding 批量异步优化(优先级:中)

问题:Init Node 对每个 chunk 单独调用 embedding API,长剧本(50+ chunks)会产生大量串行 API 调用,触发 OpenAI 429 Rate Limit。

方案:批量请求 + 并发控制 + 指数退避重试。

# raw_vector_builder.py

import asyncio
from tenacity import retry, stop_after_attempt, wait_exponential

BATCH_SIZE = 20         # OpenAI 单次最多 2048 个输入,保守取 20
MAX_CONCURRENT = 3      # 最大并发批次数

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=2, max=10)
)
async def get_embeddings_batch(texts: list[str]) -> list[list[float]]:
    """批量获取 embeddings,自动重试"""
    response = await openai_client.embeddings.create(
        model="text-embedding-3-small",
        input=texts
    )
    return [item.embedding for item in response.data]


async def build(self, content: str, chroma_store: ChromaStore) -> list[str]:
    # 1. 切片
    nodes = splitter.get_nodes_from_documents([Document(text=content)])
    texts = [node.text for node in nodes]
    
    # 2. 按批次分组
    batches = [texts[i:i+BATCH_SIZE] for i in range(0, len(texts), BATCH_SIZE)]
    
    # 3. 并发控制(信号量限制最大并发)
    semaphore = asyncio.Semaphore(MAX_CONCURRENT)
    
    async def process_batch(batch: list[str]) -> list[list[float]]:
        async with semaphore:
            return await get_embeddings_batch(batch)
    
    # 4. 并发执行所有批次
    batch_results = await asyncio.gather(*[process_batch(b) for b in batches])
    all_embeddings = [emb for batch in batch_results for emb in batch]
    
    # 5. 批量写入 Chroma
    collection = chroma_store.get_raw_collection()
    collection.add(
        ids=[f"chunk_{i}" for i in range(len(texts))],
        documents=texts,
        embeddings=all_embeddings,
        metadatas=[{"chunk_index": i} for i in range(len(texts))]
    )
    
    return [f"chunk_{i}" for i in range(len(texts))]

新增依赖tenacity(用于指数退避重试,项目可能已有)


4. Result VectorDB 元数据字段统一规范(优先级:中)

问题:各节点写入 Result VectorDB 的 metadata 字段命名不一致,导致下游查询需要大量适配代码。

统一 metadata schema

# vector_store/result_vector_store.py

from dataclasses import dataclass
from typing import Optional

@dataclass
class CharacterMeta:
    type: str = "character"       # 固定值
    name: str = ""
    role_type: str = ""           # main / supporting / extra
    scene_name: Optional[str] = None  # None 表示全局角色

@dataclass
class PropMeta:
    type: str = "prop"
    name: str = ""
    scene_name: str = ""
    owner_character: Optional[str] = None
    prop_type: str = ""           # interactive / set_dressing

@dataclass  
class ShotMeta:
    type: str = "shot"
    name: str = ""
    scene_name: str = ""
    sequence_order: int = 0
    related_characters: str = ""  # JSON 字符串(Chroma meta 只支持 str/int/float/bool)
    related_props: str = ""       # JSON 字符串

@dataclass
class DialogMeta:
    type: str = "dialog"
    name: str = ""
    scene_name: str = ""
    character_name: Optional[str] = None
    sequence_order: int = 0
    related_characters: str = ""  # JSON 字符串

统一查询接口

class ResultVectorStore:
    def query_by_type(self, collection, element_type: str, scene_name: str = None):
        """按类型和场景查询,统一 where 条件"""
        where = {"type": element_type}
        if scene_name:
            where["scene_name"] = scene_name
        return collection.get(where=where)

5. Chroma 清理策略增强(优先级:中)

问题:任务中断(进程 kill、容器重启)或失败时,cleanup() 不会被调用,导致 /tmp/chroma_screenplay 积累孤立数据。

三层清理策略

# chroma_store.py

class ChromaStore:
    
    def cleanup(self):
        """正常完成后清理"""
        try:
            self.client.delete_collection(f"raw_{self.task_id}")
            self.client.delete_collection(f"result_{self.task_id}")
            logger.info("Chroma 数据清理完成 | task_id=%s", self.task_id)
        except Exception as e:
            logger.warning("Chroma 清理失败(非致命)| task_id=%s | error=%s", self.task_id, e)
    
    @classmethod
    def cleanup_expired(cls, persist_dir: str, max_age_hours: int = 24):
        """定期清理超过指定时间的孤立数据(由定时任务调用)"""
        import os, time
        for task_dir in os.listdir(persist_dir):
            task_path = os.path.join(persist_dir, task_dir)
            mtime = os.path.getmtime(task_path)
            age_hours = (time.time() - mtime) / 3600
            if age_hours > max_age_hours:
                shutil.rmtree(task_path, ignore_errors=True)
                logger.info("清理孤立 Chroma 数据 | dir=%s | age=%.1fh", task_dir, age_hours)

定时清理任务(接入现有 Celery Beat):

# app/tasks/maintenance_tasks.py

@shared_task
def cleanup_chroma_expired_data():
    """每小时执行,清理超过 24h 的孤立 Chroma 数据"""
    ChromaStore.cleanup_expired(
        persist_dir=settings.CHROMA_PERSIST_DIR,
        max_age_hours=24
    )
# celery_beat_schedule 新增
"cleanup-chroma-hourly": {
    "task": "app.tasks.maintenance_tasks.cleanup_chroma_expired_data",
    "schedule": crontab(minute=0),  # 每小时整点
}

6. 节点级日志与耗时统计(优先级:低)

方案:实现通用节点装饰器,统一注入日志和耗时,不侵入各节点业务逻辑。

# screenplay_extraction/utils.py

import time
import functools
from app.core.logging import get_logger

logger = get_logger(__name__)

def node_monitor(node_name: str):
    """节点监控装饰器:自动记录耗时、状态、异常"""
    def decorator(func):
        @functools.wraps(func)
        async def wrapper(state: dict) -> dict:
            task_id = state.get("task_id", "unknown")
            scene_idx = state.get("current_scene_idx", -1)
            start_time = time.monotonic()
            
            logger.info(
                "节点开始 | node=%s | task_id=%s | scene_idx=%s",
                node_name, task_id, scene_idx
            )
            
            try:
                result = await func(state)
                elapsed = time.monotonic() - start_time
                
                logger.info(
                    "节点完成 | node=%s | task_id=%s | scene_idx=%s | elapsed=%.2fs",
                    node_name, task_id, scene_idx, elapsed
                )
                return result
                
            except Exception as e:
                elapsed = time.monotonic() - start_time
                logger.error(
                    "节点异常 | node=%s | task_id=%s | scene_idx=%s | elapsed=%.2fs | error=%s",
                    node_name, task_id, scene_idx, elapsed, str(e),
                    exc_info=True
                )
                return {"status": "failed", "error": f"{node_name} 异常: {str(e)}"}
        
        return wrapper
    return decorator


# 使用方式(各节点统一装饰)
@node_monitor("role_node")
async def role_node(state: ScreenplayExtractionState) -> dict:
    # 纯业务逻辑,无需手动写日志和 try/except
    ...

监控指标建议(可接入现有 Prometheus):

  • screenplay_node_duration_seconds{node_name, status} — 节点耗时分布
  • screenplay_extraction_scene_total{screenplay_id} — 场景处理计数
  • screenplay_llm_retry_total{node_name} — 自校验重试次数

7. Validate Node 全量校验(优先级:高)

问题:当前实现只校验场景内最后一条 Shot 的引用,若同一场景有多条 Shot 存在悬空引用,均会漏检。

方案:循环遍历场景内所有 Shots,聚合 unknown_refs,并生成可读报告供调试。

# server/app/services/screenplay_extraction/nodes/validate_node.py

async def validate_node(state: ScreenplayExtractionState) -> dict:
    task_id = state["task_id"]
    current_idx = state["current_scene_idx"]
    scene_results = state["scene_results"]

    if not scene_results or current_idx >= len(scene_results):
        return {"validation_passed": True}

    current_scene = scene_results[current_idx]
    shots = current_scene.get("shots", [])

    if not shots:
        return {"validation_passed": True}

    # 获取已知名称集合
    chroma_store = ChromaStore(task_id=task_id)
    result_col = chroma_store.get_result_collection()
    existing = result_col.get(where={"type": {"$in": ["character", "prop", "location"]}})
    known_names = {m["name"] for m in existing.get("metadatas", []) if "name" in m}

    # ✅ 循环检查场景内所有 Shots
    all_unknown: dict[int, set[str]] = {}
    for shot_idx, shot in enumerate(shots):
        refs = (
            set(shot.get("characters", []))
            | set(shot.get("props", []))
            | set(shot.get("locations", []))
        )
        unknown = refs - known_names
        if unknown:
            all_unknown[shot_idx] = unknown

    if not all_unknown:
        return {"validation_passed": True, "retry_count": 0}

    # ✅ 生成 unknown_refs 报告(调试用)
    report = {
        "scene_idx": current_idx,
        "scene_name": current_scene.get("scene_name"),
        "unknown_refs_by_shot": {
            str(shot_idx): list(names)
            for shot_idx, names in all_unknown.items()
        },
        "total_unknown_shots": len(all_unknown),
        "total_unknown_refs": sum(len(v) for v in all_unknown.values())
    }
    logger.warning(
        "Validate Node 发现未知引用 | scene=%s | report=%s",
        current_scene.get("scene_name"), report
    )

    if state["retry_count"] < state["max_retries"]:
        return {
            "validation_passed": False,
            "retry_count": state["retry_count"] + 1,
            "last_validation_report": report  # ✅ 写入 State,Shot Node 重抽时可参考
        }

    # 超限:接受结果,记录警告
    logger.warning(
        "Validate Node 超过最大重试次数,接受当前结果 | scene=%s",
        current_scene.get("scene_name")
    )
    return {"validation_passed": True, "retry_count": 0, "last_validation_report": report}

State 新增字段

last_validation_report: Optional[dict]  # 最近一次校验报告,Shot Node 重抽时注入上下文

Shot Node 重抽时利用报告

# shot_node.py 重抽路径:将 unknown_refs 作为负样本提示注入 LLM
if state.get("last_validation_report"):
    unknown_hint = state["last_validation_report"].get("unknown_refs_by_shot", {})
    # 在 prompt 中追加:"以下引用在已知元素列表中不存在,请修正:{unknown_hint}"

8. 向量检索动态策略(优先级:中)

问题:固定 n_results(Role=20,其他=10)在以下场景会失效:

  • 长剧本(>100 chunks):Top-10 可能遗漏关键段落
  • 短剧本(<10 chunks):Top-20 返回重复内容,浪费 Token
  • 相似场景重叠(如同一地点出现 20 次):语义检索召回噪声过多

方案:基于 chunk 总量和节点类型动态计算 n_results,加入 similarity_threshold 过滤。

# vector_store/chroma_store.py

def dynamic_query(
    self,
    collection,
    query_texts: list[str],
    node_type: str,
    total_chunks: int,
    similarity_threshold: float = 0.75
) -> list[str]:
    """
    动态调整检索参数
    
    n_results 策略:
      role_node:   min(max(20, total_chunks // 5), 50)  # 全局节点多检索
      scene/prop/shot/dialog: min(max(10, total_chunks // 10), 30)  # 局部节点
    """
    base_n = {
        "role":   max(20, min(total_chunks // 5, 50)),
        "scene":  max(10, min(total_chunks // 10, 30)),
        "prop":   max(10, min(total_chunks // 10, 30)),
        "shot":   max(10, min(total_chunks // 10, 30)),
        "dialog": max(10, min(total_chunks // 10, 30)),
    }.get(node_type, 10)

    results = collection.query(
        query_texts=query_texts,
        n_results=base_n,
        include=["documents", "distances", "metadatas"]
    )

    # ✅ similarity_threshold 过滤(Chroma 距离为余弦距离,越小越相似)
    filtered_docs = []
    for doc, dist in zip(
        results["documents"][0],
        results["distances"][0]
    ):
        similarity = 1 - dist  # 余弦相似度
        if similarity >= similarity_threshold:
            filtered_docs.append(doc)

    return filtered_docs if filtered_docs else results["documents"][0][:3]  # 至少返回 3 条

配置项config.py 新增):

CHROMA_SIMILARITY_THRESHOLD: float = 0.75   # 相似度阈值,低于此过滤
CHROMA_ROLE_MAX_RESULTS: int = 50            # 角色节点最大检索数
CHROMA_LOCAL_MAX_RESULTS: int = 30           # 局部节点最大检索数

9. 节点异常增强:error_code + failed_node(优先级:中)

问题:节点异常只返回 status=failed 和通用 error 字符串,Celery 重试时无法定位是哪个节点、哪类错误。

方案:扩展错误结构,增加 error_codefailed_node

# screenplay_extraction/errors.py

from enum import IntEnum

class ExtractionErrorCode(IntEnum):
    # 通用
    UNKNOWN = 0
    # Init Node
    CHUNKING_FAILED = 1001
    EMBEDDING_FAILED = 1002
    SCENE_SPLIT_FAILED = 1003
    # Role Node
    ROLE_LLM_FAILED = 2001
    ROLE_DB_WRITE_FAILED = 2002
    # Scene Node
    SCENE_LLM_FAILED = 3001
    SCENE_DB_WRITE_FAILED = 3002
    # Prop Node
    PROP_LLM_FAILED = 4001
    PROP_DB_WRITE_FAILED = 4002
    # Shot Node(含内嵌对白提取)
    SHOT_LLM_FAILED = 5001
    SHOT_TAG_RESOLVE_FAILED = 5002
    # Aggregate Node
    AGGREGATE_DB_WRITE_FAILED = 6001
    AGGREGATE_STORYBOARD_FAILED = 6002
# 节点内统一错误返回格式
def node_error(node_name: str, code: ExtractionErrorCode, detail: str) -> dict:
    return {
        "status": "failed",
        "failed_node": node_name,
        "error_code": int(code),
        "error": f"[{code.name}] {detail}"
    }

# State 新增字段
class ScreenplayExtractionState(TypedDict):
    # ...
    failed_node: Optional[str]   # 失败的节点名称
    error_code: Optional[int]    # 错误码(对应 ExtractionErrorCode)
# Celery 任务重试时可基于 failed_node 定向重跑
async def _run_langgraph_extraction(...):
    result = await graph.ainvoke(initial_state)
    if result.get("status") == "failed":
        failed_node = result.get("failed_node", "unknown")
        error_code = result.get("error_code", 0)
        logger.error(
            "LangGraph 抽取失败 | screenplay_id=%s | failed_node=%s | error_code=%d",
            screenplay_id, failed_node, error_code
        )
        # 后续可实现:从 failed_node 断点续跑(当前版本整体重试)
        raise ExtractionFailedError(failed_node=failed_node, error_code=error_code)

10. 资源占位符命名与类型规范(优先级:低)

问题project_resources 占位记录缺少统一命名规则和细化类型标记,后续 AI 生图批量关联时难以精确筛选。

方案:规范 file_urlresource_namemeta_data 字段命名。

# aggregate_node.py 写 project_resources 时

PLACEHOLDER_URL = "placeholder://pending-ai-generation"

def build_resource_record(
    tag_id: UUID,
    element_type: ElementType,  # CHARACTER / LOCATION / PROP
    element_name: str,
    tag_label: str,
    screenplay_id: UUID,
    project_id: UUID,
    created_by: UUID
) -> dict:
    """统一的占位资源记录结构"""

    # ✅ 命名规则:{元素类型}_{元素名}_{标签}
    resource_name = f"{element_type.name.lower()}_{element_name}_{tag_label}"

    return {
        "project_id": project_id,
        "element_tag_id": tag_id,
        "resource_type": ELEMENT_TO_RESOURCE_TYPE[element_type],
        "resource_name": resource_name,       # ✅ 唯一可读名称
        "file_url": PLACEHOLDER_URL,
        "meta_data": {
            "source": "screenplay_parse_langgraph",
            "screenplay_id": str(screenplay_id),
            "element_type": element_type.name,
            "element_name": element_name,
            "tag_label": tag_label,
            "generation_status": "pending",    # ✅ 细化状态:pending/generating/done/failed
            "generation_priority": _calc_priority(element_type),  # CHARACTER>LOCATION>PROP
        },
        "created_by": created_by
    }

def _calc_priority(element_type: ElementType) -> int:
    """生成优先级:角色=1(最高),场景=2,道具=3"""
    return {ElementType.CHARACTER: 1, ElementType.LOCATION: 2, ElementType.PROP: 3}.get(element_type, 3)

AI 生图任务筛选条件(后续查询示例):

# 查询所有待生成的角色资源,按优先级排序
await resource_repo.find({
    "meta_data->>'generation_status'": "pending",
    "meta_data->>'screenplay_id'": screenplay_id,
}, order_by="meta_data->>'generation_priority' ASC")

11. 向量库冷存档策略(优先级:低)

问题cleanup() 在 Aggregate Node 完成后直接删除所有向量数据。若后续需要补抽(如用户要求重新生成分镜),需要重新切片和向量化,增加时间和 API 成本。

方案:引入冷存档机制,区分 Raw VectorDB 和 Result VectorDB 的保留策略。

# chroma_store.py

class CleanupMode(str, Enum):
    FULL = "full"        # 全量删除(开发/测试)
    RESULT_ONLY = "result_only"  # 只删 Result,保留 Raw(默认生产)
    ARCHIVE = "archive"  # Raw 冷存档(重命名 collection,不删除)

def cleanup(self, mode: CleanupMode = CleanupMode.RESULT_ONLY):
    """
    生产默认:只删 result collection,保留 raw collection。
    Raw VectorDB 可用于:
      - 用户请求重新抽取时直接复用(跳过切片和 embedding 步骤)
      - 历史追溯或增量补抽(如仅补充道具)
    """
    try:
        # ✅ 始终删除 result(抽取结果,任务特定,无需保留)
        self.client.delete_collection(f"result_{self.task_id}")
        logger.info("删除 Result VectorDB | task_id=%s", self.task_id)

        if mode == CleanupMode.FULL:
            self.client.delete_collection(f"raw_{self.task_id}")
            logger.info("删除 Raw VectorDB | task_id=%s", self.task_id)

        elif mode == CleanupMode.ARCHIVE:
            # 冷存档:重命名为 archive_{screenplay_id}_{timestamp}
            archive_name = f"archive_{self.task_id}_{int(time.time())}"
            # Chroma 不支持 rename,通过重建实现冷存档
            raw_data = self.client.get_collection(f"raw_{self.task_id}").get()
            archive_col = self.client.create_collection(archive_name)
            archive_col.add(**raw_data)
            self.client.delete_collection(f"raw_{self.task_id}")
            logger.info("Raw VectorDB 冷存档 | archive=%s", archive_name)

        # mode == RESULT_ONLY:保留 raw collection,什么都不做

    except Exception as e:
        logger.warning("Chroma cleanup 失败(非致命)| task_id=%s | error=%s", self.task_id, e)

配置项config.py):

CHROMA_CLEANUP_MODE: str = "result_only"  # full / result_only / archive
CHROMA_RAW_MAX_AGE_DAYS: int = 7          # result_only 模式下 raw 保留天数,超期由定时任务清理

定时任务更新(接入现有 Celery Beat):

@shared_task
def cleanup_chroma_expired_data():
    """定期清理超龄 raw collections(result_only 模式下残留)"""
    ChromaStore.cleanup_expired(
        persist_dir=settings.CHROMA_PERSIST_DIR,
        prefix="raw_",                             # 只清理 raw_ 前缀
        max_age_days=settings.CHROMA_RAW_MAX_AGE_DAYS
    )

实施计划

阶段一:基础设施(Week 1)

  • 安装新依赖,更新 requirements.txt
  • 实现 chroma_store.py(单元测试覆盖)
  • 实现 text_splitter.py(MD 切片验证)
  • 实现 graph_state.py

阶段二:节点开发(Week 2)

  • 实现 graph_builder.py(mock 节点,验证图结构和路由)
  • 实现 init_node.py + role_node.py
  • 实现 scene_loop_node.py + prop_node.py
  • 实现 shot_node.py + validate_node.py
  • 实现 aggregate_node.py(处理 storyboards + 内嵌 dialogues 写入 PG)

阶段三:集成(Week 3)

  • 实现 extraction_service.py
  • 实现 screenplay_langgraph_task.py(Celery 任务)
  • 修改 API 端点,接入 mode 参数路由
  • Docker Compose 更新(Chroma 持久化 volume)

阶段四:验证(Week 4)

  • 10 个短剧本(<2000 字)功能验证
  • 10 个长剧本(>5000 字)质量对比
  • 性能压测(并发 5 个任务)
  • 错误注入测试(LLM 返回格式错误的 fallback)

维护人员: 开发团队
最后更新: 2026-02-24(对齐现有 upload-and-parse 链路;Init Node 去除 MD 转换职责;新增文件转换复用说明)