feat: 新增PDF解析功能模块

- pdf_parser.py: PDF解析主程序,支持命令行参数和并发处理
- src/parse.py: PDF解析核心模块,提供PDFParser类
  * 支持OCR API调用,将PDF转换为Markdown格式
  * 内置HTTP会话管理、连接池优化和重试机制
  * 支持并发处理和详细进度显示
  * 完善的错误处理和日志记录功能
This commit is contained in:
iomgaa 2025-08-24 15:07:42 +08:00
parent 8d6d217c2f
commit 099159dfb7
2 changed files with 541 additions and 0 deletions

90
pdf_parser.py Normal file
View File

@ -0,0 +1,90 @@
import argparse
from src.parse import PDFParser
def setup_args():
"""设置命令行参数解析
Returns:
argparse.Namespace: 解析后的命令行参数
"""
parser = argparse.ArgumentParser(
description='PDF解析工具 - 用于将PDF文件通过OCR API转换为Markdown格式',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog='''
使用示例:
%(prog)s # 使用默认参数
%(prog)s --pdf-dir dataset/pdfs # 指定PDF目录
%(prog)s --parallel 10 # 设置并行度为10
%(prog)s --markdown-dir output/markdowns # 指定输出目录
'''
)
parser.add_argument(
'--pdf-dir',
default="dataset/pdfs",
help='PDF文件目录 (默认: dataset/pdfs)'
)
parser.add_argument(
'--parallel',
type=int,
default=5,
help='并发处理线程数 (默认: 5降低并发避免服务器过载)'
)
parser.add_argument(
'--markdown-dir',
default="dataset/markdowns",
help='Markdown输出目录 (默认: dataset/markdowns)'
)
return parser.parse_args()
def main():
"""主函数 - 执行PDF解析任务"""
try:
# 解析命令行参数
args = setup_args()
# 初始化PDF解析器
parser = PDFParser(
pdf_dir=args.pdf_dir,
parallel=args.parallel,
markdown_dir=args.markdown_dir
)
print(f"=== PDF解析工具启动 ===")
print(f"PDF目录: {args.pdf_dir}")
print(f"并发数: {args.parallel}")
print(f"输出目录: {args.markdown_dir}")
print(f"========================")
# 执行PDF解析
print("开始处理PDF文件...")
stats = parser.parse_all_pdfs()
print(f"\n=== 解析完成 ===")
print(f"总数: {stats['total']} 个文件")
print(f"成功: {stats['success']} 个 ({stats['success']/stats['total']*100:.1f}%)" if stats['total'] > 0 else "成功: 0 个")
print(f"失败: {stats['failed']} 个 ({stats['failed']/stats['total']*100:.1f}%)" if stats['total'] > 0 else "失败: 0 个")
print(f"================")
return 0
except FileNotFoundError as e:
print(f"错误: 找不到指定的目录 - {e}")
return 1
except ValueError as e:
print(f"错误: 参数值无效 - {e}")
return 1
except Exception as e:
print(f"错误: 程序执行异常 - {e}")
return 1
if __name__ == "__main__":
exit_code = main()
exit(exit_code)

451
src/parse.py Normal file
View File

