2026-04-23 14:36:26 +08:00

1213 lines
49 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
标书内容生成模块 - 极速并发优化版
全局LLM_SEMAPHORE(上限20) + 并行解析 + 更高池上限。
流程:生成大纲 → 解析章节树 → 并发生成内容
"""
import re
import random
import sqlite3
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple
import config
from utils import ai_client, prompts as P
from utils import word_allocation as word_alloc
from utils import volume_chapters as vol_ch
from utils import diagram_intent as diagram_int
from utils import attachment_section as att_sec
from utils.outline_numbering import format_heading_display
logger = logging.getLogger(__name__)
BID_WRITING_SYSTEM = (
'你是一位资深的工程投标文件撰写专家,擅长以执行方视角撰写技术方案正文。'
'撰写时必须遵守以下铁律:'
'①【字数】用户规定的最低字数必须满足,但字数须由实质内容支撑,'
'不得用重复背景、堆砌承诺或复述要求来凑字数;'
'②【自称】投标方自称统一用"我方",禁用"我们""我公司"'
'③【禁止套话】禁用:综上所述、首先其次再次、我们深信、高度重视、全力以赴、'
'竭诚服务、不断优化、稳步推进、通过以上措施、我方将严格按照、我方承诺、'
'确保圆满完成、切实保障;'
'④【禁止前导句】严禁:本章节对应……、本小节主要说明……、'
'以下将从……方面说明、针对招标方要求……、根据招标文件……我方将……——'
'开头直接写实质内容;'
'⑤【禁止复述要求】招标文件给出的技术参数、工程量、服务数量、规范标准等均视为'
'已知条件,直接体现在方案中,禁止先复读要求再作答;'
'不用"满足招标方提出的XXX要求""针对招标文件第X条"等句式;'
'⑥【禁止重申背景——最常见的废稿场景】'
'禁止在章节正文中出现项目名称、建设单位、建设地点、工程规模、合同工期等基本信息;'
'尤其严禁将招标文件中的具体工程量数字(如"X条渠道""X公里""X座建筑物""X台设备"等)'
'反复引入到各个章节开头作为背景铺垫——'
'这类数字只能在专门的"项目概况/项目背景"章节出现一次,'
'质量、安全、进度、技术方案、人员配置等专业章节一律直接展开专业内容;'
'⑦【禁止虚构优越参数】严禁为了显示"超越"招标要求而捏造参数或数量:'
'招标文件要求多少就按多少写,不得无依据地写成"优于要求""高于标准"'
'如需体现竞争力,只能在工艺方法、管理措施、响应速度等可具体描述的维度展开,'
'不得在规格数量上自行拔高;'
'⑧【实质可检验】每项措施须给出具体做法、操作步骤、管理节点或时间节点;'
'凡写数量、型号、吨位、强度、时限等量化内容,须能在招标文件或工程量清单摘要中找到依据,'
'无依据处不写具体数字与型号,改用"按设计要求""与工况及进度相匹配""符合相应规范等级"等完整中文概括表述,'
'不做空洞承诺;'
'⑨【行文格式】纯文本,段落间空行分隔,列举用(1)(2)(3)编号,'
'不用markdown符号不用连接词串联不用""作结尾。'
'⑩【禁止占位符】方案叙述中严禁半角或全角方括号形式的未完稿待填(如[型号][数量][数值][X][Y]等),'
'亦不得用「待填」「TBD」留白语义须用通顺的陈述句一次写清。'
'若另有图示/表格专用输出规范要求使用约定标记,仅在该规范限定的标记内可使用方括号。'
)
# 篇幅档位key → (基础小节字数, 核心章节字数, 标签, 期望max_tokens)
VOLUME_PRESETS = {
'concise': (1200, 2500, '精简版', 5000),
'standard': (2000, 4000, '标准版', 8000),
'detailed': (3000, 5500, '详细版', 12000),
'full': (4000, 7000, '充实版', 16000),
}
def _effective_volume() -> str:
"""
生成阶段使用的篇幅档位。
简化策略:若设置了目标页数,则按页数粗略映射到四档;否则沿用 CONTENT_VOLUME。
"""
pages = int(getattr(config, 'TARGET_PAGES', 0) or 0)
return vol_ch.volume_key_from_target_pages(
pages, getattr(config, 'CONTENT_VOLUME', 'standard'),
)
# 各模型提供商的 max_tokens 硬上限
_PROVIDER_TOKEN_LIMITS = {
'deepseek': 8192,
'qwen': 8192,
'openai': 16384,
}
def _get_word_count_spec(volume: str) -> str:
"""根据篇幅档位返回嵌入提示词的字数要求段落"""
base, core, _, _ = VOLUME_PRESETS.get(volume, VOLUME_PRESETS['standard'])
pages = int(getattr(config, 'TARGET_PAGES', 0) or 0)
page_note = f'\n- 目标页数:约 {pages} 页(按粗略换算生效)' if pages > 0 else ''
return (
f'- 字数硬性要求(必须达到,不达标将被退回重写):\n'
f' · 一般小节:不少于 {base}\n'
f' · 核心技术/重点评分章节:不少于 {core}\n'
f'{page_note}'
f'- 内容必须充分展开,每个要点均需具体阐述,不得一笔带过\n'
f'- 宁多勿少,写满写透,篇幅不足是最严重的质量问题'
)
def _get_max_tokens(volume: str) -> int:
"""根据篇幅档位返回 AI 调用的 max_tokens自动适配提供商上限"""
_, _, _, tokens = VOLUME_PRESETS.get(volume, VOLUME_PRESETS['standard'])
provider = getattr(config, 'MODEL_PROVIDER', 'openai')
limit = _PROVIDER_TOKEN_LIMITS.get(provider, 8192)
return min(tokens, limit)
def _get_min_chars(volume: str) -> int:
"""触发续写的最低字数阈值(基础小节字数的 65%,略低于目标以多轮补足)"""
base, _, _, _ = VOLUME_PRESETS.get(volume, VOLUME_PRESETS['standard'])
return int(base * 0.65)
# 中文数字映射
CN_NUM_MAP = {
'': 1, '': 2, '': 3, '': 4, '': 5,
'': 6, '': 7, '': 8, '': 9, '': 10,
'十一': 11, '十二': 12, '十三': 13, '十四': 14, '十五': 15,
}
# ─── 大纲生成 ─────────────────────────────────────────────────────────────
def generate_outline(db_path: str, project_id: int) -> None:
"""后台:生成标书大纲并存入 bid_sections"""
conn = sqlite3.connect(db_path)
try:
_set_project_status(conn, project_id, 'outline_generating')
td = _get_tender_data(conn, project_id)
if not td:
raise ValueError('尚未解析招标文件,请先解析')
summary = td['summary'] or ''
rating = td['rating_requirements'] or ''
if rating:
prompt = P.get_outlines_with_rating_prompt(summary, rating)
else:
prompt = P.get_outlines_prompt(summary or td['raw_text'] or '')
outline_text = ai_client.chat(
prompt,
temperature=0.5,
max_tokens=4096,
request_timeout=getattr(config, 'OUTLINE_REQUEST_TIMEOUT', 300),
)
# 解析章节并自动重排序号,保存规范化后的大纲文本
bid_title, sections, normalized_text = _parse_outline(outline_text)
_save_outline_text(conn, project_id, normalized_text)
_save_sections(conn, project_id, sections)
_set_project_status(conn, project_id, 'outline_done')
logger.info(f'项目 {project_id} 大纲生成完成,共 {len(sections)}')
except Exception as e:
logger.exception(f'大纲生成失败 project_id={project_id}')
_set_project_status(conn, project_id, 'outline_error', str(e))
finally:
conn.close()
# ─── 章节内容生成 ──────────────────────────────────────────────────────────
def generate_section(db_path: str, project_id: int, section_id: int,
anon_requirements: str = '',
enable_figure: bool = False,
enable_table: bool = False) -> None:
"""后台:为指定 section 生成正文内容(单个章节入口,自行读取上下文)"""
conn = sqlite3.connect(db_path)
try:
section = _get_section(conn, section_id)
if not section:
raise ValueError(f'Section {section_id} 不存在')
td = _get_tender_data(conn, project_id)
outline_text = _get_outline_text(conn, project_id)
if not outline_text.strip():
raise ValueError('当前项目尚无可用大纲,请先保存或生成大纲')
summary = (td or {}).get('summary', '')
boq_summary = (td or {}).get('boq_summary', '')
conn.close()
conn = None
tender_kind = (td or {}).get('tender_kind', 'engineering') or 'engineering'
outline_head = outline_text.strip().splitlines()[0][:50] if outline_text.strip() else ''
logger.info(
f'章节生成读取大纲 project_id={project_id}, section_id={section_id}, '
f'outline_len={len(outline_text)}, outline_head="{outline_head}"'
)
alloc_map = _project_allocation_map(db_path, project_id)
override = alloc_map.get(section_id) if alloc_map else None
_generate_one(db_path, section, summary, outline_text,
anon_requirements, enable_figure, enable_table,
boq_summary, tender_kind, override)
except Exception as e:
logger.exception(f'章节生成失败 section_id={section_id}')
_update_section_status_safe(db_path, section_id, 'error', str(e))
finally:
if conn:
conn.close()
MAX_CONTINUE_ROUNDS = 3 # 极速优化从5减少到3减少额外AI调用次数结合改进prompt更易一次写满
# 单次续写目标字数上限:与 DeepSeek/Qwen 8192 max_tokens 下的实际中文产出量匹配,略保守更易写满
_CONTINUE_CHUNK_CAP = 2800
_CONTINUE_TAIL_CHARS = 2200
def _auto_continue(content: str, min_chars: int, max_tok: int, title: str,
system: str = BID_WRITING_SYSTEM) -> str:
"""
自动续写:当首次生成的内容字数不足时,发起独立的续写调用。
不传入完整的原始 prompt太长会挤占输出空间而是只提供
已有内容的末尾部分作为上下文,让 AI 集中精力续写。
"""
for round_i in range(MAX_CONTINUE_ROUNDS):
if len(content) >= min_chars:
break
remaining = min_chars - len(content)
if remaining <= 200:
break
# 本轮只要求「差额」的一部分,多轮叠加更易达到总目标
chunk_goal = min(remaining, _CONTINUE_CHUNK_CAP)
tail = (
content[-_CONTINUE_TAIL_CHARS:]
if len(content) > _CONTINUE_TAIL_CHARS
else content
)
cont_prompt = (
f'以下是投标文件「{title}」小节已撰写的部分内容(末尾段落):\n\n'
f'{tail}\n\n'
f'━━━━━━━━━━━━━━━━━━━━━━━━━\n'
f'当前累计 {len(content)} 字,本节最低要求 {min_chars} 字,'
f'全文总差额约 {remaining} 字。\n'
f'请紧接上文末尾继续撰写,要求:\n'
f'(1) 不重复、不复述上文已有段落,自然衔接续写\n'
f'(2) 深入展开实施细节、技术参数、岗位、设备、流程与验收要点\n'
f'(3) 保持"我方"口吻禁止AI套话与前导说明句\n'
f'(4) 直接输出续写正文,不写"续写如下"等引导语\n'
f'(5) 本轮续写不少于 {chunk_goal} 字,尽量写满\n'
)
logger.info(
f'[续写] "{title}"{round_i+1}'
f'({len(content)}/{min_chars}字, 差{remaining}字, 本轮目标≥{chunk_goal}字)'
)
try:
extra = ai_client.chat(
cont_prompt,
system=system,
temperature=0.7,
max_tokens=max_tok,
)
except Exception as e:
logger.warning(f'[续写] "{title}"{round_i+1}轮失败: {e}')
break
if not extra or len(extra.strip()) < 80:
logger.info(f'[续写] "{title}"{round_i+1}轮返回内容过短,终止')
break
content = content.rstrip() + '\n\n' + extra.strip()
logger.info(
f'[续写] "{title}"{round_i+1}轮完成,'
f'+{len(extra.strip())}字,累计{len(content)}'
)
logger.info(f'"{title}" 最终字数:{len(content)}')
return content
def _build_writing_system(anon_requirements: str = '') -> str:
"""根据暗标要求动态构建 system prompt"""
anon = anon_requirements.strip()
if not anon:
return BID_WRITING_SYSTEM
return (
BID_WRITING_SYSTEM
+ '\n\n【暗标合规要求(最高优先级,每个章节均须严格遵守)】\n'
+ anon
)
def _get_knowledge_context(title: str) -> str:
"""从企业知识库检索与章节标题相关的参考内容,供 AI 写作参考。
若知识库未安装或为空,静默返回空字符串。"""
try:
from modules.knowledge import search
chunks = search(title, top_k=config.TOP_K_KNOWLEDGE)
if not chunks:
return ''
parts = []
for i, chunk in enumerate(chunks, 1):
parts.append(f'[参考片段{i}]\n{chunk[:600]}')
return (
'\n\n【企业知识库参考内容(以下摘自历史投标文件,仅供参考,'
'须结合本项目实际情况重新撰写,禁止直接照抄)】\n'
+ '\n\n'.join(parts)
)
except Exception:
return ''
def _leaf_rows_for_allocation(conn, project_id: int) -> list:
cur = conn.cursor()
cur.execute(
'SELECT id, section_title FROM bid_sections WHERE project_id=? AND is_leaf=1 '
'ORDER BY order_index',
(project_id,),
)
return [{'id': r[0], 'section_title': (r[1] or '').strip()} for r in cur.fetchall()]
def _project_allocation_map(db_path: str, project_id: int) -> Optional[Dict[int, Dict[str, Any]]]:
"""按规则与叶节点全表计算各节目标字数。无有效评分时:若已设目标页且走 target_pages 分配,仍均分全稿总预算;否则 None。"""
try:
conn = sqlite3.connect(db_path)
try:
leaves = _leaf_rows_for_allocation(conn, project_id)
if not leaves:
return None
td = _get_tender_data(conn, project_id)
rating_raw = (td or {}).get('rating_json') or ''
vol = _effective_volume()
rules = word_alloc.load_rules()
return word_alloc.compute_leaf_allocations(vol, leaves, rating_raw, rules)
finally:
conn.close()
except Exception as e:
logger.warning('字数分配计算失败,回退统一篇幅: %s', e)
return None
def _build_diagram_addon_for_leaf(
section: dict,
outline_text: str,
enable_figure: bool,
enable_table: bool,
) -> str:
"""叶节点:按章节标题与大纲上下文的意图栈拼接图/表规范(受项目总开关约束)。"""
return diagram_int.get_diagram_agent().render_for_section(
section.get('section_title') or '',
outline_text or '',
enable_figure,
enable_table,
)
def _strip_line_serial_numbers(text: str) -> str:
"""
去除正文行首的纯序号(如 1. / 2、 / 370) / 12 ),保留正文语义。
"""
if not text:
return text
cleaned_lines = []
for line in text.splitlines():
cleaned = re.sub(r'^\s*\d{1,4}(?:[\..、)\s]+)\s*', '', line)
cleaned_lines.append(cleaned)
return '\n'.join(cleaned_lines)
def _generate_one(db_path: str, section: dict, summary: str, outline_text: str,
anon_requirements: str = '',
enable_figure: bool = False,
enable_table: bool = False,
boq_summary: str = '',
tender_kind: str = 'engineering',
allocation_override: Optional[Dict[str, Any]] = None) -> None:
"""
核心生成函数:纯 AI 调用 + 结果写库。
不长期持有 DB 连接,适合在线程池中并发调用。
"""
section_id = section['id']
is_leaf = bool(section['is_leaf'])
title = section['section_title']
writing_system = _build_writing_system(anon_requirements)
_att_rules = att_sec.get_attachment_rules_cached()
_att_mode = att_sec.attachment_leaf_body_mode(_att_rules) if (
is_leaf and att_sec.is_attachment_only_section(title, _att_rules)
) else ''
_is_attachment_title = bool(is_leaf and bool(_att_mode))
_use_stack_charts = bool(
_is_attachment_title and att_sec.use_attachment_stack_charts_body(_att_rules),
)
diagram_addon = ''
if is_leaf and (enable_figure or enable_table) and not _use_stack_charts:
diagram_addon = _build_diagram_addon_for_leaf(
section, outline_text, enable_figure, enable_table,
)
_update_section_status_safe(db_path, section_id, 'generating')
try:
if is_leaf:
if _use_stack_charts:
stack = diagram_int.get_diagram_agent().plan(
title, outline_text, enable_figure, enable_table,
)
if _att_mode == 'single_chart_only' and len(stack) > 1:
stack = [stack[0]]
if not stack and (enable_figure or enable_table):
fk = att_sec.pick_single_figure_or_table(
title, enable_figure, enable_table, _att_rules,
)
if fk:
stack = diagram_int.make_fallback_stack(fk)
if not stack or (not enable_figure and not enable_table):
prompt = P.get_attachment_chart_disabled_prompt(title)
stack_sys = P.ATTACHMENT_STACK_CHARTS_SYSTEM
if (anon_requirements or '').strip():
stack_sys = stack_sys + '\n\n【合规】\n' + (anon_requirements or '')[:2000]
content = ai_client.chat(
prompt,
system=stack_sys,
temperature=0.45,
max_tokens=512,
)
else:
labels = diagram_int.stack_compact_labels(stack)
prompt = P.get_attachment_stack_charts_prompt(
summary, outline_text, title, labels,
)
stack_sys = P.ATTACHMENT_STACK_CHARTS_SYSTEM
if (anon_requirements or '').strip():
stack_sys = stack_sys + '\n\n【合规】\n' + (anon_requirements or '')[:2000]
content = ai_client.chat(
prompt,
system=stack_sys,
temperature=0.5,
max_tokens=3072,
)
content = _strip_line_serial_numbers(content)
_update_section_content_safe(db_path, section_id, content, '')
else:
volume = _effective_volume()
if allocation_override:
wc_spec = allocation_override['word_count_spec']
max_tok = int(allocation_override.get('max_tokens') or _get_max_tokens(volume))
tgt = int(allocation_override.get('target_chars') or 0)
min_chars = word_alloc.continuation_threshold(tgt) if tgt > 0 else _get_min_chars(volume)
else:
wc_spec = _get_word_count_spec(volume)
max_tok = _get_max_tokens(volume)
min_chars = _get_min_chars(volume)
prompt = P.get_section_detail_prompt(
summary, outline_text, title,
word_count_spec=wc_spec,
boq_summary=boq_summary,
tender_kind=tender_kind or 'engineering',
)
if _is_attachment_title:
prompt = prompt + P.get_attachment_chapter_emphasis_hint()
# 知识库检索:将历史标书相关片段作为写作参考注入提示词
knowledge_ctx = _get_knowledge_context(title)
if knowledge_ctx:
prompt = prompt + knowledge_ctx
if diagram_addon:
prompt = prompt + diagram_addon
content = ai_client.chat(
prompt,
system=writing_system,
temperature=0.7,
max_tokens=max_tok,
)
content = _auto_continue(content, min_chars, max_tok, title,
system=writing_system)
content = _strip_line_serial_numbers(content)
_update_section_content_safe(db_path, section_id, content, '')
else:
prompt = P.get_section_intro_prompt(summary, outline_text, title)
if prompt:
intro = ai_client.chat(
prompt,
system=writing_system,
temperature=0.4,
max_tokens=1024,
)
else:
intro = ''
intro = _strip_line_serial_numbers(intro)
_update_section_content_safe(db_path, section_id, '', intro)
_update_section_status_safe(db_path, section_id, 'done')
logger.info(f'Section {section_id} "{title}" 生成完成')
except Exception as e:
logger.exception(f'章节生成失败 section_id={section_id}')
_update_section_status_safe(db_path, section_id, 'error', str(e))
def generate_all_sections(db_path: str, project_id: int,
anon_requirements: str = '',
enable_figure: bool = False,
enable_table: bool = False) -> None:
"""
后台并发生成所有章节全局LLM_SEMAPHORE保护总并发≤20
策略:先生成非叶节点(章节引言),再并发生成所有叶节点(正文)。
并发数由 config.MAX_CONCURRENT_SECTIONS (默认12可达20) 控制。
"""
try:
conn = sqlite3.connect(db_path)
cur = conn.cursor()
# 读取尚未生成的章节(跳过已完成的)
cur.execute('''
SELECT id, section_number, section_title, level, is_leaf, content, intro_content, status
FROM bid_sections WHERE project_id=? ORDER BY order_index
''', (project_id,))
rows = cur.fetchall()
td = _get_tender_data(conn, project_id)
outline_text = _get_outline_text(conn, project_id)
if not outline_text.strip():
conn.close()
raise ValueError('当前项目尚无可用大纲,请先保存或生成大纲')
summary = (td or {}).get('summary', '')
boq_summary = (td or {}).get('boq_summary', '')
tender_kind = (td or {}).get('tender_kind', 'engineering') or 'engineering'
outline_head = outline_text.strip().splitlines()[0][:50] if outline_text.strip() else ''
logger.info(
f'全量生成读取大纲 project_id={project_id}, outline_len={len(outline_text)}, outline_head="{outline_head}"'
)
alloc_map = _project_allocation_map(db_path, project_id)
conn.close()
all_sections = [
{'id': r[0], 'section_number': r[1], 'section_title': r[2],
'level': r[3], 'is_leaf': r[4], 'content': r[5], 'intro_content': r[6], 'status': r[7]}
for r in rows
]
# 只处理未完成的章节pending / error 的重新生成)
sections = [s for s in all_sections if s.get('status') != 'done']
if not sections:
logger.info(f'项目 {project_id} 所有章节已生成完成,无需重新生成')
return
# 分组:非叶节点(章节引言,通常较短)+ 叶节点(正文内容,耗时较长)
non_leaf = [s for s in sections if not s['is_leaf']]
leaf = [s for s in sections if s['is_leaf']]
workers = max(1, config.MAX_CONCURRENT_SECTIONS)
# 极速优化尊重全局LLM上限避免continuation rounds导致超限
llm_limit = getattr(config, 'LLM_CONCURRENCY_LIMIT', 20)
workers = min(workers, max(1, llm_limit // 2)) # 保守分配,留空间给续写/知识检索
logger.info(
f'项目 {project_id} 开始并发生成: '
f'{len(non_leaf)} 个章节引言 + {len(leaf)} 个叶节点, '
f'并发数={workers} (LLM上限={llm_limit})'
)
# 第一阶段:并发生成非叶节点引言(通常很快)
if non_leaf:
_concurrent_generate(db_path, non_leaf, summary, outline_text, workers,
anon_requirements, enable_figure, enable_table,
boq_summary, tender_kind, None)
# 第二阶段:并发生成叶节点正文(主要耗时部分)
if leaf:
_concurrent_generate(db_path, leaf, summary, outline_text, workers,
anon_requirements, enable_figure, enable_table,
boq_summary, tender_kind, alloc_map)
# 统计结果
conn = sqlite3.connect(db_path)
cur = conn.cursor()
cur.execute('''
SELECT
COUNT(*) as total,
SUM(CASE WHEN status='done' THEN 1 ELSE 0 END) as done,
SUM(CASE WHEN status='error' THEN 1 ELSE 0 END) as errors
FROM bid_sections WHERE project_id=?
''', (project_id,))
total, done, errors = cur.fetchone()
conn.close()
logger.info(f'项目 {project_id} 全量生成完成: {done}/{total} 成功, {errors} 失败')
except Exception as e:
logger.exception(f'全量生成失败 project_id={project_id}')
def _concurrent_generate(db_path: str, sections: list, summary: str,
outline_text: str, workers: int,
anon_requirements: str = '',
enable_figure: bool = False,
enable_table: bool = False,
boq_summary: str = '',
tender_kind: str = 'engineering',
alloc_map: Optional[Dict[int, Dict[str, Any]]] = None) -> None:
"""用线程池并发生成一批章节"""
with ThreadPoolExecutor(max_workers=workers, thread_name_prefix='gen') as pool:
futures = {}
for s in sections:
override = alloc_map.get(s['id']) if alloc_map else None
f = pool.submit(_generate_one, db_path, s, summary, outline_text,
anon_requirements, enable_figure, enable_table,
boq_summary, tender_kind, override)
futures[f] = s
for f in as_completed(futures):
s = futures[f]
try:
f.result()
except Exception as e:
logger.error(f'章节 {s["id"]} "{s["section_title"]}" 异常: {e}')
# ─── 大纲解析 ─────────────────────────────────────────────────────────────
_CN_NUMS_LIST = [
'', '', '', '', '', '', '', '', '', '', '',
'十一', '十二', '十三', '十四', '十五', '十六', '十七', '十八', '十九', '二十',
]
def _renumber_sections(sections: list) -> list:
"""
对章节列表按层级顺序重新编号,确保删除/增减章节后序号连续。
level 1 → 整数字符串 "1","2",...
level 2 → "1.1","1.2",...
level 3 → "1.1.1","1.1.2",...
level 4 → "1.1.1.1",...
直接修改传入列表中各节点的 number 字段,并返回该列表。
"""
counters = [0] * 5 # 索引 0-3 对应 level 1-4
for s in sections:
level = s['level']
idx = level - 1
counters[idx] += 1
for j in range(idx + 1, len(counters)):
counters[j] = 0
if level == 1:
s['number'] = str(counters[0])
else:
s['number'] = '.'.join(str(counters[i]) for i in range(level))
return sections
def _sections_to_outline_text(bid_title: str, sections: list) -> str:
"""将章节列表还原为大纲文本一级「一、」子级「1.1 」,与 AI 目录示例一致。"""
lines = []
if bid_title:
lines.append(bid_title)
for s in sections:
level = int(s.get('level', 1))
title = s.get('title', '')
number = s.get('number', '')
line = format_heading_display(level, number, title)
indent = '\u3000' * (level - 1)
lines.append(f'{indent}{line}')
return '\n'.join(lines)
def _parse_outline(text: str):
"""
将大纲文本解析为章节列表,并自动重排序号(修复删除章节后序号不连续的问题)。
返回 (bid_title, sections_list, normalized_text)
每个 section: {number, title, level, is_leaf, order_index}
"""
lines = text.strip().split('\n')
bid_title = ''
sections = []
order = 0
# 第一行非章节行作为标题
for i, line in enumerate(lines):
stripped = line.strip()
if not stripped:
continue
is_chapter_line = (
bool(re.match(r'^[一二三四五六七八九十百第]', stripped))
or bool(re.match(r'^\d+(?:[..、]\s*|\s+)?\S+', stripped))
)
if not is_chapter_line:
bid_title = stripped
lines = lines[i + 1:]
break
break
chapter_counter = 0
for line in lines:
raw_line = line.rstrip('\n')
stripped = raw_line.strip()
if not stripped:
continue
# 去掉行首全角/半角缩进后再匹配编号避免「  1.1 标题」无法识别
parse_line = stripped.lstrip('\u3000\u2003\u2002\u00a0 \t')
# 一级:中文数字 + 顿号/句号
m1 = re.match(r'^([一二三四五六七八九十百]+)[、。.]\s*(.*)', parse_line)
if m1:
cn = m1.group(1)
title = m1.group(2).strip()
chapter_counter = CN_NUM_MAP.get(cn, chapter_counter + 1)
sections.append({
'number': str(chapter_counter),
'title': title,
'level': 1,
'is_leaf': True,
'order_index': order,
})
order += 1
continue
# 二/三/四级须先于「一级 纯数字+顿号/空格」匹配,避免 "1.1 标题" 被误成一级 1 且 title=".1 标题" 导致节数虚增
m_num = re.match(r'^(\d+(?:\.\d+)+)\s+(.*)', parse_line)
if m_num:
num_str = m_num.group(1)
title = m_num.group(2).strip()
level = num_str.count('.') + 1
sections.append({
'number': num_str,
'title': title,
'level': min(level, 4),
'is_leaf': True,
'order_index': order,
})
order += 1
continue
# 一级:阿拉伯数字 + 可选分隔(支持 "1 标题"、"1.标题"、"1标题")—— 不含 1.1 形式(已上处理)
m1_en = re.match(r'^(\d+)(?:[、。..]\s*|\s+)?(.*)', parse_line)
if m1_en:
chapter_no = int(m1_en.group(1))
title = (m1_en.group(2) or '').strip()
title = re.sub(r'^[、。..\s]+', '', title)
if title:
chapter_counter = chapter_no
sections.append({
'number': str(chapter_counter),
'title': title,
'level': 1,
'is_leaf': True,
'order_index': order,
})
order += 1
continue
# 兜底:无编号行按缩进推断层级(支持“纯标题大纲”)
indent_full = len(re.match(r'^[\u3000 ]*', raw_line).group(0))
# 约定:每 1 个全角空格/2 个半角空格视作 1 级缩进
level = min(max(1, (indent_full // 2) + 1), 4)
if level == 1:
chapter_counter += 1
number = str(chapter_counter)
else:
number = '1.' * (level - 1) + '1'
sections.append({
'number': number.strip('.'),
'title': parse_line,
'level': level,
'is_leaf': True,
'order_index': order,
})
order += 1
# 重排序号(核心修复:删除章节后确保编号连续)
_renumber_sections(sections)
# 标记非叶节点(在重排后执行,确保前缀匹配正确)
nums = [s['number'] for s in sections]
for s in sections:
prefix = s['number'] + '.'
if any(n.startswith(prefix) for n in nums):
s['is_leaf'] = False
# 重建规范大纲文本(供回写数据库)
normalized_text = _sections_to_outline_text(bid_title, sections)
return bid_title, sections, normalized_text
# ─── 数据库工具 ───────────────────────────────────────────────────────────
def _get_tender_data(conn, project_id):
cur = conn.cursor()
cur.execute(
"SELECT summary, rating_requirements, rating_json, raw_text, boq_summary, tender_kind "
"FROM tender_data WHERE project_id=?",
(project_id,)
)
row = cur.fetchone()
if row:
return {
'summary': row[0],
'rating_requirements': row[1],
'rating_json': row[2],
'raw_text': row[3],
'boq_summary': row[4] or '',
'tender_kind': row[5] or 'engineering',
}
return None
def _get_outline_text(conn, project_id):
cur = conn.cursor()
cur.execute("SELECT outline FROM tender_data WHERE project_id=?", (project_id,))
row = cur.fetchone()
return row[0] if row and row[0] else ''
def _save_outline_text(conn, project_id, outline_text):
cur = conn.cursor()
# 兜底:若 tender_data 尚未初始化,先补齐空记录,避免 UPDATE 0 行导致“假保存成功”
cur.execute(
"INSERT OR IGNORE INTO tender_data (project_id, status) VALUES (?, 'pending')",
(project_id,),
)
cur.execute(
"UPDATE tender_data SET outline=?, updated_at=? WHERE project_id=?",
(outline_text, datetime.now(), project_id),
)
conn.commit()
def _save_sections(conn, project_id, sections):
cur = conn.cursor()
# 清除旧章节
cur.execute("DELETE FROM bid_sections WHERE project_id=?", (project_id,))
for s in sections:
cur.execute('''
INSERT INTO bid_sections
(project_id, section_number, section_title, level, is_leaf, order_index, status)
VALUES (?, ?, ?, ?, ?, ?, 'pending')
''', (project_id, s['number'], s['title'], s['level'], 1 if s['is_leaf'] else 0, s['order_index']))
conn.commit()
def _get_section(conn, section_id):
cur = conn.cursor()
cur.execute(
"SELECT id, section_number, section_title, level, is_leaf, content, intro_content FROM bid_sections WHERE id=?",
(section_id,)
)
row = cur.fetchone()
if row:
return {
'id': row[0], 'section_number': row[1], 'section_title': row[2],
'level': row[3], 'is_leaf': row[4], 'content': row[5], 'intro_content': row[6]
}
return None
def _update_section_status(conn, section_id, status, error=''):
cur = conn.cursor()
cur.execute(
"UPDATE bid_sections SET status=?, error_message=?, updated_at=? WHERE id=?",
(status, error, datetime.now(), section_id)
)
conn.commit()
def _update_section_content(conn, section_id, content, intro_content):
cur = conn.cursor()
cur.execute(
"UPDATE bid_sections SET content=?, intro_content=?, updated_at=? WHERE id=?",
(content, intro_content, datetime.now(), section_id)
)
conn.commit()
# ─── 线程安全的数据库操作(每次独立开关连接,启用 WAL──────────────────
def _db_connect(db_path: str) -> sqlite3.Connection:
"""创建启用 WAL 模式的连接,适合多线程并发写入"""
conn = sqlite3.connect(db_path, timeout=30, check_same_thread=False)
conn.execute('PRAGMA journal_mode=WAL')
return conn
def _update_section_status_safe(db_path, section_id, status, error=''):
conn = _db_connect(db_path)
try:
_update_section_status(conn, section_id, status, error)
finally:
conn.close()
def _update_section_content_safe(db_path, section_id, content, intro_content):
conn = _db_connect(db_path)
try:
_update_section_content(conn, section_id, content, intro_content)
finally:
conn.close()
def _set_project_status(conn, project_id, status, error=''):
cur = conn.cursor()
cur.execute(
"UPDATE projects SET outline_status=?, outline_error=?, updated_at=? WHERE id=?",
(status, error, datetime.now(), project_id)
)
conn.commit()
# ─── AI自动填充小章节 ───────────────────────────────────────────────────────
def expand_outline(
outline_text: str,
summary: str = '',
rating_requirements: str = '',
project_id: int = 0,
target_pages: int = 0,
) -> str:
"""
根据用户输入的主章节标题,自动填充子章节。
target_pages: 小章节行总数上界用(与 volume_chapters 线性映射 + ±10%);须由调用方传入本次请求
的页数(如前端目标页数),避免仅依赖 process 内全局 config 在多进程或与配置不同步时未生效(出现数百节)。
为 0 时不做条数限制(与未启用目标页数一致)。
"""
lines = outline_text.strip().split('\n')
bid_title = ''
main_chapters = []
# 提取标书标题(第一行非章节行且较长时视为标题)
for i, line in enumerate(lines):
stripped = line.strip()
if not stripped:
continue
is_chapter_format = re.match(r'^[一二三四五六七八九十百第]', stripped) or re.match(r'^\d+[..、\s]', stripped)
if not is_chapter_format and len(stripped) > 50:
bid_title = stripped
lines = lines[i + 1:]
break
break
# 提取一级章节
for line in lines:
stripped = line.strip()
if not stripped:
continue
# 先排除二级及以上章节
if re.match(r'^\d+(?:\.\d+)+', stripped):
continue
m1_cn = re.match(r'^([一二三四五六七八九十百]+)[、。..\s]+\s*(.*)', stripped)
if not m1_cn:
m1_cn = re.match(r'^第([一二三四五六七八九十百]+)[章节]\s*(.*)', stripped)
if not m1_cn:
m1_cn = re.match(r'^([一二三四五六七八九十百]+)(?![一二三四五六七八九十百])\s+(.*)', stripped)
m1_en = re.match(r'^(\d+)[、。..\s]+\s*(.*)', stripped)
if not m1_en:
m1_en = re.match(r'^第(\d+)[章节]\s*(.*)', stripped)
if not m1_en:
m1_en = re.match(r'^(\d+)(?!\d)\s+(.*)', stripped)
if not m1_en:
m1_en = re.match(r'^(\d+)([^\d].*)', stripped)
if m1_cn or m1_en:
title = (m1_cn.group(2) if m1_cn else m1_en.group(2)).strip()
title = re.sub(r'^[、。..\s]+', '', title)
if title:
main_chapters.append({'title': title})
else:
# 没有编号的短文本行,也允许作为主章节
if 0 < len(stripped) < 50:
main_chapters.append({'title': stripped})
if not main_chapters:
logger.warning(f'expand_outline未找到主章节输入大纲{outline_text[:200]}')
return outline_text
expanded_lines = []
if bid_title:
expanded_lines.append(bid_title)
# 并发生成主章节的小章节(附件类主章跳过 AI 填充)
results: List[Optional[str]] = [None] * len(main_chapters)
chapters_to_expand: List[Tuple[int, Dict[str, Any]]] = []
for idx, chapter in enumerate(main_chapters):
ct = chapter['title']
if att_sec.should_skip_expand_subchapters(ct):
results[idx] = ''
label = att_sec.parse_attachment_label(ct)
if label:
logger.info(
'expand_outline 跳过附件主章节小章节填充: title=%r attachment_label=%r',
ct, label,
)
else:
logger.info(
'expand_outline 跳过附件主章节小章节填充: title=%r',
ct,
)
else:
chapters_to_expand.append((idx, chapter))
if chapters_to_expand:
tp = max(0, int(target_pages or 0))
per_main: Optional[List[int]] = None
if tp > 0:
k_exp = len(chapters_to_expand)
n_total = vol_ch.subchapter_total_effective(tp, k_exp, random.Random())
per_main = vol_ch.allocate_subchapters_to_mains(n_total, k_exp)
# 全局信号量已保护AI调用此处可提高到接近LLM上限默认12-20
max_workers = min(len(chapters_to_expand), getattr(config, 'MAX_CONCURRENT_SECTIONS', 15))
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_chapter = {
executor.submit(
_generate_sub_chapters,
ch['title'],
summary,
rating_requirements,
idx + 1,
project_id,
per_main[i] if per_main else None,
): (idx, ch['title'])
for i, (idx, ch) in enumerate(chapters_to_expand)
}
for future in as_completed(future_to_chapter):
idx, title = future_to_chapter[future]
try:
results[idx] = future.result()
logger.info(f'主章节扩展成功: {title}')
except Exception as e:
logger.error(f'主章节扩展失败: {title}, 错误: {e}')
results[idx] = ''
# 组装结果
for idx, chapter in enumerate(main_chapters):
chapter_num = idx + 1
cn_num = _CN_NUMS_LIST[chapter_num] if chapter_num < len(_CN_NUMS_LIST) else str(chapter_num)
expanded_lines.append(f'{cn_num}{chapter["title"]}')
if results[idx]:
expanded_lines.append(results[idx])
return '\n'.join(expanded_lines)
def _extract_title_text(title: str) -> str:
"""从标题中提取纯文本内容,去除序号和标点符号。"""
text = re.sub(r'^[一二三四五六七八九十百]+[、。.]\s*', '', title.strip())
text = re.sub(r'^\d+(?:\.\d+)*[、。.]?\s*', '', text)
text = re.sub(r'^\s*[、。,,;:]+\s*', '', text)
text = re.sub(r'\s*[、。,,;:]+\s*$', '', text)
return text.strip()
def _generate_sub_chapters(
chapter_title: str,
summary: str,
rating_requirements: str,
chapter_num: int,
project_id: int = 0,
max_subchapters: Optional[int] = None,
) -> str:
"""为单个主章节生成子章节大纲。"""
if max_subchapters is not None and max_subchapters <= 0:
return ''
boq_summary = _get_boq_summary_for_chapter(chapter_title, summary)
prompt = P.get_chapter_outline_prompt(
summary, chapter_title, rating_requirements, max_subchapters=max_subchapters
)
if boq_summary:
prompt += (
'\n\n【工程量清单关键信息】\n'
f'{boq_summary}\n\n请严格根据工程量清单中的工程项目生成子章节,确保每个子章节都与具体工程内容对应。'
)
try:
response = ai_client.chat(
prompt,
system='你是一位专业的标书大纲生成专家。请根据主章节标题和工程量清单内容生成合适的子章节列表,严格遵守编号规则:'
'绝对禁止出现1.0、2.0、1.0.1等0开头编号'
'二级从X.1开始三级从X.1.1开始四级从X.1.1.1开始;'
'只输出子章节,不重复主章节标题。',
temperature=0.5,
max_tokens=2048,
request_timeout=getattr(config, 'OUTLINE_REQUEST_TIMEOUT', 300),
)
logger.info(f'_generate_sub_chapters AI响应章节={chapter_title},长度={len(response)}')
main_title_text = _extract_title_text(chapter_title)
lines = response.strip().split('\n')
level_counts = {1: 0, 2: 0, 3: 0, 4: 0}
result_lines = []
for line in lines:
if not line or not line.strip():
continue
indent_count = 0
remaining = line
while remaining and (remaining[0] == '\u3000' or remaining[0] == ' '):
indent_count += 1
remaining = remaining[1:]
remaining = re.sub(r'^[\s#*>\-]+', '', remaining).strip()
if not remaining:
continue
m = re.match(r'^(\d+(?:\.\d+)*)[、。..]?\s*(.*)', remaining)
if m:
original_num = m.group(1)
parts = original_num.split('.')
has_invalid_zero = any(i > 0 and part and part[0] == '0' for i, part in enumerate(parts))
if has_invalid_zero:
continue
if len(parts) > 1:
level = len(parts) - 1
else:
if indent_count == 0:
level = 1
elif indent_count <= 2:
level = 2
else:
level = 3
title = m.group(2).strip()
else:
m_cn = re.match(r'^([一二三四五六七八九十百]+)[、。..]\s*(.*)', remaining)
if m_cn:
title = m_cn.group(2).strip()
level = 1
else:
title = remaining
if indent_count == 0:
level = 1
elif indent_count <= 2:
level = 2
else:
level = 3
title = _extract_title_text(title)
if not title or len(title) < 2:
continue
if main_title_text and _extract_title_text(title) == main_title_text:
continue
level = min(max(level, 1), 3)
level_counts[level] += 1
for l in range(level + 1, 5):
level_counts[l] = 0
if level == 1:
num = f'{chapter_num}.{level_counts[1]}'
indent = ''
elif level == 2:
num = f'{chapter_num}.{level_counts[1]}.{level_counts[2]}'
indent = '\u3000'
else:
num = f'{chapter_num}.{level_counts[1]}.{level_counts[2]}.{level_counts[3]}'
indent = '\u3000\u3000'
result_lines.append(f'{indent}{num} {title}')
if max_subchapters is not None and max_subchapters > 0 and len(result_lines) > max_subchapters:
result_lines = result_lines[:max_subchapters]
return '\n'.join(result_lines)
except Exception:
logger.exception(f'生成子章节失败 chapter={chapter_title}')
return ''
def _get_boq_summary_for_chapter(chapter_title: str, summary: str) -> str:
"""
从摘要中提取与施工方案相关的工程量清单信息。
"""
if not summary:
return ''
boq_keywords = [
'项目编码', '清单编码', '编码', '编号', '序号', '项目编号', '清单编号',
'项目名称', '清单名称', '名称', '工程名称', '清单项目名称', '分项名称',
'计量单位', '单位', '计量', '工程量', '数量', '清单数量', '清单工程量',
'综合单价', '单价', '投标单价', '综合价', '合价', '金额', '合计金额', '综合合价', '合计', '总价', '小计',
'项目特征', '项目特征描述', '特征描述', '做法说明', '工程内容', '工作内容', '详述', '说明', '特征', '项目特征及内容',
'施工内容', '工艺要求', '技术措施', '施工要求', '施工方法'
]
lines = summary.strip().split('\n')
boq_lines = []
for line in lines:
if any(keyword in line for keyword in boq_keywords):
boq_lines.append(line.strip())
if boq_lines:
return '\n'.join(boq_lines[:20])
return ''