MedResearcher/src/parse.py

741 lines
32 KiB
Python
Raw Normal View History

"""PDF解析模块
该模块提供PDFParser类用于将PDF文件通过OCR API转换为Markdown格式
支持并发处理进度显示错误处理等功能
"""
import requests
import logging
import os
import time
import zipfile
import tempfile
import re
import json
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict, Optional, Tuple
class PDFParser:
"""PDF解析类 - 用于将PDF文件转换为Markdown格式并按任务类型筛选
支持的任务类型
- prediction: 预测任务 (PRED_)
- classification: 分类任务 (CLAS_)
- time_series: 时间序列分析 (TIME_)
- correlation: 关联性分析 (CORR_)
"""
def __init__(self, pdf_dir: str = "dataset/pdfs", parallel: int = 3,
markdown_dir: str = "dataset/markdowns"):
"""初始化解析器配置
Args:
pdf_dir (str): PDF文件目录默认dataset/pdfs
parallel (int): 并发处理数默认3降低并发以避免服务器过载
markdown_dir (str): Markdown输出目录默认dataset/markdowns
"""
self.pdf_dir = Path(pdf_dir)
self.parallel = parallel
self.markdown_dir = Path(markdown_dir)
# OCR API配置
self.ocr_api_url = "http://100.106.4.14:7861/parse"
# AI模型API配置用于四类任务识别prediction/classification/time_series/correlation
self.ai_api_url = "http://100.82.33.121:11001/v1/chat/completions"
self.ai_model = "gpt-oss-20b"
# MIMIC-IV关键词配置用于内容筛选
self.mimic_keywords = [
"MIMIC-IV", "MIMIC 4", "MIMIC IV", "MIMIC-4",
"Medical Information Mart Intensive Care IV",
"MIMIC-IV dataset", "MIMIC-IV database"
]
# 任务类型到前缀的映射配置
self.task_type_prefixes = {
"prediction": "PRED_",
"classification": "CLAS_",
"time_series": "TIME_",
"correlation": "CORR_",
"none": None # 不符合任何类型,不标记
}
# HTTP会话配置增加连接池大小和超时时间
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'MedResearcher-PDFParser/1.0'
})
# 配置连接池适配器(增加连接池大小)
adapter = HTTPAdapter(
pool_connections=10, # 连接池数量
pool_maxsize=20, # 最大连接数
max_retries=0 # 禁用自动重试,使用自定义重试逻辑
)
self.session.mount('http://', adapter)
self.session.mount('https://', adapter)
# 配置日志
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
def _scan_pdf_files(self) -> List[Path]:
"""扫描PDF文件目录获取所有PDF文件
Returns:
List[Path]: PDF文件路径列表
Raises:
FileNotFoundError: PDF目录不存在
"""
if not self.pdf_dir.exists():
raise FileNotFoundError(f"PDF目录不存在: {self.pdf_dir}")
pdf_files = []
for pdf_file in self.pdf_dir.glob("*.pdf"):
if pdf_file.is_file():
pdf_files.append(pdf_file)
logging.info(f"发现 {len(pdf_files)} 个PDF文件待处理")
return pdf_files
def _check_mimic_keywords(self, output_subdir: Path) -> bool:
"""检查Markdown文件是否包含MIMIC-IV关键词
Args:
output_subdir (Path): 包含Markdown文件的输出子目录
Returns:
bool: 是否包含MIMIC-IV关键词
"""
try:
# 查找所有.md文件
md_files = list(output_subdir.glob("*.md"))
if not md_files:
logging.warning(f"未找到Markdown文件进行MIMIC关键词检查: {output_subdir}")
return False
# 检查每个Markdown文件的内容
for md_file in md_files:
try:
with open(md_file, 'r', encoding='utf-8') as f:
content = f.read().lower() # 转换为小写进行不区分大小写匹配
# 检查是否包含任何MIMIC-IV关键词
for keyword in self.mimic_keywords:
if keyword.lower() in content:
logging.info(f"发现MIMIC-IV关键词 '{keyword}' 在文件 {md_file.name}")
return True
except Exception as e:
logging.error(f"读取Markdown文件时发生错误: {md_file.name} - {e}")
continue
logging.info(f"未发现MIMIC-IV关键词: {output_subdir.name}")
return False
except Exception as e:
logging.error(f"检查MIMIC关键词时发生错误: {output_subdir} - {e}")
return False
def _extract_introduction(self, output_subdir: Path) -> Optional[str]:
"""从Markdown文件中提取Introduction部分
Args:
output_subdir (Path): 包含Markdown文件的输出子目录
Returns:
Optional[str]: 提取的Introduction内容失败时返回None
"""
try:
# 查找所有.md文件
md_files = list(output_subdir.glob("*.md"))
if not md_files:
logging.warning(f"未找到Markdown文件进行Introduction提取: {output_subdir}")
return None
# 通常使用第一个md文件
md_file = md_files[0]
try:
with open(md_file, 'r', encoding='utf-8') as f:
content = f.read()
# 使用正则表达式提取Introduction部分
# 匹配各种可能的Introduction标题格式
patterns = [
r'(?i)#\s*Introduction\s*\n(.*?)(?=\n#|\n\n#|$)',
r'(?i)##\s*Introduction\s*\n(.*?)(?=\n##|\n\n##|$)',
r'(?i)###\s*Introduction\s*\n(.*?)(?=\n###|\n\n###|$)',
r'(?i)\*\*Introduction\*\*\s*\n(.*?)(?=\n\*\*|\n\n\*\*|$)',
r'(?i)Introduction\s*\n(.*?)(?=\n[A-Z][a-z]+\s*\n|$)'
]
for pattern in patterns:
match = re.search(pattern, content, re.DOTALL)
if match:
introduction = match.group(1).strip()
if len(introduction) > 100: # 确保有足够的内容进行分析
logging.info(f"成功提取Introduction部分 ({len(introduction)} 字符): {md_file.name}")
return introduction
# 如果没有明确的Introduction标题尝试提取前几段作为近似的introduction
paragraphs = content.split('\n\n')
introduction_candidates = []
for para in paragraphs[:5]: # 取前5段
para = para.strip()
if len(para) > 50 and not para.startswith('#'): # 过滤掉标题和过短段落
introduction_candidates.append(para)
if introduction_candidates:
introduction = '\n\n'.join(introduction_candidates[:3]) # 最多取前3段
if len(introduction) > 200:
logging.info(f"提取近似Introduction部分 ({len(introduction)} 字符): {md_file.name}")
return introduction
logging.warning(f"未能提取到有效的Introduction内容: {md_file.name}")
return None
except Exception as e:
logging.error(f"读取Markdown文件时发生错误: {md_file.name} - {e}")
return None
except Exception as e:
logging.error(f"提取Introduction时发生错误: {output_subdir} - {e}")
return None
def _analyze_research_task(self, introduction: str) -> str:
"""使用AI模型分析论文的研究任务类型
Args:
introduction (str): 论文的Introduction内容
Returns:
str: 任务类型 ('prediction', 'classification', 'time_series', 'correlation', 'none')
"""
try:
# 构造AI分析的提示词
system_prompt = """你是一个医学研究专家。请分析给定的论文Introduction部分判断该研究属于以下哪种任务类型
1. prediction - 预测任务预测未来事件结局或数值如死亡率预测住院时长预测疾病进展预测
2. classification - 分类任务将患者或病例分类到不同类别如疾病诊断分类风险等级分类药物反应分类
3. time_series - 时间序列分析分析随时间变化的医疗数据如生命体征趋势分析病情演进分析纵向队列研究
4. correlation - 关联性分析研究变量间的关系或关联如痾病与人口特征关系药物与副作用关联风险因素识别
5. none - 不属于以上任何类型
请以JSON格式回答包含任务类型和置信度
{\"task_type\": \"prediction\", \"confidence\": 0.85}
task_type必须是以下选项之一predictionclassificationtime_seriescorrelationnone
confidence为0-1之间的数值表示判断的置信度
只返回JSON不要添加其他文字"""
user_prompt = f"请分析以下论文Introduction判断属于哪种任务类型\n\n{introduction[:2000]}" # 限制长度避免token过多
# 构造API请求数据
api_data = {
"model": self.ai_model,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
"max_tokens": 50, # 需要返回JSON格式
"temperature": 0.1 # 降低随机性
}
# 调用AI API
response = self.session.post(
self.ai_api_url,
json=api_data,
headers={"Content-Type": "application/json"},
timeout=30
)
if response.status_code == 200:
result = response.json()
ai_response = result['choices'][0]['message']['content'].strip()
try:
# 解析JSON响应
parsed_response = json.loads(ai_response)
task_type = parsed_response.get('task_type', 'none').lower()
confidence = parsed_response.get('confidence', 0.0)
# 验证任务类型是否有效
valid_types = ['prediction', 'classification', 'time_series', 'correlation', 'none']
if task_type not in valid_types:
logging.warning(f"AI返回了无效的任务类型: {task_type},使用默认值 'none'")
task_type = "none"
confidence = 0.0
# 只接受高置信度的结果
if confidence < 0.7:
logging.info(f"AI分析置信度过低 ({confidence:.2f}),归类为 'none'")
task_type = "none"
logging.info(f"AI分析结果: 任务类型={task_type}, 置信度={confidence:.2f}")
return task_type
except json.JSONDecodeError as e:
logging.error(f"解析AI JSON响应失败: {ai_response} - 错误: {e}")
return "none"
else:
logging.error(f"AI API调用失败状态码: {response.status_code}")
return "none"
except Exception as e:
logging.error(f"AI分析研究任务时发生错误: {e}")
return "none"
def _mark_valid_folder(self, output_subdir: Path, task_type: str) -> bool:
"""为通过筛选的文件夹添加任务类型前缀标记
Args:
output_subdir (Path): 需要标记的输出子目录
task_type (str): 任务类型 ('prediction', 'classification', 'time_series', 'correlation')
Returns:
bool: 标记是否成功
"""
try:
# 获取任务类型对应的前缀
prefix = self.task_type_prefixes.get(task_type)
if not prefix:
logging.info(f"任务类型 '{task_type}' 不需要标记文件夹")
return True # 不需要标记,但认为成功
# 检查文件夹是否已经有相应的任务类型前缀
if output_subdir.name.startswith(prefix):
logging.info(f"文件夹已标记为{task_type}任务: {output_subdir.name}")
return True
# 检查是否已经有其他任务类型的前缀
for existing_type, existing_prefix in self.task_type_prefixes.items():
if existing_prefix and output_subdir.name.startswith(existing_prefix):
logging.info(f"文件夹已有{existing_type}任务标记,不需要重新标记: {output_subdir.name}")
return True
# 生成新的文件夹名
new_folder_name = prefix + output_subdir.name
new_folder_path = output_subdir.parent / new_folder_name
# 重命名文件夹
output_subdir.rename(new_folder_path)
logging.info(f"文件夹标记成功: {output_subdir.name} -> {new_folder_name} (任务类型: {task_type})")
return True
except Exception as e:
logging.error(f"标记文件夹时发生错误: {output_subdir} - {e}")
return False
def _prepare_output_dir(self) -> Path:
"""准备Markdown输出目录
Returns:
Path: Markdown输出目录路径
"""
self.markdown_dir.mkdir(parents=True, exist_ok=True)
logging.info(f"Markdown输出目录已准备: {self.markdown_dir}")
return self.markdown_dir
def _call_ocr_api(self, pdf_file: Path) -> Optional[Dict]:
"""调用OCR API解析PDF文件
Args:
pdf_file (Path): PDF文件路径
Returns:
Optional[Dict]: API响应数据失败时返回None
"""
try:
with open(pdf_file, 'rb') as f:
files = {
'file': (pdf_file.name, f, 'application/pdf')
}
response = self._make_request_with_retry(
self.ocr_api_url,
files=files,
timeout=1800 # 增加到3分钟匹配服务器处理时间
)
if response.status_code == 200:
response_data = response.json()
if response_data.get('success', False):
logging.debug(f"OCR API调用成功: {pdf_file.name}")
return response_data
else:
logging.warning(f"OCR API处理失败: {pdf_file.name} - {response_data.get('message', 'Unknown error')}")
return None
else:
logging.error(f"OCR API请求失败状态码: {response.status_code} - {pdf_file.name}")
return None
except Exception as e:
logging.error(f"调用OCR API时发生错误: {pdf_file.name} - {e}")
return None
def _download_and_extract_zip(self, download_url: str, pdf_file: Path) -> bool:
"""从API响应中下载ZIP文件并解压到子文件夹
Args:
download_url (str): 完整的下载URL
pdf_file (Path): 原始PDF文件路径用于生成输出文件夹名
Returns:
bool: 下载和解压是否成功
"""
try:
# 下载ZIP文件
response = self._make_request_with_retry(download_url, timeout=60)
if response.status_code != 200:
logging.error(f"下载ZIP失败状态码: {response.status_code} - {pdf_file.name}")
return False
# 创建以PDF文件名命名的输出子文件夹
output_subdir = self.markdown_dir / pdf_file.stem
output_subdir.mkdir(parents=True, exist_ok=True)
# 使用临时文件保存ZIP内容
with tempfile.NamedTemporaryFile(suffix='.zip', delete=False) as temp_zip:
temp_zip.write(response.content)
temp_zip_path = temp_zip.name
try:
# 解压ZIP文件到输出子文件夹
with zipfile.ZipFile(temp_zip_path, 'r') as zip_ref:
zip_ref.extractall(output_subdir)
logging.debug(f"ZIP文件解压成功: {pdf_file.name} -> {output_subdir}")
# 清洗解压后的Markdown文件
if not self._clean_markdown_files(output_subdir):
logging.warning(f"Markdown文件清洗失败但解压成功: {pdf_file.name}")
return True
finally:
# 清理临时ZIP文件
os.unlink(temp_zip_path)
except zipfile.BadZipFile as e:
logging.error(f"ZIP文件损坏: {pdf_file.name} - {e}")
return False
except Exception as e:
logging.error(f"下载或解压ZIP时发生错误: {pdf_file.name} - {e}")
return False
def _clean_markdown_files(self, output_subdir: Path) -> bool:
"""清洗输出目录中的Markdown文件去除数字编号和空行
Args:
output_subdir (Path): 包含Markdown文件的输出子目录
Returns:
bool: 清洗是否成功
"""
try:
# 查找所有.md文件
md_files = list(output_subdir.glob("*.md"))
if not md_files:
logging.debug(f"未找到Markdown文件进行清洗: {output_subdir}")
return True
for md_file in md_files:
try:
# 读取原文件内容
with open(md_file, 'r', encoding='utf-8') as f:
lines = f.readlines()
# 清洗每一行
cleaned_lines = []
for line in lines:
# 去除行尾换行符
line_content = line.rstrip('\n\r')
# 跳过纯数字行(如 "2", "30"
if re.match(r'^\d+$', line_content):
continue
# 跳过数字+空格行(如 "30 "
if re.match(r'^\d+\s*$', line_content):
continue
# 去除行首的数字+空格模式(如 "1 Title:" -> "Title:"
cleaned_line = re.sub(r'^\d+\s+', '', line_content)
# 如果清洗后行不为空,则保留
if cleaned_line.strip():
cleaned_lines.append(cleaned_line + '\n')
else:
# 保留空行以维护文档结构
cleaned_lines.append('\n')
# 写回清洗后的内容
with open(md_file, 'w', encoding='utf-8') as f:
f.writelines(cleaned_lines)
logging.debug(f"Markdown文件清洗完成: {md_file.name}")
except Exception as e:
logging.error(f"清洗Markdown文件时发生错误: {md_file.name} - {e}")
return False
logging.info(f"成功清洗 {len(md_files)} 个Markdown文件: {output_subdir}")
return True
except Exception as e:
logging.error(f"清洗Markdown文件时发生错误: {output_subdir} - {e}")
return False
def _process_single_pdf(self, pdf_file: Path) -> bool:
"""处理单个PDF文件的完整流程
Args:
pdf_file (Path): PDF文件路径
Returns:
bool: 处理是否成功
"""
try:
# 检查PDF文件是否存在且有效
if not pdf_file.exists() or pdf_file.stat().st_size == 0:
logging.warning(f"PDF文件不存在或为空: {pdf_file}")
return False
# 检查是否已存在对应的输出子文件夹
output_subdir = self.markdown_dir / pdf_file.stem
if output_subdir.exists() and any(output_subdir.iterdir()):
logging.info(f"输出文件夹已存在且非空,跳过处理: {pdf_file.stem}")
return True
# 调用OCR API
api_response = self._call_ocr_api(pdf_file)
if not api_response:
return False
# 获取下载URL并拼接完整地址
download_url = api_response.get('download_url')
if not download_url:
logging.error(f"API响应中缺少下载URL: {pdf_file.name}")
return False
# 拼接完整的下载URL
full_download_url = f"http://100.106.4.14:7861{download_url}"
logging.debug(f"完整下载URL: {full_download_url}")
# 下载并解压ZIP文件
success = self._download_and_extract_zip(full_download_url, pdf_file)
if not success:
return False
# 获取解压后的文件夹路径
output_subdir = self.markdown_dir / pdf_file.stem
# 第一层筛选检查MIMIC-IV关键词
logging.info(f"开始MIMIC-IV关键词筛选: {pdf_file.stem}")
if not self._check_mimic_keywords(output_subdir):
logging.info(f"未通过MIMIC-IV关键词筛选跳过: {pdf_file.stem}")
return True # 处理成功但未通过筛选
# 第二层筛选AI分析研究任务
logging.info(f"开始AI研究任务分析: {pdf_file.stem}")
introduction = self._extract_introduction(output_subdir)
if not introduction:
logging.warning(f"无法提取Introduction跳过AI分析: {pdf_file.stem}")
return True # 处理成功但无法进行任务分析
task_type = self._analyze_research_task(introduction)
if task_type == "none":
logging.info(f"未通过研究任务筛选 (task_type=none),跳过: {pdf_file.stem}")
return True # 处理成功但未通过筛选
# 两层筛选都通过,根据任务类型标记文件夹
logging.info(f"通过所有筛选,标记为{task_type}任务论文: {pdf_file.stem}")
if self._mark_valid_folder(output_subdir, task_type):
logging.info(f"论文筛选完成,已标记为{task_type}任务: {pdf_file.stem}")
else:
logging.warning(f"文件夹标记失败: {pdf_file.stem}")
return True
except Exception as e:
logging.error(f"处理PDF文件时发生错误: {pdf_file.name} - {e}")
return False
def _make_request_with_retry(self, url: str, files: Optional[Dict] = None,
max_retries: int = 5, timeout: int = 180) -> requests.Response:
"""带智能重试策略的HTTP请求
Args:
url (str): 请求URL
files (Optional[Dict]): 文件数据用于POST请求
max_retries (int): 最大重试次数增加到5次
timeout (int): 请求超时时间
Returns:
requests.Response: HTTP响应
Raises:
requests.RequestException: 当所有重试都失败时抛出
"""
for attempt in range(max_retries):
try:
if files:
response = self.session.post(url, files=files, timeout=timeout)
else:
response = self.session.get(url, timeout=timeout)
# 检查响应状态针对500错误进行重试
if response.status_code == 500:
if attempt == max_retries - 1:
logging.error(f"服务器内部错误,已达到最大重试次数: HTTP {response.status_code}")
return response # 返回错误响应而不是抛出异常
# 500错误使用较长的等待时间
wait_time = min(30, 10 + (attempt * 5)) # 10s, 15s, 20s, 25s, 30s
logging.warning(f"服务器内部错误,{wait_time}秒后重试 (第{attempt + 1}次)")
time.sleep(wait_time)
continue
return response
except requests.exceptions.Timeout as e:
if attempt == max_retries - 1:
logging.error(f"请求超时,已达到最大重试次数: {e}")
raise
# 超时错误使用较短的等待时间
wait_time = min(15, 5 + (attempt * 2)) # 5s, 7s, 9s, 11s, 13s
logging.warning(f"请求超时,{wait_time}秒后重试 (第{attempt + 1}次): {e}")
time.sleep(wait_time)
except requests.exceptions.ConnectionError as e:
if attempt == max_retries - 1:
logging.error(f"连接错误,已达到最大重试次数: {e}")
raise
# 连接错误使用指数退避
wait_time = min(60, 5 * (2 ** attempt)) # 5s, 10s, 20s, 40s, 60s
logging.warning(f"连接错误,{wait_time}秒后重试 (第{attempt + 1}次): {e}")
time.sleep(wait_time)
except requests.RequestException as e:
if attempt == max_retries - 1:
logging.error(f"请求失败,已达到最大重试次数: {e}")
raise
# 其他错误使用标准指数退避
wait_time = min(30, 3 * (2 ** attempt)) # 3s, 6s, 12s, 24s, 30s
logging.warning(f"请求失败,{wait_time}秒后重试 (第{attempt + 1}次): {e}")
time.sleep(wait_time)
def parse_all_pdfs(self) -> Dict[str, int]:
"""批量处理所有PDF文件转换为Markdown格式
Returns:
Dict[str, int]: 处理统计信息 {'success': 成功数, 'failed': 失败数, 'total': 总数}
Raises:
FileNotFoundError: PDF目录不存在
"""
try:
# 扫描PDF文件
pdf_files = self._scan_pdf_files()
if not pdf_files:
logging.warning("未找到PDF文件")
return {'success': 0, 'failed': 0, 'total': 0}
# 准备输出目录
self._prepare_output_dir()
# 初始化统计
total_files = len(pdf_files)
success_count = 0
failed_count = 0
failed_files = []
logging.info(f"开始并发处理 {total_files} 个PDF文件")
logging.info(f"并发数: {self.parallel} (降低并发数以避免服务器过载)")
logging.info(f"请求超时: 1800秒 (适配服务器处理时间)")
logging.info(f"重试次数: 5次 (智能重试策略)")
# 使用并发执行器处理PDF
with ThreadPoolExecutor(max_workers=self.parallel) as executor:
# 提交所有处理任务
future_to_pdf = {
executor.submit(self._process_single_pdf, pdf_file): pdf_file
for pdf_file in pdf_files
}
# 处理完成的任务,实时显示进度
completed_count = 0
for future in as_completed(future_to_pdf):
pdf_file = future_to_pdf[future]
filename = pdf_file.name[:50] + '...' if len(pdf_file.name) > 50 else pdf_file.name
try:
success = future.result()
completed_count += 1
if success:
success_count += 1
status = ""
else:
failed_count += 1
failed_files.append({
'filename': pdf_file.name,
'path': str(pdf_file)
})
status = ""
# 显示进度
progress = (completed_count / total_files) * 100
print(f"\r[{completed_count:3d}/{total_files}] {progress:5.1f}% {status} {filename}", end='', flush=True)
except Exception as e:
failed_count += 1
completed_count += 1
failed_files.append({
'filename': pdf_file.name,
'path': str(pdf_file),
'error': str(e)
})
progress = (completed_count / total_files) * 100
print(f"\r[{completed_count:3d}/{total_files}] {progress:5.1f}% ✗ {filename} (Error: {str(e)[:30]})", end='', flush=True)
print() # 换行
# 记录失败详情
if failed_files:
logging.warning(f"以下 {len(failed_files)} 个PDF文件处理失败:")
for file_info in failed_files:
logging.warning(f" - {file_info['filename']}")
if 'error' in file_info:
logging.warning(f" 错误: {file_info['error']}")
# 生成处理报告
stats = {
'success': success_count,
'failed': failed_count,
'total': total_files
}
logging.info(f"PDF解析完成! 成功: {success_count}/{total_files} ({success_count/total_files*100:.1f}%)")
if failed_count > 0:
logging.warning(f"失败: {failed_count}/{total_files} ({failed_count/total_files*100:.1f}%)")
return stats
except Exception as e:
logging.error(f"批量处理PDF文件时发生错误: {e}")
raise