MedResearcher/src/parse.py
iomgaa 099159dfb7 feat: 新增PDF解析功能模块
- pdf_parser.py: PDF解析主程序,支持命令行参数和并发处理
- src/parse.py: PDF解析核心模块,提供PDFParser类
  * 支持OCR API调用,将PDF转换为Markdown格式
  * 内置HTTP会话管理、连接池优化和重试机制
  * 支持并发处理和详细进度显示
  * 完善的错误处理和日志记录功能
2025-08-24 15:07:42 +08:00

451 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""PDF解析模块
该模块提供PDFParser类用于将PDF文件通过OCR API转换为Markdown格式。
支持并发处理、进度显示、错误处理等功能。
"""
import requests
import logging
import os
import time
import zipfile
import tempfile
import re
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict, Optional, Tuple
class PDFParser:
"""PDF解析类 - 用于将PDF文件转换为Markdown格式"""
def __init__(self, pdf_dir: str = "dataset/pdfs", parallel: int = 3,
markdown_dir: str = "dataset/markdowns"):
"""初始化解析器配置
Args:
pdf_dir (str): PDF文件目录默认dataset/pdfs
parallel (int): 并发处理数默认3降低并发以避免服务器过载
markdown_dir (str): Markdown输出目录默认dataset/markdowns
"""
self.pdf_dir = Path(pdf_dir)
self.parallel = parallel
self.markdown_dir = Path(markdown_dir)
# OCR API配置
self.ocr_api_url = "http://100.106.4.14:7861/parse"
# HTTP会话配置增加连接池大小和超时时间
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'MedResearcher-PDFParser/1.0'
})
# 配置连接池适配器(增加连接池大小)
adapter = HTTPAdapter(
pool_connections=10, # 连接池数量
pool_maxsize=20, # 最大连接数
max_retries=0 # 禁用自动重试,使用自定义重试逻辑
)
self.session.mount('http://', adapter)
self.session.mount('https://', adapter)
# 配置日志
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
def _scan_pdf_files(self) -> List[Path]:
"""扫描PDF文件目录获取所有PDF文件
Returns:
List[Path]: PDF文件路径列表
Raises:
FileNotFoundError: PDF目录不存在
"""
if not self.pdf_dir.exists():
raise FileNotFoundError(f"PDF目录不存在: {self.pdf_dir}")
pdf_files = []
for pdf_file in self.pdf_dir.glob("*.pdf"):
if pdf_file.is_file():
pdf_files.append(pdf_file)
logging.info(f"发现 {len(pdf_files)} 个PDF文件待处理")
return pdf_files
def _prepare_output_dir(self) -> Path:
"""准备Markdown输出目录
Returns:
Path: Markdown输出目录路径
"""
self.markdown_dir.mkdir(parents=True, exist_ok=True)
logging.info(f"Markdown输出目录已准备: {self.markdown_dir}")
return self.markdown_dir
def _call_ocr_api(self, pdf_file: Path) -> Optional[Dict]:
"""调用OCR API解析PDF文件
Args:
pdf_file (Path): PDF文件路径
Returns:
Optional[Dict]: API响应数据失败时返回None
"""
try:
with open(pdf_file, 'rb') as f:
files = {
'file': (pdf_file.name, f, 'application/pdf')
}
response = self._make_request_with_retry(
self.ocr_api_url,
files=files,
timeout=1800 # 增加到3分钟匹配服务器处理时间
)
if response.status_code == 200:
response_data = response.json()
if response_data.get('success', False):
logging.debug(f"OCR API调用成功: {pdf_file.name}")
return response_data
else:
logging.warning(f"OCR API处理失败: {pdf_file.name} - {response_data.get('message', 'Unknown error')}")
return None
else:
logging.error(f"OCR API请求失败状态码: {response.status_code} - {pdf_file.name}")
return None
except Exception as e:
logging.error(f"调用OCR API时发生错误: {pdf_file.name} - {e}")
return None
def _download_and_extract_zip(self, download_url: str, pdf_file: Path) -> bool:
"""从API响应中下载ZIP文件并解压到子文件夹
Args:
download_url (str): 完整的下载URL
pdf_file (Path): 原始PDF文件路径用于生成输出文件夹名
Returns:
bool: 下载和解压是否成功
"""
try:
# 下载ZIP文件
response = self._make_request_with_retry(download_url, timeout=60)
if response.status_code != 200:
logging.error(f"下载ZIP失败状态码: {response.status_code} - {pdf_file.name}")
return False
# 创建以PDF文件名命名的输出子文件夹
output_subdir = self.markdown_dir / pdf_file.stem
output_subdir.mkdir(parents=True, exist_ok=True)
# 使用临时文件保存ZIP内容
with tempfile.NamedTemporaryFile(suffix='.zip', delete=False) as temp_zip:
temp_zip.write(response.content)
temp_zip_path = temp_zip.name
try:
# 解压ZIP文件到输出子文件夹
with zipfile.ZipFile(temp_zip_path, 'r') as zip_ref:
zip_ref.extractall(output_subdir)
logging.debug(f"ZIP文件解压成功: {pdf_file.name} -> {output_subdir}")
# 清洗解压后的Markdown文件
if not self._clean_markdown_files(output_subdir):
logging.warning(f"Markdown文件清洗失败但解压成功: {pdf_file.name}")
return True
finally:
# 清理临时ZIP文件
os.unlink(temp_zip_path)
except zipfile.BadZipFile as e:
logging.error(f"ZIP文件损坏: {pdf_file.name} - {e}")
return False
except Exception as e:
logging.error(f"下载或解压ZIP时发生错误: {pdf_file.name} - {e}")
return False
def _clean_markdown_files(self, output_subdir: Path) -> bool:
"""清洗输出目录中的Markdown文件去除数字编号和空行
Args:
output_subdir (Path): 包含Markdown文件的输出子目录
Returns:
bool: 清洗是否成功
"""
try:
# 查找所有.md文件
md_files = list(output_subdir.glob("*.md"))
if not md_files:
logging.debug(f"未找到Markdown文件进行清洗: {output_subdir}")
return True
for md_file in md_files:
try:
# 读取原文件内容
with open(md_file, 'r', encoding='utf-8') as f:
lines = f.readlines()
# 清洗每一行
cleaned_lines = []
for line in lines:
# 去除行尾换行符
line_content = line.rstrip('\n\r')
# 跳过纯数字行(如 "2", "30"
if re.match(r'^\d+$', line_content):
continue
# 跳过数字+空格行(如 "30 "
if re.match(r'^\d+\s*$', line_content):
continue
# 去除行首的数字+空格模式(如 "1 Title:" -> "Title:"
cleaned_line = re.sub(r'^\d+\s+', '', line_content)
# 如果清洗后行不为空,则保留
if cleaned_line.strip():
cleaned_lines.append(cleaned_line + '\n')
else:
# 保留空行以维护文档结构
cleaned_lines.append('\n')
# 写回清洗后的内容
with open(md_file, 'w', encoding='utf-8') as f:
f.writelines(cleaned_lines)
logging.debug(f"Markdown文件清洗完成: {md_file.name}")
except Exception as e:
logging.error(f"清洗Markdown文件时发生错误: {md_file.name} - {e}")
return False
logging.info(f"成功清洗 {len(md_files)} 个Markdown文件: {output_subdir}")
return True
except Exception as e:
logging.error(f"清洗Markdown文件时发生错误: {output_subdir} - {e}")
return False
def _process_single_pdf(self, pdf_file: Path) -> bool:
"""处理单个PDF文件的完整流程
Args:
pdf_file (Path): PDF文件路径
Returns:
bool: 处理是否成功
"""
try:
# 检查PDF文件是否存在且有效
if not pdf_file.exists() or pdf_file.stat().st_size == 0:
logging.warning(f"PDF文件不存在或为空: {pdf_file}")
return False
# 检查是否已存在对应的输出子文件夹
output_subdir = self.markdown_dir / pdf_file.stem
if output_subdir.exists() and any(output_subdir.iterdir()):
logging.info(f"输出文件夹已存在且非空,跳过处理: {pdf_file.stem}")
return True
# 调用OCR API
api_response = self._call_ocr_api(pdf_file)
if not api_response:
return False
# 获取下载URL并拼接完整地址
download_url = api_response.get('download_url')
if not download_url:
logging.error(f"API响应中缺少下载URL: {pdf_file.name}")
return False
# 拼接完整的下载URL
full_download_url = f"http://100.106.4.14:7861{download_url}"
logging.debug(f"完整下载URL: {full_download_url}")
# 下载并解压ZIP文件
success = self._download_and_extract_zip(full_download_url, pdf_file)
return success
except Exception as e:
logging.error(f"处理PDF文件时发生错误: {pdf_file.name} - {e}")
return False
def _make_request_with_retry(self, url: str, files: Optional[Dict] = None,
max_retries: int = 5, timeout: int = 180) -> requests.Response:
"""带智能重试策略的HTTP请求
Args:
url (str): 请求URL
files (Optional[Dict]): 文件数据用于POST请求
max_retries (int): 最大重试次数增加到5次
timeout (int): 请求超时时间(秒)
Returns:
requests.Response: HTTP响应
Raises:
requests.RequestException: 当所有重试都失败时抛出
"""
for attempt in range(max_retries):
try:
if files:
response = self.session.post(url, files=files, timeout=timeout)
else:
response = self.session.get(url, timeout=timeout)
# 检查响应状态针对500错误进行重试
if response.status_code == 500:
if attempt == max_retries - 1:
logging.error(f"服务器内部错误,已达到最大重试次数: HTTP {response.status_code}")
return response # 返回错误响应而不是抛出异常
# 500错误使用较长的等待时间
wait_time = min(30, 10 + (attempt * 5)) # 10s, 15s, 20s, 25s, 30s
logging.warning(f"服务器内部错误,{wait_time}秒后重试 (第{attempt + 1}次)")
time.sleep(wait_time)
continue
return response
except requests.exceptions.Timeout as e:
if attempt == max_retries - 1:
logging.error(f"请求超时,已达到最大重试次数: {e}")
raise
# 超时错误使用较短的等待时间
wait_time = min(15, 5 + (attempt * 2)) # 5s, 7s, 9s, 11s, 13s
logging.warning(f"请求超时,{wait_time}秒后重试 (第{attempt + 1}次): {e}")
time.sleep(wait_time)
except requests.exceptions.ConnectionError as e:
if attempt == max_retries - 1:
logging.error(f"连接错误,已达到最大重试次数: {e}")
raise
# 连接错误使用指数退避
wait_time = min(60, 5 * (2 ** attempt)) # 5s, 10s, 20s, 40s, 60s
logging.warning(f"连接错误,{wait_time}秒后重试 (第{attempt + 1}次): {e}")
time.sleep(wait_time)
except requests.RequestException as e:
if attempt == max_retries - 1:
logging.error(f"请求失败,已达到最大重试次数: {e}")
raise
# 其他错误使用标准指数退避
wait_time = min(30, 3 * (2 ** attempt)) # 3s, 6s, 12s, 24s, 30s
logging.warning(f"请求失败,{wait_time}秒后重试 (第{attempt + 1}次): {e}")
time.sleep(wait_time)
def parse_all_pdfs(self) -> Dict[str, int]:
"""批量处理所有PDF文件转换为Markdown格式
Returns:
Dict[str, int]: 处理统计信息 {'success': 成功数, 'failed': 失败数, 'total': 总数}
Raises:
FileNotFoundError: PDF目录不存在
"""
try:
# 扫描PDF文件
pdf_files = self._scan_pdf_files()
if not pdf_files:
logging.warning("未找到PDF文件")
return {'success': 0, 'failed': 0, 'total': 0}
# 准备输出目录
self._prepare_output_dir()
# 初始化统计
total_files = len(pdf_files)
success_count = 0
failed_count = 0
failed_files = []
logging.info(f"开始并发处理 {total_files} 个PDF文件")
logging.info(f"并发数: {self.parallel} (降低并发数以避免服务器过载)")
logging.info(f"请求超时: 1800秒 (适配服务器处理时间)")
logging.info(f"重试次数: 5次 (智能重试策略)")
# 使用并发执行器处理PDF
with ThreadPoolExecutor(max_workers=self.parallel) as executor:
# 提交所有处理任务
future_to_pdf = {
executor.submit(self._process_single_pdf, pdf_file): pdf_file
for pdf_file in pdf_files
}
# 处理完成的任务,实时显示进度
completed_count = 0
for future in as_completed(future_to_pdf):
pdf_file = future_to_pdf[future]
filename = pdf_file.name[:50] + '...' if len(pdf_file.name) > 50 else pdf_file.name
try:
success = future.result()
completed_count += 1
if success:
success_count += 1
status = ""
else:
failed_count += 1
failed_files.append({
'filename': pdf_file.name,
'path': str(pdf_file)
})
status = ""
# 显示进度
progress = (completed_count / total_files) * 100
print(f"\r[{completed_count:3d}/{total_files}] {progress:5.1f}% {status} {filename}", end='', flush=True)
except Exception as e:
failed_count += 1
completed_count += 1
failed_files.append({
'filename': pdf_file.name,
'path': str(pdf_file),
'error': str(e)
})
progress = (completed_count / total_files) * 100
print(f"\r[{completed_count:3d}/{total_files}] {progress:5.1f}% ✗ {filename} (Error: {str(e)[:30]})", end='', flush=True)
print() # 换行
# 记录失败详情
if failed_files:
logging.warning(f"以下 {len(failed_files)} 个PDF文件处理失败:")
for file_info in failed_files:
logging.warning(f" - {file_info['filename']}")
if 'error' in file_info:
logging.warning(f" 错误: {file_info['error']}")
# 生成处理报告
stats = {
'success': success_count,
'failed': failed_count,
'total': total_files
}
logging.info(f"PDF解析完成! 成功: {success_count}/{total_files} ({success_count/total_files*100:.1f}%)")
if failed_count > 0:
logging.warning(f"失败: {failed_count}/{total_files} ({failed_count/total_files*100:.1f}%)")
return stats
except Exception as e:
logging.error(f"批量处理PDF文件时发生错误: {e}")
raise