""" 招标文件解析模块 流程:提取文本 → 生成摘要 → 提取评分要求 → 结构化JSON """ import json import logging import re import sqlite3 from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime from functools import partial from utils import ai_client, prompts as P from utils.file_utils import extract_text, truncate_text from utils.tender_kind_sections import ( get_tender_kind_classify_prompt, parse_tender_kind_response, ) logger = logging.getLogger(__name__) def parse_boq_file(db_path: str, project_id: int, file_path: str, file_name: str) -> None: """ 后台线程:解析工程量清单文件 → 本地结构化分析 → AI 摘要 → 写库。 boq_status: none → parsing → done / error """ from utils.bill_analysis import analyze_boq_pages, categories_to_prompt_appendix from utils.boq_parser import extract_boq_pages conn = sqlite3.connect(db_path) try: _set_boq_status(conn, project_id, 'parsing', '正在提取工程量清单文本...') page_texts = extract_boq_pages(file_path) boq_text = '\n'.join(page_texts).strip() if not boq_text: raise ValueError('未能从文件中提取到有效内容,请检查文件格式') _set_boq_status(conn, project_id, 'parsing', '正在本地解析清单结构...') analysis = analyze_boq_pages(page_texts) boq_analysis_json = json.dumps(analysis, ensure_ascii=False) structured = '' if not analysis.get('scanned') and not analysis.get('no_bill_pages'): structured = categories_to_prompt_appendix(analysis) _set_boq_status(conn, project_id, 'parsing', '正在生成工程量清单摘要...') summary_prompt = P.get_boq_summary_prompt(boq_text[:10000], structured) boq_summary = ai_client.chat(summary_prompt, temperature=0.2, max_tokens=2048) cur = conn.cursor() cur.execute(''' UPDATE tender_data SET boq_file_name=?, boq_text=?, boq_summary=?, boq_analysis_json=?, boq_status='done', boq_error='', updated_at=? WHERE project_id=? ''', (file_name, boq_text[:12000], boq_summary, boq_analysis_json, datetime.now(), project_id)) conn.commit() logger.info(f'项目 {project_id} 工程量清单解析完成') except Exception as e: logger.exception(f'工程量清单解析失败 project_id={project_id}') _set_boq_status(conn, project_id, 'error', str(e)) finally: conn.close() def _set_boq_status(conn, project_id, status, message=''): cur = conn.cursor() cur.execute(''' UPDATE tender_data SET boq_status=?, boq_error=?, updated_at=? WHERE project_id=? ''', (status, message, datetime.now(), project_id)) conn.commit() def parse_tender_file(db_path: str, project_id: int, file_path: str, file_name: str) -> None: """ 后台线程中运行:解析招标文件并将结果写入数据库。 极速优化:提取文本后并行执行3个独立AI任务(摘要、评分要求、类型识别), 然后顺序执行依赖的JSON结构化。全局 LLM 信号量限制总并发(默认 40,与 config 一致)。 status 字段:pending → parsing → done / error """ conn = sqlite3.connect(db_path) try: _set_status(conn, project_id, 'parsing', '正在提取文件文本...') # 1. 提取原始文本(I/O/CPU,可进一步并行但当前足够快) raw_text = extract_text(file_path) raw_text = truncate_text(raw_text, 60000) excerpt = (raw_text or '')[:15000] _set_status(conn, project_id, 'parsing', '并行生成摘要、评分要求和类型识别...') # 并行准备3个独立AI任务(大幅加速解析环节) def _run_summary(): prompt = P.get_project_summary_prompt(raw_text) return ai_client.chat(prompt, temperature=0.3, max_tokens=4096) def _run_rating(): prompt = P.get_rating_requirements_prompt(raw_text) return ai_client.chat(prompt, temperature=0.2, max_tokens=4096) def _run_kind(): prompt = get_tender_kind_classify_prompt(excerpt) raw = ai_client.chat(prompt, temperature=0.1, max_tokens=32) return parse_tender_kind_response(raw) # 使用有限线程池;单次解析内 LLM 调用仍受全局信号量约束 with ThreadPoolExecutor(max_workers=3, thread_name_prefix='parse') as executor: future_summary = executor.submit(_run_summary) future_rating = executor.submit(_run_rating) future_kind = executor.submit(_run_kind) summary = future_summary.result() rating_md = future_rating.result() tender_kind = future_kind.result() logger.info(f'项目 {project_id} 招标文件类型识别为: {tender_kind}') _set_status(conn, project_id, 'parsing', '正在结构化评分数据...') # 4. 依赖rating_md的JSON结构化(顺序执行) rating_json_prompt = P.get_rating_json_prompt(rating_md) rating_json_raw = ai_client.chat(rating_json_prompt, temperature=0.1, max_tokens=2048) rating_json_str = _clean_json(rating_json_raw) # 写入数据库 _upsert_tender_data(conn, project_id, file_name, raw_text, summary, rating_md, rating_json_str, tender_kind) # Deep integration: persist diagram/anon settings to projects table (auto-defaults) # Future: add AI extraction prompt for diagram intent and anon rules from raw_text cur = conn.cursor() cur.execute(''' UPDATE projects SET enable_figure = COALESCE(enable_figure, 1), enable_table = COALESCE(enable_table, 1), anon_requirements = COALESCE(anon_requirements, '不得出现投标人身份信息') WHERE id = ? ''', (project_id,)) conn.commit() _set_status(conn, project_id, 'done', '解析完成(已同步生成设置)') logger.info(f'项目 {project_id} 招标文件解析完成(并行加速完成,生成设置已打通)') except Exception as e: logger.exception(f'解析失败 project_id={project_id}') _set_status(conn, project_id, 'error', str(e)) finally: conn.close() # ─── 内部工具 ────────────────────────────────────────────────────────────── def _set_status(conn, project_id, status, message=''): cur = conn.cursor() cur.execute(''' INSERT INTO tender_data (project_id, status, error_message) VALUES (?, ?, ?) ON CONFLICT(project_id) DO UPDATE SET status=?, error_message=?, updated_at=? ''', (project_id, status, message, status, message, datetime.now())) conn.commit() def _upsert_tender_data(conn, project_id, file_name, raw_text, summary, rating_md, rating_json_str, tender_kind: str = 'engineering'): cur = conn.cursor() cur.execute(''' INSERT INTO tender_data (project_id, file_name, raw_text, summary, rating_requirements, rating_json, tender_kind, status, error_message) VALUES (?, ?, ?, ?, ?, ?, ?, 'done', '') ON CONFLICT(project_id) DO UPDATE SET file_name=?, raw_text=?, summary=?, rating_requirements=?, rating_json=?, tender_kind=?, status='done', error_message='', updated_at=? ''', ( project_id, file_name, raw_text, summary, rating_md, rating_json_str, tender_kind, file_name, raw_text, summary, rating_md, rating_json_str, tender_kind, datetime.now() )) conn.commit() def _clean_json(raw: str) -> str: """尝试从 AI 返回中提取 JSON 字符串""" # 去除 markdown 代码块 raw = re.sub(r'```(?:json)?\s*', '', raw) raw = raw.replace('```', '').strip() # 验证是否是有效 JSON try: json.loads(raw) return raw except json.JSONDecodeError: # 尝试提取 { ... } 部分 m = re.search(r'\{[\s\S]*\}', raw) if m: candidate = m.group(0) try: json.loads(candidate) return candidate except Exception: pass return raw