2026-04-23 14:37:19 +08:00

578 lines
22 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
工程量清单本地分析(从 bill-worker.js Phase 2/3 移植)。
Phase 2按页关键字筛选清单页Phase 3正则解析分部与清单项。
"""
from __future__ import annotations
import logging
import re
from typing import Any
logger = logging.getLogger(__name__)
BILL_KW = ['项目编码', '项目名称', '工程量', '计量单位', '综合单价', '清单编码']
SEC_KW = ['分部分项', '分类分项', '措施项目', '其他项目', '工程量清单计价']
FEE_PAGE_KW = [
'规费', '税金', '社会保险费', '住房公积金', '养老保险',
'工伤保险', '失业保险', '医疗保险', '教育费附加', '城市维护建设税',
]
ITEM_START = re.compile(r'^\d+(\.\d+)+\s')
CODE_INLINE = re.compile(r'(?:^|\s)(\d{9,12}|(?<![A-Za-z])B\d{5,6})\s')
CODE_START_RE = re.compile(r'^(\d{9,12}|B\d{5,6})\s')
SEQ_CODE_RE = re.compile(r'^\d{1,4}\s+(\d{9,12}|(?<![A-Za-z])B\d{5,6})\s')
PAGE_MARK = re.compile(r'^--\s*\d+\s+of\s+\d+\s*--')
HEADER_RE = re.compile(r'^序号\s+(项目编码|项目名称)')
HEADER_KW = re.compile(
r'^(项目编码|项目名称|清单编码|计量单位|综合单价|工程量|合\s*价|金额|序号)\s'
)
CATEGORY_MARKERS = [
'', '', '', '', '', '', '', '', '', '',
'(一)', '(二)', '(三)', '(四)', '(五)',
]
# 编码:行内 912 位数字或 B 编码(排除字母前缀如 GB
CODE_RE = re.compile(r'(?<![A-Za-z])(\d{9,12}|(?<![A-Za-z])B\d{5,6})')
UNIT_TOKENS = [
'', '', 'm3', 'm2', 'km', 'hm2', '', '', 't', 'kg',
'', '', '', '', '', '', '', '', '', '', '',
'', '', '', '', '', '', '', '', '', '', '',
'', '', '', '', '延m', '', '', 'm',
]
UNIT_SET = frozenset(UNIT_TOKENS)
_unit_escaped = [re.escape(u) for u in UNIT_TOKENS]
UNIT_RE = re.compile(r'(?:^|\s)(' + '|'.join(_unit_escaped) + r')(?=\s|\d|$)')
SKIP_RE = re.compile(r'\s*计|小\s*计|本页小计|总\s*计|价税合计')
_DASH_CODE = re.compile(
r'(\d{2,4})[-](\d{2,4})[-](\d{2,4})(?:[-](\d{2,4}))?'
)
_EXACT_FEE_ITEM = frozenset([
'规费', '税金', '利润', '增值税', '暂列金额', '暂估价', '计日工',
'总承包服务费', '企业管理费', '甲供材料保管费', '价税合计',
])
_FEE_KW = [
'安全文明', '文明施工费', '环境保护费', '临时设施费',
'夜间施工增加费', '夜间施工费',
'冬雨季施工增加费', '冬雨季施工费',
'二次搬运费', '大型机械设备进出场', '大型机械进出场',
'施工排水降水', '排水降水费',
'已完工程及设备保护', '已完工程保护费',
'工程排污费', '社会保障费', '住房公积金',
'工伤保险', '劳动保险', '意外伤害保险', '建筑工程保险',
'城市维护建设税', '城市建设维护税',
'教育费附加', '地方教育附加',
'材料暂估', '专业工程暂估',
'超高施工增加费', '安全防护费',
'措施项目费', '其他项目费', '不可竞争费',
]
_CAT_KW = [
'土建', '建筑', '结构', '装饰', '装修', '安装', '给排水', '暖通', '空调', '通风',
'电气', '强电', '弱电', '消防', '智能化', '幕墙', '门窗', '园林', '绿化', '景观',
'市政', '道路', '桥梁', '管网', '基础', '地基', '桩基', '主体', '屋面', '防水',
'保温', '钢结构', '排水', '给水', '照明', '动力', '防雷', '电梯', '人防', '室外',
'附属', '分部', '工程', '措施', '清单', '土石方', '混凝土', '砌筑', '模板', '脚手架',
'水利', '河道', '管道', '阀门', '设备', '仪表', '自动化', '通信', '网络',
'拆除', '外墙', '内墙', '楼地面', '天棚', '吊顶', '栏杆', '屋顶', '涂料', '抹灰',
'廊道', '阀门井', '蓄水池', '泵站', '供水', '引水', '水源', '渠道', '闸门',
'围栏', '警示', '检修', '管线', '配电', '水池', '水塔', '取水', '净水',
]
_EXACT_FEE_CAT = frozenset([
'规费', '税金', '利润', '增值税', '暂列金额', '暂估价', '计日工',
'总承包服务费', '企业管理费', '价税合计',
'措施项目费', '其他项目费', '不可竞争费',
])
_FEE_CAT_KW = [
'措施项目费', '其他项目费', '不可竞争费',
'规费汇总', '税金汇总', '费率', '费用汇总', '费用合计',
'暂列金额', '暂估价', '计日工', '总承包服务费',
'安全文明施工费', '社会保障费', '住房公积金',
'工伤保险', '教育费附加', '城市维护建设税',
]
_SPEC_KW_RE = re.compile(
r'(材质|规格|型号|品牌|颜色|尺寸|厚度|直径|管径|强度|等级|类别|类型|做法|要求|标准|内容|工作内容|土壤|含量|配合比|工艺|方式|形式|范围|部位|位置|高度|宽度|长度|深度|坡度|截面|跨度|运距|开挖|回填|混凝土|钢筋|压实)[:]'
)
def _fold_dash_codes(line: str) -> str:
def repl(m: re.Match) -> str:
a, b, c, d = m.group(1), m.group(2), m.group(3), m.group(4) or ''
combined = a + b + c + d
if 9 <= len(combined) <= 12:
return combined
return m.group(0)
return _DASH_CODE.sub(repl, line)
def is_fee_item(name: str) -> bool:
if not name:
return False
n = re.sub(r'\s+', '', name)
if n in _EXACT_FEE_ITEM:
return True
for kw in _FEE_KW:
if kw in n:
return True
return False
def split_name_and_spec(raw_name: str) -> tuple[str, str]:
if not raw_name:
return '', ''
m = re.search(r'\d+[.、.)\uFF09]\s*[\u4e00-\u9fff]', raw_name)
if m and m.start() > 0:
return raw_name[:m.start()].strip(), raw_name[m.start():].strip()
kw = _SPEC_KW_RE.search(raw_name)
if kw and kw.start() > 0:
return raw_name[:kw.start()].strip(), raw_name[kw.start():].strip()
paren = re.search(r'[(]\d+[)]', raw_name)
if paren and paren.start() > 0:
return raw_name[:paren.start()].strip(), raw_name[paren.start():].strip()
return raw_name, ''
def is_cat_title(text: str) -> bool:
return any(k in text for k in _CAT_KW)
def is_fee_cat_title(text: str) -> bool:
if not text:
return False
t = re.sub(r'\s+', '', text)
if t in _EXACT_FEE_CAT:
return True
for kw in _FEE_CAT_KW:
if kw in t:
return True
return False
def _is_new_line_trigger(raw: str) -> bool:
if ITEM_START.match(raw):
return True
if CODE_START_RE.match(raw):
return True
if SEQ_CODE_RE.match(raw):
return True
for m in CATEGORY_MARKERS:
if raw.startswith(m + ' ') or raw.startswith(m + '\u3000'):
return True
return False
def parse_bill_text(text: str) -> dict[str, Any]:
raw_lines = []
for l in text.split('\n'):
line = l.replace('\t', ' ').strip()
line = _fold_dash_codes(line)
raw_lines.append(line)
logic_lines: list[str] = []
current_line = ''
for raw in raw_lines:
if not raw or PAGE_MARK.match(raw):
continue
if HEADER_RE.match(raw) or HEADER_KW.match(raw):
continue
if re.match(r'^(元)|^款章节号|^备注$|^第\d+页', raw):
continue
if _is_new_line_trigger(raw):
if current_line:
logic_lines.append(current_line)
current_line = raw
elif CODE_INLINE.search(raw) and len(raw) > 15:
if current_line:
logic_lines.append(current_line)
current_line = raw
else:
if current_line and len(current_line) > 300:
logic_lines.append(current_line)
current_line = raw
else:
current_line = current_line + ' ' + raw if current_line else raw
if current_line:
logic_lines.append(current_line)
logger.debug('合并后 %s 条逻辑行(原始 %s 行)', len(logic_lines), len(raw_lines))
categories: list[dict[str, Any]] = []
cur_cat: dict[str, Any] | None = None
cur_item: dict[str, Any] | None = None
for line in logic_lines:
if SKIP_RE.search(line):
continue
# 行首序号多级如「1.1.1.1 」或「14 位序号 + 空格 + 9 位以上编码」。
# 避免误删「行首即 912 位清单编码 + 空格」整段JS 原 \d+(\.\d+)* 会吞掉编码)。
stripped = line.strip()
m_hier = re.match(r'^\d+(?:\.\d+)+\s+', stripped)
if m_hier:
stripped = stripped[m_hier.end():].strip()
elif re.match(r'^\d{1,4}\s+\d{9}', stripped):
stripped = re.sub(r'^\d{1,4}\s+', '', stripped, count=1).strip()
if not stripped:
stripped = line.strip()
if not stripped:
continue
cm = CODE_RE.search(stripped)
if cm:
if cur_item and cur_cat:
cur_cat['items'].append(cur_item)
if not cur_cat:
cur_cat = {'name': '未分类', 'items': []}
categories.append(cur_cat)
code = cm.group(1)
rest = stripped[cm.end():].strip()
name, unit, quantity, spec = '', '', '', ''
unit_match = UNIT_RE.search(rest)
if unit_match:
ui = rest.find(unit_match.group(0))
raw_name = rest[:ui].strip()
unit = unit_match.group(1)
after_unit = rest[ui + len(unit_match.group(0)):].strip()
qm = re.match(r'^([\d,.]+)', after_unit)
if qm:
quantity = qm.group(1)
tail = after_unit[qm.end():].strip()
if tail:
tail_tokens = tail.split()
si = 0
while si < len(tail_tokens) and re.match(r'^[\d,.%\-]+$', tail_tokens[si]):
si += 1
spec_tail = ' '.join(tail_tokens[si:]).strip()
if spec_tail:
spec = spec_tail
ns_name, ns_spec = split_name_and_spec(raw_name)
name = ns_name
if ns_spec:
spec = ns_spec + (';' + spec if spec else '')
else:
tokens = [t for t in rest.split() if t]
found_unit_idx = -1
for ti in range(len(tokens) - 1, 0, -1):
if tokens[ti] in UNIT_SET:
found_unit_idx = ti
break
if found_unit_idx >= 1:
raw_name_str = ' '.join(tokens[:found_unit_idx])
ns_name, ns_spec = split_name_and_spec(raw_name_str)
name = ns_name
if ns_spec:
spec = ns_spec
unit = tokens[found_unit_idx]
after_tokens = tokens[found_unit_idx + 1:]
if after_tokens and re.match(r'^[\d,.]+$', after_tokens[0]):
quantity = after_tokens[0]
si = 1
while si < len(after_tokens) and re.match(r'^[\d,.%\-]+$', after_tokens[si]):
si += 1
spec_tail = ' '.join(after_tokens[si:]).strip()
if spec_tail:
spec = spec + ';' + spec_tail if spec else spec_tail
else:
name = rest
name = re.sub(r'\s+', '', name).strip()
for u in UNIT_TOKENS:
if name.endswith(u) and len(name) > len(u):
unit = unit or u
name = name[: len(name) - len(u)]
break
cur_item = {'code': code, 'name': name, 'unit': unit, 'quantity': quantity, 'spec': spec}
continue
if len(stripped) > 4:
uni_match = UNIT_RE.search(stripped)
if uni_match:
ui = stripped.find(uni_match.group(0))
before_unit = stripped[:ui].strip()
after_unit = stripped[ui + len(uni_match.group(0)):].strip()
has_qty = bool(re.match(r'^[\d,.]+', after_unit))
if (
2 <= len(before_unit) <= 50
and has_qty
and re.search(r'[\u4e00-\u9fff]', before_unit)
):
if cur_item and cur_cat:
cur_cat['items'].append(cur_item)
if not cur_cat:
cur_cat = {'name': '未分类', 'items': []}
categories.append(cur_cat)
unit_fb = uni_match.group(1)
qm = re.match(r'^([\d,.]+)', after_unit)
quantity_fb = qm.group(1) if qm else ''
ns_name, ns_spec = split_name_and_spec(before_unit)
name_fb = re.sub(r'\s+', '', ns_name).strip()
spec_fb = ns_spec or ''
cur_item = {'code': '', 'name': name_fb, 'unit': unit_fb, 'quantity': quantity_fb, 'spec': spec_fb}
continue
if 2 < len(stripped) < 60 and not CODE_RE.search(stripped):
if UNIT_RE.search(stripped) and re.search(r'\d+\.?\d*\s*$', stripped):
if cur_item:
cur_item['spec'] = (cur_item.get('spec') or '') + (
';' + stripped if cur_item.get('spec') else stripped
)
continue
if is_cat_title(stripped) and not UNIT_RE.search(stripped) and not is_fee_cat_title(stripped):
if cur_item and cur_cat:
cur_cat['items'].append(cur_item)
cur_item = None
clean_title = re.sub(
r'\s+(座|个|项|处|m|km|段|条)\s+\d+[\d.]*\s*$', '', stripped
).strip()
cur_cat = {'name': clean_title, 'items': []}
categories.append(cur_cat)
continue
if re.match(r'^[一二三四五六七八九十]+\s', stripped) or re.match(
r'^[一二三四五六七八九十\d]+', stripped
):
clean_title = re.sub(r'\s+(座|个|项|处)\s+\d+[\d.]*\s*$', '', stripped).strip()
if is_fee_cat_title(clean_title):
continue
if cur_item and cur_cat:
cur_cat['items'].append(cur_item)
cur_item = None
cur_cat = {'name': clean_title, 'items': []}
categories.append(cur_cat)
continue
if cur_item and len(stripped) > 1:
cur_item['spec'] = (cur_item.get('spec') or '') + (
';' + stripped if cur_item.get('spec') else stripped
)
if cur_item and cur_cat:
cur_cat['items'].append(cur_item)
fee_filtered = 0
for cat in categories:
if cat.get('items'):
before = len(cat['items'])
cat['items'] = [it for it in cat['items'] if not is_fee_item(it.get('name', ''))]
fee_filtered += before - len(cat['items'])
if fee_filtered:
logger.debug('费用项过滤: 移除 %s', fee_filtered)
total_before_merge = 0
total_after_merge = 0
for cat in categories:
items = cat.get('items') or []
if not items:
continue
total_before_merge += len(items)
name_map: dict[str, dict[str, Any]] = {}
for item in items:
key = re.sub(r'\s+', '', (item.get('name') or '')).strip()
if not key:
continue
if key not in name_map:
name_map[key] = {
'code': item.get('code') or '',
'name': item['name'],
'unit': item.get('unit') or '',
'quantity': item.get('quantity') or '',
'spec': item.get('spec') or '',
'_quantities': [item['quantity']] if item.get('quantity') else [],
'_specs': [item['spec']] if item.get('spec') else [],
}
else:
m = name_map[key]
if not m['code'] and item.get('code'):
m['code'] = item['code']
if not m['unit'] and item.get('unit'):
m['unit'] = item['unit']
if item.get('quantity'):
m['_quantities'].append(item['quantity'])
if item.get('spec') and item['spec'] not in m['_specs']:
m['_specs'].append(item['spec'])
merged_items: list[dict[str, str]] = []
for m in name_map.values():
qlist = m['_quantities']
if len(qlist) > 1:
nums = []
ok = True
for q in qlist:
try:
nums.append(float(q.replace(',', '')))
except ValueError:
ok = False
break
if ok:
s = sum(nums)
m['quantity'] = str(int(s)) if s % 1 == 0 else f'{s:.2f}'
else:
m['quantity'] = '; '.join(qlist)
elif len(qlist) == 1:
m['quantity'] = qlist[0]
if m['_specs']:
trimmed = [s[:120] + '...' if len(s) > 120 else s for s in m['_specs']]
m['spec'] = '; '.join(trimmed)
if len(m['spec']) > 300:
m['spec'] = m['spec'][:300] + '...'
for k in ('_quantities', '_specs'):
m.pop(k, None)
merged_items.append(
{k: m[k] for k in ('code', 'name', 'unit', 'quantity', 'spec')}
)
cat['items'] = merged_items
total_after_merge += len(merged_items)
merged_count = total_before_merge - total_after_merge
if merged_count > 0:
logger.debug('按名称合并: %s%s', total_before_merge, total_after_merge)
valid = [c for c in categories if c.get('items')]
total_items = sum(len(c['items']) for c in valid)
logger.debug(
'最终结果: %s 分部, %s 清单项', len(valid), total_items
)
return {
'project_summary': {
'remark': f'本地解析:{len(valid)} 个分部,{total_items} 个清单项(合并前 {total_before_merge} 项)',
},
'categories': valid,
}
def filter_bill_pages(page_texts: list[str]) -> tuple[list[str], dict[str, Any]]:
"""
从按页文本中筛选工程量清单相关页;返回 (bill_page_texts, meta)。
"""
n = len(page_texts)
meta: dict[str, Any] = {'total_pages': n, 'scanned': False, 'no_bill_pages': False}
total_chars = sum(len(t or '') for t in page_texts)
if total_chars < 50:
meta['scanned'] = True
meta['reason'] = 'noText'
return [], meta
bill_flags = [False] * n
for i, t in enumerate(page_texts):
if not (t or '').strip():
continue
t = t or ''
h_hits = sum(1 for k in BILL_KW if k in t)
s_hit = any(k in t for k in SEC_KW)
has_code = bool(re.search(r'\d{9}', t))
if h_hits >= 2 or s_hit or has_code:
bill_flags[i] = True
first_bill = next((i for i, f in enumerate(bill_flags) if f), -1)
last_bill = max((i for i, f in enumerate(bill_flags) if f), default=-1)
if first_bill >= 0 and last_bill > first_bill:
for i in range(first_bill, last_bill + 1):
if bill_flags[i]:
continue
t = page_texts[i] or ''
if not t.strip() or len(t.strip()) <= 30:
continue
fee_hits = sum(1 for kw in FEE_PAGE_KW if kw in t)
if fee_hits >= 2 and not re.search(r'\d{9}', t):
continue
bill_flags[i] = True
bill_texts = [page_texts[i] for i in range(n) if bill_flags[i]]
if not bill_texts:
meta['no_bill_pages'] = True
meta['bill_page_indices'] = [i for i in range(n) if bill_flags[i]]
meta['bill_pages'] = len(bill_texts)
return bill_texts, meta
def analyze_boq_pages(page_texts: list[str]) -> dict[str, Any]:
"""
串联筛选 + parse_bill_text返回结构含 _meta供持久化与前端。
"""
total_pages = len(page_texts)
total_chars = sum(len(t or '') for t in page_texts)
if total_chars < 50:
return {
'scanned': True,
'reason': 'noText',
'totalPages': total_pages,
'project_summary': {'remark': '文本过少,疑似扫描件或未提取到文字'},
'categories': [],
'_meta': {
'method': 'python-local',
'total_pages': total_pages,
'bill_pages': 0,
},
}
bill_texts, fmeta = filter_bill_pages(page_texts)
if not bill_texts:
return {
'scanned': False,
'no_bill_pages': True,
'totalPages': total_pages,
'project_summary': {'remark': '未识别到清单相关页面'},
'categories': [],
'_meta': {
'method': 'python-local',
'total_pages': total_pages,
'bill_pages': 0,
**{k: fmeta[k] for k in ('no_bill_pages',) if k in fmeta},
},
}
merged = '\n'.join(bill_texts)
parsed = parse_bill_text(merged)
return {
'scanned': False,
**parsed,
'_meta': {
'method': 'python-local',
'total_pages': total_pages,
'bill_pages': len(bill_texts),
'bill_page_indices': fmeta.get('bill_page_indices', []),
},
}
def categories_to_prompt_appendix(
analysis: dict[str, Any],
max_chars: int = 3000,
max_per_cat: int = 40,
) -> str:
"""将本地解析结果压成短文本,注入 AI 摘要提示词。"""
cats = analysis.get('categories') or []
lines: list[str] = []
for cat in cats:
name = cat.get('name', '')
items = cat.get('items') or []
lines.append(f'{name}')
for it in items[:max_per_cat]:
code = it.get('code') or '-'
n = it.get('name') or ''
u = it.get('unit') or ''
q = it.get('quantity') or ''
lines.append(f' {code} {n} {u} {q}'.strip())
if len(items) > max_per_cat:
lines.append(f' …共 {len(items)} 条,此处省略其余')
text = '\n'.join(lines).strip()
if len(text) > max_chars:
return text[:max_chars] + '\n…(附录已截断)'
return text