You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

52 KiB

ADR 07: LangGraph 剧本智能抽取系统(简化版,无向量库)

状态: 已接受
日期: 2026-02-24
决策者: 开发团队


目录

  1. 背景与问题
  2. 决策
  3. 系统架构
  4. 数据库表关系
  5. 模块设计
  6. LangGraph 状态与节点
  7. 各节点提示词模板
  8. API 接口设计
  9. 数据流说明
  10. 运行与恢复策略
  11. 依赖与部署
  12. 风险与对策
  13. 实施计划

背景与问题

现有方案瓶颈

当前剧本抽取方案(AIService.parse_screenplay)存在以下结构性问题:

问题维度 现状 影响
单次巨型 Prompt 全量剧本内容 + 7 类抽取任务塞入同一 LLM 请求 长剧本(>1 万字)精度显著下降
无依赖链 道具、分镜抽取时无法复用已抽的角色/场景结果 角色名不一致、道具归属错误
无自校验 LLM 输出错误格式或引用不一致时无补偿机制 频繁触发 JSON 解析失败
无断点恢复 Celery 任务失败后只能整体重试 长剧本失败成本高
上下文丢失 场景之间缺乏信息传递 分镜叙事连贯性差

新方案动机

引入 LangGraph 的多节点智能抽取管道:

  • 依赖链管理:角色 → 场景 → 道具 → 分镜,每步复用已抽结果
  • 场景级上下文传递:每个场景独立处理,完整文本直接传递,无信息丢失
  • 自校验闭环:分镜抽取后逻辑校验,失败自动重抽(最多 2 次)
  • 节点级断点恢复:LangGraph Postgres Checkpointer 支持单节点重跑
  • 分阶段迁移:新方案作为 mode=langgraph 可选模式,不破坏现有接口

决策

采用 LangGraph + PostgreSQL 的纯内存状态工作流,不引入向量库。

技术选型理由

组件 选型 理由
流程控制 LangGraph 原生支持有状态图、条件边、循环、节点级重试
文本切片 MarkdownNodeParser + SentenceSplitter 利用 MD 标题感知能力,场景分割零 LLM 调用
上下文传递 LangGraph State(内存) 场景文本直接在 State 中传递
LLM 调用 OpenAI SDK(现有) 复用项目现有 AIProvider
结构化存储 PostgreSQL(现有) Source of Truth,节点直接写入
任务调度 Celery(现有) LangGraph pipeline 封装为 Celery 任务
断点恢复 LangGraph Postgres Checkpointer thread_id = task_id,支持 Resume

明确排除

  • 不使用 Chroma/Qdrant/pgvector:剧本已按场景切分,每场景 500-2000 字可直接传 LLM,无需向量检索
  • 不维护 provider_cache 持久化:PostgreSQL 是唯一数据源,Provider 完全无状态
  • 不提供 memory/vector/hybrid 多模式切换:仅保留单一路径 MemoryContextProvider

系统架构

前置说明:MD 转换与 OSS 存储已就位

⚠️ ScreenplayFileParserServiceupload-and-parse 接口)已完整实现 MD 转换与 OSS 存储链路,LangGraph 方案无需重新实现此模块,直接复用即可。

