2026-04-24 18:53:49 +08:00

139 lines
4.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
工程量清单解析模块:从 Excel / CSV / PDF / Word 文件中提取结构化文本。
"""
import csv
import logging
import re
from pathlib import Path
logger = logging.getLogger(__name__)
# 最大返回字符数(送给 AI 做摘要时截断)
MAX_BOQ_CHARS = 12000
def extract_boq_text(file_path: str) -> str:
"""
从工程量清单文件提取原始结构化文本。
支持:.xlsx / .xls / .csv / .pdf / .docx / .doc
"""
ext = Path(file_path).suffix.lower()
if ext in ('.xlsx', '.xls'):
text = _extract_excel(file_path)
elif ext == '.csv':
text = _extract_csv(file_path)
elif ext == '.pdf':
from utils.file_utils import _extract_pdf
text = _extract_pdf(file_path)
elif ext == '.docx':
from utils.file_utils import _extract_docx
text = _extract_docx(file_path)
elif ext == '.doc':
from utils.file_utils import _extract_doc
text = _extract_doc(file_path)
else:
raise ValueError(f'不支持的文件格式 {ext},请使用 xlsx/xls/csv/pdf/docx/doc')
return text[:MAX_BOQ_CHARS]
def extract_boq_pages(file_path: str) -> list[str]:
"""
返回按「页」切分的清单文本PDF 为每页一段Excel/CSV/Word 为单元素全文。
"""
ext = Path(file_path).suffix.lower()
if ext == '.pdf':
from utils.file_utils import extract_pdf_pages
return extract_pdf_pages(file_path)
text = extract_boq_text(file_path)
return [text] if text else ['']
# ─── Excel ────────────────────────────────────────────────────────────────
def _extract_excel(file_path: str) -> str:
try:
import openpyxl
wb = openpyxl.load_workbook(file_path, data_only=True, read_only=True)
parts = []
for name in wb.sheetnames:
ws = wb[name]
block = _sheet_to_text(ws, name)
if block.strip():
parts.append(block)
wb.close()
return '\n\n'.join(parts)
except ImportError:
return _extract_xls_fallback(file_path)
except Exception as e:
raise RuntimeError(f'Excel 解析失败:{e}') from e
def _sheet_to_text(ws, sheet_name: str) -> str:
"""将一个 Sheet 转为管道分隔文本,自动过滤全空行和全空列。"""
raw_rows = []
for row in ws.iter_rows(values_only=True):
cells = ['' if v is None else str(v).strip() for v in row]
if any(cells):
raw_rows.append(cells)
if not raw_rows:
return ''
# 对齐列数
max_cols = max(len(r) for r in raw_rows)
raw_rows = [r + [''] * (max_cols - len(r)) for r in raw_rows]
# 找出有内容的列索引
active_cols = [j for j in range(max_cols)
if any(raw_rows[i][j] for i in range(len(raw_rows)))]
if not active_cols:
return ''
lines = [f'{sheet_name}']
for row in raw_rows:
line = ' | '.join(row[j] for j in active_cols)
if line.replace('|', '').strip():
lines.append(line)
return '\n'.join(lines)
def _extract_xls_fallback(file_path: str) -> str:
"""旧版 .xls 使用 xlrd 兜底(需安装 xlrd<2"""
try:
import xlrd # type: ignore
wb = xlrd.open_workbook(file_path)
parts = []
for sheet in wb.sheets():
lines = [f'{sheet.name}']
for rx in range(sheet.nrows):
cells = [str(sheet.cell_value(rx, cx)).strip()
for cx in range(sheet.ncols)]
line = ' | '.join(c for c in cells if c)
if line:
lines.append(line)
parts.append('\n'.join(lines))
return '\n\n'.join(parts)
except Exception as e:
raise RuntimeError(f'.xls 解析失败,请另存为 .xlsx 后重试:{e}') from e
# ─── CSV ─────────────────────────────────────────────────────────────────
def _extract_csv(file_path: str) -> str:
encodings = ['utf-8-sig', 'gbk', 'utf-8', 'gb18030', 'latin-1']
for enc in encodings:
try:
lines = []
with open(file_path, 'r', encoding=enc, newline='') as f:
for row in csv.reader(f):
line = ' | '.join(c.strip() for c in row if c.strip())
if line:
lines.append(line)
return '\n'.join(lines)
except (UnicodeDecodeError, UnicodeError):
continue
except Exception as e:
raise RuntimeError(f'CSV 解析失败:{e}') from e
raise RuntimeError('CSV 文件编码不支持,请另存为 UTF-8 格式后重试')