207 lines
8.3 KiB
Python
207 lines
8.3 KiB
Python
"""
|
||
招标文件解析模块
|
||
流程:提取文本 → 生成摘要 → 提取评分要求 → 结构化JSON
|
||
"""
|
||
import json
|
||
import logging
|
||
import re
|
||
import sqlite3
|
||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||
from datetime import datetime
|
||
from functools import partial
|
||
|
||
from utils import ai_client, prompts as P
|
||
from utils.file_utils import extract_text, truncate_text
|
||
from utils.tender_kind_sections import (
|
||
get_tender_kind_classify_prompt,
|
||
parse_tender_kind_response,
|
||
)
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
def parse_boq_file(db_path: str, project_id: int, file_path: str, file_name: str) -> None:
|
||
"""
|
||
后台线程:解析工程量清单文件 → 本地结构化分析 → AI 摘要 → 写库。
|
||
boq_status: none → parsing → done / error
|
||
"""
|
||
from utils.bill_analysis import analyze_boq_pages, categories_to_prompt_appendix
|
||
from utils.boq_parser import extract_boq_pages
|
||
|
||
conn = sqlite3.connect(db_path)
|
||
try:
|
||
_set_boq_status(conn, project_id, 'parsing', '正在提取工程量清单文本...')
|
||
|
||
page_texts = extract_boq_pages(file_path)
|
||
boq_text = '\n'.join(page_texts).strip()
|
||
if not boq_text:
|
||
raise ValueError('未能从文件中提取到有效内容,请检查文件格式')
|
||
|
||
_set_boq_status(conn, project_id, 'parsing', '正在本地解析清单结构...')
|
||
analysis = analyze_boq_pages(page_texts)
|
||
boq_analysis_json = json.dumps(analysis, ensure_ascii=False)
|
||
|
||
structured = ''
|
||
if not analysis.get('scanned') and not analysis.get('no_bill_pages'):
|
||
structured = categories_to_prompt_appendix(analysis)
|
||
|
||
_set_boq_status(conn, project_id, 'parsing', '正在生成工程量清单摘要...')
|
||
|
||
summary_prompt = P.get_boq_summary_prompt(boq_text[:10000], structured)
|
||
boq_summary = ai_client.chat(summary_prompt, temperature=0.2, max_tokens=2048)
|
||
|
||
cur = conn.cursor()
|
||
cur.execute('''
|
||
UPDATE tender_data
|
||
SET boq_file_name=?, boq_text=?, boq_summary=?, boq_analysis_json=?,
|
||
boq_status='done', boq_error='', updated_at=?
|
||
WHERE project_id=?
|
||
''', (file_name, boq_text[:12000], boq_summary, boq_analysis_json, datetime.now(), project_id))
|
||
conn.commit()
|
||
logger.info(f'项目 {project_id} 工程量清单解析完成')
|
||
|
||
except Exception as e:
|
||
logger.exception(f'工程量清单解析失败 project_id={project_id}')
|
||
_set_boq_status(conn, project_id, 'error', str(e))
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
def _set_boq_status(conn, project_id, status, message=''):
|
||
cur = conn.cursor()
|
||
cur.execute('''
|
||
UPDATE tender_data SET boq_status=?, boq_error=?, updated_at=?
|
||
WHERE project_id=?
|
||
''', (status, message, datetime.now(), project_id))
|
||
conn.commit()
|
||
|
||
|
||
def parse_tender_file(db_path: str, project_id: int, file_path: str, file_name: str) -> None:
|
||
"""
|
||
后台线程中运行:解析招标文件并将结果写入数据库。
|
||
极速优化:提取文本后并行执行3个独立AI任务(摘要、评分要求、类型识别),
|
||
然后顺序执行依赖的JSON结构化。全局 LLM 信号量限制总并发(默认 40,与 config 一致)。
|
||
status 字段:pending → parsing → done / error
|
||
"""
|
||
conn = sqlite3.connect(db_path)
|
||
try:
|
||
_set_status(conn, project_id, 'parsing', '正在提取文件文本...')
|
||
|
||
# 1. 提取原始文本(I/O/CPU,可进一步并行但当前足够快)
|
||
raw_text = extract_text(file_path)
|
||
raw_text = truncate_text(raw_text, 60000)
|
||
excerpt = (raw_text or '')[:15000]
|
||
|
||
_set_status(conn, project_id, 'parsing', '并行生成摘要、评分要求和类型识别...')
|
||
|
||
# 并行准备3个独立AI任务(大幅加速解析环节)
|
||
def _run_summary():
|
||
prompt = P.get_project_summary_prompt(raw_text)
|
||
return ai_client.chat(prompt, temperature=0.3, max_tokens=4096)
|
||
|
||
def _run_rating():
|
||
prompt = P.get_rating_requirements_prompt(raw_text)
|
||
return ai_client.chat(prompt, temperature=0.2, max_tokens=4096)
|
||
|
||
def _run_kind():
|
||
prompt = get_tender_kind_classify_prompt(excerpt)
|
||
raw = ai_client.chat(prompt, temperature=0.1, max_tokens=32)
|
||
return parse_tender_kind_response(raw)
|
||
|
||
# 使用有限线程池;单次解析内 LLM 调用仍受全局信号量约束
|
||
with ThreadPoolExecutor(max_workers=3, thread_name_prefix='parse') as executor:
|
||
future_summary = executor.submit(_run_summary)
|
||
future_rating = executor.submit(_run_rating)
|
||
future_kind = executor.submit(_run_kind)
|
||
|
||
summary = future_summary.result()
|
||
rating_md = future_rating.result()
|
||
tender_kind = future_kind.result()
|
||
|
||
logger.info(f'项目 {project_id} 招标文件类型识别为: {tender_kind}')
|
||
|
||
_set_status(conn, project_id, 'parsing', '正在结构化评分数据...')
|
||
|
||
# 4. 依赖rating_md的JSON结构化(顺序执行)
|
||
rating_json_prompt = P.get_rating_json_prompt(rating_md)
|
||
rating_json_raw = ai_client.chat(rating_json_prompt, temperature=0.1, max_tokens=2048)
|
||
rating_json_str = _clean_json(rating_json_raw)
|
||
|
||
# 写入数据库
|
||
_upsert_tender_data(conn, project_id, file_name, raw_text,
|
||
summary, rating_md, rating_json_str, tender_kind)
|
||
|
||
# Deep integration: persist diagram/anon settings to projects table (auto-defaults)
|
||
# Future: add AI extraction prompt for diagram intent and anon rules from raw_text
|
||
cur = conn.cursor()
|
||
cur.execute('''
|
||
UPDATE projects SET
|
||
enable_figure = COALESCE(enable_figure, 1),
|
||
enable_table = COALESCE(enable_table, 1),
|
||
anon_requirements = COALESCE(anon_requirements, '不得出现投标人身份信息')
|
||
WHERE id = ?
|
||
''', (project_id,))
|
||
conn.commit()
|
||
|
||
_set_status(conn, project_id, 'done', '解析完成(已同步生成设置)')
|
||
logger.info(f'项目 {project_id} 招标文件解析完成(并行加速完成,生成设置已打通)')
|
||
|
||
except Exception as e:
|
||
logger.exception(f'解析失败 project_id={project_id}')
|
||
_set_status(conn, project_id, 'error', str(e))
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
# ─── 内部工具 ──────────────────────────────────────────────────────────────
|
||
|
||
def _set_status(conn, project_id, status, message=''):
|
||
cur = conn.cursor()
|
||
cur.execute('''
|
||
INSERT INTO tender_data (project_id, status, error_message)
|
||
VALUES (?, ?, ?)
|
||
ON CONFLICT(project_id) DO UPDATE SET status=?, error_message=?, updated_at=?
|
||
''', (project_id, status, message, status, message, datetime.now()))
|
||
conn.commit()
|
||
|
||
|
||
def _upsert_tender_data(conn, project_id, file_name, raw_text,
|
||
summary, rating_md, rating_json_str,
|
||
tender_kind: str = 'engineering'):
|
||
cur = conn.cursor()
|
||
cur.execute('''
|
||
INSERT INTO tender_data
|
||
(project_id, file_name, raw_text, summary, rating_requirements, rating_json,
|
||
tender_kind, status, error_message)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, 'done', '')
|
||
ON CONFLICT(project_id) DO UPDATE SET
|
||
file_name=?, raw_text=?, summary=?, rating_requirements=?,
|
||
rating_json=?, tender_kind=?, status='done', error_message='', updated_at=?
|
||
''', (
|
||
project_id, file_name, raw_text, summary, rating_md, rating_json_str, tender_kind,
|
||
file_name, raw_text, summary, rating_md, rating_json_str, tender_kind, datetime.now()
|
||
))
|
||
conn.commit()
|
||
|
||
|
||
def _clean_json(raw: str) -> str:
|
||
"""尝试从 AI 返回中提取 JSON 字符串"""
|
||
# 去除 markdown 代码块
|
||
raw = re.sub(r'```(?:json)?\s*', '', raw)
|
||
raw = raw.replace('```', '').strip()
|
||
# 验证是否是有效 JSON
|
||
try:
|
||
json.loads(raw)
|
||
return raw
|
||
except json.JSONDecodeError:
|
||
# 尝试提取 { ... } 部分
|
||
m = re.search(r'\{[\s\S]*\}', raw)
|
||
if m:
|
||
candidate = m.group(0)
|
||
try:
|
||
json.loads(candidate)
|
||
return candidate
|
||
except Exception:
|
||
pass
|
||
return raw
|