用户上传文件(MD / TXT / PDF / DOCX / RTF / DOC / PPTX)
    │
    ▼ ScreenplayFileParserService
    ├── TXT / MD       → 直接读取
    ├── DOCX           → python-docx 解析,Heading 转 MD 标题
    ├── PDF            → pdfplumber + PyMuPDF 去水印
    ├── RTF            → striprtf 解析
    └── DOC / PPTX     → textract / python-pptx 解析
    │
    ▼ _format_as_markdown()
    标准化为 Markdown 格式(全大写行 → ## 标题)
    │
    ▼ screenplay.file_url = OSS MD URL  ← 唯一事实来源

结论:LangGraph Init Node 直接使用 screenplay.file_url 的内容,无需任何格式转换


总体架构图

flowchart TB
    subgraph Stage1["阶段 1(已实现):上传 & MD 标准化"]
        Upload["POST /v1/screenplays/upload-and-parse"]
        Parser["ScreenplayFileParserService"]
        Process["任意格式 → 标准 MD → OSS 存储"]
        Upload --> Parser --> Process
    end

    subgraph Stage2["阶段 2(新增):LangGraph 智能抽取"]
        API["POST /{screenplay_id}/parse?mode=langgraph"]
        Download["① 从 screenplay.file_url 下载 MD 内容"]
        CeleryTask["② 提交 Celery Task"]
        API --> Download --> CeleryTask
    end

    subgraph LangGraphWorkflow["ScreenplayExtractionService(LangGraph 工作流)"]
        InitNode["Init Node\nMarkdown 地点片段切分\n写入 PG screenplay_locations 表"]
        RoleNode["Role Node\n全局角色抽取\n写 PG + State"]
        LocationLoop{"还有未处理地点片段?"}

        subgraph LocationIteration["每地点片段循环"]
            LocationNode["Location Node\n拍摄地点+标签抽取\n写 PG"]
            PropNode["Prop Node\n道具抽取\n写 PG"]
            ShotNode["Shot Node\n分镜抽取(含对白)\n仅写 State"]
            ValidateNode["Validate Node\n引用校验\nPass/Fail 路由"]
        end

        AggNode["Aggregate Node\n写分镜/对白/refs\n写 project_resources"]

        InitNode --> RoleNode --> LocationLoop
        LocationLoop -->|有地点片段| LocationNode
        LocationNode --> PropNode --> ShotNode --> ValidateNode
        ValidateNode -->|"Pass / 超限"| LocationLoop
        ValidateNode -->|"Fail (retry<2)"| ShotNode
        LocationLoop -->|全部完成| AggNode
    end

    Stage1 -->|"screenplay.file_url (OSS MD URL)"| Stage2
    Stage2 -->|"MD 纯文本 + scene_text[]"| LangGraphWorkflow

数据库表关系

涉及数据表总览

阶段一:上传 & 文件解析

表名 操作 说明
projects READ / UPDATE 读取父项目信息
screenplays INSERT 创建剧本主记录,parsing_status=PENDING
attachments INSERT 存储原始上传文件

阶段二:AI 解析结果写入(LangGraph 各节点)

表名 操作 写入内容 归属
screenplay_locations INSERT Init Node 切分的剧本地点片段列表 剧本级
project_characters UPSERT AI 抽取的角色(按名称去重) 父级项目
project_locations UPSERT AI 抽取的拍摄地点(按名称去重) 父级项目
project_props UPSERT AI 抽取的道具(按名称去重) 父级项目
screenplay_element_refs INSERT 剧本与项目元素的逻辑引用关系
project_element_tags INSERT 元素变体标签 父级项目
storyboards INSERT 分镜主记录 子项目
storyboard_items INSERT 分镜↔元素标签关联明细
storyboard_dialogues INSERT 分镜对白记录
project_resources INSERT(可选) 为每个元素标签创建占位符资源 父级项目
screenplays UPDATE 回填 parsing_status=COMPLETED

项目层级与数据归属

父级项目 (parent_project_id IS NULL)
├── project_characters   ← 角色(跨剧本共享)
├── project_locations    ← 场景(跨剧本共享)
├── project_props        ← 道具(跨剧本共享)
├── project_element_tags ← 元素变体标签
├── project_resources    ← 素材资源
│
└── 子项目 (parent_project_id = 父级项目.id)
    ├── screenplays          ← 剧本(一对一绑定子项目)
    ├── screenplay_locations ← 剧本地点片段列表(Init Node 写入)
    ├── storyboards          ← 分镜
    ├── storyboard_items     ← 分镜元素关联
    └── storyboard_dialogues ← 分镜对白

表关系 ER 图

erDiagram
    projects {
        UUID id PK
        UUID parent_project_id FK
        UUID screenplay_id FK
    }
    screenplays {
        UUID screenplay_id PK
        UUID project_id FK
        int parsing_status
        int character_count
        int location_count
    }
    screenplay_locations {
        UUID location_id PK
        UUID screenplay_id FK
        int location_idx
        string location_name
        text location_text
    }
    project_characters {
        UUID character_id PK
        UUID project_id FK
        UUID default_tag_id FK
        string name
    }
    project_locations {
        UUID location_id PK
        UUID project_id FK
        UUID default_tag_id FK
        string name
    }
    project_props {
        UUID prop_id PK
        UUID project_id FK
        UUID default_tag_id FK
        string name
    }
    screenplay_element_refs {
        UUID ref_id PK
        UUID screenplay_id FK
        int element_type
        UUID element_id
        int order_index
    }
    project_element_tags {
        UUID tag_id PK
        UUID project_id FK
        int element_type
        UUID element_id
        string tag_label
    }
    storyboards {
        UUID storyboard_id PK
        UUID project_id FK
        jsonb meta_data
        int order_index
    }
    storyboard_items {
        UUID item_id PK
        UUID storyboard_id FK
        UUID element_tag_id FK
        string element_name
        string tag_label
    }
    storyboard_dialogues {
        UUID dialogue_id PK
        UUID storyboard_id FK
        UUID character_id FK
        int dialogue_type
        string content
    }
    project_resources {
        UUID project_resource_id PK
        UUID project_id FK
        UUID element_tag_id FK
        string file_url
    }

    projects ||--o{ projects : "parent_project_id"
    projects ||--o| screenplays : "screenplay_id"
    projects ||--o{ project_characters : "父级项目 project_id"
    projects ||--o{ project_locations : "父级项目 project_id"
    projects ||--o{ project_props : "父级项目 project_id"
    projects ||--o{ project_element_tags : "父级项目 project_id"
    projects ||--o{ storyboards : "子项目 project_id"

    screenplays ||--o{ screenplay_locations : "screenplay_id"
    screenplays ||--o{ screenplay_element_refs : "screenplay_id"

    screenplay_element_refs }o--|| project_characters : "element_id(type=1)"
    screenplay_element_refs }o--|| project_locations : "element_id(type=2)"
    screenplay_element_refs }o--|| project_props : "element_id(type=3)"

    project_characters ||--o{ project_element_tags : "element_id(type=1)"
    project_locations ||--o{ project_element_tags : "element_id(type=2)"
    project_props ||--o{ project_element_tags : "element_id(type=3)"

    storyboards ||--o{ storyboard_items : "storyboard_id"
    storyboards ||--o{ storyboard_dialogues : "storyboard_id"

    storyboard_items }o--|| project_element_tags : "element_tag_id"
    storyboard_dialogues }o--o| project_characters : "character_id(可选)"

    project_element_tags ||--o{ project_resources : "element_tag_id"

写入顺序与依赖链

AI 解析阶段各节点顺序写入:

Init Node:
① screenplay_locations (INSERT) ← location_id, location_idx, location_name, location_text

Role Node:
② project_characters / project_element_tags(CHARACTER)
         │  (UPSERT,返回 character_id_map + character_tag_id_map)

Location Node(每地点片段):
③ project_locations + project_element_tags(LOCATION)
         │  (UPSERT,返回 location_tag_id_map)

Prop Node(每地点片段):
④ project_props + project_element_tags(PROP)
         │  (UPSERT,返回 prop_tag_id_map)

Aggregate Node(全局一次):
⑤ screenplay_element_refs  (INSERT)
⑥ storyboards              (INSERT)
⑦ storyboard_items         (INSERT,element_tag_id)
⑧ storyboard_dialogues     (INSERT,character_id 可选)
⑨ project_resources        (INSERT 占位符)
⑩ screenplays              (UPDATE:parsing_status=COMPLETED)

模块设计

目录结构

server/app/services/screenplay_extraction/
├── __init__.py
├── extraction_service.py          # 对外唯一入口
├── graph_state.py                 # LangGraph 状态定义(TypedDict)
├── graph_builder.py               # 图构建:节点注册 + 边定义
├── errors.py                      # 错误码枚举 + node_error() 工具
├── utils.py                       # node_monitor 装饰器、公共工具
├── nodes/
│   ├── __init__.py                # load_prompt_template() 懒加载工具
│   ├── init_node.py               # MD 切片 + 写 scenes 表
│   ├── role_node.py               # 全局角色抽取
│   ├── scene_node.py              # 场景/标签抽取(每场景)
│   ├── prop_node.py               # 道具抽取(每场景)
│   ├── shot_node.py               # 分镜抽取(含对白,每场景)
│   ├── validate_node.py           # 引用一致性校验
│   └── aggregate_node.py          # 汇总 + 写入 PostgreSQL
└── prompts/                       # 提示词模板文件(重启容器生效)
    ├── role_prompt.md
    ├── scene_prompt.md
    ├── prop_prompt.md
    └── shot_prompt.md

server/app/tasks/
└── screenplay_langgraph_task.py   # Celery 任务(async_to_sync)

为什么单独建 prompts/ 目录?

维度 说明
提示词与逻辑解耦 调整文案只改 .md 文件,节点代码不动
重启容器即生效 docker restart jointo-server-app 热更新
与项目风格一致 screenplay_parsing.md 也是文件形式
可读性好 纯 Markdown,不受 Python 字符串缩进干扰

extraction_service.py

class ScreenplayExtractionService:
    """LangGraph 剧本抽取对外唯一入口"""

    async def run(
        self,
        screenplay_id: str,
        task_id: str,
        content: str,
        custom_requirements: str | None = None
    ) -> dict:
        graph = build_extraction_graph()
        config = {
            "configurable": {"thread_id": task_id},
            "metadata": {"run_id": str(uuid.uuid4())}
        }
        initial_state = ScreenplayExtractionState(
            state_version=1,
            task_id=task_id,
            screenplay_id=screenplay_id,
            content=content,
            custom_requirements=custom_requirements or "",
            location_ids=[],
            location_texts={},           # location_id → location_text
            current_location_idx=0,
            character_ids=[],
            character_tag_id_map={},     # "角色名-标签" → UUID
            location_results=[],
            validation_passed=False,
            retry_count=0,
            max_retries=2,
            last_validation_report=None,
            status="running",
            failed_node=None,
            error_code=None,
            error=None,
            final_result=None
        )
        return await graph.ainvoke(initial_state, config=config)

graph_builder.py

def build_extraction_graph() -> CompiledGraph:
    from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver

    workflow = StateGraph(ScreenplayExtractionState)

    workflow.add_node("init",     init_node)
    workflow.add_node("role",     role_node)
    workflow.add_node("location", location_node)
    workflow.add_node("prop",     prop_node)
    workflow.add_node("shot",     shot_node)
    workflow.add_node("validate", validate_node)
    workflow.add_node("aggregate",aggregate_node)

    workflow.set_entry_point("init")
    workflow.add_edge("init", "role")

    workflow.add_conditional_edges(
        "role", route_location_loop,
        {"location": "location", "aggregate": "aggregate"}
    )
    workflow.add_edge("location", "prop")
    workflow.add_edge("prop",  "shot")
    workflow.add_edge("shot",  "validate")
    workflow.add_conditional_edges(
        "validate", route_validation,
        {"location": "location", "shot": "shot", "aggregate": "aggregate"}
    )
    workflow.add_edge("aggregate", END)

    checkpointer = AsyncPostgresSaver.from_conn_string(settings.LANGGRAPH_CHECKPOINT_DB)
    return workflow.compile(checkpointer=checkpointer)


def route_location_loop(state: ScreenplayExtractionState) -> str:
    if state["current_location_idx"] < len(state["location_ids"]):
        return "location"
    return "aggregate"


def route_validation(state: ScreenplayExtractionState) -> str:
    if state.get("validation_passed", False):
        # 推进到下一地点片段
        return "location" if state["current_location_idx"] + 1 < len(state["location_ids"]) else "aggregate"
    if state["retry_count"] >= state["max_retries"]:
        return "location" if state["current_location_idx"] + 1 < len(state["location_ids"]) else "aggregate"
    return "shot"

LangGraph 状态与节点

全局状态定义

# server/app/services/screenplay_extraction/graph_state.py

class LocationResult(TypedDict):
    location_id: str                       # screenplay_locations 表的 location_id
    location_idx: int
    project_location_id: str | None        # project_locations 表的 location_id
    location_tag_id_map: dict[str, str]    # "地点名-标签" → UUID
    prop_ids: list[str]
    prop_tag_id_map: dict[str, str]        # "道具名-标签" → UUID
    shot_ids: list[str]                    # Aggregate Node 写 PG 后填充


class ScreenplayExtractionState(TypedDict):
    state_version: int                     # 当前:1

    # 任务标识
    task_id: str
    screenplay_id: str

    # 输入(Init Node 处理后 content 清空释放内存)
    content: str | None
    custom_requirements: str

    # Init Node 产出
    location_ids: list[str]                # PG screenplay_locations 表的 location_id 列表
    location_texts: dict[str, str]         # location_id → location_text(内存传递上下文)
    current_location_idx: int

    # Role Node 产出(全局)
    character_ids: list[str]
    character_tag_id_map: dict[str, str]   # "角色名-标签" → UUID

    # 每地点片段抽取结果(累积)
    location_results: list[LocationResult]

    # 校验与重试
    validation_passed: bool
    retry_count: int
    max_retries: int
    last_validation_report: dict | None

    # 状态
    status: str                            # running / completed / failed
    failed_node: str | None
    error_code: int | None
    error: str | None

    # 最终输出(仅统计信息)
    final_result: dict | None

禁止 在 State 中存储完整角色对象、完整道具对象等大对象。所有完整数据通过 PostgreSQL 读写,State 只存 ID 和控制字段。


节点职责矩阵

节点 执行频率 写 PG 读取上下文 返回 State 关键字段
Init Node 一次 screenplay_locations content(MD 全文) location_ids, location_texts, content=None
Role Node 一次 project_characters + project_element_tags location_texts 全部合并 character_ids, character_tag_id_map
Location Node 每地点片段一次 project_locations + project_element_tags location_texts[location_id] + 已知角色名 location_results[N].location_*
Prop Node 每地点片段一次 project_props + project_element_tags location_texts[location_id] + 已知角色名 location_results[N].prop_*
Shot Node 每地点片段一次 无(仅写 State) location_texts[location_id] + 角色/地点/道具 tag_id_map location_results[N].shot_ids(暂存分镜数据)
Validate Node 每地点片段一次 State 中 shot 引用 + 已知 tag_id_map validation_passed, retry_count, last_validation_report
Aggregate Node 一次 screenplay_element_refs + storyboards + storyboard_items + storyboard_dialogues + project_resources + screenplays State 全量 status=completed, final_result

Init Node

# nodes/init_node.py

from llama_index.core.node_parser import MarkdownNodeParser, SentenceSplitter
from llama_index.core import Document

LOCATION_MAX_TOKENS = 512   # 超过此值对地点片段内部进一步切段落

@node_monitor("init_node")
async def init_node(state: ScreenplayExtractionState) -> dict:
    """
    职责:
    1. MarkdownNodeParser 按 ## 标题切分剧本地点片段(零 LLM 调用)
    2. 超长片段用 SentenceSplitter 切段落
    3. 写入 PG screenplay_locations 表
    4. 清空 content 释放内存
    """
    content = state["content"]
    screenplay_id = state["screenplay_id"]

    # Level 1: 按 ## 标题切分地点片段
    md_parser = MarkdownNodeParser(include_metadata=True)
    location_nodes = md_parser.get_nodes_from_documents([Document(text=content)])

    locations_list = []
    for node in location_nodes:
        location_name = node.metadata.get("header_path", "").strip()
        if location_name:
            locations_list.append((location_name, node.text))

    if not locations_list:
        locations_list = [("全文", content)]

    # Level 2: 超长片段切段落(合并文本作为上下文)
    para_splitter = SentenceSplitter(chunk_size=LOCATION_MAX_TOKENS, chunk_overlap=32)
    location_texts: dict[str, str] = {}

    # 写入 PG screenplay_locations 表
    async with get_async_session() as db:
        location_repo = ScreenplayLocationRepository(db)
        location_ids = []
        for idx, (location_name, location_text) in enumerate(locations_list):
            location = await location_repo.create(
                screenplay_id=screenplay_id,
                location_idx=idx,
                location_name=location_name,
                location_text=location_text
            )
            location_ids.append(str(location.location_id))
            location_texts[str(location.location_id)] = location_text

    return {
        "location_ids": location_ids,
        "location_texts": location_texts,
        "current_location_idx": 0,
        "content": None,           # 清空原始内容,释放内存
    }

Role Node

# nodes/role_node.py

@node_monitor("role_node")
async def role_node(state: ScreenplayExtractionState) -> dict:
    """
    职责:全局角色抽取(一次性)
    上下文:所有地点片段文本合并(角色是全剧本级别,需要全文视角)
    写 PG:project_characters + project_element_tags(CHARACTER)
    """
    # 合并所有地点片段文本作为角色抽取上下文
    all_text = "\n\n".join(state["location_texts"].values())

    # 如果全文超长(> 8000 字),取前 6000 字(角色通常早期出场)
    if len(all_text) > 8000:
        all_text = all_text[:6000] + "\n\n[...内容已截断,以上为主要角色出场段落...]"

    role_result = await extract_roles_with_llm(all_text, state["custom_requirements"])

    async with get_async_session() as db:
        char_repo = CharacterRepository(db)
        character_ids, character_tag_id_map = await char_repo.upsert_characters(
            project_id=state["parent_project_id"],
            role_result=role_result
        )

    return {
        "character_ids": character_ids,
        "character_tag_id_map": character_tag_id_map,
    }

Location Node

# nodes/location_node.py

@node_monitor("location_node")
async def location_node(state: ScreenplayExtractionState) -> dict:
    """
    职责:每地点片段独立执行,抽取拍摄地点描述 + 标签
    上下文:当前地点片段文本 + 已知角色名
    写 PG:project_locations + project_element_tags(LOCATION)
    """
    idx = state["current_location_idx"]
    location_id = state["location_ids"][idx]
    location_text = state["location_texts"][location_id]
    known_chars = list({k.rsplit("-", 1)[0] for k in state["character_tag_id_map"]})

    location_result = await extract_location_with_llm(
        location_name=_get_location_name(location_id, state),
        context=location_text,
        known_characters=known_chars
    )

    async with get_async_session() as db:
        loc_repo = ProjectLocationRepository(db)
        project_location_id, location_tag_id_map = await loc_repo.upsert_location(
            project_id=state["parent_project_id"],
            location_result=location_result
        )

    # 更新 location_results
    location_results = list(state["location_results"])
    if idx >= len(location_results):
        location_results.append(LocationResult(
            location_id=location_id, location_idx=idx,
            project_location_id=str(project_location_id),
            location_tag_id_map=location_tag_id_map,
            prop_ids=[], prop_tag_id_map={}, shot_ids=[]
        ))
    else:
        location_results[idx]["project_location_id"] = str(project_location_id)
        location_results[idx]["location_tag_id_map"] = location_tag_id_map

    return {"location_results": location_results}

Validate Node

# nodes/validate_node.py

@node_monitor("validate_node")
async def validate_node(state: ScreenplayExtractionState) -> dict:
    """
    全量校验:地点片段内所有 Shots 的引用是否均存在于已知 tag_id_map
    """
    idx = state["current_location_idx"]
    current_location = state["location_results"][idx]
    shots = current_location.get("_pending_shots", [])

    if not shots:
        return {"validation_passed": True, "current_scene_idx": idx + 1}

    # 构建已知名称集合
    known_names: set[str] = set()
    known_names.update(k.rsplit("-", 1)[0] for k in state["character_tag_id_map"])
    known_names.update(k.rsplit("-", 1)[0] for k in current_scene.get("prop_tag_id_map", {}))
    known_names.update(k.rsplit("-", 1)[0] for k in current_scene.get("location_tag_id_map", {}))

    all_unknown: dict[int, list[str]] = {}
    for shot_idx, shot in enumerate(shots):
        refs = (
            set(c["name"] for c in shot.get("characters", []))
            | set(p["name"] for p in shot.get("props", []))
            | set(l["name"] for l in shot.get("locations", []))
        )
        unknown = refs - known_names
        if unknown:
            all_unknown[shot_idx] = list(unknown)

    if not all_unknown:
        return {
            "validation_passed": True,
            "retry_count": 0,
            "current_location_idx": idx + 1
        }

    report = {
        "location_idx": idx,
        "unknown_refs_by_shot": {str(k): v for k, v in all_unknown.items()},
        "total_unknown_refs": sum(len(v) for v in all_unknown.values())
    }

    if state["retry_count"] < state["max_retries"]:
        return {
            "validation_passed": False,
            "retry_count": state["retry_count"] + 1,
            "last_validation_report": report
        }

    # 超限:接受当前结果,推进到下一地点片段
    logger.warning("Validate 超过最大重试 | location_idx=%d | report=%s", idx, report)
    return {
        "validation_passed": True,
        "retry_count": 0,
        "last_validation_report": report,
        "current_location_idx": idx + 1
    }

各节点提示词模板

公共模板加载器

# nodes/__init__.py

from pathlib import Path
from functools import lru_cache

_PROMPTS_DIR = Path(__file__).parent.parent / "prompts"

@lru_cache(maxsize=None)
def load_prompt_template(name: str) -> str:
    """进程内永久缓存(重启容器生效)"""
    return (_PROMPTS_DIR / f"{name}.md").read_text(encoding="utf-8")

Role Node 提示词(prompts/role_prompt.md

def build_role_prompt(context: str, custom_requirements: str = "") -> str:
    tmpl = load_prompt_template("role_prompt")
    prompt = tmpl.format(context=context)
    if custom_requirements:
        prompt += f"\n\n## 用户特殊要求\n{custom_requirements}"
    return prompt
你是专业影视剧本分析专家,专注从剧本中识别所有角色信息。

## 任务
从以下剧本片段中提取所有角色,包括主角、配角和群演。
为每个角色识别不同的**外形变体标签**(如年龄段、服装状态、特殊状态等)。

## 识别要求
- 识别所有出现的角色(含画外音、旁白角色)
- 每个角色至少1个标签,若角色有不同年龄/状态则分别创建
- tag_key 使用英文小写(如 youth / adult / injured)
- tag_label 使用中文(如 少年 / 青年 / 受伤)
- 角色类型:main=主角,supporting=配角,extra=群演

## 输出格式(严格遵守,仅返回 JSON)
```json
{
  "characters": [
    {
      "name": "角色名",
      "description": "角色描述",
      "role_type": "main|supporting|extra",
      "is_offscreen": false,
      "meta_data": {"gender": "male|female", "personality": "性格描述"}
    }
  ],
  "character_tags": {
    "角色名": [
      {
        "tag_key": "youth",
        "tag_label": "少年",
        "description": "15岁,穿着布衣,活泼好动",
        "meta_data": {"age": 15, "clothing": "布衣"}
      }
    ]
  }
}
```

## 剧本片段
{context}

Location Node 提示词(prompts/location_prompt.md

def build_location_prompt(location_name: str, context: str, known_characters: list[str]) -> str:
    char_list = "、".join(known_characters) if known_characters else "(暂无已知角色)"
    tmpl = load_prompt_template("location_prompt")
    return tmpl.format(location_name=location_name, char_list=char_list, context=context)
你是专业影视剧本分析专家,专注从剧本中识别拍摄地点信息。

## 当前任务
为地点片段「{location_name}」提取拍摄地点描述和时间/氛围变体标签。

## 已知角色(本片段可能出现)
{char_list}

## 识别要求
- 为拍摄地点提取不同的时间段/天气/氛围标签,标签必须有明确的视觉/光照差异
- 至少1个标签(如仅一种时间则创建"常规"标签)
- tag_key 使用英文小写(如 daytime / night / rainy)
- tag_label 使用中文(如 白天 / 夜晚 / 雨天)
- 顶层键必须为 "locations"(数组)

## 输出格式(严格遵守,仅返回 JSON)
```json
{
  "locations": [
    {
      "name": "{location_name}",
      "location": "地点描述",
      "description": "拍摄地点整体描述",
      "meta_data": {"time_of_day": "morning|afternoon|evening|night"}
    }
  ],
  "location_tags": {
    "{location_name}": [
      {
        "tag_key": "daytime",
        "tag_label": "白天",
        "description": "阳光明媚,鸟语花香",
        "meta_data": {"lighting": "natural", "weather": "sunny"}
      }
    ]
  }
}
```

## 当前地点片段原文
{context}

Prop Node 提示词(prompts/prop_prompt.md

def build_prop_prompt(location_name: str, context: str, known_characters: list[str]) -> str:
    char_list = "、".join(known_characters) if known_characters else "(暂无)"
    tmpl = load_prompt_template("prop_prompt")
    return tmpl.format(location_name=location_name, char_list=char_list, context=context)
你是专业影视剧本分析专家,专注识别剧本中的道具和物品。

## 当前任务
识别地点片段「{location_name}」中涉及的所有道具,包含互动道具和布景道具。

## 已知角色(owner_character 只能从此列表中选择)
{char_list}

## 识别要求
- **互动道具**(interactive):角色会使用/操作的物品,必须填写 owner_character
- **布景道具**(set_dressing):拍摄地点装饰性物品,必须填写 owner_location
- owner_character 必须从上方已知角色中选择,不得编造角色名
- 每个道具至少1个状态标签

## 输出格式(严格遵守,仅返回 JSON)
```json
{
  "props": [
    {
      "name": "道具名",
      "description": "道具描述",
      "prop_type": "interactive|set_dressing",
      "owner_character": "角色名(互动道具必填)",
      "owner_location": "地点名(布景道具必填)",
      "meta_data": {"material": "材质"}
    }
  ],
  "prop_tags": {
    "道具名": [
      {
        "tag_key": "new",
        "tag_label": "崭新",
        "description": "刚打造,金光闪闪",
        "meta_data": {"condition": "new"}
      }
    ]
  }
}
```

## 当前地点片段原文
{context}

Shot Node 提示词(prompts/shot_prompt.md

def build_shot_prompt(
    location_name: str,
    context: str,
    characters_context: str,
    location_context: str,
    props_context: str,
    storyboard_count_hint: int = 0,
    custom_requirements: str = "",
    unknown_refs_hint: str = ""
) -> str:
    count_hint = f"- 本地点片段目标分镜数量约 {storyboard_count_hint}\n" if storyboard_count_hint else ""
    extra_req = f"\n\n## 用户特殊要求\n{custom_requirements}" if custom_requirements else ""
    retry_hint = f"\n\n## ⚠️ 上次校验发现以下引用不在已知列表中,请修正\n{unknown_refs_hint}" if unknown_refs_hint else ""
    tmpl = load_prompt_template("shot_prompt")
    return tmpl.format(
        characters_context=characters_context,
        location_context=location_context,
        props_context=props_context,
        count_hint=count_hint,
        context=context,
        extra_req=extra_req,
        retry_hint=retry_hint,
    )


def format_characters_context(character_tag_id_map: dict[str, str]) -> str:
    seen: dict[str, list[str]] = {}
    for key in character_tag_id_map:
        parts = key.rsplit("-", 1)
        if len(parts) == 2:
            seen.setdefault(parts[0], []).append(parts[1])
    lines = [f"- {n}(可用标签:{'、'.join(t)})" for n, t in seen.items()]
    return "\n".join(lines) if lines else "(无已知角色)"


def format_location_context(location_name: str, location_tag_id_map: dict[str, str]) -> str:
    tags = [k.rsplit("-", 1)[1] for k in location_tag_id_map if k.startswith(f"{location_name}-")]
    return f"- {location_name}(可用标签:{'、'.join(tags)})" if tags else f"- {location_name}(标签:常规)"


def format_props_context(prop_tag_id_map: dict[str, str]) -> str:
    seen: dict[str, list[str]] = {}
    for key in prop_tag_id_map:
        parts = key.rsplit("-", 1)
        if len(parts) == 2:
            seen.setdefault(parts[0], []).append(parts[1])
    lines = [f"- {n}(可用标签:{'、'.join(t)})" for n, t in seen.items()]
    return "\n".join(lines) if lines else "(无已知道具)"
你是专业影视分镜师,擅长将剧本片段拆解为可执行的分镜脚本。

## 已知元素(所有引用必须严格来自以下列表,不得编造)

### 角色列表
{characters_context}

### 当前拍摄地点
{location_context}

### 道具列表
{props_context}

## 分镜拆解要求

### 镜头参数
{count_hint}- shot_size:close_up / medium_shot / full_shot / long_shot / extreme_close_up / over_shoulder
- camera_movement:static / pan / tilt / zoom / dolly / tracking / arc / crane / handheld

### 元素引用规则
- characters 和 props 使用**对象数组**格式(含 action / position / is_visible)
- 所有 name 必须与已知元素列表完全一致,不得编造
- tag_label 必须是该元素已知的标签名之一

### 对白(dialogues)提取规则
- 该分镜画面内发生的所有对白,**内嵌在对应分镜的 dialogues 数组中**
- character_name 只能从上方已知角色列表中选择
- 旁白/画外音:dialogue_type=3,character_name 填 null
- 内心OS:dialogue_type=2
- 同一分镜多条对白按发生顺序填写 sequence_order(从 0 开始)

## 输出格式(严格遵守,仅返回 JSON)
```json
{
  "storyboards": [
    {
      "title": "分镜标题",
      "description": "画面内容详细描述",
      "shooting_description": "景深、运镜、镜头语言描述",
      "shot_size": "medium_shot",
      "camera_movement": "static",
      "estimated_duration": 5.5,
      "order_index": 1,
      "meta_data": {"lighting": "自然光"},
      "characters": [
        {"name": "孙悟空", "tag_label": "少年", "action": "挥舞金箍棒", "position": "center", "is_visible": true, "order": 0}
      ],
      "locations": [
        {"name": "花果山", "tag_label": "白天", "order": 0}
      ],
      "props": [
        {"name": "金箍棒", "tag_label": "崭新", "action": "被挥舞", "position": "foreground", "is_visible": true, "order": 0}
      ],
      "dialogues": [
        {"character_name": "孙悟空", "content": "今日我便要闯出一番天地!", "dialogue_type": 1, "sequence_order": 0, "emotion": "兴奋"},
        {"character_name": null, "content": "命运在此刻悄然转向。", "dialogue_type": 3, "sequence_order": 1, "emotion": "平静"}
      ]
    }
  ]
}
```

| dialogue_type | 类型 | 说明 |
|---|---|---|
| 1 | normal | 角色间正常对话 |
| 2 | inner_monologue | 内心OS,其他角色听不到 |
| 3 | narration | 旁白/画外音,character_name 填 null |

## 当前地点片段原文
{context}
{extra_req}
{retry_hint}

API 接口设计

修改现有解析接口

# server/app/schemas/screenplay.py

class ParseMode(str, Enum):
    DEFAULT   = "default"     # 现有方案(单次 LLM 调用)
    LANGGRAPH = "langgraph"   # 新方案(LangGraph 多节点)

class ScreenplayParseRequest(BaseModel):
    custom_requirements: Optional[str] = None
    mode: ParseMode = ParseMode.DEFAULT   # 默认保持现有行为

API 路由变化

# server/app/api/v1/screenplays.py

@router.post("/{screenplay_id}/parse")
async def parse_screenplay(
    screenplay_id: UUID,
    request: ScreenplayParseRequest,
    ...
):
    if request.mode == ParseMode.LANGGRAPH:
        from app.tasks.screenplay_langgraph_task import parse_screenplay_langgraph_task
        task = parse_screenplay_langgraph_task.delay(
            screenplay_id=str(screenplay_id),
            content=screenplay_content,
            custom_requirements=request.custom_requirements
        )
        return SuccessResponse(
            data={"task_id": task.id, "mode": "langgraph"},
            message="LangGraph 抽取任务已提交"
        )
    else:
        # 现有逻辑不变
        ...

Celery Task(screenplay_langgraph_task.py

# server/app/tasks/screenplay_langgraph_task.py

from asgiref.sync import async_to_sync

class ScreenplayLangGraphTask(Task):
    """LangGraph 剧本抽取 Celery 任务"""

    def run(
        self,
        screenplay_id: str,
        content: str,
        custom_requirements: str | None = None
    ):
        task_id = self.request.id or str(uuid7())
        service = ScreenplayExtractionService()
        try:
            # 不使用 asyncio.run(),避免 Celery 事件循环冲突
            return async_to_sync(service.run)(
                screenplay_id=screenplay_id,
                task_id=task_id,
                content=content,
                custom_requirements=custom_requirements
            )
        except Exception as e:
            logger.error(
                "LangGraph 任务失败 | task_id=%s | error=%s", task_id, e,
                exc_info=True
            )
            raise
        finally:
            # finally 确保 checkpoint 已保存后再清理
            async_to_sync(cleanup_task_resources)(task_id)


parse_screenplay_langgraph_task = app.register_task(ScreenplayLangGraphTask())

数据流说明

LangGraph 节点 × 数据库写入责任矩阵

节点 写入表 依赖的上游数据
Init Node scenes content(MD 全文)
Role Node project_characters + project_element_tags scene_texts(所有场景文本合并)
Scene Node project_locations + project_element_tags scene_texts[scene_id] + 已知角色名
Prop Node project_props + project_element_tags scene_texts[scene_id] + character_tag_id_map
Shot Node 无(仅写 State) scene_texts[scene_id] + 全部 tag_id_map
Validate Node State 中的 shot 引用 + tag_id_map
Aggregate Node screenplay_element_refs + storyboards + storyboard_items + storyboard_dialogues + project_resources + screenplays State 全量

设计关键:Role/Scene/Prop Node 在节点内直接增量写 PostgreSQL。Shot Node 仅操作内存 State(对白内嵌在分镜内)。Aggregate Node 统一写分镜、对白和资源。


依赖链:角色 → 拍摄地点 → 道具 → 分镜

┌─────────────────────────────────────────────────────────────────────┐
│ Role Node(全局,一次性)                                            │
│                                                                      │
│ 上下文:所有 location_texts 合并(前 6000 字)                       │
│ LLM 产出:characters[] + character_tags{}                            │
│ DB 写入:project_characters + project_element_tags(CHARACTER)        │
│ State 输出:character_ids, character_tag_id_map                      │
└──────────────────────────────┬──────────────────────────────────────┘
                               │
                               ▼
┌─────────────────────────────────────────────────────────────────────┐
│ Location Node(每地点片段一次)                                       │
│                                                                      │
│ 上下文:location_texts[location_id](当前地点片段完整文本)          │
│         已知角色名列表(来自 character_tag_id_map)                   │
│ LLM 产出:location + location_tags{}                                 │
│ DB 写入:project_locations + project_element_tags(LOCATION)          │
│ State 输出:location_results[N].location_tag_id_map                  │
└──────────────────────────────┬──────────────────────────────────────┘
                               │
                               ▼
┌─────────────────────────────────────────────────────────────────────┐
│ Prop Node(每地点片段一次)                                           │
│                                                                      │
│ 上下文:location_texts[location_id](当前地点片段完整文本)          │
│         已知角色名(约束 owner_character,防幻觉)                    │
│ LLM 产出:props[] + prop_tags{}                                      │
│ DB 写入:project_props + project_element_tags(PROP)                  │
│ State 输出:location_results[N].prop_tag_id_map                      │
└──────────────────────────────┬──────────────────────────────────────┘
                               │
                               ▼
┌─────────────────────────────────────────────────────────────────────┐
│ Shot Node(每地点片段一次)                                           │
│                                                                      │
│ 上下文:location_texts[location_id](当前地点片段完整文本)          │
│         character_tag_id_map(格式化为可读列表)                     │
│         location_tag_id_map(当前地点片段)                          │
│         prop_tag_id_map(当前地点片段)                              │
│ LLM 产出:storyboards[](含内嵌 dialogues)                          │
│ DB 写入:❌ 不写 PG(由 Aggregate Node 统一写)                      │
│ State 输出:location_results[N]._pending_shots                       │
└──────────────────────────────┬──────────────────────────────────────┘
                               │ (Validate Node 校验)
                               ▼
┌─────────────────────────────────────────────────────────────────────┐
│ Aggregate Node(全局一次)                                            │
│                                                                      │
│ 复用现有 ScreenplayService.store_parsed_elements() 分镜写入逻辑      │
│ DB 写入顺序:                                                         │
│   ① screenplay_element_refs(角色/地点/道具 → 剧本引用)            │
│   ② storyboards(分镜主记录)                                        │
│   ③ storyboard_items(含 action/position/is_visible)               │
│   ④ storyboard_dialogues(dialogue_type/emotion/sequence_order)    │
│   ⑤ project_resources(占位符,等待 AI 生图)                       │
│   ⑥ screenplays.parsing_status = COMPLETED                          │
└─────────────────────────────────────────────────────────────────────┘

运行与恢复策略

LangGraph Checkpointer 配置

# thread_id 固定 = task_id,同一任务重试可从断点 Resume
config = {
    "configurable": {"thread_id": task_id},
    "metadata": {"run_id": str(uuid.uuid4())}
}

# PostgreSQL Schema 初始化(一次性)
# CREATE SCHEMA IF NOT EXISTS langgraph;
# GRANT ALL ON SCHEMA langgraph TO <db_user>;

错误码体系

# errors.py

    # Init Node
    LOCATION_SPLIT_FAILED = 1001
    LOCATION_DB_WRITE     = 1002
    # Role Node
    ROLE_LLM_FAILED    = 2001
    ROLE_DB_WRITE      = 2002
    # Location Node
    LOCATION_LLM_FAILED   = 3001
    LOCATION_DB_WRITE     = 3002
    # Prop Node
    PROP_LLM_FAILED    = 4001
    PROP_DB_WRITE      = 4002
    # Shot Node
    SHOT_LLM_FAILED    = 5001
    SHOT_TAG_RESOLVE   = 5002
    # Aggregate Node
    AGGREGATE_DB_WRITE = 6001
    # Init Node
    SCENE_SPLIT_FAILED = 1001
    SCENE_DB_WRITE     = 1002
    # Role Node
    ROLE_LLM_FAILED    = 2001
    ROLE_DB_WRITE      = 2002
    # Scene Node
    SCENE_LLM_FAILED   = 3001
    SCENE_DB_WRITE     = 3002
    # Prop Node
    PROP_LLM_FAILED    = 4001
    PROP_DB_WRITE      = 4002
    # Shot Node
    SHOT_LLM_FAILED    = 5001
    SHOT_TAG_RESOLVE   = 5002
    # Aggregate Node
    AGGREGATE_DB_WRITE = 6001


def node_error(node_name: str, code: ExtractionErrorCode, detail: str) -> dict:
    return {
        "status": "failed",
        "failed_node": node_name,
        "error_code": int(code),
        "error": f"[{code.name}] {detail}"
    }

节点监控装饰器

# utils.py

def node_monitor(node_name: str):
    def decorator(func):
        @functools.wraps(func)
        async def wrapper(state: dict) -> dict:
            task_id = state.get("task_id", "unknown")
            location_idx = state.get("current_location_idx", -1)
            start = time.monotonic()
            logger.info("节点开始 | node=%s | task_id=%s | location_idx=%s", node_name, task_id, location_idx)
            try:
                result = await func(state)
                logger.info("节点完成 | node=%s | task_id=%s | elapsed=%.2fs",
                            node_name, task_id, time.monotonic() - start)
                return result
            except Exception as e:
                logger.error("节点异常 | node=%s | task_id=%s | elapsed=%.2fs | error=%s",
                             node_name, task_id, time.monotonic() - start, e, exc_info=True)
                return node_error(node_name, ExtractionErrorCode.UNKNOWN, str(e))
        return wrapper
    return decorator

依赖与部署

新增 Python 依赖

# requirements.txt 新增
langgraph>=0.2.0
langgraph-checkpoint-postgres>=0.1.0
llama-index-core>=0.11.0            # MarkdownNodeParser + SentenceSplitter
tenacity>=8.0.0                     # 指数退避重试
asgiref>=3.0.0                      # async_to_sync(Celery 集成)

配置项(最小集)

# server/app/core/config.py
SCREENPLAY_STATE_VERSION: int = 1
LANGGRAPH_CHECKPOINT_DB: str = "postgresql+asyncpg://..."
LANGGRAPH_CHECKPOINT_SCHEMA: str = "langgraph"

迁移策略

  • mode=default(默认):现有 Celery + 单次 LLM,100% 流量
  • mode=langgraph(新):新管道,灰度测试,按需触发
质量指标 现有基线 LangGraph 目标
角色识别准确率 ~85% >92%
道具归属正确率 ~70% >85%
分镜引用一致性 ~75% >90%
长剧本(>5000字)成功率 ~60% >90%

满足上述指标后,将 mode=langgraph 设为默认值。


风险与对策

风险 概率 影响 对策
超长剧本 Role Node 上下文超限 全文超 8000 字时取前 6000 字(角色通常早期出场)
单地点片段文本过长 Init Node 用 SentenceSplitter 对超长片段子切片后合并传递
State 膨胀(location_texts 过大) Aggregate Node 完成后清空 location_texts;监控 State 大小 < 10MB
任务中断无法恢复 Postgres Checkpointer + thread_id = task_id 支持断点 Resume
Celery 多进程 Provider 状态丢失 Provider 完全无状态,每节点从 PG 重建
自校验死循环 硬性 max_retries=2,超过后接受当前结果继续
Shot Node 引用幻觉 last_validation_report 注入重抽 Prompt,提示具体未知引用

实施计划

Week 1

  • 搭建 graph_state.pygraph_builder.pyerrors.pyutils.py
  • 实现 init_node.py(MarkdownNodeParser 地点片段切分,写 screenplay_locations 表,清空 content
  • 实现 role_node.py(全文合并抽取 + 角色名标准化去重)

Week 2

  • 实现 location_node.py(拍摄地点标签,写 project_locations
  • 实现 prop_node.py(道具标签,写 project_props
  • 实现 shot_node.py(分镜 + 内嵌对白,仅写 State,携带 tag_id 上下文)
  • 实现 validate_node.py(全量校验 + last_validation_report
  • 实现 aggregate_node.py(汇总统计,写分镜/对白/resources)

Week 3

  • 实现 extraction_service.py(Checkpointer 配置 + ainvoke
  • 实现 screenplay_langgraph_task.pyasync_to_sync + finally cleanup)
  • 修改 ScreenplayParseRequest,接入 mode=langgraph 路由
  • 初始化 langgraph PostgreSQL Schema

Week 4

  • 10 个短剧本(< 2000 字)功能验证
  • 10 个长剧本(> 5000 字)质量对比
  • 并发 5 任务压测 + 故障注入测试(中断恢复)
  • 质量指标达标后灰度切流

备注

  • 未来若需引入向量检索优化,应新增独立 ADR(ADR-0XX),不在本 ADR 内混合
  • 本 ADR 唯一实现路径为"无向量库纯内存方案",确保开发口径一致
  • location_texts 在 Aggregate Node 完成后应主动清空,避免 Checkpoint 存储膨胀

维护人员: 开发团队
最后更新: 2026-02-25