139 lines
4.8 KiB
Python
139 lines
4.8 KiB
Python
"""
|
||
工程量清单解析模块:从 Excel / CSV / PDF / Word 文件中提取结构化文本。
|
||
"""
|
||
import csv
|
||
import logging
|
||
import re
|
||
from pathlib import Path
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# 最大返回字符数(送给 AI 做摘要时截断)
|
||
MAX_BOQ_CHARS = 12000
|
||
|
||
|
||
def extract_boq_text(file_path: str) -> str:
|
||
"""
|
||
从工程量清单文件提取原始结构化文本。
|
||
支持:.xlsx / .xls / .csv / .pdf / .docx / .doc
|
||
"""
|
||
ext = Path(file_path).suffix.lower()
|
||
if ext in ('.xlsx', '.xls'):
|
||
text = _extract_excel(file_path)
|
||
elif ext == '.csv':
|
||
text = _extract_csv(file_path)
|
||
elif ext == '.pdf':
|
||
from utils.file_utils import _extract_pdf
|
||
text = _extract_pdf(file_path)
|
||
elif ext == '.docx':
|
||
from utils.file_utils import _extract_docx
|
||
text = _extract_docx(file_path)
|
||
elif ext == '.doc':
|
||
from utils.file_utils import _extract_doc
|
||
text = _extract_doc(file_path)
|
||
else:
|
||
raise ValueError(f'不支持的文件格式 {ext},请使用 xlsx/xls/csv/pdf/docx/doc')
|
||
|
||
return text[:MAX_BOQ_CHARS]
|
||
|
||
|
||
def extract_boq_pages(file_path: str) -> list[str]:
|
||
"""
|
||
返回按「页」切分的清单文本:PDF 为每页一段;Excel/CSV/Word 为单元素全文。
|
||
"""
|
||
ext = Path(file_path).suffix.lower()
|
||
if ext == '.pdf':
|
||
from utils.file_utils import extract_pdf_pages
|
||
return extract_pdf_pages(file_path)
|
||
text = extract_boq_text(file_path)
|
||
return [text] if text else ['']
|
||
|
||
|
||
# ─── Excel ────────────────────────────────────────────────────────────────
|
||
|
||
def _extract_excel(file_path: str) -> str:
|
||
try:
|
||
import openpyxl
|
||
wb = openpyxl.load_workbook(file_path, data_only=True, read_only=True)
|
||
parts = []
|
||
for name in wb.sheetnames:
|
||
ws = wb[name]
|
||
block = _sheet_to_text(ws, name)
|
||
if block.strip():
|
||
parts.append(block)
|
||
wb.close()
|
||
return '\n\n'.join(parts)
|
||
except ImportError:
|
||
return _extract_xls_fallback(file_path)
|
||
except Exception as e:
|
||
raise RuntimeError(f'Excel 解析失败:{e}') from e
|
||
|
||
|
||
def _sheet_to_text(ws, sheet_name: str) -> str:
|
||
"""将一个 Sheet 转为管道分隔文本,自动过滤全空行和全空列。"""
|
||
raw_rows = []
|
||
for row in ws.iter_rows(values_only=True):
|
||
cells = ['' if v is None else str(v).strip() for v in row]
|
||
if any(cells):
|
||
raw_rows.append(cells)
|
||
|
||
if not raw_rows:
|
||
return ''
|
||
|
||
# 对齐列数
|
||
max_cols = max(len(r) for r in raw_rows)
|
||
raw_rows = [r + [''] * (max_cols - len(r)) for r in raw_rows]
|
||
|
||
# 找出有内容的列索引
|
||
active_cols = [j for j in range(max_cols)
|
||
if any(raw_rows[i][j] for i in range(len(raw_rows)))]
|
||
if not active_cols:
|
||
return ''
|
||
|
||
lines = [f'【{sheet_name}】']
|
||
for row in raw_rows:
|
||
line = ' | '.join(row[j] for j in active_cols)
|
||
if line.replace('|', '').strip():
|
||
lines.append(line)
|
||
return '\n'.join(lines)
|
||
|
||
|
||
def _extract_xls_fallback(file_path: str) -> str:
|
||
"""旧版 .xls 使用 xlrd 兜底(需安装 xlrd<2)"""
|
||
try:
|
||
import xlrd # type: ignore
|
||
wb = xlrd.open_workbook(file_path)
|
||
parts = []
|
||
for sheet in wb.sheets():
|
||
lines = [f'【{sheet.name}】']
|
||
for rx in range(sheet.nrows):
|
||
cells = [str(sheet.cell_value(rx, cx)).strip()
|
||
for cx in range(sheet.ncols)]
|
||
line = ' | '.join(c for c in cells if c)
|
||
if line:
|
||
lines.append(line)
|
||
parts.append('\n'.join(lines))
|
||
return '\n\n'.join(parts)
|
||
except Exception as e:
|
||
raise RuntimeError(f'.xls 解析失败,请另存为 .xlsx 后重试:{e}') from e
|
||
|
||
|
||
# ─── CSV ─────────────────────────────────────────────────────────────────
|
||
|
||
def _extract_csv(file_path: str) -> str:
|
||
encodings = ['utf-8-sig', 'gbk', 'utf-8', 'gb18030', 'latin-1']
|
||
for enc in encodings:
|
||
try:
|
||
lines = []
|
||
with open(file_path, 'r', encoding=enc, newline='') as f:
|
||
for row in csv.reader(f):
|
||
line = ' | '.join(c.strip() for c in row if c.strip())
|
||
if line:
|
||
lines.append(line)
|
||
return '\n'.join(lines)
|
||
except (UnicodeDecodeError, UnicodeError):
|
||
continue
|
||
except Exception as e:
|
||
raise RuntimeError(f'CSV 解析失败:{e}') from e
|
||
raise RuntimeError('CSV 文件编码不支持,请另存为 UTF-8 格式后重试')
|