feat: 实现PDF下载功能

- 新增 download_pdfs_from_csv() 方法支持从CSV文件批量下载论文PDF
- 支持ArXiv和MedRxiv两种数据源的PDF链接解析和下载
- 实现并发下载控制、失败重试机制和PDF完整性验证
- 添加实时下载进度显示和详细的错误日志记录
- 更新命令行参数支持PDF下载测试功能
- 清理临时文件和更新.gitignore规则
This commit is contained in:
iomgaa 2025-08-23 19:42:47 +08:00
parent 802fe4b239
commit 41e5fd1543
4 changed files with 447 additions and 303 deletions

3
.gitignore vendored
View File

@ -10,4 +10,5 @@ wheels/
.venv .venv
.claude .claude
dataset/ dataset/
docs/CLAUDE*.DS_Store docs/CLAUDE*
.DS_Store

View File

@ -1,283 +0,0 @@
# AI指导规范构建任务 - 深度分析
## 任务理解
用户希望为MedResearcher项目构建一套完整的AI协作指导规范核心思想是"充分讨论后再修改",以避免代码难以维护。
## 核心设计理念
1. **讨论优先**:任何修改前必须充分讨论,达成共识
2. **上下文明确**:不仅指出需要什么,更要具体到哪个文件的哪个函数
3. **渐进式实施**:通过子任务分解,逐步完成复杂需求
4. **可追溯性**:所有决策和修改都有明确记录
## 详细工作流程设计
### 阶段1需求理解与记录必须执行
**触发条件**:用户提出任何代码修改需求
**具体步骤**
1. **立即**在CLAUDE-temp.md中撰写
```markdown
## 任务理解
原始需求:[用户的原话]
我的理解:[用我的话重述一遍]
## 收集的上下文
### 相关文件和函数
- `papers_crawler.py::line_45-67::fetch_papers()` - 当前的爬取实现
- `config.py::line_12-15::RETRY_CONFIG` - 现有重试配置
### 现有代码分析
[贴出关键代码片段并分析]
### 潜在影响
- 影响文件papers_crawler.py, test_crawler.py
- 影响功能:论文爬取的稳定性
- 风险评估:可能影响爬取速度
## 任务复杂度判断
- [ ] 单一功能修改影响1-2个函数 → 简单任务
- [x] 需要修改3个以上函数或添加新模块 → 复杂任务
```
2. **等待用户反馈**
- "理解正确" → 继续阶段2
- "理解有偏差" → 修正理解,重新记录
- "补充需求" → 更新CLAUDE-temp.md
### 阶段2任务规划根据复杂度差异化
**简单任务处理**
- **判断标准**
- 修改不超过2个函数
- 不需要新建文件
- 逻辑改动在50行以内
- **严格要求**:绝对不允许拆分子任务
- **直接输出**:一个完整的执行计划
**复杂任务处理**
- **判断标准**
- 修改3个以上函数
- 需要新建模块或文件
- 涉及多个功能模块交互
- **拆分原则**
- 必须拆分为3-5个子任务不能少于3个不能多于5个
- 每个子任务可独立验证
- 子任务之间有清晰的依赖关系
- **拆分示例**
```
子任务1创建重试机制基础设施
子任务2集成重试机制到爬虫
子任务3添加重试相关配置
子任务4更新错误处理逻辑
子任务5添加重试日志记录
```
### 阶段3计划确认与正式化用户确认后执行
**创建CLAUDE-plan.md严格按照以下格式**
#### 简单任务格式:
```markdown
## 任务:[具体任务名称]
创建时间2025-08-22 10:30
### 目标
[30-50字描述要实现的功能必须具体且可验证]
### 所需上下文
- `papers_crawler.py::45-67行::fetch_papers()` - 需要添加异常处理
- `papers_crawler.py::120-135行::parse_response()` - 需要理解返回格式
- `config.py::全文` - 了解现有配置结构
### 拟修改内容
1. 修改 `papers_crawler.py::fetch_papers()` 第50-55行 - 添加try-except块
2. 修改 `papers_crawler.py::fetch_papers()` 第65行 - 添加重试逻辑
3. 修改 `config.py` 末尾 - 添加RETRY_TIMES常量
### 测试指令
```bash
# 主功能测试
uv run papers_crawler.py --keyword "machine learning" --limit 5
# 异常情况测试(模拟网络错误)
uv run papers_crawler.py --test-mode --simulate-error
```
### 验收标准
- [ ] 正常爬取功能不受影响
- [ ] 网络异常时能正确重试
- [ ] 日志正确记录重试次数
```
#### 复杂任务格式:
```markdown
## 任务:[具体任务名称]
创建时间2025-08-22 10:30
### 总体目标
[50-100字描述整体要实现的功能]
### 子任务分解
#### 子任务1[名称]
**目标**[20-30字描述]
**所需上下文**
- `papers_crawler.py::45-67行::fetch_papers()` - 理解当前实现
- `utils/__init__.py::全文` - 确认工具模块结构
**拟修改内容**
- 新建 `utils/retry.py` - 创建RetryDecorator类
- 修改 `utils/__init__.py` - 导出retry装饰器
**测试指令**
```bash
# 单元测试
python -c "from utils.retry import retry; print('导入成功')"
```
#### 子任务2[名称]
[格式同上]
### 整体验收标准
- [ ] 所有子任务独立测试通过
- [ ] 集成测试:完整流程测试通过
- [ ] 性能测试:重试不影响正常爬取速度
```
### 阶段4实施与验证严格按计划执行
**执行要求**
1. **开始前确认**
- 再次阅读CLAUDE-plan.md
- 确认所有依赖文件存在
- 确认测试环境就绪
2. **执行中记录**
- 在CLAUDE-activeContext.md实时更新进度
- 遇到计划外情况立即停止并讨论
3. **完成后验证**
- 运行所有测试指令
- 检查验收标准
- 记录任何偏差或问题
## Memory Bank系统设计
### 文件结构
```
/docs/
├── CLAUDE-temp.md # 临时讨论和分析
├── CLAUDE-plan.md # 当前任务的正式计划
├── CLAUDE-activeContext.md # 会话状态和进度跟踪
├── CLAUDE-patterns.md # 项目代码模式记录
├── CLAUDE-decisions.md # 重要决策和理由记录
├── CLAUDE-troubleshooting.md # 问题和解决方案库
└── CLAUDE-config-variables.md # 配置变量参考
```
### 使用原则
1. **docs/CLAUDE-temp.md**
- 每次新任务开始时清空或归档
- 用于快速记录和思考
- 不需要结构化
2. **docs/CLAUDE-plan.md**
- 结构化的任务计划
- 用户确认后才写入
- 作为实施的指导文档
3. **docs/CLAUDE-activeContext.md**
- 记录当前进度
- 标记已完成/进行中/待完成
- 会话恢复时的参考
### Memory Bank更新机制
**使用专门的SubAgent管理**
```
Task: memory-bank-updater
Description: "更新Memory Bank文件"
Prompt: "任务已完成请更新以下Memory Bank文件
1. CLAUDE-activeContext.md - 标记任务完成,记录最终状态
2. CLAUDE-patterns.md - 如有新的代码模式,记录下来
3. CLAUDE-decisions.md - 记录本次任务的关键决策
4. CLAUDE-troubleshooting.md - 如遇到问题,记录解决方案
5. CLAUDE-config-variables.md - 如有新配置,更新文档
具体完成内容:[任务摘要]
遇到的问题:[如有]
采用的解决方案:[如有]"
```
**调用时机**
- 每个任务完成后必须调用
- 遇到重要决策时调用
- 发现新的最佳实践时调用
## 工具使用优化原则
### 1. 批量操作原则
**场景**:需要读取多个文件或执行多个独立搜索时
**做法**
```python
# 同时执行多个工具调用
parallel_calls = [
Read("papers_crawler.py"),
Read("pdf_parser.py"),
Grep("retry", "*.py"),
LS("./utils/")
]
```
**禁止**:顺序执行可并行的操作
### 2. 上下文管理策略
**主上下文保留**
- 用户对话
- 关键决策点
- 当前任务计划
**委托给subagent**
- 大规模代码搜索:"搜索所有使用requests库的地方"
- 代码模式分析:"分析项目中的错误处理模式"
- 依赖关系梳理:"找出papers_crawler.py的所有依赖"
**subagent使用示例**
```
Task: code-searcher
Prompt: "在整个项目中搜索所有异常处理相关的代码,
重点关注papers_crawler.py和pdf_parser.py
总结当前的错误处理模式和改进建议"
```
### 3. 文件操作最佳实践
**读取顺序**
1. 先读CLAUDE-activeContext.md如果存在了解当前状态
2. 读取主文件了解整体结构
3. 读取相关依赖文件
**修改原则**
- 优先使用Edit而非Write
- 使用MultiEdit处理同文件多处修改
- 新文件创建需明确理由
## 与现有编程规范的协同
### 层次关系
1. **编程规范**已在CLAUDE.md中定义
- 定义"怎么写代码"
- 包括:命名、注释、代码风格等
2. **AI指导规范**(本规范):
- 定义"怎么理解和修改代码"
- 包括:工作流程、沟通方式、工具使用等
### 执行优先级
1. 遵守编程规范的硬性要求(如单次修改限制)
2. 按AI指导流程进行任务
3. 发生冲突时,编程规范优先
## 规范更新机制
- 每次遇到新的最佳实践记录到CLAUDE-patterns.md
- 定期回顾CLAUDE-troubleshooting.md提炼通用规则
- 用户可随时提出规范优化建议

