578 lines
22 KiB
Python
578 lines
22 KiB
Python
"""
|
||
工程量清单本地分析(从 bill-worker.js Phase 2/3 移植)。
|
||
Phase 2:按页关键字筛选清单页;Phase 3:正则解析分部与清单项。
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import logging
|
||
import re
|
||
from typing import Any
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
BILL_KW = ['项目编码', '项目名称', '工程量', '计量单位', '综合单价', '清单编码']
|
||
SEC_KW = ['分部分项', '分类分项', '措施项目', '其他项目', '工程量清单计价']
|
||
FEE_PAGE_KW = [
|
||
'规费', '税金', '社会保险费', '住房公积金', '养老保险',
|
||
'工伤保险', '失业保险', '医疗保险', '教育费附加', '城市维护建设税',
|
||
]
|
||
|
||
ITEM_START = re.compile(r'^\d+(\.\d+)+\s')
|
||
CODE_INLINE = re.compile(r'(?:^|\s)(\d{9,12}|(?<![A-Za-z])B\d{5,6})\s')
|
||
CODE_START_RE = re.compile(r'^(\d{9,12}|B\d{5,6})\s')
|
||
SEQ_CODE_RE = re.compile(r'^\d{1,4}\s+(\d{9,12}|(?<![A-Za-z])B\d{5,6})\s')
|
||
PAGE_MARK = re.compile(r'^--\s*\d+\s+of\s+\d+\s*--')
|
||
HEADER_RE = re.compile(r'^序号\s+(项目编码|项目名称)')
|
||
HEADER_KW = re.compile(
|
||
r'^(项目编码|项目名称|清单编码|计量单位|综合单价|工程量|合\s*价|金额|序号)\s'
|
||
)
|
||
CATEGORY_MARKERS = [
|
||
'一', '二', '三', '四', '五', '六', '七', '八', '九', '十',
|
||
'(一)', '(二)', '(三)', '(四)', '(五)',
|
||
]
|
||
|
||
# 编码:行内 9–12 位数字或 B 编码(排除字母前缀如 GB)
|
||
CODE_RE = re.compile(r'(?<![A-Za-z])(\d{9,12}|(?<![A-Za-z])B\d{5,6})')
|
||
|
||
UNIT_TOKENS = [
|
||
'm³', 'm²', 'm3', 'm2', 'km', 'hm2', '㎡', '㎥', 't', 'kg',
|
||
'个', '台', '套', '组', '根', '块', '片', '张', '只', '吨', '项',
|
||
'处', '座', '件', '段', '条', '把', '扇', '口', '圈', '道', '孔',
|
||
'对', '副', '樘', '方', '延m', '株', '棵', 'm',
|
||
]
|
||
UNIT_SET = frozenset(UNIT_TOKENS)
|
||
_unit_escaped = [re.escape(u) for u in UNIT_TOKENS]
|
||
UNIT_RE = re.compile(r'(?:^|\s)(' + '|'.join(_unit_escaped) + r')(?=\s|\d|$)')
|
||
|
||
SKIP_RE = re.compile(r'合\s*计|小\s*计|本页小计|总\s*计|价税合计')
|
||
|
||
_DASH_CODE = re.compile(
|
||
r'(\d{2,4})[-‐–](\d{2,4})[-‐–](\d{2,4})(?:[-‐–](\d{2,4}))?'
|
||
)
|
||
|
||
_EXACT_FEE_ITEM = frozenset([
|
||
'规费', '税金', '利润', '增值税', '暂列金额', '暂估价', '计日工',
|
||
'总承包服务费', '企业管理费', '甲供材料保管费', '价税合计',
|
||
])
|
||
_FEE_KW = [
|
||
'安全文明', '文明施工费', '环境保护费', '临时设施费',
|
||
'夜间施工增加费', '夜间施工费',
|
||
'冬雨季施工增加费', '冬雨季施工费',
|
||
'二次搬运费', '大型机械设备进出场', '大型机械进出场',
|
||
'施工排水降水', '排水降水费',
|
||
'已完工程及设备保护', '已完工程保护费',
|
||
'工程排污费', '社会保障费', '住房公积金',
|
||
'工伤保险', '劳动保险', '意外伤害保险', '建筑工程保险',
|
||
'城市维护建设税', '城市建设维护税',
|
||
'教育费附加', '地方教育附加',
|
||
'材料暂估', '专业工程暂估',
|
||
'超高施工增加费', '安全防护费',
|
||
'措施项目费', '其他项目费', '不可竞争费',
|
||
]
|
||
|
||
_CAT_KW = [
|
||
'土建', '建筑', '结构', '装饰', '装修', '安装', '给排水', '暖通', '空调', '通风',
|
||
'电气', '强电', '弱电', '消防', '智能化', '幕墙', '门窗', '园林', '绿化', '景观',
|
||
'市政', '道路', '桥梁', '管网', '基础', '地基', '桩基', '主体', '屋面', '防水',
|
||
'保温', '钢结构', '排水', '给水', '照明', '动力', '防雷', '电梯', '人防', '室外',
|
||
'附属', '分部', '工程', '措施', '清单', '土石方', '混凝土', '砌筑', '模板', '脚手架',
|
||
'水利', '河道', '管道', '阀门', '设备', '仪表', '自动化', '通信', '网络',
|
||
'拆除', '外墙', '内墙', '楼地面', '天棚', '吊顶', '栏杆', '屋顶', '涂料', '抹灰',
|
||
'廊道', '阀门井', '蓄水池', '泵站', '供水', '引水', '水源', '渠道', '闸门',
|
||
'围栏', '警示', '检修', '管线', '配电', '水池', '水塔', '取水', '净水',
|
||
]
|
||
|
||
_EXACT_FEE_CAT = frozenset([
|
||
'规费', '税金', '利润', '增值税', '暂列金额', '暂估价', '计日工',
|
||
'总承包服务费', '企业管理费', '价税合计',
|
||
'措施项目费', '其他项目费', '不可竞争费',
|
||
])
|
||
_FEE_CAT_KW = [
|
||
'措施项目费', '其他项目费', '不可竞争费',
|
||
'规费汇总', '税金汇总', '费率', '费用汇总', '费用合计',
|
||
'暂列金额', '暂估价', '计日工', '总承包服务费',
|
||
'安全文明施工费', '社会保障费', '住房公积金',
|
||
'工伤保险', '教育费附加', '城市维护建设税',
|
||
]
|
||
|
||
_SPEC_KW_RE = re.compile(
|
||
r'(材质|规格|型号|品牌|颜色|尺寸|厚度|直径|管径|强度|等级|类别|类型|做法|要求|标准|内容|工作内容|土壤|含量|配合比|工艺|方式|形式|范围|部位|位置|高度|宽度|长度|深度|坡度|截面|跨度|运距|开挖|回填|混凝土|钢筋|压实)[::]'
|
||
)
|
||
|
||
|
||
def _fold_dash_codes(line: str) -> str:
|
||
def repl(m: re.Match) -> str:
|
||
a, b, c, d = m.group(1), m.group(2), m.group(3), m.group(4) or ''
|
||
combined = a + b + c + d
|
||
if 9 <= len(combined) <= 12:
|
||
return combined
|
||
return m.group(0)
|
||
|
||
return _DASH_CODE.sub(repl, line)
|
||
|
||
|
||
def is_fee_item(name: str) -> bool:
|
||
if not name:
|
||
return False
|
||
n = re.sub(r'\s+', '', name)
|
||
if n in _EXACT_FEE_ITEM:
|
||
return True
|
||
for kw in _FEE_KW:
|
||
if kw in n:
|
||
return True
|
||
return False
|
||
|
||
|
||
def split_name_and_spec(raw_name: str) -> tuple[str, str]:
|
||
if not raw_name:
|
||
return '', ''
|
||
m = re.search(r'\d+[.、.)\uFF09]\s*[\u4e00-\u9fff]', raw_name)
|
||
if m and m.start() > 0:
|
||
return raw_name[:m.start()].strip(), raw_name[m.start():].strip()
|
||
kw = _SPEC_KW_RE.search(raw_name)
|
||
if kw and kw.start() > 0:
|
||
return raw_name[:kw.start()].strip(), raw_name[kw.start():].strip()
|
||
paren = re.search(r'[((]\d+[))]', raw_name)
|
||
if paren and paren.start() > 0:
|
||
return raw_name[:paren.start()].strip(), raw_name[paren.start():].strip()
|
||
return raw_name, ''
|
||
|
||
|
||
def is_cat_title(text: str) -> bool:
|
||
return any(k in text for k in _CAT_KW)
|
||
|
||
|
||
def is_fee_cat_title(text: str) -> bool:
|
||
if not text:
|
||
return False
|
||
t = re.sub(r'\s+', '', text)
|
||
if t in _EXACT_FEE_CAT:
|
||
return True
|
||
for kw in _FEE_CAT_KW:
|
||
if kw in t:
|
||
return True
|
||
return False
|
||
|
||
|
||
def _is_new_line_trigger(raw: str) -> bool:
|
||
if ITEM_START.match(raw):
|
||
return True
|
||
if CODE_START_RE.match(raw):
|
||
return True
|
||
if SEQ_CODE_RE.match(raw):
|
||
return True
|
||
for m in CATEGORY_MARKERS:
|
||
if raw.startswith(m + ' ') or raw.startswith(m + '\u3000'):
|
||
return True
|
||
return False
|
||
|
||
|
||
def parse_bill_text(text: str) -> dict[str, Any]:
|
||
raw_lines = []
|
||
for l in text.split('\n'):
|
||
line = l.replace('\t', ' ').strip()
|
||
line = _fold_dash_codes(line)
|
||
raw_lines.append(line)
|
||
|
||
logic_lines: list[str] = []
|
||
current_line = ''
|
||
|
||
for raw in raw_lines:
|
||
if not raw or PAGE_MARK.match(raw):
|
||
continue
|
||
if HEADER_RE.match(raw) or HEADER_KW.match(raw):
|
||
continue
|
||
if re.match(r'^(元)|^款章节号|^备注$|^第\d+页', raw):
|
||
continue
|
||
|
||
if _is_new_line_trigger(raw):
|
||
if current_line:
|
||
logic_lines.append(current_line)
|
||
current_line = raw
|
||
elif CODE_INLINE.search(raw) and len(raw) > 15:
|
||
if current_line:
|
||
logic_lines.append(current_line)
|
||
current_line = raw
|
||
else:
|
||
if current_line and len(current_line) > 300:
|
||
logic_lines.append(current_line)
|
||
current_line = raw
|
||
else:
|
||
current_line = current_line + ' ' + raw if current_line else raw
|
||
if current_line:
|
||
logic_lines.append(current_line)
|
||
|
||
logger.debug('合并后 %s 条逻辑行(原始 %s 行)', len(logic_lines), len(raw_lines))
|
||
|
||
categories: list[dict[str, Any]] = []
|
||
cur_cat: dict[str, Any] | None = None
|
||
cur_item: dict[str, Any] | None = None
|
||
|
||
for line in logic_lines:
|
||
if SKIP_RE.search(line):
|
||
continue
|
||
|
||
# 行首序号:多级如「1.1.1.1 」;或「1–4 位序号 + 空格 + 9 位以上编码」。
|
||
# 避免误删「行首即 9–12 位清单编码 + 空格」整段(JS 原 \d+(\.\d+)* 会吞掉编码)。
|
||
stripped = line.strip()
|
||
m_hier = re.match(r'^\d+(?:\.\d+)+\s+', stripped)
|
||
if m_hier:
|
||
stripped = stripped[m_hier.end():].strip()
|
||
elif re.match(r'^\d{1,4}\s+\d{9}', stripped):
|
||
stripped = re.sub(r'^\d{1,4}\s+', '', stripped, count=1).strip()
|
||
if not stripped:
|
||
stripped = line.strip()
|
||
if not stripped:
|
||
continue
|
||
|
||
cm = CODE_RE.search(stripped)
|
||
if cm:
|
||
if cur_item and cur_cat:
|
||
cur_cat['items'].append(cur_item)
|
||
if not cur_cat:
|
||
cur_cat = {'name': '未分类', 'items': []}
|
||
categories.append(cur_cat)
|
||
|
||
code = cm.group(1)
|
||
rest = stripped[cm.end():].strip()
|
||
name, unit, quantity, spec = '', '', '', ''
|
||
|
||
unit_match = UNIT_RE.search(rest)
|
||
if unit_match:
|
||
ui = rest.find(unit_match.group(0))
|
||
raw_name = rest[:ui].strip()
|
||
unit = unit_match.group(1)
|
||
after_unit = rest[ui + len(unit_match.group(0)):].strip()
|
||
qm = re.match(r'^([\d,.]+)', after_unit)
|
||
if qm:
|
||
quantity = qm.group(1)
|
||
tail = after_unit[qm.end():].strip()
|
||
if tail:
|
||
tail_tokens = tail.split()
|
||
si = 0
|
||
while si < len(tail_tokens) and re.match(r'^[\d,.%\-]+$', tail_tokens[si]):
|
||
si += 1
|
||
spec_tail = ' '.join(tail_tokens[si:]).strip()
|
||
if spec_tail:
|
||
spec = spec_tail
|
||
ns_name, ns_spec = split_name_and_spec(raw_name)
|
||
name = ns_name
|
||
if ns_spec:
|
||
spec = ns_spec + (';' + spec if spec else '')
|
||
else:
|
||
tokens = [t for t in rest.split() if t]
|
||
found_unit_idx = -1
|
||
for ti in range(len(tokens) - 1, 0, -1):
|
||
if tokens[ti] in UNIT_SET:
|
||
found_unit_idx = ti
|
||
break
|
||
if found_unit_idx >= 1:
|
||
raw_name_str = ' '.join(tokens[:found_unit_idx])
|
||
ns_name, ns_spec = split_name_and_spec(raw_name_str)
|
||
name = ns_name
|
||
if ns_spec:
|
||
spec = ns_spec
|
||
unit = tokens[found_unit_idx]
|
||
after_tokens = tokens[found_unit_idx + 1:]
|
||
if after_tokens and re.match(r'^[\d,.]+$', after_tokens[0]):
|
||
quantity = after_tokens[0]
|
||
si = 1
|
||
while si < len(after_tokens) and re.match(r'^[\d,.%\-]+$', after_tokens[si]):
|
||
si += 1
|
||
spec_tail = ' '.join(after_tokens[si:]).strip()
|
||
if spec_tail:
|
||
spec = spec + ';' + spec_tail if spec else spec_tail
|
||
else:
|
||
name = rest
|
||
|
||
name = re.sub(r'\s+', '', name).strip()
|
||
for u in UNIT_TOKENS:
|
||
if name.endswith(u) and len(name) > len(u):
|
||
unit = unit or u
|
||
name = name[: len(name) - len(u)]
|
||
break
|
||
|
||
cur_item = {'code': code, 'name': name, 'unit': unit, 'quantity': quantity, 'spec': spec}
|
||
continue
|
||
|
||
if len(stripped) > 4:
|
||
uni_match = UNIT_RE.search(stripped)
|
||
if uni_match:
|
||
ui = stripped.find(uni_match.group(0))
|
||
before_unit = stripped[:ui].strip()
|
||
after_unit = stripped[ui + len(uni_match.group(0)):].strip()
|
||
has_qty = bool(re.match(r'^[\d,.]+', after_unit))
|
||
if (
|
||
2 <= len(before_unit) <= 50
|
||
and has_qty
|
||
and re.search(r'[\u4e00-\u9fff]', before_unit)
|
||
):
|
||
if cur_item and cur_cat:
|
||
cur_cat['items'].append(cur_item)
|
||
if not cur_cat:
|
||
cur_cat = {'name': '未分类', 'items': []}
|
||
categories.append(cur_cat)
|
||
unit_fb = uni_match.group(1)
|
||
qm = re.match(r'^([\d,.]+)', after_unit)
|
||
quantity_fb = qm.group(1) if qm else ''
|
||
ns_name, ns_spec = split_name_and_spec(before_unit)
|
||
name_fb = re.sub(r'\s+', '', ns_name).strip()
|
||
spec_fb = ns_spec or ''
|
||
cur_item = {'code': '', 'name': name_fb, 'unit': unit_fb, 'quantity': quantity_fb, 'spec': spec_fb}
|
||
continue
|
||
|
||
if 2 < len(stripped) < 60 and not CODE_RE.search(stripped):
|
||
if UNIT_RE.search(stripped) and re.search(r'\d+\.?\d*\s*$', stripped):
|
||
if cur_item:
|
||
cur_item['spec'] = (cur_item.get('spec') or '') + (
|
||
';' + stripped if cur_item.get('spec') else stripped
|
||
)
|
||
continue
|
||
if is_cat_title(stripped) and not UNIT_RE.search(stripped) and not is_fee_cat_title(stripped):
|
||
if cur_item and cur_cat:
|
||
cur_cat['items'].append(cur_item)
|
||
cur_item = None
|
||
clean_title = re.sub(
|
||
r'\s+(座|个|项|处|m|km|段|条)\s+\d+[\d.]*\s*$', '', stripped
|
||
).strip()
|
||
cur_cat = {'name': clean_title, 'items': []}
|
||
categories.append(cur_cat)
|
||
continue
|
||
|
||
if re.match(r'^[一二三四五六七八九十]+\s', stripped) or re.match(
|
||
r'^([一二三四五六七八九十\d]+)', stripped
|
||
):
|
||
clean_title = re.sub(r'\s+(座|个|项|处)\s+\d+[\d.]*\s*$', '', stripped).strip()
|
||
if is_fee_cat_title(clean_title):
|
||
continue
|
||
if cur_item and cur_cat:
|
||
cur_cat['items'].append(cur_item)
|
||
cur_item = None
|
||
cur_cat = {'name': clean_title, 'items': []}
|
||
categories.append(cur_cat)
|
||
continue
|
||
|
||
if cur_item and len(stripped) > 1:
|
||
cur_item['spec'] = (cur_item.get('spec') or '') + (
|
||
';' + stripped if cur_item.get('spec') else stripped
|
||
)
|
||
|
||
if cur_item and cur_cat:
|
||
cur_cat['items'].append(cur_item)
|
||
|
||
fee_filtered = 0
|
||
for cat in categories:
|
||
if cat.get('items'):
|
||
before = len(cat['items'])
|
||
cat['items'] = [it for it in cat['items'] if not is_fee_item(it.get('name', ''))]
|
||
fee_filtered += before - len(cat['items'])
|
||
if fee_filtered:
|
||
logger.debug('费用项过滤: 移除 %s 项', fee_filtered)
|
||
|
||
total_before_merge = 0
|
||
total_after_merge = 0
|
||
for cat in categories:
|
||
items = cat.get('items') or []
|
||
if not items:
|
||
continue
|
||
total_before_merge += len(items)
|
||
name_map: dict[str, dict[str, Any]] = {}
|
||
for item in items:
|
||
key = re.sub(r'\s+', '', (item.get('name') or '')).strip()
|
||
if not key:
|
||
continue
|
||
if key not in name_map:
|
||
name_map[key] = {
|
||
'code': item.get('code') or '',
|
||
'name': item['name'],
|
||
'unit': item.get('unit') or '',
|
||
'quantity': item.get('quantity') or '',
|
||
'spec': item.get('spec') or '',
|
||
'_quantities': [item['quantity']] if item.get('quantity') else [],
|
||
'_specs': [item['spec']] if item.get('spec') else [],
|
||
}
|
||
else:
|
||
m = name_map[key]
|
||
if not m['code'] and item.get('code'):
|
||
m['code'] = item['code']
|
||
if not m['unit'] and item.get('unit'):
|
||
m['unit'] = item['unit']
|
||
if item.get('quantity'):
|
||
m['_quantities'].append(item['quantity'])
|
||
if item.get('spec') and item['spec'] not in m['_specs']:
|
||
m['_specs'].append(item['spec'])
|
||
|
||
merged_items: list[dict[str, str]] = []
|
||
for m in name_map.values():
|
||
qlist = m['_quantities']
|
||
if len(qlist) > 1:
|
||
nums = []
|
||
ok = True
|
||
for q in qlist:
|
||
try:
|
||
nums.append(float(q.replace(',', '')))
|
||
except ValueError:
|
||
ok = False
|
||
break
|
||
if ok:
|
||
s = sum(nums)
|
||
m['quantity'] = str(int(s)) if s % 1 == 0 else f'{s:.2f}'
|
||
else:
|
||
m['quantity'] = '; '.join(qlist)
|
||
elif len(qlist) == 1:
|
||
m['quantity'] = qlist[0]
|
||
|
||
if m['_specs']:
|
||
trimmed = [s[:120] + '...' if len(s) > 120 else s for s in m['_specs']]
|
||
m['spec'] = '; '.join(trimmed)
|
||
if len(m['spec']) > 300:
|
||
m['spec'] = m['spec'][:300] + '...'
|
||
for k in ('_quantities', '_specs'):
|
||
m.pop(k, None)
|
||
merged_items.append(
|
||
{k: m[k] for k in ('code', 'name', 'unit', 'quantity', 'spec')}
|
||
)
|
||
cat['items'] = merged_items
|
||
total_after_merge += len(merged_items)
|
||
|
||
merged_count = total_before_merge - total_after_merge
|
||
if merged_count > 0:
|
||
logger.debug('按名称合并: %s → %s 项', total_before_merge, total_after_merge)
|
||
|
||
valid = [c for c in categories if c.get('items')]
|
||
total_items = sum(len(c['items']) for c in valid)
|
||
logger.debug(
|
||
'最终结果: %s 分部, %s 清单项', len(valid), total_items
|
||
)
|
||
|
||
return {
|
||
'project_summary': {
|
||
'remark': f'本地解析:{len(valid)} 个分部,{total_items} 个清单项(合并前 {total_before_merge} 项)',
|
||
},
|
||
'categories': valid,
|
||
}
|
||
|
||
|
||
def filter_bill_pages(page_texts: list[str]) -> tuple[list[str], dict[str, Any]]:
|
||
"""
|
||
从按页文本中筛选工程量清单相关页;返回 (bill_page_texts, meta)。
|
||
"""
|
||
n = len(page_texts)
|
||
meta: dict[str, Any] = {'total_pages': n, 'scanned': False, 'no_bill_pages': False}
|
||
|
||
total_chars = sum(len(t or '') for t in page_texts)
|
||
if total_chars < 50:
|
||
meta['scanned'] = True
|
||
meta['reason'] = 'noText'
|
||
return [], meta
|
||
|
||
bill_flags = [False] * n
|
||
for i, t in enumerate(page_texts):
|
||
if not (t or '').strip():
|
||
continue
|
||
t = t or ''
|
||
h_hits = sum(1 for k in BILL_KW if k in t)
|
||
s_hit = any(k in t for k in SEC_KW)
|
||
has_code = bool(re.search(r'\d{9}', t))
|
||
if h_hits >= 2 or s_hit or has_code:
|
||
bill_flags[i] = True
|
||
|
||
first_bill = next((i for i, f in enumerate(bill_flags) if f), -1)
|
||
last_bill = max((i for i, f in enumerate(bill_flags) if f), default=-1)
|
||
if first_bill >= 0 and last_bill > first_bill:
|
||
for i in range(first_bill, last_bill + 1):
|
||
if bill_flags[i]:
|
||
continue
|
||
t = page_texts[i] or ''
|
||
if not t.strip() or len(t.strip()) <= 30:
|
||
continue
|
||
fee_hits = sum(1 for kw in FEE_PAGE_KW if kw in t)
|
||
if fee_hits >= 2 and not re.search(r'\d{9}', t):
|
||
continue
|
||
bill_flags[i] = True
|
||
|
||
bill_texts = [page_texts[i] for i in range(n) if bill_flags[i]]
|
||
if not bill_texts:
|
||
meta['no_bill_pages'] = True
|
||
|
||
meta['bill_page_indices'] = [i for i in range(n) if bill_flags[i]]
|
||
meta['bill_pages'] = len(bill_texts)
|
||
return bill_texts, meta
|
||
|
||
|
||
def analyze_boq_pages(page_texts: list[str]) -> dict[str, Any]:
|
||
"""
|
||
串联筛选 + parse_bill_text;返回结构含 _meta,供持久化与前端。
|
||
"""
|
||
total_pages = len(page_texts)
|
||
total_chars = sum(len(t or '') for t in page_texts)
|
||
|
||
if total_chars < 50:
|
||
return {
|
||
'scanned': True,
|
||
'reason': 'noText',
|
||
'totalPages': total_pages,
|
||
'project_summary': {'remark': '文本过少,疑似扫描件或未提取到文字'},
|
||
'categories': [],
|
||
'_meta': {
|
||
'method': 'python-local',
|
||
'total_pages': total_pages,
|
||
'bill_pages': 0,
|
||
},
|
||
}
|
||
|
||
bill_texts, fmeta = filter_bill_pages(page_texts)
|
||
if not bill_texts:
|
||
return {
|
||
'scanned': False,
|
||
'no_bill_pages': True,
|
||
'totalPages': total_pages,
|
||
'project_summary': {'remark': '未识别到清单相关页面'},
|
||
'categories': [],
|
||
'_meta': {
|
||
'method': 'python-local',
|
||
'total_pages': total_pages,
|
||
'bill_pages': 0,
|
||
**{k: fmeta[k] for k in ('no_bill_pages',) if k in fmeta},
|
||
},
|
||
}
|
||
|
||
merged = '\n'.join(bill_texts)
|
||
parsed = parse_bill_text(merged)
|
||
return {
|
||
'scanned': False,
|
||
**parsed,
|
||
'_meta': {
|
||
'method': 'python-local',
|
||
'total_pages': total_pages,
|
||
'bill_pages': len(bill_texts),
|
||
'bill_page_indices': fmeta.get('bill_page_indices', []),
|
||
},
|
||
}
|
||
|
||
|
||
def categories_to_prompt_appendix(
|
||
analysis: dict[str, Any],
|
||
max_chars: int = 3000,
|
||
max_per_cat: int = 40,
|
||
) -> str:
|
||
"""将本地解析结果压成短文本,注入 AI 摘要提示词。"""
|
||
cats = analysis.get('categories') or []
|
||
lines: list[str] = []
|
||
for cat in cats:
|
||
name = cat.get('name', '')
|
||
items = cat.get('items') or []
|
||
lines.append(f'【{name}】')
|
||
for it in items[:max_per_cat]:
|
||
code = it.get('code') or '-'
|
||
n = it.get('name') or ''
|
||
u = it.get('unit') or ''
|
||
q = it.get('quantity') or ''
|
||
lines.append(f' {code} {n} {u} {q}'.strip())
|
||
if len(items) > max_per_cat:
|
||
lines.append(f' …共 {len(items)} 条,此处省略其余')
|
||
text = '\n'.join(lines).strip()
|
||
if len(text) > max_chars:
|
||
return text[:max_chars] + '\n…(附录已截断)'
|
||
return text
|