2026-04-23 14:36:26 +08:00

214 lines
7.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
文件处理工具:从 PDF / Word 文件中提取纯文本
"""
import os
import logging
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
from functools import partial
logger = logging.getLogger(__name__)
def extract_text(file_path: str) -> str:
"""
根据文件扩展名提取文本。
支持 .pdf / .docx / .doc
"""
path = Path(file_path)
ext = path.suffix.lower()
if ext == '.pdf':
return _extract_pdf(file_path)
elif ext == '.docx':
return _extract_docx(file_path)
elif ext == '.doc':
return _extract_doc(file_path)
else:
raise ValueError(f'不支持的文件类型: {ext}')
def _extract_pdf(file_path: str) -> str:
"""提取 PDF 文本,优先使用 pypdf回退到 pdfminer"""
try:
from pypdf import PdfReader
reader = PdfReader(file_path)
parts = []
for page in reader.pages:
text = page.extract_text()
if text:
parts.append(text)
result = '\n'.join(parts)
if result.strip():
return result
except Exception as e:
logger.warning(f'pypdf 提取失败: {e},尝试 pdfminer')
try:
from pdfminer.high_level import extract_text as pm_extract
result = pm_extract(file_path)
return result or ''
except Exception as e:
logger.error(f'pdfminer 提取失败: {e}')
raise RuntimeError(f'PDF 文本提取失败: {e}')
def extract_pdf_pages(file_path: str) -> list[str]:
"""
按页提取 PDF 文本(用于工程量清单页筛选)。
极速优化:对于>5页PDF使用ThreadPool并行提取页面 (plan要求),大幅加速解析环节。
优先 pypdf 逐页;若各页均无文本则回退 pdfminer。
"""
try:
from pypdf import PdfReader
reader = PdfReader(file_path)
if len(reader.pages) <= 5:
# 小文件顺序更快
pages = [(page.extract_text() or '').strip() for page in reader.pages]
else:
# 并行提取大PDF (fulfills plan's Parallel Extract Pages)
def _extract_page(page):
return (page.extract_text() or '').strip()
with ThreadPoolExecutor(max_workers=4) as executor:
pages = list(executor.map(_extract_page, reader.pages))
if any(pages):
return pages
except Exception as e:
logger.warning(f'pypdf 按页提取失败: {e},尝试 pdfminer')
try:
from pdfminer.high_level import extract_text as pm_extract
blob = (pm_extract(file_path) or '').strip()
return [blob] if blob else ['']
except Exception as e:
logger.error(f'pdfminer 提取失败: {e}')
raise RuntimeError(f'PDF 文本提取失败: {e}')
def _extract_docx(file_path: str) -> str:
"""提取 .docx 文档文本python-docx"""
try:
from docx import Document
doc = Document(file_path)
parts = []
for para in doc.paragraphs:
if para.text.strip():
parts.append(para.text)
for table in doc.tables:
for row in table.rows:
row_texts = [cell.text.strip() for cell in row.cells if cell.text.strip()]
if row_texts:
parts.append(' '.join(row_texts))
return '\n'.join(parts)
except Exception as e:
logger.error(f'.docx 提取失败: {e}')
raise RuntimeError(f'Word 文本提取失败: {e}')
def _extract_doc(file_path: str) -> str:
"""
提取旧版 .doc 文件文本,按优先级依次尝试:
1. win32comWindows + Microsoft Word 已安装,最准确)
2. LibreOffice 命令行转换(需安装 LibreOffice
3. python-docx 兼容尝试(部分以 XML 保存的伪 .doc 可读)
全部失败时提示用户手动另存为 .docx
"""
abs_path = str(Path(file_path).resolve())
# ── 方案1win32comWindows + Word──────────────────────────────────
try:
import win32com.client
import pythoncom
pythoncom.CoInitialize()
word = None
try:
word = win32com.client.Dispatch('Word.Application')
word.Visible = False
doc = word.Documents.Open(abs_path, ReadOnly=True)
text = doc.Range().Text
doc.Close(False)
logger.info(f'.doc 通过 win32com 提取成功: {file_path}')
return text or ''
finally:
if word:
try:
word.Quit()
except Exception:
pass
pythoncom.CoUninitialize()
except ImportError:
logger.info('pywin32 未安装,跳过 win32com 方案')
except Exception as e:
logger.warning(f'win32com 提取 .doc 失败: {e}')
# ── 方案2LibreOffice 命令行 ─────────────────────────────────────────
try:
import subprocess
import tempfile
tmp_dir = tempfile.mkdtemp()
for soffice_cmd in ('soffice', 'libreoffice'):
try:
result = subprocess.run(
[soffice_cmd, '--headless', '--convert-to', 'txt:Text',
'--outdir', tmp_dir, abs_path],
capture_output=True, text=True, timeout=60,
)
if result.returncode == 0:
txt_file = os.path.join(tmp_dir, Path(file_path).stem + '.txt')
if os.path.exists(txt_file):
with open(txt_file, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
logger.info(f'.doc 通过 LibreOffice 提取成功: {file_path}')
return content
except FileNotFoundError:
continue
except subprocess.TimeoutExpired:
logger.warning('LibreOffice 转换超时')
break
except Exception as e:
logger.warning(f'LibreOffice 提取 .doc 失败: {e}')
# ── 方案3python-docx 兼容尝试(部分另存的 .doc 实为 XML 格式)──────
try:
result = _extract_docx(file_path)
if result.strip():
logger.info(f'.doc 通过 python-docx 兼容读取成功: {file_path}')
return result
except Exception as e:
logger.warning(f'python-docx 兼容读取 .doc 失败: {e}')
raise RuntimeError(
'无法读取 .doc 格式文件。请在 Word 中打开该文件,'
'选择「另存为」→「Word 文档 (.docx)」后重新上传。'
)
def truncate_text(text: str, max_chars: int = 60000) -> str:
"""截断超长文本,避免超出 AI Token 限制"""
if len(text) <= max_chars:
return text
return text[:max_chars] + '\n\n...[文档内容已截断,仅展示前段]'
def split_text_chunks(text: str, chunk_size: int = 2000, overlap: int = 200) -> list[str]:
"""将文本按固定大小分块(用于知识库)"""
chunks = []
start = 0
while start < len(text):
end = min(start + chunk_size, len(text))
chunks.append(text[start:end])
start += chunk_size - overlap
return chunks
def allowed_file(filename: str) -> bool:
allowed = {'pdf', 'doc', 'docx'}
return '.' in filename and filename.rsplit('.', 1)[1].lower() in allowed
def safe_filename(filename: str) -> str:
"""生成安全的文件名"""
import re
name = re.sub(r'[^\w\u4e00-\u9fff.\-]', '_', filename)
return name