""" 标书内容生成模块 - 极速并发优化版 全局LLM_SEMAPHORE(上限20) + 并行解析 + 更高池上限。 流程:生成大纲 → 解析章节树 → 并发生成内容 """ import re import random import sqlite3 import logging from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime from typing import Any, Dict, List, Optional, Tuple import config from utils import ai_client, prompts as P from utils import word_allocation as word_alloc from utils import volume_chapters as vol_ch from utils import diagram_intent as diagram_int from utils import attachment_section as att_sec from utils.outline_numbering import format_heading_display logger = logging.getLogger(__name__) BID_WRITING_SYSTEM = ( '你是一位资深的工程投标文件撰写专家,擅长以执行方视角撰写技术方案正文。' '撰写时必须遵守以下铁律:' '①【字数】用户规定的最低字数必须满足,但字数须由实质内容支撑,' '不得用重复背景、堆砌承诺或复述要求来凑字数;' '②【自称】投标方自称统一用"我方",禁用"我们""我公司";' '③【禁止套话】禁用:综上所述、首先其次再次、我们深信、高度重视、全力以赴、' '竭诚服务、不断优化、稳步推进、通过以上措施、我方将严格按照、我方承诺、' '确保圆满完成、切实保障;' '④【禁止前导句】严禁:本章节对应……、本小节主要说明……、' '以下将从……方面说明、针对招标方要求……、根据招标文件……我方将……——' '开头直接写实质内容;' '⑤【禁止复述要求】招标文件给出的技术参数、工程量、服务数量、规范标准等均视为' '已知条件,直接体现在方案中,禁止先复读要求再作答;' '不用"满足招标方提出的XXX要求""针对招标文件第X条"等句式;' '⑥【禁止重申背景——最常见的废稿场景】' '禁止在章节正文中出现项目名称、建设单位、建设地点、工程规模、合同工期等基本信息;' '尤其严禁将招标文件中的具体工程量数字(如"X条渠道""X公里""X座建筑物""X台设备"等)' '反复引入到各个章节开头作为背景铺垫——' '这类数字只能在专门的"项目概况/项目背景"章节出现一次,' '质量、安全、进度、技术方案、人员配置等专业章节一律直接展开专业内容;' '⑦【禁止虚构优越参数】严禁为了显示"超越"招标要求而捏造参数或数量:' '招标文件要求多少就按多少写,不得无依据地写成"优于要求""高于标准";' '如需体现竞争力,只能在工艺方法、管理措施、响应速度等可具体描述的维度展开,' '不得在规格数量上自行拔高;' '⑧【实质可检验】每项措施须给出具体做法、操作步骤、管理节点或时间节点;' '凡写数量、型号、吨位、强度、时限等量化内容,须能在招标文件或工程量清单摘要中找到依据,' '无依据处不写具体数字与型号,改用"按设计要求""与工况及进度相匹配""符合相应规范等级"等完整中文概括表述,' '不做空洞承诺;' '⑨【行文格式】纯文本,段落间空行分隔,列举用(1)(2)(3)编号,' '不用markdown符号,不用连接词串联,不用"等"作结尾。' '⑩【禁止占位符】方案叙述中严禁半角或全角方括号形式的未完稿待填(如[型号][数量][数值][X][Y]等),' '亦不得用「待填」「TBD」留白;语义须用通顺的陈述句一次写清。' '若另有图示/表格专用输出规范要求使用约定标记,仅在该规范限定的标记内可使用方括号。' ) # 篇幅档位:key → (基础小节字数, 核心章节字数, 标签, 期望max_tokens) VOLUME_PRESETS = { 'concise': (1200, 2500, '精简版', 5000), 'standard': (2000, 4000, '标准版', 8000), 'detailed': (3000, 5500, '详细版', 12000), 'full': (4000, 7000, '充实版', 16000), } def _effective_volume() -> str: """ 生成阶段使用的篇幅档位。 简化策略:若设置了目标页数,则按页数粗略映射到四档;否则沿用 CONTENT_VOLUME。 """ pages = int(getattr(config, 'TARGET_PAGES', 0) or 0) return vol_ch.volume_key_from_target_pages( pages, getattr(config, 'CONTENT_VOLUME', 'standard'), ) # 各模型提供商的 max_tokens 硬上限 _PROVIDER_TOKEN_LIMITS = { 'deepseek': 8192, 'qwen': 8192, 'openai': 16384, } def _get_word_count_spec(volume: str) -> str: """根据篇幅档位返回嵌入提示词的字数要求段落""" base, core, _, _ = VOLUME_PRESETS.get(volume, VOLUME_PRESETS['standard']) pages = int(getattr(config, 'TARGET_PAGES', 0) or 0) page_note = f'\n- 目标页数:约 {pages} 页(按粗略换算生效)' if pages > 0 else '' return ( f'- 字数硬性要求(必须达到,不达标将被退回重写):\n' f' · 一般小节:不少于 {base} 字\n' f' · 核心技术/重点评分章节:不少于 {core} 字\n' f'{page_note}' f'- 内容必须充分展开,每个要点均需具体阐述,不得一笔带过\n' f'- 宁多勿少,写满写透,篇幅不足是最严重的质量问题' ) def _get_max_tokens(volume: str) -> int: """根据篇幅档位返回 AI 调用的 max_tokens,自动适配提供商上限""" _, _, _, tokens = VOLUME_PRESETS.get(volume, VOLUME_PRESETS['standard']) provider = getattr(config, 'MODEL_PROVIDER', 'openai') limit = _PROVIDER_TOKEN_LIMITS.get(provider, 8192) return min(tokens, limit) def _get_min_chars(volume: str) -> int: """触发续写的最低字数阈值(基础小节字数的 65%,略低于目标以多轮补足)""" base, _, _, _ = VOLUME_PRESETS.get(volume, VOLUME_PRESETS['standard']) return int(base * 0.65) # 中文数字映射 CN_NUM_MAP = { '一': 1, '二': 2, '三': 3, '四': 4, '五': 5, '六': 6, '七': 7, '八': 8, '九': 9, '十': 10, '十一': 11, '十二': 12, '十三': 13, '十四': 14, '十五': 15, } # ─── 大纲生成 ───────────────────────────────────────────────────────────── def generate_outline(db_path: str, project_id: int) -> None: """后台:生成标书大纲并存入 bid_sections""" conn = sqlite3.connect(db_path) try: _set_project_status(conn, project_id, 'outline_generating') td = _get_tender_data(conn, project_id) if not td: raise ValueError('尚未解析招标文件,请先解析') summary = td['summary'] or '' rating = td['rating_requirements'] or '' if rating: prompt = P.get_outlines_with_rating_prompt(summary, rating) else: prompt = P.get_outlines_prompt(summary or td['raw_text'] or '') outline_text = ai_client.chat( prompt, temperature=0.5, max_tokens=4096, request_timeout=getattr(config, 'OUTLINE_REQUEST_TIMEOUT', 300), ) # 解析章节并自动重排序号,保存规范化后的大纲文本 bid_title, sections, normalized_text = _parse_outline(outline_text) _save_outline_text(conn, project_id, normalized_text) _save_sections(conn, project_id, sections) _set_project_status(conn, project_id, 'outline_done') logger.info(f'项目 {project_id} 大纲生成完成,共 {len(sections)} 节') except Exception as e: logger.exception(f'大纲生成失败 project_id={project_id}') _set_project_status(conn, project_id, 'outline_error', str(e)) finally: conn.close() # ─── 章节内容生成 ────────────────────────────────────────────────────────── def generate_section(db_path: str, project_id: int, section_id: int, anon_requirements: str = '', enable_figure: bool = False, enable_table: bool = False) -> None: """后台:为指定 section 生成正文内容(单个章节入口,自行读取上下文)""" conn = sqlite3.connect(db_path) try: section = _get_section(conn, section_id) if not section: raise ValueError(f'Section {section_id} 不存在') td = _get_tender_data(conn, project_id) outline_text = _get_outline_text(conn, project_id) if not outline_text.strip(): raise ValueError('当前项目尚无可用大纲,请先保存或生成大纲') summary = (td or {}).get('summary', '') boq_summary = (td or {}).get('boq_summary', '') conn.close() conn = None tender_kind = (td or {}).get('tender_kind', 'engineering') or 'engineering' outline_head = outline_text.strip().splitlines()[0][:50] if outline_text.strip() else '' logger.info( f'章节生成读取大纲 project_id={project_id}, section_id={section_id}, ' f'outline_len={len(outline_text)}, outline_head="{outline_head}"' ) alloc_map = _project_allocation_map(db_path, project_id) override = alloc_map.get(section_id) if alloc_map else None _generate_one(db_path, section, summary, outline_text, anon_requirements, enable_figure, enable_table, boq_summary, tender_kind, override) except Exception as e: logger.exception(f'章节生成失败 section_id={section_id}') _update_section_status_safe(db_path, section_id, 'error', str(e)) finally: if conn: conn.close() MAX_CONTINUE_ROUNDS = 3 # 极速优化:从5减少到3,减少额外AI调用次数(结合改进prompt更易一次写满) # 单次续写目标字数上限:与 DeepSeek/Qwen 8192 max_tokens 下的实际中文产出量匹配,略保守更易写满 _CONTINUE_CHUNK_CAP = 2800 _CONTINUE_TAIL_CHARS = 2200 def _auto_continue(content: str, min_chars: int, max_tok: int, title: str, system: str = BID_WRITING_SYSTEM) -> str: """ 自动续写:当首次生成的内容字数不足时,发起独立的续写调用。 不传入完整的原始 prompt(太长会挤占输出空间),而是只提供 已有内容的末尾部分作为上下文,让 AI 集中精力续写。 """ for round_i in range(MAX_CONTINUE_ROUNDS): if len(content) >= min_chars: break remaining = min_chars - len(content) if remaining <= 200: break # 本轮只要求「差额」的一部分,多轮叠加更易达到总目标 chunk_goal = min(remaining, _CONTINUE_CHUNK_CAP) tail = ( content[-_CONTINUE_TAIL_CHARS:] if len(content) > _CONTINUE_TAIL_CHARS else content ) cont_prompt = ( f'以下是投标文件「{title}」小节已撰写的部分内容(末尾段落):\n\n' f'{tail}\n\n' f'━━━━━━━━━━━━━━━━━━━━━━━━━\n' f'当前累计 {len(content)} 字,本节最低要求 {min_chars} 字,' f'全文总差额约 {remaining} 字。\n' f'请紧接上文末尾继续撰写,要求:\n' f'(1) 不重复、不复述上文已有段落,自然衔接续写\n' f'(2) 深入展开实施细节、技术参数、岗位、设备、流程与验收要点\n' f'(3) 保持"我方"口吻,禁止AI套话与前导说明句\n' f'(4) 直接输出续写正文,不写"续写如下"等引导语\n' f'(5) 本轮续写不少于 {chunk_goal} 字,尽量写满\n' ) logger.info( f'[续写] "{title}" 第{round_i+1}轮 ' f'({len(content)}/{min_chars}字, 差{remaining}字, 本轮目标≥{chunk_goal}字)' ) try: extra = ai_client.chat( cont_prompt, system=system, temperature=0.7, max_tokens=max_tok, ) except Exception as e: logger.warning(f'[续写] "{title}" 第{round_i+1}轮失败: {e}') break if not extra or len(extra.strip()) < 80: logger.info(f'[续写] "{title}" 第{round_i+1}轮返回内容过短,终止') break content = content.rstrip() + '\n\n' + extra.strip() logger.info( f'[续写] "{title}" 第{round_i+1}轮完成,' f'+{len(extra.strip())}字,累计{len(content)}字' ) logger.info(f'"{title}" 最终字数:{len(content)}') return content def _build_writing_system(anon_requirements: str = '') -> str: """根据暗标要求动态构建 system prompt""" anon = anon_requirements.strip() if not anon: return BID_WRITING_SYSTEM return ( BID_WRITING_SYSTEM + '\n\n【暗标合规要求(最高优先级,每个章节均须严格遵守)】\n' + anon ) def _get_knowledge_context(title: str) -> str: """从企业知识库检索与章节标题相关的参考内容,供 AI 写作参考。 若知识库未安装或为空,静默返回空字符串。""" try: from modules.knowledge import search chunks = search(title, top_k=config.TOP_K_KNOWLEDGE) if not chunks: return '' parts = [] for i, chunk in enumerate(chunks, 1): parts.append(f'[参考片段{i}]\n{chunk[:600]}') return ( '\n\n【企业知识库参考内容(以下摘自历史投标文件,仅供参考,' '须结合本项目实际情况重新撰写,禁止直接照抄)】\n' + '\n\n'.join(parts) ) except Exception: return '' def _leaf_rows_for_allocation(conn, project_id: int) -> list: cur = conn.cursor() cur.execute( 'SELECT id, section_title FROM bid_sections WHERE project_id=? AND is_leaf=1 ' 'ORDER BY order_index', (project_id,), ) return [{'id': r[0], 'section_title': (r[1] or '').strip()} for r in cur.fetchall()] def _project_allocation_map(db_path: str, project_id: int) -> Optional[Dict[int, Dict[str, Any]]]: """按规则与叶节点全表计算各节目标字数。无有效评分时:若已设目标页且走 target_pages 分配,仍均分全稿总预算;否则 None。""" try: conn = sqlite3.connect(db_path) try: leaves = _leaf_rows_for_allocation(conn, project_id) if not leaves: return None td = _get_tender_data(conn, project_id) rating_raw = (td or {}).get('rating_json') or '' vol = _effective_volume() rules = word_alloc.load_rules() return word_alloc.compute_leaf_allocations(vol, leaves, rating_raw, rules) finally: conn.close() except Exception as e: logger.warning('字数分配计算失败,回退统一篇幅: %s', e) return None def _build_diagram_addon_for_leaf( section: dict, outline_text: str, enable_figure: bool, enable_table: bool, ) -> str: """叶节点:按章节标题与大纲上下文的意图栈拼接图/表规范(受项目总开关约束)。""" return diagram_int.get_diagram_agent().render_for_section( section.get('section_title') or '', outline_text or '', enable_figure, enable_table, ) def _strip_line_serial_numbers(text: str) -> str: """ 去除正文行首的纯序号(如 1. / 2、 / 370) / 12 ),保留正文语义。 """ if not text: return text cleaned_lines = [] for line in text.splitlines(): cleaned = re.sub(r'^\s*\d{1,4}(?:[\..、)\s]+)\s*', '', line) cleaned_lines.append(cleaned) return '\n'.join(cleaned_lines) def _generate_one(db_path: str, section: dict, summary: str, outline_text: str, anon_requirements: str = '', enable_figure: bool = False, enable_table: bool = False, boq_summary: str = '', tender_kind: str = 'engineering', allocation_override: Optional[Dict[str, Any]] = None) -> None: """ 核心生成函数:纯 AI 调用 + 结果写库。 不长期持有 DB 连接,适合在线程池中并发调用。 """ section_id = section['id'] is_leaf = bool(section['is_leaf']) title = section['section_title'] writing_system = _build_writing_system(anon_requirements) _att_rules = att_sec.get_attachment_rules_cached() _att_mode = att_sec.attachment_leaf_body_mode(_att_rules) if ( is_leaf and att_sec.is_attachment_only_section(title, _att_rules) ) else '' _is_attachment_title = bool(is_leaf and bool(_att_mode)) _use_stack_charts = bool( _is_attachment_title and att_sec.use_attachment_stack_charts_body(_att_rules), ) diagram_addon = '' if is_leaf and (enable_figure or enable_table) and not _use_stack_charts: diagram_addon = _build_diagram_addon_for_leaf( section, outline_text, enable_figure, enable_table, ) _update_section_status_safe(db_path, section_id, 'generating') try: if is_leaf: if _use_stack_charts: stack = diagram_int.get_diagram_agent().plan( title, outline_text, enable_figure, enable_table, ) if _att_mode == 'single_chart_only' and len(stack) > 1: stack = [stack[0]] if not stack and (enable_figure or enable_table): fk = att_sec.pick_single_figure_or_table( title, enable_figure, enable_table, _att_rules, ) if fk: stack = diagram_int.make_fallback_stack(fk) if not stack or (not enable_figure and not enable_table): prompt = P.get_attachment_chart_disabled_prompt(title) stack_sys = P.ATTACHMENT_STACK_CHARTS_SYSTEM if (anon_requirements or '').strip(): stack_sys = stack_sys + '\n\n【合规】\n' + (anon_requirements or '')[:2000] content = ai_client.chat( prompt, system=stack_sys, temperature=0.45, max_tokens=512, ) else: labels = diagram_int.stack_compact_labels(stack) prompt = P.get_attachment_stack_charts_prompt( summary, outline_text, title, labels, ) stack_sys = P.ATTACHMENT_STACK_CHARTS_SYSTEM if (anon_requirements or '').strip(): stack_sys = stack_sys + '\n\n【合规】\n' + (anon_requirements or '')[:2000] content = ai_client.chat( prompt, system=stack_sys, temperature=0.5, max_tokens=3072, ) content = _strip_line_serial_numbers(content) _update_section_content_safe(db_path, section_id, content, '') else: volume = _effective_volume() if allocation_override: wc_spec = allocation_override['word_count_spec'] max_tok = int(allocation_override.get('max_tokens') or _get_max_tokens(volume)) tgt = int(allocation_override.get('target_chars') or 0) min_chars = word_alloc.continuation_threshold(tgt) if tgt > 0 else _get_min_chars(volume) else: wc_spec = _get_word_count_spec(volume) max_tok = _get_max_tokens(volume) min_chars = _get_min_chars(volume) prompt = P.get_section_detail_prompt( summary, outline_text, title, word_count_spec=wc_spec, boq_summary=boq_summary, tender_kind=tender_kind or 'engineering', ) if _is_attachment_title: prompt = prompt + P.get_attachment_chapter_emphasis_hint() # 知识库检索:将历史标书相关片段作为写作参考注入提示词 knowledge_ctx = _get_knowledge_context(title) if knowledge_ctx: prompt = prompt + knowledge_ctx if diagram_addon: prompt = prompt + diagram_addon content = ai_client.chat( prompt, system=writing_system, temperature=0.7, max_tokens=max_tok, ) content = _auto_continue(content, min_chars, max_tok, title, system=writing_system) content = _strip_line_serial_numbers(content) _update_section_content_safe(db_path, section_id, content, '') else: prompt = P.get_section_intro_prompt(summary, outline_text, title) if prompt: intro = ai_client.chat( prompt, system=writing_system, temperature=0.4, max_tokens=1024, ) else: intro = '' intro = _strip_line_serial_numbers(intro) _update_section_content_safe(db_path, section_id, '', intro) _update_section_status_safe(db_path, section_id, 'done') logger.info(f'Section {section_id} "{title}" 生成完成') except Exception as e: logger.exception(f'章节生成失败 section_id={section_id}') _update_section_status_safe(db_path, section_id, 'error', str(e)) def generate_all_sections(db_path: str, project_id: int, anon_requirements: str = '', enable_figure: bool = False, enable_table: bool = False) -> None: """ 后台:并发生成所有章节(全局LLM_SEMAPHORE保护,总并发≤20)。 策略:先生成非叶节点(章节引言),再并发生成所有叶节点(正文)。 并发数由 config.MAX_CONCURRENT_SECTIONS (默认12,可达20) 控制。 """ try: conn = sqlite3.connect(db_path) cur = conn.cursor() # 读取尚未生成的章节(跳过已完成的) cur.execute(''' SELECT id, section_number, section_title, level, is_leaf, content, intro_content, status FROM bid_sections WHERE project_id=? ORDER BY order_index ''', (project_id,)) rows = cur.fetchall() td = _get_tender_data(conn, project_id) outline_text = _get_outline_text(conn, project_id) if not outline_text.strip(): conn.close() raise ValueError('当前项目尚无可用大纲,请先保存或生成大纲') summary = (td or {}).get('summary', '') boq_summary = (td or {}).get('boq_summary', '') tender_kind = (td or {}).get('tender_kind', 'engineering') or 'engineering' outline_head = outline_text.strip().splitlines()[0][:50] if outline_text.strip() else '' logger.info( f'全量生成读取大纲 project_id={project_id}, outline_len={len(outline_text)}, outline_head="{outline_head}"' ) alloc_map = _project_allocation_map(db_path, project_id) conn.close() all_sections = [ {'id': r[0], 'section_number': r[1], 'section_title': r[2], 'level': r[3], 'is_leaf': r[4], 'content': r[5], 'intro_content': r[6], 'status': r[7]} for r in rows ] # 只处理未完成的章节(pending / error 的重新生成) sections = [s for s in all_sections if s.get('status') != 'done'] if not sections: logger.info(f'项目 {project_id} 所有章节已生成完成,无需重新生成') return # 分组:非叶节点(章节引言,通常较短)+ 叶节点(正文内容,耗时较长) non_leaf = [s for s in sections if not s['is_leaf']] leaf = [s for s in sections if s['is_leaf']] workers = max(1, config.MAX_CONCURRENT_SECTIONS) # 极速优化:尊重全局LLM上限,避免continuation rounds导致超限 llm_limit = getattr(config, 'LLM_CONCURRENCY_LIMIT', 20) workers = min(workers, max(1, llm_limit // 2)) # 保守分配,留空间给续写/知识检索 logger.info( f'项目 {project_id} 开始并发生成: ' f'{len(non_leaf)} 个章节引言 + {len(leaf)} 个叶节点, ' f'并发数={workers} (LLM上限={llm_limit})' ) # 第一阶段:并发生成非叶节点引言(通常很快) if non_leaf: _concurrent_generate(db_path, non_leaf, summary, outline_text, workers, anon_requirements, enable_figure, enable_table, boq_summary, tender_kind, None) # 第二阶段:并发生成叶节点正文(主要耗时部分) if leaf: _concurrent_generate(db_path, leaf, summary, outline_text, workers, anon_requirements, enable_figure, enable_table, boq_summary, tender_kind, alloc_map) # 统计结果 conn = sqlite3.connect(db_path) cur = conn.cursor() cur.execute(''' SELECT COUNT(*) as total, SUM(CASE WHEN status='done' THEN 1 ELSE 0 END) as done, SUM(CASE WHEN status='error' THEN 1 ELSE 0 END) as errors FROM bid_sections WHERE project_id=? ''', (project_id,)) total, done, errors = cur.fetchone() conn.close() logger.info(f'项目 {project_id} 全量生成完成: {done}/{total} 成功, {errors} 失败') except Exception as e: logger.exception(f'全量生成失败 project_id={project_id}') def _concurrent_generate(db_path: str, sections: list, summary: str, outline_text: str, workers: int, anon_requirements: str = '', enable_figure: bool = False, enable_table: bool = False, boq_summary: str = '', tender_kind: str = 'engineering', alloc_map: Optional[Dict[int, Dict[str, Any]]] = None) -> None: """用线程池并发生成一批章节""" with ThreadPoolExecutor(max_workers=workers, thread_name_prefix='gen') as pool: futures = {} for s in sections: override = alloc_map.get(s['id']) if alloc_map else None f = pool.submit(_generate_one, db_path, s, summary, outline_text, anon_requirements, enable_figure, enable_table, boq_summary, tender_kind, override) futures[f] = s for f in as_completed(futures): s = futures[f] try: f.result() except Exception as e: logger.error(f'章节 {s["id"]} "{s["section_title"]}" 异常: {e}') # ─── 大纲解析 ───────────────────────────────────────────────────────────── _CN_NUMS_LIST = [ '', '一', '二', '三', '四', '五', '六', '七', '八', '九', '十', '十一', '十二', '十三', '十四', '十五', '十六', '十七', '十八', '十九', '二十', ] def _renumber_sections(sections: list) -> list: """ 对章节列表按层级顺序重新编号,确保删除/增减章节后序号连续。 level 1 → 整数字符串 "1","2",... level 2 → "1.1","1.2",... level 3 → "1.1.1","1.1.2",... level 4 → "1.1.1.1",... 直接修改传入列表中各节点的 number 字段,并返回该列表。 """ counters = [0] * 5 # 索引 0-3 对应 level 1-4 for s in sections: level = s['level'] idx = level - 1 counters[idx] += 1 for j in range(idx + 1, len(counters)): counters[j] = 0 if level == 1: s['number'] = str(counters[0]) else: s['number'] = '.'.join(str(counters[i]) for i in range(level)) return sections def _sections_to_outline_text(bid_title: str, sections: list) -> str: """将章节列表还原为大纲文本:一级「一、」子级「1.1 」,与 AI 目录示例一致。""" lines = [] if bid_title: lines.append(bid_title) for s in sections: level = int(s.get('level', 1)) title = s.get('title', '') number = s.get('number', '') line = format_heading_display(level, number, title) indent = '\u3000' * (level - 1) lines.append(f'{indent}{line}') return '\n'.join(lines) def _parse_outline(text: str): """ 将大纲文本解析为章节列表,并自动重排序号(修复删除章节后序号不连续的问题)。 返回 (bid_title, sections_list, normalized_text) 每个 section: {number, title, level, is_leaf, order_index} """ lines = text.strip().split('\n') bid_title = '' sections = [] order = 0 # 第一行非章节行作为标题 for i, line in enumerate(lines): stripped = line.strip() if not stripped: continue is_chapter_line = ( bool(re.match(r'^[一二三四五六七八九十百第]', stripped)) or bool(re.match(r'^\d+(?:[..、]\s*|\s+)?\S+', stripped)) ) if not is_chapter_line: bid_title = stripped lines = lines[i + 1:] break break chapter_counter = 0 for line in lines: raw_line = line.rstrip('\n') stripped = raw_line.strip() if not stripped: continue # 去掉行首全角/半角缩进后再匹配编号,避免「  1.1 标题」无法识别 parse_line = stripped.lstrip('\u3000\u2003\u2002\u00a0 \t') # 一级:中文数字 + 顿号/句号 m1 = re.match(r'^([一二三四五六七八九十百]+)[、。.]\s*(.*)', parse_line) if m1: cn = m1.group(1) title = m1.group(2).strip() chapter_counter = CN_NUM_MAP.get(cn, chapter_counter + 1) sections.append({ 'number': str(chapter_counter), 'title': title, 'level': 1, 'is_leaf': True, 'order_index': order, }) order += 1 continue # 二/三/四级须先于「一级 纯数字+顿号/空格」匹配,避免 "1.1 标题" 被误成一级 1 且 title=".1 标题" 导致节数虚增 m_num = re.match(r'^(\d+(?:\.\d+)+)\s+(.*)', parse_line) if m_num: num_str = m_num.group(1) title = m_num.group(2).strip() level = num_str.count('.') + 1 sections.append({ 'number': num_str, 'title': title, 'level': min(level, 4), 'is_leaf': True, 'order_index': order, }) order += 1 continue # 一级:阿拉伯数字 + 可选分隔(支持 "1 标题"、"1.标题"、"1标题")—— 不含 1.1 形式(已上处理) m1_en = re.match(r'^(\d+)(?:[、。..]\s*|\s+)?(.*)', parse_line) if m1_en: chapter_no = int(m1_en.group(1)) title = (m1_en.group(2) or '').strip() title = re.sub(r'^[、。..\s]+', '', title) if title: chapter_counter = chapter_no sections.append({ 'number': str(chapter_counter), 'title': title, 'level': 1, 'is_leaf': True, 'order_index': order, }) order += 1 continue # 兜底:无编号行按缩进推断层级(支持“纯标题大纲”) indent_full = len(re.match(r'^[\u3000 ]*', raw_line).group(0)) # 约定:每 1 个全角空格/2 个半角空格视作 1 级缩进 level = min(max(1, (indent_full // 2) + 1), 4) if level == 1: chapter_counter += 1 number = str(chapter_counter) else: number = '1.' * (level - 1) + '1' sections.append({ 'number': number.strip('.'), 'title': parse_line, 'level': level, 'is_leaf': True, 'order_index': order, }) order += 1 # 重排序号(核心修复:删除章节后确保编号连续) _renumber_sections(sections) # 标记非叶节点(在重排后执行,确保前缀匹配正确) nums = [s['number'] for s in sections] for s in sections: prefix = s['number'] + '.' if any(n.startswith(prefix) for n in nums): s['is_leaf'] = False # 重建规范大纲文本(供回写数据库) normalized_text = _sections_to_outline_text(bid_title, sections) return bid_title, sections, normalized_text # ─── 数据库工具 ─────────────────────────────────────────────────────────── def _get_tender_data(conn, project_id): cur = conn.cursor() cur.execute( "SELECT summary, rating_requirements, rating_json, raw_text, boq_summary, tender_kind " "FROM tender_data WHERE project_id=?", (project_id,) ) row = cur.fetchone() if row: return { 'summary': row[0], 'rating_requirements': row[1], 'rating_json': row[2], 'raw_text': row[3], 'boq_summary': row[4] or '', 'tender_kind': row[5] or 'engineering', } return None def _get_outline_text(conn, project_id): cur = conn.cursor() cur.execute("SELECT outline FROM tender_data WHERE project_id=?", (project_id,)) row = cur.fetchone() return row[0] if row and row[0] else '' def _save_outline_text(conn, project_id, outline_text): cur = conn.cursor() # 兜底:若 tender_data 尚未初始化,先补齐空记录,避免 UPDATE 0 行导致“假保存成功” cur.execute( "INSERT OR IGNORE INTO tender_data (project_id, status) VALUES (?, 'pending')", (project_id,), ) cur.execute( "UPDATE tender_data SET outline=?, updated_at=? WHERE project_id=?", (outline_text, datetime.now(), project_id), ) conn.commit() def _save_sections(conn, project_id, sections): cur = conn.cursor() # 清除旧章节 cur.execute("DELETE FROM bid_sections WHERE project_id=?", (project_id,)) for s in sections: cur.execute(''' INSERT INTO bid_sections (project_id, section_number, section_title, level, is_leaf, order_index, status) VALUES (?, ?, ?, ?, ?, ?, 'pending') ''', (project_id, s['number'], s['title'], s['level'], 1 if s['is_leaf'] else 0, s['order_index'])) conn.commit() def _get_section(conn, section_id): cur = conn.cursor() cur.execute( "SELECT id, section_number, section_title, level, is_leaf, content, intro_content FROM bid_sections WHERE id=?", (section_id,) ) row = cur.fetchone() if row: return { 'id': row[0], 'section_number': row[1], 'section_title': row[2], 'level': row[3], 'is_leaf': row[4], 'content': row[5], 'intro_content': row[6] } return None def _update_section_status(conn, section_id, status, error=''): cur = conn.cursor() cur.execute( "UPDATE bid_sections SET status=?, error_message=?, updated_at=? WHERE id=?", (status, error, datetime.now(), section_id) ) conn.commit() def _update_section_content(conn, section_id, content, intro_content): cur = conn.cursor() cur.execute( "UPDATE bid_sections SET content=?, intro_content=?, updated_at=? WHERE id=?", (content, intro_content, datetime.now(), section_id) ) conn.commit() # ─── 线程安全的数据库操作(每次独立开关连接,启用 WAL)────────────────── def _db_connect(db_path: str) -> sqlite3.Connection: """创建启用 WAL 模式的连接,适合多线程并发写入""" conn = sqlite3.connect(db_path, timeout=30, check_same_thread=False) conn.execute('PRAGMA journal_mode=WAL') return conn def _update_section_status_safe(db_path, section_id, status, error=''): conn = _db_connect(db_path) try: _update_section_status(conn, section_id, status, error) finally: conn.close() def _update_section_content_safe(db_path, section_id, content, intro_content): conn = _db_connect(db_path) try: _update_section_content(conn, section_id, content, intro_content) finally: conn.close() def _set_project_status(conn, project_id, status, error=''): cur = conn.cursor() cur.execute( "UPDATE projects SET outline_status=?, outline_error=?, updated_at=? WHERE id=?", (status, error, datetime.now(), project_id) ) conn.commit() # ─── AI自动填充小章节 ─────────────────────────────────────────────────────── def expand_outline( outline_text: str, summary: str = '', rating_requirements: str = '', project_id: int = 0, target_pages: int = 0, ) -> str: """ 根据用户输入的主章节标题,自动填充子章节。 target_pages: 小章节行总数上界用(与 volume_chapters 线性映射 + ±10%);须由调用方传入本次请求 的页数(如前端目标页数),避免仅依赖 process 内全局 config 在多进程或与配置不同步时未生效(出现数百节)。 为 0 时不做条数限制(与未启用目标页数一致)。 """ lines = outline_text.strip().split('\n') bid_title = '' main_chapters = [] # 提取标书标题(第一行非章节行且较长时视为标题) for i, line in enumerate(lines): stripped = line.strip() if not stripped: continue is_chapter_format = re.match(r'^[一二三四五六七八九十百第]', stripped) or re.match(r'^\d+[..、\s]', stripped) if not is_chapter_format and len(stripped) > 50: bid_title = stripped lines = lines[i + 1:] break break # 提取一级章节 for line in lines: stripped = line.strip() if not stripped: continue # 先排除二级及以上章节 if re.match(r'^\d+(?:\.\d+)+', stripped): continue m1_cn = re.match(r'^([一二三四五六七八九十百]+)[、。..\s]+\s*(.*)', stripped) if not m1_cn: m1_cn = re.match(r'^第([一二三四五六七八九十百]+)[章节]\s*(.*)', stripped) if not m1_cn: m1_cn = re.match(r'^([一二三四五六七八九十百]+)(?![一二三四五六七八九十百])\s+(.*)', stripped) m1_en = re.match(r'^(\d+)[、。..\s]+\s*(.*)', stripped) if not m1_en: m1_en = re.match(r'^第(\d+)[章节]\s*(.*)', stripped) if not m1_en: m1_en = re.match(r'^(\d+)(?!\d)\s+(.*)', stripped) if not m1_en: m1_en = re.match(r'^(\d+)([^\d].*)', stripped) if m1_cn or m1_en: title = (m1_cn.group(2) if m1_cn else m1_en.group(2)).strip() title = re.sub(r'^[、。..\s]+', '', title) if title: main_chapters.append({'title': title}) else: # 没有编号的短文本行,也允许作为主章节 if 0 < len(stripped) < 50: main_chapters.append({'title': stripped}) if not main_chapters: logger.warning(f'expand_outline未找到主章节,输入大纲:{outline_text[:200]}') return outline_text expanded_lines = [] if bid_title: expanded_lines.append(bid_title) # 并发生成主章节的小章节(附件类主章跳过 AI 填充) results: List[Optional[str]] = [None] * len(main_chapters) chapters_to_expand: List[Tuple[int, Dict[str, Any]]] = [] for idx, chapter in enumerate(main_chapters): ct = chapter['title'] if att_sec.should_skip_expand_subchapters(ct): results[idx] = '' label = att_sec.parse_attachment_label(ct) if label: logger.info( 'expand_outline 跳过附件主章节小章节填充: title=%r attachment_label=%r', ct, label, ) else: logger.info( 'expand_outline 跳过附件主章节小章节填充: title=%r', ct, ) else: chapters_to_expand.append((idx, chapter)) if chapters_to_expand: tp = max(0, int(target_pages or 0)) per_main: Optional[List[int]] = None if tp > 0: k_exp = len(chapters_to_expand) n_total = vol_ch.subchapter_total_effective(tp, k_exp, random.Random()) per_main = vol_ch.allocate_subchapters_to_mains(n_total, k_exp) # 全局信号量已保护AI调用,此处可提高到接近LLM上限(默认12-20) max_workers = min(len(chapters_to_expand), getattr(config, 'MAX_CONCURRENT_SECTIONS', 15)) with ThreadPoolExecutor(max_workers=max_workers) as executor: future_to_chapter = { executor.submit( _generate_sub_chapters, ch['title'], summary, rating_requirements, idx + 1, project_id, per_main[i] if per_main else None, ): (idx, ch['title']) for i, (idx, ch) in enumerate(chapters_to_expand) } for future in as_completed(future_to_chapter): idx, title = future_to_chapter[future] try: results[idx] = future.result() logger.info(f'主章节扩展成功: {title}') except Exception as e: logger.error(f'主章节扩展失败: {title}, 错误: {e}') results[idx] = '' # 组装结果 for idx, chapter in enumerate(main_chapters): chapter_num = idx + 1 cn_num = _CN_NUMS_LIST[chapter_num] if chapter_num < len(_CN_NUMS_LIST) else str(chapter_num) expanded_lines.append(f'{cn_num}、{chapter["title"]}') if results[idx]: expanded_lines.append(results[idx]) return '\n'.join(expanded_lines) def _extract_title_text(title: str) -> str: """从标题中提取纯文本内容,去除序号和标点符号。""" text = re.sub(r'^[一二三四五六七八九十百]+[、。.]\s*', '', title.strip()) text = re.sub(r'^\d+(?:\.\d+)*[、。.]?\s*', '', text) text = re.sub(r'^\s*[、。,,;;::]+\s*', '', text) text = re.sub(r'\s*[、。,,;;::]+\s*$', '', text) return text.strip() def _generate_sub_chapters( chapter_title: str, summary: str, rating_requirements: str, chapter_num: int, project_id: int = 0, max_subchapters: Optional[int] = None, ) -> str: """为单个主章节生成子章节大纲。""" if max_subchapters is not None and max_subchapters <= 0: return '' boq_summary = _get_boq_summary_for_chapter(chapter_title, summary) prompt = P.get_chapter_outline_prompt( summary, chapter_title, rating_requirements, max_subchapters=max_subchapters ) if boq_summary: prompt += ( '\n\n【工程量清单关键信息】\n' f'{boq_summary}\n\n请严格根据工程量清单中的工程项目生成子章节,确保每个子章节都与具体工程内容对应。' ) try: response = ai_client.chat( prompt, system='你是一位专业的标书大纲生成专家。请根据主章节标题和工程量清单内容生成合适的子章节列表,严格遵守编号规则:' '绝对禁止出现1.0、2.0、1.0.1等0开头编号;' '二级从X.1开始,三级从X.1.1开始,四级从X.1.1.1开始;' '只输出子章节,不重复主章节标题。', temperature=0.5, max_tokens=2048, request_timeout=getattr(config, 'OUTLINE_REQUEST_TIMEOUT', 300), ) logger.info(f'_generate_sub_chapters AI响应章节={chapter_title},长度={len(response)}') main_title_text = _extract_title_text(chapter_title) lines = response.strip().split('\n') level_counts = {1: 0, 2: 0, 3: 0, 4: 0} result_lines = [] for line in lines: if not line or not line.strip(): continue indent_count = 0 remaining = line while remaining and (remaining[0] == '\u3000' or remaining[0] == ' '): indent_count += 1 remaining = remaining[1:] remaining = re.sub(r'^[\s#*>\-]+', '', remaining).strip() if not remaining: continue m = re.match(r'^(\d+(?:\.\d+)*)[、。..]?\s*(.*)', remaining) if m: original_num = m.group(1) parts = original_num.split('.') has_invalid_zero = any(i > 0 and part and part[0] == '0' for i, part in enumerate(parts)) if has_invalid_zero: continue if len(parts) > 1: level = len(parts) - 1 else: if indent_count == 0: level = 1 elif indent_count <= 2: level = 2 else: level = 3 title = m.group(2).strip() else: m_cn = re.match(r'^([一二三四五六七八九十百]+)[、。..]\s*(.*)', remaining) if m_cn: title = m_cn.group(2).strip() level = 1 else: title = remaining if indent_count == 0: level = 1 elif indent_count <= 2: level = 2 else: level = 3 title = _extract_title_text(title) if not title or len(title) < 2: continue if main_title_text and _extract_title_text(title) == main_title_text: continue level = min(max(level, 1), 3) level_counts[level] += 1 for l in range(level + 1, 5): level_counts[l] = 0 if level == 1: num = f'{chapter_num}.{level_counts[1]}' indent = '' elif level == 2: num = f'{chapter_num}.{level_counts[1]}.{level_counts[2]}' indent = '\u3000' else: num = f'{chapter_num}.{level_counts[1]}.{level_counts[2]}.{level_counts[3]}' indent = '\u3000\u3000' result_lines.append(f'{indent}{num} {title}') if max_subchapters is not None and max_subchapters > 0 and len(result_lines) > max_subchapters: result_lines = result_lines[:max_subchapters] return '\n'.join(result_lines) except Exception: logger.exception(f'生成子章节失败 chapter={chapter_title}') return '' def _get_boq_summary_for_chapter(chapter_title: str, summary: str) -> str: """ 从摘要中提取与施工方案相关的工程量清单信息。 """ if not summary: return '' boq_keywords = [ '项目编码', '清单编码', '编码', '编号', '序号', '项目编号', '清单编号', '项目名称', '清单名称', '名称', '工程名称', '清单项目名称', '分项名称', '计量单位', '单位', '计量', '工程量', '数量', '清单数量', '清单工程量', '综合单价', '单价', '投标单价', '综合价', '合价', '金额', '合计金额', '综合合价', '合计', '总价', '小计', '项目特征', '项目特征描述', '特征描述', '做法说明', '工程内容', '工作内容', '详述', '说明', '特征', '项目特征及内容', '施工内容', '工艺要求', '技术措施', '施工要求', '施工方法' ] lines = summary.strip().split('\n') boq_lines = [] for line in lines: if any(keyword in line for keyword in boq_keywords): boq_lines.append(line.strip()) if boq_lines: return '\n'.join(boq_lines[:20]) return ''