View File

@ -35,6 +35,20 @@ def setup_args():
help='并行处理线程数 (默认: 20)' help='并行处理线程数 (默认: 20)'
) )
parser.add_argument(
'--csv-download',
type=str,
default=None,
help='指定CSV文件路径'
)
parser.add_argument(
'--pdf_download_list',
type=str,
default='dataset/mimic_papers_20250823.csv',
help='指定PDF下载目录'
)
return parser.parse_args() return parser.parse_args()
@ -45,18 +59,20 @@ def main():
# 解析命令行参数 # 解析命令行参数
args = setup_args() args = setup_args()
print(f"=== 论文爬取工具启动 ===")
print(f"论文数据源: {args.paper_website}")
print(f"并行处理数: {args.parallel}")
print(f"========================")
# 初始化论文爬取器 # 初始化论文爬取器
crawler = PaperCrawler( crawler = PaperCrawler(
websites=args.paper_website, websites=args.paper_website,
parallel=args.parallel parallel=args.parallel
) )
print(f"=== 论文爬取工具启动 ===")
print(f"论文数据源: {args.paper_website}")
print(f"并行处理数: {args.parallel}")
print(f"========================")
# 执行论文爬取 # 执行论文爬取
if args.csv_download:
print("开始爬取MIMIC-4相关论文...") print("开始爬取MIMIC-4相关论文...")
papers = crawler.crawl_papers() papers = crawler.crawl_papers()
@ -70,6 +86,23 @@ def main():
else: else:
print("未找到相关论文,请检查网络连接或关键词设置") print("未找到相关论文,请检查网络连接或关键词设置")
# 如果指定了PDF下载测试执行测试
if args.pdf_download_list:
print(f"=== PDF下载功能测试 ===")
print(f"CSV文件: {args.pdf_download_list}")
print(f"并发数: {args.parallel}")
print(f"========================")
# 执行PDF下载
stats = crawler.download_pdfs_from_csv(args.pdf_download_list)
print(f"\n=== PDF下载测试完成 ===")
print(f"总数: {stats['total']} 篇论文")
print(f"成功: {stats['success']} 篇 ({stats['success']/stats['total']*100:.1f}%)")
print(f"失败: {stats['failed']} 篇 ({stats['failed']/stats['total']*100:.1f}%)")
print(f"========================")
return 0
except FileNotFoundError as e: except FileNotFoundError as e:
print(f"错误: 找不到指定的文件 - {e}") print(f"错误: 找不到指定的文件 - {e}")
return 1 return 1

