diff --git a/.gitignore b/.gitignore index 578f33c..dfed88b 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,4 @@ wheels/ # Virtual environments .venv .claude +dataset/ \ No newline at end of file diff --git a/CLAUDE.md b/CLAUDE.md index ff5b8e8..1ad1cf7 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -16,6 +16,27 @@ MedResearcher 是一个给予用户输入的自动实验平台,其会给予用 2. pdf解析主文件: pdf_parser.py 3. 实验运行主文件: experiment_runner.py +## 文件结构 +``` +/ +├── dataset/ # 数据存放目录 +│ └── mimic.csv # 存放所有需要处理的与mimic相关论文的基础信息 +├── papers_crawler.py # 论文爬取主文件 +├── pdf_parser.py # pdf解析主文件 +├── experiment_runner.py # 实验运行主文件 +├── src/ # 源代码目录 +│ └── utils/ # 工具函数目录 +│ └── csv_utils.py # CSV文件操作工具函数 +└── docs/ # Memory Bank系统文件 + ├── CLAUDE-temp.md # 临时讨论和分析 + ├── CLAUDE-plan.md # 当前任务计划 + ├── CLAUDE-activeContext.md # 会话状态跟踪 + ├── CLAUDE-patterns.md # 代码模式记录 + ├── CLAUDE-decisions.md # 决策记录 + ├── CLAUDE-troubleshooting.md # 问题解决方案 + └── CLAUDE-config-variables.md # 配置变量参考 +``` + ## 开发注意事项 ### 必须遵循的编程规范 @@ -71,6 +92,7 @@ MedResearcher 是一个给予用户输入的自动实验平台,其会给予用 - 不允许撰写独立的测试用例文件或测试代码 - 通过直接运行对应的主文件(papers_crawler.py、pdf_parser.py 或 experiment_runner.py)进行功能测试 - 主文件的 `if __name__ == "__main__":` 块中应包含基本的功能调用示例 + - **严格限制main函数使用**:只有主文件(papers_crawler.py、pdf_parser.py、experiment_runner.py)允许包含 `if __name__ == "__main__":` 块,其他所有工具模块、库文件绝对不允许包含此类测试代码 - 函数可以被其他模块调用,不一定要在主文件中直接调用 - 作为MVP项目,重点关注理想情况下的功能实现,而非鲁棒性 @@ -120,6 +142,7 @@ MedResearcher 是一个给予用户输入的自动实验平台,其会给予用 - **上下文明确**:具体到文件、行号和函数名 - **渐进式实施**:通过子任务分解,逐步完成复杂需求 - **可追溯性**:所有决策和修改都有明确记录 +- **深度思考**:除非任务不涉及代码的修改,不然默认使用ultrathink模式 ### 工作流程 @@ -157,18 +180,6 @@ MedResearcher 是一个给予用户输入的自动实验平台,其会给予用 ### Memory Bank系统 -#### 文件结构 -``` -/docs/ -├── CLAUDE-temp.md # 临时讨论和分析 -├── CLAUDE-plan.md # 当前任务计划 -├── CLAUDE-activeContext.md # 会话状态跟踪 -├── CLAUDE-patterns.md # 代码模式记录 -├── CLAUDE-decisions.md # 决策记录 -├── CLAUDE-troubleshooting.md # 问题解决方案 -└── CLAUDE-config-variables.md # 配置变量参考 -``` - #### 更新机制 每个任务完成后,使用专门的SubAgent更新Memory Bank, SubAgent为memory-bank-synchronizer: ``` @@ -176,16 +187,29 @@ Task: memory-bank-synchronizer 功能: 更新所有Memory Bank文件,记录任务成果、决策和经验 ``` -### 工具使用原则 +### 工具使用原则与智能协作 -1. **批量操作**:多个独立操作必须同时执行 -2. **上下文管理**: - - 主上下文:用户对话、决策、当前计划 - - SubAgent:大规模搜索、模式分析、依赖梳理 -3. **文件操作**: - - 优先Edit而非Write - - 同文件多处修改用MultiEdit - - 新文件需明确理由 +#### 1. 基本操作策略 +- **批量操作**:多个独立操作必须同时执行,提高效率 +- **文件操作**:优先Edit而非Write,同文件多处修改用MultiEdit,新文件需明确理由 +- **上下文管理**:合理分配主上下文和SubAgent的职责 + +#### 2. 上下文分层管理 +- **主上下文**:负责用户对话、核心决策、当前计划制定 +- **SubAgent**:承担大规模搜索、模式分析、依赖梳理等专门任务 +- **协作原则**:主上下文保持聚焦,复杂分析任务交给专门SubAgent + +#### 3. MCP工具集成策略 +- **Context7 MCP**:遇到不熟悉或新版本第三方库时的首选工具 + - 使用场景:库文档查询、API使用示例、版本兼容性确认 + - 触发条件:代码中涉及未知库调用或需要确认具体用法 +- **其他MCP工具**:根据具体需求选择合适的MCP服务 + - 数据集访问、外部API调用、特定领域知识查询 + +#### 4. SubAgent选择策略 +- **代码搜索类**:使用code-searcher进行深度代码分析 +- **记忆银行更新**:任务完成后使用memory-bank-synchronizer +- **通用复杂任务**:使用general-purpose处理多步骤综合任务 ### TodoWrite工具使用 **强制使用场景**: diff --git a/src/utils/csv_utils.py b/src/utils/csv_utils.py new file mode 100644 index 0000000..8b397ec --- /dev/null +++ b/src/utils/csv_utils.py @@ -0,0 +1,245 @@ +"""CSV工具模块 + +该模块提供与CSV文件操作相关的工具函数,包括读取、写入、数据处理等功能。 +用于支持MedResearcher项目中的数据集处理需求。 +""" + +import csv +import logging +from pathlib import Path +from typing import List, Dict, Any, Optional, Union + + +def read_csv_to_dict(file_path: Union[str, Path], encoding: str = 'utf-8') -> List[Dict[str, str]]: + """读取CSV文件并转换为字典列表 + + Args: + file_path (Union[str, Path]): CSV文件路径 + encoding (str): 文件编码,默认为utf-8 + + Returns: + List[Dict[str, str]]: 包含所有行数据的字典列表,每个字典代表一行数据 + + Raises: + FileNotFoundError: 当文件不存在时抛出 + PermissionError: 当没有文件读取权限时抛出 + UnicodeDecodeError: 当文件编码错误时抛出 + """ + try: + file_path = Path(file_path) + if not file_path.exists(): + raise FileNotFoundError(f"CSV文件不存在: {file_path}") + + data = [] + with open(file_path, 'r', encoding=encoding, newline='') as csvfile: + # 自动检测CSV方言(分隔符、引号等) + dialect = csv.Sniffer().sniff(csvfile.read(1024)) + csvfile.seek(0) + + reader = csv.DictReader(csvfile, dialect=dialect) + for row in reader: + data.append(row) + + logging.info(f"成功读取CSV文件: {file_path}, 共{len(data)}行数据") + return data + + except FileNotFoundError as e: + logging.error(f"文件未找到: {e}") + raise + except PermissionError as e: + logging.error(f"文件权限错误: {e}") + raise + except UnicodeDecodeError as e: + logging.error(f"文件编码错误: {e}") + raise + except Exception as e: + logging.error(f"读取CSV文件时发生未知错误: {e}") + raise + + +def write_dict_to_csv(data: List[Dict[str, Any]], file_path: Union[str, Path], + encoding: str = 'utf-8', fieldnames: Optional[List[str]] = None) -> None: + """将字典列表写入CSV文件 + + Args: + data (List[Dict[str, Any]]): 要写入的数据列表 + file_path (Union[str, Path]): 输出CSV文件路径 + encoding (str): 文件编码,默认为utf-8 + fieldnames (Optional[List[str]]): 字段名列表,如果为None则从第一行数据中提取 + + Raises: + ValueError: 当数据为空或字段名无效时抛出 + PermissionError: 当没有文件写入权限时抛出 + """ + try: + if not data: + raise ValueError("数据列表不能为空") + + file_path = Path(file_path) + + # 如果未指定字段名,从第一行数据中提取 + if fieldnames is None: + fieldnames = list(data[0].keys()) if data else [] + + if not fieldnames: + raise ValueError("无法确定CSV字段名") + + # 确保目录存在 + file_path.parent.mkdir(parents=True, exist_ok=True) + + with open(file_path, 'w', encoding=encoding, newline='') as csvfile: + writer = csv.DictWriter(csvfile, fieldnames=fieldnames) + writer.writeheader() # 写入表头 + writer.writerows(data) # 写入数据行 + + logging.info(f"成功写入CSV文件: {file_path}, 共{len(data)}行数据") + + except ValueError as e: + logging.error(f"数据验证错误: {e}") + raise + except PermissionError as e: + logging.error(f"文件权限错误: {e}") + raise + except Exception as e: + logging.error(f"写入CSV文件时发生未知错误: {e}") + raise + + +def filter_csv_data(data: List[Dict[str, str]], filter_func) -> List[Dict[str, str]]: + """根据过滤函数筛选CSV数据 + + Args: + data (List[Dict[str, str]]): 原始数据列表 + filter_func: 过滤函数,接收一行数据(Dict)作为参数,返回布尔值 + + Returns: + List[Dict[str, str]]: 筛选后的数据列表 + + Raises: + ValueError: 当数据为空时抛出 + TypeError: 当过滤函数不可调用时抛出 + """ + try: + if not data: + raise ValueError("数据列表不能为空") + + if not callable(filter_func): + raise TypeError("filter_func必须是可调用对象") + + filtered_data = [row for row in data if filter_func(row)] + + logging.info(f"数据筛选完成: 原始{len(data)}行 -> 筛选后{len(filtered_data)}行") + return filtered_data + + except ValueError as e: + logging.error(f"数据验证错误: {e}") + raise + except TypeError as e: + logging.error(f"类型错误: {e}") + raise + except Exception as e: + logging.error(f"筛选数据时发生未知错误: {e}") + raise + + +def get_csv_info(file_path: Union[str, Path], encoding: str = 'utf-8') -> Dict[str, Any]: + """获取CSV文件基本信息 + + Args: + file_path (Union[str, Path]): CSV文件路径 + encoding (str): 文件编码,默认为utf-8 + + Returns: + Dict[str, Any]: 包含文件信息的字典,包括行数、列数、字段名等 + + Raises: + FileNotFoundError: 当文件不存在时抛出 + """ + try: + file_path = Path(file_path) + if not file_path.exists(): + raise FileNotFoundError(f"CSV文件不存在: {file_path}") + + with open(file_path, 'r', encoding=encoding, newline='') as csvfile: + # 检测CSV方言 + sample = csvfile.read(1024) + dialect = csv.Sniffer().sniff(sample) + csvfile.seek(0) + + reader = csv.DictReader(csvfile, dialect=dialect) + fieldnames = reader.fieldnames or [] + + # 计算行数 + row_count = sum(1 for _ in reader) + + info = { + 'file_path': str(file_path), + 'file_size': file_path.stat().st_size, + 'row_count': row_count, + 'column_count': len(fieldnames), + 'fieldnames': fieldnames, + 'delimiter': dialect.delimiter, + 'quotechar': dialect.quotechar + } + + logging.info(f"CSV文件信息获取完成: {file_path}") + return info + + except FileNotFoundError as e: + logging.error(f"文件未找到: {e}") + raise + except Exception as e: + logging.error(f"获取CSV文件信息时发生未知错误: {e}") + raise + + +def merge_csv_files(file_paths: List[Union[str, Path]], output_path: Union[str, Path], + encoding: str = 'utf-8') -> None: + """合并多个CSV文件为单个文件 + + Args: + file_paths (List[Union[str, Path]]): 要合并的CSV文件路径列表 + output_path (Union[str, Path]): 输出文件路径 + encoding (str): 文件编码,默认为utf-8 + + Raises: + ValueError: 当文件列表为空时抛出 + FileNotFoundError: 当某个文件不存在时抛出 + """ + try: + if not file_paths: + raise ValueError("文件路径列表不能为空") + + merged_data = [] + fieldnames = None + + for file_path in file_paths: + file_path = Path(file_path) + if not file_path.exists(): + raise FileNotFoundError(f"CSV文件不存在: {file_path}") + + # 读取文件数据 + file_data = read_csv_to_dict(file_path, encoding) + + # 设置字段名(以第一个文件的字段名为准) + if fieldnames is None and file_data: + fieldnames = list(file_data[0].keys()) + + merged_data.extend(file_data) + + # 写入合并后的数据 + if merged_data: + write_dict_to_csv(merged_data, output_path, encoding, fieldnames) + logging.info(f"成功合并{len(file_paths)}个CSV文件到: {output_path}") + else: + logging.warning("没有数据可以合并") + + except ValueError as e: + logging.error(f"参数错误: {e}") + raise + except FileNotFoundError as e: + logging.error(f"文件未找到: {e}") + raise + except Exception as e: + logging.error(f"合并CSV文件时发生未知错误: {e}") + raise