@ -0,0 +1,451 @@
"""PDF解析模块
该模块提供PDFParser类用于将PDF文件通过OCR API转换为Markdown格式
支持并发处理进度显示错误处理等功能
"""
import requests
import logging
import os
import time
import zipfile
import tempfile
import re
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict, Optional, Tuple
class PDFParser:
"""PDF解析类 - 用于将PDF文件转换为Markdown格式"""
def __init__(self, pdf_dir: str = "dataset/pdfs", parallel: int = 3,
markdown_dir: str = "dataset/markdowns"):
"""初始化解析器配置
Args:
pdf_dir (str): PDF文件目录默认dataset/pdfs
parallel (int): 并发处理数默认3降低并发以避免服务器过载
markdown_dir (str): Markdown输出目录默认dataset/markdowns
"""
self.pdf_dir = Path(pdf_dir)
self.parallel = parallel
self.markdown_dir = Path(markdown_dir)
# OCR API配置
self.ocr_api_url = "http://100.106.4.14:7861/parse"
# HTTP会话配置增加连接池大小和超时时间
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'MedResearcher-PDFParser/1.0'
})
# 配置连接池适配器(增加连接池大小)
adapter = HTTPAdapter(
pool_connections=10, # 连接池数量
pool_maxsize=20, # 最大连接数
max_retries=0 # 禁用自动重试,使用自定义重试逻辑
)
self.session.mount('http://', adapter)
self.session.mount('https://', adapter)
# 配置日志
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
def _scan_pdf_files(self) -> List[Path]:
"""扫描PDF文件目录获取所有PDF文件
Returns:
List[Path]: PDF文件路径列表
Raises:
FileNotFoundError: PDF目录不存在
"""
if not self.pdf_dir.exists():
raise FileNotFoundError(f"PDF目录不存在: {self.pdf_dir}")
pdf_files = []
for pdf_file in self.pdf_dir.glob("*.pdf"):
if pdf_file.is_file():
pdf_files.append(pdf_file)
logging.info(f"发现 {len(pdf_files)} 个PDF文件待处理")
return pdf_files
def _prepare_output_dir(self) -> Path:
"""准备Markdown输出目录
Returns:
Path: Markdown输出目录路径
"""
self.markdown_dir.mkdir(parents=True, exist_ok=True)
logging.info(f"Markdown输出目录已准备: {self.markdown_dir}")
return self.markdown_dir
def _call_ocr_api(self, pdf_file: Path) -> Optional[Dict]:
"""调用OCR API解析PDF文件
Args:
pdf_file (Path): PDF文件路径
Returns:
Optional[Dict]: API响应数据失败时返回None
"""
try:
with open(pdf_file, 'rb') as f:
files = {
'file': (pdf_file.name, f, 'application/pdf')
}
response = self._make_request_with_retry(
self.ocr_api_url,
files=files,
timeout=1800 # 增加到3分钟匹配服务器处理时间
)
if response.status_code == 200:
response_data = response.json()
if response_data.get('success', False):
logging.debug(f"OCR API调用成功: {pdf_file.name}")
return response_data
else:
logging.warning(f"OCR API处理失败: {pdf_file.name} - {response_data.get('message', 'Unknown error')}")
return None
else:
logging.error(f"OCR API请求失败状态码: {response.status_code} - {pdf_file.name}")
return None
except Exception as e:
logging.error(f"调用OCR API时发生错误: {pdf_file.name} - {e}")
return None
def _download_and_extract_zip(self, download_url: str, pdf_file: Path) -> bool:
"""从API响应中下载ZIP文件并解压到子文件夹
Args:
download_url (str): 完整的下载URL
pdf_file (Path): 原始PDF文件路径用于生成输出文件夹名
Returns:
bool: 下载和解压是否成功
"""
try:
# 下载ZIP文件
response = self._make_request_with_retry(download_url, timeout=60)
if response.status_code != 200:
logging.error(f"下载ZIP失败状态码: {response.status_code} - {pdf_file.name}")
return False
# 创建以PDF文件名命名的输出子文件夹
output_subdir = self.markdown_dir / pdf_file.stem
output_subdir.mkdir(parents=True, exist_ok=True)
# 使用临时文件保存ZIP内容
with tempfile.NamedTemporaryFile(suffix='.zip', delete=False) as temp_zip:
temp_zip.write(response.content)
temp_zip_path = temp_zip.name
try:
# 解压ZIP文件到输出子文件夹
with zipfile.ZipFile(temp_zip_path, 'r') as zip_ref:
zip_ref.extractall(output_subdir)
logging.debug(f"ZIP文件解压成功: {pdf_file.name} -> {output_subdir}")
# 清洗解压后的Markdown文件
if not self._clean_markdown_files(output_subdir):
logging.warning(f"Markdown文件清洗失败但解压成功: {pdf_file.name}")
return True
finally:
# 清理临时ZIP文件
os.unlink(temp_zip_path)
except zipfile.BadZipFile as e:
logging.error(f"ZIP文件损坏: {pdf_file.name} - {e}")
return False
except Exception as e:
logging.error(f"下载或解压ZIP时发生错误: {pdf_file.name} - {e}")
return False
def _clean_markdown_files(self, output_subdir: Path) -> bool:
"""清洗输出目录中的Markdown文件去除数字编号和空行
Args:
output_subdir (Path): 包含Markdown文件的输出子目录
Returns:
bool: 清洗是否成功
"""
try:
# 查找所有.md文件
md_files = list(output_subdir.glob("*.md"))
if not md_files:
logging.debug(f"未找到Markdown文件进行清洗: {output_subdir}")
return True
for md_file in md_files:
try:
# 读取原文件内容
with open(md_file, 'r', encoding='utf-8') as f:
lines = f.readlines()
# 清洗每一行
cleaned_lines = []
for line in lines:
# 去除行尾换行符
line_content = line.rstrip('\n\r')
# 跳过纯数字行(如 "2", "30"
if re.match(r'^\d+$', line_content):
continue
# 跳过数字+空格行(如 "30 "
if re.match(r'^\d+\s*$', line_content):
continue
# 去除行首的数字+空格模式(如 "1 Title:" -> "Title:"
cleaned_line = re.sub(r'^\d+\s+', '', line_content)
# 如果清洗后行不为空,则保留
if cleaned_line.strip():
cleaned_lines.append(cleaned_line + '\n')
else:
# 保留空行以维护文档结构
cleaned_lines.append('\n')
# 写回清洗后的内容
with open(md_file, 'w', encoding='utf-8') as f:
f.writelines(cleaned_lines)
logging.debug(f"Markdown文件清洗完成: {md_file.name}")
except Exception as e:
logging.error(f"清洗Markdown文件时发生错误: {md_file.name} - {e}")
return False
logging.info(f"成功清洗 {len(md_files)} 个Markdown文件: {output_subdir}")
return True
except Exception as e:
logging.error(f"清洗Markdown文件时发生错误: {output_subdir} - {e}")
return False
def _process_single_pdf(self, pdf_file: Path) -> bool:
"""处理单个PDF文件的完整流程
Args:
pdf_file (Path): PDF文件路径
Returns:
bool: 处理是否成功
"""
try:
# 检查PDF文件是否存在且有效
if not pdf_file.exists() or pdf_file.stat().st_size == 0:
logging.warning(f"PDF文件不存在或为空: {pdf_file}")
return False
# 检查是否已存在对应的输出子文件夹
output_subdir = self.markdown_dir / pdf_file.stem
if output_subdir.exists() and any(output_subdir.iterdir()):
logging.info(f"输出文件夹已存在且非空,跳过处理: {pdf_file.stem}")
return True
# 调用OCR API
api_response = self._call_ocr_api(pdf_file)
if not api_response:
return False
# 获取下载URL并拼接完整地址
download_url = api_response.get('download_url')
if not download_url:
logging.error(f"API响应中缺少下载URL: {pdf_file.name}")
return False
# 拼接完整的下载URL
full_download_url = f"http://100.106.4.14:7861{download_url}"
logging.debug(f"完整下载URL: {full_download_url}")
# 下载并解压ZIP文件
success = self._download_and_extract_zip(full_download_url, pdf_file)
return success
except Exception as e:
logging.error(f"处理PDF文件时发生错误: {pdf_file.name} - {e}")
return False
def _make_request_with_retry(self, url: str, files: Optional[Dict] = None,
max_retries: int = 5, timeout: int = 180) -> requests.Response:
"""带智能重试策略的HTTP请求
Args:
url (str): 请求URL
files (Optional[Dict]): 文件数据用于POST请求
max_retries (int): 最大重试次数增加到5次
timeout (int): 请求超时时间
Returns:
requests.Response: HTTP响应
Raises:
requests.RequestException: 当所有重试都失败时抛出
"""
for attempt in range(max_retries):
try:
if files:
response = self.session.post(url, files=files, timeout=timeout)
else:
response = self.session.get(url, timeout=timeout)
# 检查响应状态针对500错误进行重试
if response.status_code == 500:
if attempt == max_retries - 1:
logging.error(f"服务器内部错误,已达到最大重试次数: HTTP {response.status_code}")
return response # 返回错误响应而不是抛出异常
# 500错误使用较长的等待时间
wait_time = min(30, 10 + (attempt * 5)) # 10s, 15s, 20s, 25s, 30s
logging.warning(f"服务器内部错误,{wait_time}秒后重试 (第{attempt + 1}次)")
time.sleep(wait_time)
continue
return response
except requests.exceptions.Timeout as e:
if attempt == max_retries - 1:
logging.error(f"请求超时,已达到最大重试次数: {e}")
raise
# 超时错误使用较短的等待时间
wait_time = min(15, 5 + (attempt * 2)) # 5s, 7s, 9s, 11s, 13s
logging.warning(f"请求超时,{wait_time}秒后重试 (第{attempt + 1}次): {e}")
time.sleep(wait_time)
except requests.exceptions.ConnectionError as e:
if attempt == max_retries - 1:
logging.error(f"连接错误,已达到最大重试次数: {e}")
raise
# 连接错误使用指数退避
wait_time = min(60, 5 * (2 ** attempt)) # 5s, 10s, 20s, 40s, 60s
logging.warning(f"连接错误,{wait_time}秒后重试 (第{attempt + 1}次): {e}")
time.sleep(wait_time)
except requests.RequestException as e:
if attempt == max_retries - 1:
logging.error(f"请求失败,已达到最大重试次数: {e}")
raise
# 其他错误使用标准指数退避
wait_time = min(30, 3 * (2 ** attempt)) # 3s, 6s, 12s, 24s, 30s
logging.warning(f"请求失败,{wait_time}秒后重试 (第{attempt + 1}次): {e}")
time.sleep(wait_time)
def parse_all_pdfs(self) -> Dict[str, int]:
"""批量处理所有PDF文件转换为Markdown格式
Returns:
Dict[str, int]: 处理统计信息 {'success': 成功数, 'failed': 失败数, 'total': 总数}
Raises:
FileNotFoundError: PDF目录不存在
"""
try:
# 扫描PDF文件
pdf_files = self._scan_pdf_files()
if not pdf_files:
logging.warning("未找到PDF文件")
return {'success': 0, 'failed': 0, 'total': 0}
# 准备输出目录
self._prepare_output_dir()
# 初始化统计
total_files = len(pdf_files)
success_count = 0
failed_count = 0
failed_files = []
logging.info(f"开始并发处理 {total_files} 个PDF文件")
logging.info(f"并发数: {self.parallel} (降低并发数以避免服务器过载)")
logging.info(f"请求超时: 1800秒 (适配服务器处理时间)")
logging.info(f"重试次数: 5次 (智能重试策略)")
# 使用并发执行器处理PDF
with ThreadPoolExecutor(max_workers=self.parallel) as executor:
# 提交所有处理任务
future_to_pdf = {
executor.submit(self._process_single_pdf, pdf_file): pdf_file
for pdf_file in pdf_files
}
# 处理完成的任务,实时显示进度
completed_count = 0
for future in as_completed(future_to_pdf):
pdf_file = future_to_pdf[future]
filename = pdf_file.name[:50] + '...' if len(pdf_file.name) > 50 else pdf_file.name
try:
success = future.result()
completed_count += 1
if success:
success_count += 1
status = ""
else:
failed_count += 1
failed_files.append({
'filename': pdf_file.name,
'path': str(pdf_file)
})
status = ""
# 显示进度
progress = (completed_count / total_files) * 100
print(f"\r[{completed_count:3d}/{total_files}] {progress:5.1f}% {status} {filename}", end='', flush=True)
except Exception as e:
failed_count += 1
completed_count += 1
failed_files.append({
'filename': pdf_file.name,
'path': str(pdf_file),
'error': str(e)
})
progress = (completed_count / total_files) * 100
print(f"\r[{completed_count:3d}/{total_files}] {progress:5.1f}% ✗ {filename} (Error: {str(e)[:30]})", end='', flush=True)
print() # 换行
# 记录失败详情
if failed_files:
logging.warning(f"以下 {len(failed_files)} 个PDF文件处理失败:")
for file_info in failed_files:
logging.warning(f" - {file_info['filename']}")
if 'error' in file_info:
logging.warning(f" 错误: {file_info['error']}")
# 生成处理报告
stats = {
'success': success_count,
'failed': failed_count,
'total': total_files
}
logging.info(f"PDF解析完成! 成功: {success_count}/{total_files} ({success_count/total_files*100:.1f}%)")
if failed_count > 0:
logging.warning(f"失败: {failed_count}/{total_files} ({failed_count/total_files*100:.1f}%)")
return stats
except Exception as e:
logging.error(f"批量处理PDF文件时发生错误: {e}")
raise