View File

@ -8,12 +8,13 @@ import requests
import xml.etree.ElementTree as ET import xml.etree.ElementTree as ET
import logging import logging
import time import time
import re
from datetime import datetime, timedelta from datetime import datetime, timedelta
from concurrent.futures import ThreadPoolExecutor, as_completed from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict, Optional from typing import List, Dict, Optional
from pathlib import Path from pathlib import Path
from src.utils.csv_utils import write_dict_to_csv from src.utils.csv_utils import write_dict_to_csv, read_csv_to_dict
class PaperCrawler: class PaperCrawler:
@ -419,3 +420,395 @@ class PaperCrawler:
except Exception as e: except Exception as e:
logging.error(f"保存CSV文件时出错: {e}") logging.error(f"保存CSV文件时出错: {e}")
raise raise
def download_pdfs_from_csv(self, csv_file_path: str) -> Dict[str, int]:
"""从CSV文件下载论文PDF
Args:
csv_file_path (str): 包含论文信息的CSV文件路径
Returns:
Dict[str, int]: 下载统计信息 {'success': 成功数, 'failed': 失败数, 'total': 总数}
Raises:
FileNotFoundError: CSV文件不存在
ValueError: CSV文件格式错误
"""
try:
# 读取CSV文件中的论文信息
papers_data = self._read_papers_csv(csv_file_path)
if not papers_data:
logging.warning("CSV文件中没有论文数据")
return {'success': 0, 'failed': 0, 'total': 0}
# 准备PDF存储目录
pdf_dir = self._prepare_pdf_storage()
# 初始化统计
total_papers = len(papers_data)
success_count = 0
failed_count = 0
failed_papers = []
logging.info(f"开始并发下载 {total_papers} 篇论文的PDF文件并发数: {self.parallel}")
# 使用并发执行器下载PDF
with ThreadPoolExecutor(max_workers=self.parallel) as executor:
# 提交所有下载任务
future_to_paper = {
executor.submit(self._download_single_pdf, paper_data, pdf_dir): paper_data
for paper_data in papers_data
}
# 处理完成的任务,实时显示进度
completed_count = 0
for future in as_completed(future_to_paper):
paper_data = future_to_paper[future]
title = paper_data.get('title', 'Unknown')[:50] + '...' if len(paper_data.get('title', '')) > 50 else paper_data.get('title', 'Unknown')
try:
success = future.result()
completed_count += 1
if success:
success_count += 1
status = ""
else:
failed_count += 1
failed_papers.append({
'title': paper_data.get('title', ''),
'source': paper_data.get('source', ''),
'url': paper_data.get('url', ''),
'doi': paper_data.get('doi', '')
})
status = ""
# 显示进度
progress = (completed_count / total_papers) * 100
print(f"\r[{completed_count:3d}/{total_papers}] {progress:5.1f}% {status} {title}", end='', flush=True)
except Exception as e:
failed_count += 1
completed_count += 1
failed_papers.append({
'title': paper_data.get('title', ''),
'source': paper_data.get('source', ''),
'error': str(e)
})
progress = (completed_count / total_papers) * 100
print(f"\r[{completed_count:3d}/{total_papers}] {progress:5.1f}% ✗ {title} (Error: {str(e)[:30]})", end='', flush=True)
print() # 换行
# 记录失败详情
if failed_papers:
logging.warning(f"以下 {len(failed_papers)} 篇论文下载失败:")
for paper in failed_papers:
logging.warning(f" - {paper.get('title', 'Unknown')} [{paper.get('source', 'unknown')}]")
if 'error' in paper:
logging.warning(f" 错误: {paper['error']}")
# 生成下载报告
stats = {
'success': success_count,
'failed': failed_count,
'total': total_papers
}
logging.info(f"PDF下载完成! 成功: {success_count}/{total_papers} ({success_count/total_papers*100:.1f}%)")
if failed_count > 0:
logging.warning(f"失败: {failed_count}/{total_papers} ({failed_count/total_papers*100:.1f}%)")
return stats
except Exception as e:
logging.error(f"下载PDF文件时发生错误: {e}")
raise
def _prepare_pdf_storage(self) -> Path:
"""准备PDF存储目录
Returns:
Path: PDF存储目录路径
"""
pdf_dir = Path("dataset") / "pdfs"
pdf_dir.mkdir(parents=True, exist_ok=True)
logging.info(f"PDF存储目录已准备: {pdf_dir}")
return pdf_dir
def _read_papers_csv(self, csv_file_path: str) -> List[Dict[str, str]]:
"""读取论文CSV文件
Args:
csv_file_path (str): CSV文件路径
Returns:
List[Dict[str, str]]: 论文数据列表
Raises:
FileNotFoundError: 文件不存在
ValueError: 文件格式错误
"""
try:
papers_data = read_csv_to_dict(csv_file_path)
# 验证必要字段
required_fields = ['title', 'url', 'source', 'doi']
if papers_data:
missing_fields = [field for field in required_fields
if field not in papers_data[0]]
if missing_fields:
raise ValueError(f"CSV文件缺少必要字段: {missing_fields}")
logging.info(f"成功读取CSV文件{len(papers_data)} 篇论文")
return papers_data
except Exception as e:
logging.error(f"读取CSV文件失败: {e}")
raise
def _get_pdf_url(self, paper_data: Dict[str, str]) -> Optional[str]:
"""获取论文PDF下载链接
Args:
paper_data (Dict[str, str]): 论文数据
Returns:
Optional[str]: PDF下载链接如果无法获取返回None
"""
try:
source = paper_data.get('source', '')
if not source:
logging.warning("论文缺少source字段")
return None
source = source.lower()
url = paper_data.get('url', '')
doi = paper_data.get('doi', '')
if source == 'arxiv':
return self._get_arxiv_pdf_url(url)
elif source == 'medrxiv':
return self._get_medrxiv_pdf_url(doi, url)
else:
logging.warning(f"不支持的数据源: {source}")
return None
except Exception as e:
logging.error(f"获取PDF链接失败: {e}")
return None
def _get_arxiv_pdf_url(self, url: str) -> Optional[str]:
"""获取ArXiv论文PDF链接
Args:
url (str): ArXiv论文页面URL
Returns:
Optional[str]: PDF下载链接
"""
try:
if not url:
return None
# 从URL中提取论文ID
# 格式: http://arxiv.org/abs/2301.12345 -> 2301.12345
if '/abs/' in url:
paper_id = url.split('/abs/')[-1]
pdf_url = f"http://arxiv.org/pdf/{paper_id}.pdf"
logging.debug(f"ArXiv PDF链接: {pdf_url}")
return pdf_url
else:
logging.warning(f"无法解析ArXiv URL: {url}")
return None
except Exception as e:
logging.error(f"获取ArXiv PDF链接失败: {e}")
return None
def _get_medrxiv_pdf_url(self, doi: str, url: str) -> Optional[str]:
"""获取MedRxiv论文PDF链接
Args:
doi (str): 论文DOI
url (str): DOI链接备用
Returns:
Optional[str]: PDF下载链接
"""
try:
if not doi:
logging.warning("MedRxiv论文缺少DOI")
return None
# 主策略基于DOI构造PDF链接
# DOI格式: 10.1101/yyyy.mm.dd.xxxxxxx
# PDF格式: https://www.medrxiv.org/content/medrxiv/early/yyyy/mm/dd/yyyy.mm.dd.xxxxxxx.full.pdf
if doi.startswith('10.1101/'):
# 提取日期和论文ID部分
paper_part = doi.replace('10.1101/', '')
# 解析日期部分 yyyy.mm.dd.xxxxxxx
parts = paper_part.split('.')
if len(parts) >= 4:
year = parts[0]
month = parts[1].zfill(2) # 确保两位数
day = parts[2].zfill(2) # 确保两位数
# 构造PDF URL
pdf_url = f"https://www.medrxiv.org/content/medrxiv/early/{year}/{month}/{day}/{paper_part}.full.pdf"
logging.debug(f"MedRxiv PDF链接: {pdf_url}")
return pdf_url
else:
logging.warning(f"无法解析MedRxiv DOI格式: {doi}")
return None
else:
logging.warning(f"不支持的MedRxiv DOI格式: {doi}")
return None
except Exception as e:
logging.error(f"获取MedRxiv PDF链接失败: {e}")
return None
def _download_single_pdf(self, paper_data: Dict[str, str], pdf_dir: Path) -> bool:
"""下载单个论文PDF
Args:
paper_data (Dict[str, str]): 论文数据
pdf_dir (Path): PDF存储目录
Returns:
bool: 下载是否成功
"""
try:
# 获取PDF下载链接
pdf_url = self._get_pdf_url(paper_data)
if not pdf_url:
logging.warning(f"无法获取PDF链接: {paper_data.get('title', 'Unknown')}")
return False
# 生成安全的文件名
filename = self._generate_safe_filename(paper_data)
file_path = pdf_dir / filename
# 如果文件已存在且有效,跳过下载
if file_path.exists() and self._validate_pdf_file(file_path):
logging.info(f"PDF文件已存在且有效跳过下载: {filename}")
return True
# 下载PDF文件最多重试3次
for attempt in range(3):
try:
response = self._make_request_with_retry(pdf_url, max_retries=1)
if response.status_code == 200:
# 写入文件
with open(file_path, 'wb') as f:
f.write(response.content)
# 验证PDF完整性
if self._validate_pdf_file(file_path):
logging.info(f"成功下载PDF: {filename}")
return True
else:
logging.warning(f"PDF文件损坏删除并重试: {filename}")
file_path.unlink(missing_ok=True)
else:
logging.warning(f"PDF下载失败状态码 {response.status_code}: {pdf_url}")
except Exception as e:
logging.warning(f"PDF下载第{attempt + 1}次尝试失败: {e}")
# 重试前等待
if attempt < 2:
time.sleep(2 ** attempt)
logging.error(f"PDF下载最终失败: {paper_data.get('title', 'Unknown')}")
return False
except Exception as e:
logging.error(f"下载PDF时发生错误: {e}")
return False
def _validate_pdf_file(self, file_path: Path) -> bool:
"""验证PDF文件完整性
Args:
file_path (Path): PDF文件路径
Returns:
bool: PDF文件是否有效
"""
try:
if not file_path.exists():
return False
# 检查文件大小
if file_path.stat().st_size < 1024: # 至少1KB
logging.warning(f"PDF文件太小可能无效: {file_path.name}")
return False
# 检查PDF文件头和结构
with open(file_path, 'rb') as f:
# 读取文件头
header = f.read(8)
if not header.startswith(b'%PDF-'):
logging.warning(f"文件不是有效的PDF格式: {file_path.name}")
return False
# 检查文件尾部读取最后1KB
f.seek(-min(1024, file_path.stat().st_size), 2)
trailer = f.read()
if b'%%EOF' not in trailer and b'endobj' not in trailer:
logging.warning(f"PDF文件可能不完整: {file_path.name}")
return False
logging.debug(f"PDF文件验证通过: {file_path.name}")
return True
except Exception as e:
logging.error(f"验证PDF文件时发生错误: {e}")
return False
def _generate_safe_filename(self, paper_data: Dict[str, str]) -> str:
"""生成安全的PDF文件名
Args:
paper_data (Dict[str, str]): 论文数据
Returns:
str: 安全的文件名
"""
try:
source = paper_data.get('source', 'unknown').lower()
title = paper_data.get('title', 'untitled')
url = paper_data.get('url', '')
doi = paper_data.get('doi', '')
# 提取paper_id
paper_id = 'unknown'
if source == 'arxiv' and '/abs/' in url:
paper_id = url.split('/abs/')[-1]
elif source == 'medrxiv' and doi:
paper_id = doi.split('/')[-1] if '/' in doi else doi
# 清理标题,保留主要信息
safe_title = re.sub(r'[^\w\s-]', '', title) # 移除特殊字符
safe_title = re.sub(r'\s+', '_', safe_title.strip()) # 空格转下划线
safe_title = safe_title.lower()[:50] # 限制长度并转小写
# 构造文件名: source_paperid_title.pdf
filename = f"{source}_{paper_id}_{safe_title}.pdf"
# 确保文件名长度合理
if len(filename) > 255: # 大多数文件系统的限制
filename = f"{source}_{paper_id}.pdf"
return filename
except Exception as e:
logging.error(f"生成文件名时发生错误: {e}")
# 回退方案
timestamp = int(time.time())
return f"paper_{timestamp}.pdf"