From 6932e5fa8e1926d959ff6f316f212dc08a3437f1 Mon Sep 17 00:00:00 2001 From: iomgaa Date: Thu, 29 May 2025 19:30:19 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BD=BF=E7=94=A8TREx=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E9=9B=86=E7=94=9F=E6=88=90=E6=95=B0=E6=8D=AE=E5=BA=93=E5=88=9D?= =?UTF-8?q?=E5=A7=8B=E5=8C=96=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- preprocessing/README_trex_processor.md | 79 +- preprocessing/merge_output_json.py | 225 +++++ preprocessing/trex_to_sentences_simple.py | 1076 +++++++++++---------- 3 files changed, 841 insertions(+), 539 deletions(-) create mode 100644 preprocessing/merge_output_json.py diff --git a/preprocessing/README_trex_processor.md b/preprocessing/README_trex_processor.md index 4947e30..bcd8be0 100644 --- a/preprocessing/README_trex_processor.md +++ b/preprocessing/README_trex_processor.md @@ -4,10 +4,35 @@ 1. **句子提取**:从 TREx 数据集提取三元组并转换为自然语言句子 2. **LLM 处理**:使用 ollama qwen3:4b 模型进行句子修正和重要性评分 +## 🆕 防卡死机制 + +为了解决LLM处理时可能出现的卡死问题,新增了以下功能: + +### 超时和重试机制 +- **超时时间**:每个LLM请求60秒超时 +- **重试机制**:失败后最多重试2次,采用指数退避策略 +- **并发控制**:降低并发数至4个,减少服务器压力 + +### 心跳监控系统 +- **实时监控**:每30秒检查一次LLM响应状态 +- **异常警告**:超过30秒无成功响应时发出警告 +- **服务检测**:自动检查ollama服务状态 +- **详细统计**:实时显示成功率、超时率等统计信息 + +### 日志系统 +- **详细日志**:所有操作都记录在 `logs/` 目录下 +- **双重输出**:同时输出到日志文件和控制台 +- **时间戳标记**:日志文件包含启动时间戳 + +### 改进的错误处理 +- **异常恢复**:LLM处理失败时使用原句子和默认评分 +- **状态监控**:处理前检查ollama服务状态 +- **批次间休息**:批次之间休息5秒,避免过度压力 + ## 安装依赖 ```bash -pip install agno asyncio pydantic +pip install agno asyncio pydantic requests ``` 确保已安装并启动 ollama,并下载 qwen3:4b 模型: @@ -50,24 +75,52 @@ python trex_to_sentences_simple.py --step llm --sentences_json my_sentences.json ## 输出文件 -**注意:所有输出文件都会自动保存在 `./output/` 目录中** +**注意:所有输出文件都会自动保存在相应目录中** -### 步骤1输出 +### 句子提取输出 - `output/extracted_sentences.json`: 提取的原始句子,包含元数据 -### 步骤2输出 +### LLM处理输出 - `output/{output_file}.txt`: 修正后的句子文本文件 - `output/{output_file}.json`: 完整的处理结果(包含原句、修正句、评分) - `output/{output_file}_sorted_by_importance.txt`: 按重要性评分排序的句子 ### 检查点文件 -- `output/{output_file}_checkpoint_{数量}.json`: 每2000条句子自动保存的检查点 +- `output/{output_file}_checkpoint_{数量}.json`: 每1000条句子自动保存的检查点 + +### 日志文件 +- `logs/trex_processor_{时间戳}.log`: 详细的处理日志 + +## 🆕 故障诊断 + +### 如果遇到卡死问题: + +1. **检查日志文件**:查看 `logs/` 目录下的最新日志 +2. **观察心跳监控**:注意控制台的心跳警告信息 +3. **检查ollama服务**: + ```bash + ps aux | grep ollama + curl http://localhost:11434/api/tags + ``` +4. **重启ollama服务**(如果需要): + ```bash + pkill ollama + ollama serve & + ``` + +### 常见警告信息: + +- `⚠️ 心跳检测`: 30秒无成功响应(正常情况下会自动恢复) +- `❌ 严重警告`: 90秒无成功响应(可能需要检查服务) +- `💀 Ollama服务异常`: ollama服务可能已停止 +- `💀 致命错误`: 连续多次警告(建议重启程序) ## 检查点恢复机制 - 步骤2会自动检测已有的检查点文件(在 `output/` 目录中) - 只处理尚未处理的句子,避免重复工作 - 如果所有句子都已处理,会直接生成最终输出文件 +- 中断后重新运行会自动从最新检查点继续 ## 示例工作流 @@ -84,14 +137,18 @@ python trex_to_sentences_simple.py --step llm ## 性能特点 -- **并发处理**: 最大54个并发LLM请求 -- **检查点保存**: 每2000条句子自动保存,支持断点续传 -- **进度显示**: 详细的处理进度和时间预估 -- **错误处理**: LLM请求失败时使用原句子和默认评分 +- **保守的并发**: 最大4个并发LLM请求(降低卡死风险) +- **检查点保存**: 每1000条句子自动保存,支持断点续传 +- **智能监控**: 详细的处理进度和时间预估 +- **健壮的错误处理**: LLM请求失败时使用原句子和默认评分 +- **服务监控**: 自动检测ollama服务状态 ## 注意事项 1. 首次运行步骤2前,必须先完成步骤1 -2. 检查点文件会占用额外磁盘空间(每个都包含所有已处理数据) +2. 检查点文件会占用额外磁盘空间(每个都包含所有已处理数据) 3. LLM处理速度取决于模型性能和网络状况 -4. 建议先用`--max_files`参数测试小批量数据 \ No newline at end of file +4. 建议先用`--max_files`参数测试小批量数据 +5. **新增**:如果遇到卡死,查看日志文件和心跳监控信息 +6. **新增**:程序会自动检测并报告ollama服务状态 +7. **新增**:所有处理过程都有详细日志记录,便于问题诊断 \ No newline at end of file diff --git a/preprocessing/merge_output_json.py b/preprocessing/merge_output_json.py new file mode 100644 index 0000000..bfa26e7 --- /dev/null +++ b/preprocessing/merge_output_json.py @@ -0,0 +1,225 @@ +#!/usr/bin/env python3 +""" +JSON文件合并脚本 +读取多个JSON文件并合并为一个JSON文件 +""" + +import json +import os +from typing import Dict, List, Any, Union + +# 需要合并的JSON文件列表 +JSON_FILES_TO_MERGE = [ + "output/trex_sentences_enhanced_checkpoint_360000.json" +] +for i in range(1, 1010): + JSON_FILES_TO_MERGE.append(f"output/trex_sentences_enhanced_batch_{i}.json") + +def load_json_file(file_path: str) -> Union[Dict, List, None]: + """加载JSON文件""" + if not os.path.exists(file_path): + print(f"警告: 文件 {file_path} 不存在") + return None + + try: + with open(file_path, 'r', encoding='utf-8') as f: + data = json.load(f) + print(f"成功加载: {file_path}") + return data + except json.JSONDecodeError as e: + print(f"错误: 无法解析JSON文件 {file_path} - {e}") + return None + except Exception as e: + print(f"错误: 读取文件 {file_path} 失败 - {e}") + return None + +def merge_json_data(data1: Union[Dict, List], data2: Union[Dict, List]) -> Union[Dict, List]: + """合并两个JSON数据结构""" + + # 如果两个都是列表,直接合并 + if isinstance(data1, list) and isinstance(data2, list): + print(f"合并两个列表: {len(data1)} + {len(data2)} = {len(data1) + len(data2)} 项") + return data1 + data2 + + # 如果两个都是字典 + elif isinstance(data1, dict) and isinstance(data2, dict): + print("合并两个字典结构") + merged = data1.copy() + + # 特殊处理:如果都有'sentences'字段且为列表,合并sentences + if 'sentences' in data1 and 'sentences' in data2: + if isinstance(data1['sentences'], list) and isinstance(data2['sentences'], list): + print(f"合并sentences字段: {len(data1['sentences'])} + {len(data2['sentences'])} = {len(data1['sentences']) + len(data2['sentences'])} 项") + merged['sentences'] = data1['sentences'] + data2['sentences'] + + # 更新metadata if exists + if 'metadata' in merged: + if isinstance(merged['metadata'], dict): + merged['metadata']['total_sentences'] = len(merged['sentences']) + merged['metadata']['merged_from'] = [os.path.basename(f) for f in JSON_FILES_TO_MERGE if os.path.exists(f)] + + # 合并其他字段 + for key, value in data2.items(): + if key != 'sentences' and key not in merged: + merged[key] = value + + return merged + + # 普通字典合并 + for key, value in data2.items(): + if key in merged: + # 如果key重复且都是列表,合并列表 + if isinstance(merged[key], list) and isinstance(value, list): + merged[key] = merged[key] + value + # 如果key重复且都是字典,递归合并 + elif isinstance(merged[key], dict) and isinstance(value, dict): + merged[key] = merge_json_data(merged[key], value) + else: + # 其他情况保留第二个文件的值 + merged[key] = value + print(f"字段 '{key}' 被覆盖") + else: + merged[key] = value + + return merged + + # 类型不匹配的情况,创建一个包含两者的新结构 + else: + print("数据类型不匹配,创建包含两者的新结构") + return { + "data_from_save.json": data1, + "data_from_save2.json": data2, + "merged_at": "test.py" + } + +def save_merged_json(data: Union[Dict, List], output_path: str): + """保存合并后的JSON数据""" + try: + # 确保输出目录存在 + os.makedirs(os.path.dirname(output_path), exist_ok=True) + + with open(output_path, 'w', encoding='utf-8') as f: + json.dump(data, f, ensure_ascii=False, indent=2) + + print(f"合并结果已保存到: {output_path}") + + # 显示统计信息 + if isinstance(data, dict): + if 'sentences' in data and isinstance(data['sentences'], list): + print(f"总计句子数: {len(data['sentences'])}") + print(f"总计字段数: {len(data)}") + elif isinstance(data, list): + print(f"总计列表项数: {len(data)}") + + except Exception as e: + print(f"错误: 保存文件失败 - {e}") + +def remove_duplicates_from_sentences(data: Union[Dict, List]) -> Union[Dict, List]: + """从合并结果中移除重复的句子(基于句子内容)""" + if isinstance(data, dict) and 'sentences' in data: + if isinstance(data['sentences'], list): + original_count = len(data['sentences']) + seen_sentences = set() + unique_sentences = [] + + for item in data['sentences']: + if isinstance(item, dict): + # 如果是字典,使用sentence字段或corrected_sentence字段作为唯一标识 + sentence_key = item.get('sentence') or item.get('corrected_sentence') or item.get('original_sentence') + elif isinstance(item, str): + sentence_key = item + else: + sentence_key = str(item) + + if sentence_key and sentence_key not in seen_sentences: + seen_sentences.add(sentence_key) + unique_sentences.append(item) + + data['sentences'] = unique_sentences + + # 更新metadata + if 'metadata' in data and isinstance(data['metadata'], dict): + data['metadata']['total_sentences'] = len(unique_sentences) + data['metadata']['duplicates_removed'] = original_count - len(unique_sentences) + + print(f"去重完成: {original_count} -> {len(unique_sentences)} (移除了 {original_count - len(unique_sentences)} 个重复项)") + + return data + +def merge_multiple_json_data(data_list: List[Union[Dict, List]]) -> Union[Dict, List]: + """合并多个JSON数据结构""" + if not data_list: + return {} + + if len(data_list) == 1: + return data_list[0] + + print(f"准备合并 {len(data_list)} 个JSON数据结构") + + # 从第一个数据开始,逐步合并其他数据 + merged_data = data_list[0] + + for i, data in enumerate(data_list[1:], 1): + print(f"正在合并第 {i+1} 个数据结构...") + merged_data = merge_json_data(merged_data, data) + + return merged_data + +def main(): + """主函数""" + print("=== JSON文件合并脚本 ===") + + # 输出路径 + output_path = "output/merged.json" + + print(f"准备合并以下文件:") + for i, file_path in enumerate(JSON_FILES_TO_MERGE, 1): + print(f" {i}. {file_path}") + print(f"输出文件: {output_path}") + print() + + # 加载所有文件 + loaded_data = [] + successfully_loaded = [] + + for file_path in JSON_FILES_TO_MERGE: + data = load_json_file(file_path) + if data is not None: + loaded_data.append(data) + successfully_loaded.append(file_path) + + # 检查是否至少有一个文件加载成功 + if not loaded_data: + print("错误: 没有文件能够成功加载,退出") + return + + print(f"成功加载了 {len(loaded_data)} 个文件:") + for file_path in successfully_loaded: + print(f" ✓ {file_path}") + + if len(loaded_data) < len(JSON_FILES_TO_MERGE): + failed_count = len(JSON_FILES_TO_MERGE) - len(loaded_data) + print(f"警告: {failed_count} 个文件加载失败") + print() + + # 合并所有数据 + if len(loaded_data) == 1: + print("只有一个文件可用,直接使用...") + merged_data = loaded_data[0] + else: + print("开始合并所有文件...") + merged_data = merge_multiple_json_data(loaded_data) + + # 去重处理 + print("\n检查并去除重复项...") + merged_data = remove_duplicates_from_sentences(merged_data) + + # 保存合并结果 + print("\n保存合并结果...") + save_merged_json(merged_data, output_path) + + print("\n=== 合并完成 ===") + print(f"合并了 {len(successfully_loaded)} 个文件的数据") + +if __name__ == "__main__": + main() diff --git a/preprocessing/trex_to_sentences_simple.py b/preprocessing/trex_to_sentences_simple.py index bbb415c..e5167b9 100644 --- a/preprocessing/trex_to_sentences_simple.py +++ b/preprocessing/trex_to_sentences_simple.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 """ TREx数据集增强预处理脚本 -使用agno框架和ollama qwen3:4b进行句子后处理和重要性评分 +使用vLLM OpenAI兼容API进行句子后处理和重要性评分 支持两个独立步骤: 1. 句子提取:从TREx数据集提取句子并保存为JSON @@ -17,11 +17,10 @@ import asyncio import time import logging from datetime import datetime -import subprocess import requests from pydantic import BaseModel, Field -from agno.agent import Agent -from agno.models.ollama import Ollama +import aiohttp +import concurrent.futures # 设置日志系统 def setup_logging(): @@ -96,25 +95,20 @@ class EnhancedTRExProcessor: self.max_files = max_files self.enable_llm_processing = enable_llm_processing - # LLM处理配置 - self.llm_timeout = 60 # 增加每个请求的超时时间到60秒 - self.max_concurrent = 8 # 进一步降低并发数到4 - self.max_retries = 2 # 减少重试次数避免过长等待 - self.heartbeat_interval = 30 # 缩短心跳检测间隔到30秒 + # Ollama API配置 + self.model_name = "gemma3:latest" # Ollama模型名称 + self.ollama_base_url = "http://localhost:11434" # Ollama服务器地址 + self.batch_size_per_request = 8 # 每个API请求处理的句子数量(Ollama建议较小批次) + self.max_concurrent_requests = 2 # 最大并发请求数(Ollama建议较低并发) + self.request_timeout = 180 # 请求超时时间(秒) + self.retry_attempts = 3 # 重试次数 # 统计信息 self.total_requests = 0 self.successful_requests = 0 self.failed_requests = 0 - self.timeout_requests = 0 - self.last_successful_time = time.time() - self.last_activity_time = time.time() # 新增:最后活动时间 - # 初始化agno agent(仅在需要LLM处理时) - if self.enable_llm_processing: - self.setup_agent() - - logger.info(f"处理器初始化完成 - 并发数: {self.max_concurrent}, 超时时间: {self.llm_timeout}秒") + logger.info(f"处理器初始化完成 - 模型: {self.model_name}, 批次大小: {self.batch_size_per_request}, 并发数: {self.max_concurrent_requests}") # 扩展的Wikidata属性映射 self.property_mappings = { @@ -157,165 +151,431 @@ class EnhancedTRExProcessor: "http://www.wikidata.org/prop/direct/P138": "is named after" } - def setup_agent(self): - """设置agno agent""" - try: - self.agent = Agent( - model=Ollama( - id="gemma3:latest", - # 使用options设置temperature和其他参数 - options={ - "temperature": 0.2, - "top_p": 0.8, - "top_k": 20, - "num_ctx": 4096, - } - ), - response_model=ProcessedSentence, - instructions=[ - "You are a professional text processing assistant responsible for correcting errors in sentences and evaluating the importance of knowledge.", - "", - "### Sentence Correction Rules:", - "1. Remove Wikipedia-specific markers: such as (disambiguation), (film), (band), etc. in parentheses", - "2. Ensure grammatical completeness: complete subject+predicate+object structure, avoid dangling 'and is', 'or', etc.", - "3. Fix obvious grammatical errors: tense consistency, singular/plural consistency, correct preposition usage", - "4. Clean up garbled text and special characters: such as â, €, ™ and other encoding issues", - "5. Ensure semantic fluency: if the original sentence cannot be fixed, reorganize the language to make it coherent", - "6. Do not add information not present in the original text, only correct errors", - "", - "### Correction Examples:", - "- Error: 'Argument (disambiguation) is related to philosophy, logic, and is an.'", - "- Corrected: 'Argument is related to philosophy and logic.'", - "", - "- Error: 'Beijing is a capital city and are.'", - "- Corrected: 'Beijing is a capital city.'", - "", - "Importance scoring criteria (0.0-10.0, in increments of 0.1):", - "", - "0.0 points - Completely incorrect or meaningless information", - "Examples: 'Apple is a metal', 'The sun rises from the west', '1+1=3'", - "", - "0.5 points - Almost worthless information", - "Examples: 'Color of a fictional character's socks', 'Third line of dialogue from a game NPC', 'What someone had for breakfast yesterday'", - "", - "1.0 points - Extremely rare, non-practical knowledge", - "Examples: 'Pet name of a minor novel character', 'Content of the 15th line in movie end credits', 'Nickname of website user ID 123456'", - "", - "1.5 points - Very niche detailed information", - "Examples: 'Outfit of a passerby at minute 37 in a movie', 'Duration of background music in a game's hidden level', 'Content of the 3rd dialogue box on page 200 of a manga'", - "", - "2.0 points - Details in niche professional fields", - "Examples: 'Color change of rare minerals at specific temperatures', 'Length of an insect's third antenna', 'Molecular formula of chemical reaction byproducts'", - "", - "2.5 points - Technical details only professionals care about", - "Examples: 'Release date of specific software library version', 'Time complexity coefficient of an algorithm', 'Thermal expansion coefficient of a material'", - "", - "3.0 points - Professional knowledge in specific fields", - "Examples: 'Programming language syntax features', 'Gene sequence of a virus', 'Official system of ancient dynasties'", - "", - "3.5 points - Professional information with some value", - "Examples: 'Specific system of historical dynasty', 'Mechanism of action of a drug', 'Development time of a technical standard'", - "", - "4.0 points - Meaningful knowledge known by few", - "Examples: 'Unique cultural traditions of a country', 'Important discoveries by a scientist', 'Detailed process of historical events'", - "", - "4.5 points - Knowledge of interest to some groups", - "Examples: 'Author's creative background', 'Characteristics of an art movement', 'Detailed rules of a sport'", - "", - "5.0 points - General knowledge of moderate importance", - "Examples: 'Famous attractions in cities', 'Development history of a company', 'Living habits of animals'", - "", - "5.5 points - Fairly useful common sense", - "Examples: 'Plant growth environment', 'Healthy eating common sense', 'Basic first aid knowledge'", - "", - "6.0 points - Knowledge most educated people should know", - "Examples: 'Shakespeare's representative works', 'Basic geometric theorems', 'Major world currencies'", - "", - "6.5 points - Important cultural or scientific common sense", - "Examples: 'Basic structure of DNA', 'Newton's three laws', 'Major world religions'", - "", - "7.0 points - Important foundational knowledge", - "Examples: 'Time period of World War II', 'Functions of major human organs', 'Basic mathematical operation rules'", - "", - "7.5 points - Very important common sense", - "Examples: 'Light speed is the fastest in the universe', 'Earth is round', 'Basic principles of blood circulation'", - "", - "8.0 points - Core knowledge in basic education", - "Examples: 'Earth orbits the sun', 'Principle of seasonal formation', 'Basic grammar rules'", - "", - "8.5 points - Important knowledge everyone should master", - "Examples: 'Chemical formula of water H2O', 'Basic safety common sense', 'Simple mathematical calculations'", - "", - "9.0 points - Extremely important basic concepts", - "Examples: 'Humans need oxygen to survive', 'Fire is hot', 'Basic directional concepts'", - "", - "9.5 points - Core knowledge everyone must know", - "Examples: 'A day has 24 hours', 'A year has 12 months', 'Basic number concepts'", - "", - "10.0 points - Most basic and important common sense", - "Examples: 'Humans need food and water to survive', 'The sky is blue', 'Stones are heavier than feathers'", - "", - "When scoring, please consider:", - "1. Popularity of knowledge - How many people know this knowledge", - "2. Practical value - How useful this knowledge is in daily life", - "3. Educational importance - The position of this knowledge in the education system", - "4. Cultural significance - The importance of this knowledge for understanding the world", - "", - "Please output structured results directly without showing the thinking process." - ], - markdown=False - ) - logger.info("LLM处理器初始化成功") - except Exception as e: - logger.error(f"LLM处理器初始化失败: {e}") - print(f"LLM处理器初始化失败: {e}") - print("将使用基础模式(不使用LLM后处理)") - self.enable_llm_processing = False + def get_system_prompt(self) -> str: + """获取系统提示""" + return """You are a professional text processing assistant responsible for correcting errors in sentences and evaluating the importance of knowledge. - async def process_sentence_with_llm(self, sentence: str) -> ProcessedSentence: - """使用LLM处理单个句子(保留用于单独调用)""" - for attempt in range(self.max_retries): +### Sentence Correction Rules: +1. Remove Wikipedia-specific markers: such as (disambiguation), (film), (band), etc. in parentheses +2. Ensure grammatical completeness: complete subject+predicate+object structure, avoid dangling 'and is', 'or', etc. +3. Fix obvious grammatical errors: tense consistency, singular/plural consistency, correct preposition usage +4. Clean up garbled text and special characters: such as â, €, ™ and other encoding issues +5. Ensure semantic fluency: if the original sentence cannot be fixed, reorganize the language to make it coherent +6. Do not add information not present in the original text, only correct errors + +### Correction Examples: +- Error: 'Argument (disambiguation) is related to philosophy, logic, and is an.' +- Corrected: 'Argument is related to philosophy and logic.' + +- Error: 'Beijing is a capital city and are.' +- Corrected: 'Beijing is a capital city.' + +Importance scoring criteria (0.0-10.0, in increments of 0.1): + +0.0 points - Completely incorrect or meaningless information +Examples: 'Apple is a metal', 'The sun rises from the west', '1+1=3' + +0.5 points - Almost worthless information +Examples: 'Color of a fictional character's socks', 'Third line of dialogue from a game NPC', 'What someone had for breakfast yesterday' + +1.0 points - Extremely rare, non-practical knowledge +Examples: 'Pet name of a minor novel character', 'Content of the 15th line in movie end credits', 'Nickname of website user ID 123456' + +1.5 points - Very niche detailed information +Examples: 'Outfit of a passerby at minute 37 in a movie', 'Duration of background music in a game's hidden level', 'Content of the 3rd dialogue box on page 200 of a manga' + +2.0 points - Details in niche professional fields +Examples: 'Color change of rare minerals at specific temperatures', 'Length of an insect's third antenna', 'Molecular formula of chemical reaction byproducts' + +2.5 points - Technical details only professionals care about +Examples: 'Release date of specific software library version', 'Time complexity coefficient of an algorithm', 'Thermal expansion coefficient of a material' + +3.0 points - Professional knowledge in specific fields +Examples: 'Programming language syntax features', 'Gene sequence of a virus', 'Official system of ancient dynasties' + +3.5 points - Professional information with some value +Examples: 'Specific system of historical dynasty', 'Mechanism of action of a drug', 'Development time of a technical standard' + +4.0 points - Meaningful knowledge known by few +Examples: 'Unique cultural traditions of a country', 'Important discoveries by a scientist', 'Detailed process of historical events' + +4.5 points - Knowledge of interest to some groups +Examples: 'Author's creative background', 'Characteristics of an art movement', 'Detailed rules of a sport' + +5.0 points - General knowledge of moderate importance +Examples: 'Famous attractions in cities', 'Development history of a company', 'Living habits of animals' + +5.5 points - Fairly useful common sense +Examples: 'Plant growth environment', 'Healthy eating common sense', 'Basic first aid knowledge' + +6.0 points - Knowledge most educated people should know +Examples: 'Shakespeare's representative works', 'Basic geometric theorems', 'Major world currencies' + +6.5 points - Important cultural or scientific common sense +Examples: 'Basic structure of DNA', 'Newton's three laws', 'Major world religions' + +7.0 points - Important foundational knowledge +Examples: 'Time period of World War II', 'Functions of major human organs', 'Basic mathematical operation rules' + +7.5 points - Very important common sense +Examples: 'Light speed is the fastest in the universe', 'Earth is round', 'Basic principles of blood circulation' + +8.0 points - Core knowledge in basic education +Examples: 'Earth orbits the sun', 'Principle of seasonal formation', 'Basic grammar rules' + +8.5 points - Important knowledge everyone should master +Examples: 'Chemical formula of water H2O', 'Basic safety common sense', 'Simple mathematical calculations' + +9.0 points - Extremely important basic concepts +Examples: 'Humans need oxygen to survive', 'Fire is hot', 'Basic directional concepts' + +9.5 points - Core knowledge everyone must know +Examples: 'A day has 24 hours', 'A year has 12 months', 'Basic number concepts' + +10.0 points - Most basic and important common sense +Examples: 'Humans need food and water to survive', 'The sky is blue', 'Stones are heavier than feathers' + +When scoring, please consider: +1. Popularity of knowledge - How many people know this knowledge +2. Practical value - How useful this knowledge is in daily life +3. Educational importance - The position of this knowledge in the education system +4. Cultural significance - The importance of this knowledge for understanding world + +Please respond with valid JSON in the following format: +{ + "corrected_sentence": "corrected sentence here", + "importance_score": evaluation score +}""" + + async def process_batch_with_vllm_api(self, sentences: List[str]) -> List[Dict[str, Any]]: + """使用vLLM OpenAI兼容API处理一批句子""" + processed_sentences = [] + + async with aiohttp.ClientSession() as session: + # 创建并发任务 + semaphore = asyncio.Semaphore(self.max_concurrent_requests) + tasks = [] + + # 将句子分成小批次 + for i in range(0, len(sentences), self.batch_size_per_request): + batch_sentences = sentences[i:i + self.batch_size_per_request] + task = self.process_single_batch_request(session, semaphore, batch_sentences, i) + tasks.append(task) + + # 等待所有任务完成 + batch_results = await asyncio.gather(*tasks, return_exceptions=True) + + # 收集结果 + for result in batch_results: + if isinstance(result, Exception): + logger.error(f"批次处理出错: {result}") + continue + if result: + processed_sentences.extend(result) + + return processed_sentences + + async def process_single_batch_request(self, session: aiohttp.ClientSession, semaphore: asyncio.Semaphore, + sentences: List[str], batch_index: int) -> List[Dict[str, Any]]: + """处理单个批次的API请求""" + async with semaphore: + for attempt in range(self.retry_attempts): + try: + # 为每个句子创建单独的消息 + messages = [] + for sentence in sentences: + messages.append({ + "role": "user", + "content": f"Please correct the errors in the following sentence and evaluate its importance: {sentence}" + }) + + # 构建Ollama请求数据 + request_data = { + "model": self.model_name, + "messages": [ + {"role": "system", "content": self.get_system_prompt()} + ] + messages, + "stream": False, + "options": { + "temperature": 0.2, + "num_predict": 500 * len(sentences) # 为每个句子分配足够的token + }, + "format": "json" # Ollama的JSON格式参数 + } + + # 发送请求到Ollama + async with session.post( + f'{self.ollama_base_url}/api/chat', + json=request_data, + timeout=aiohttp.ClientTimeout(total=self.request_timeout) + ) as response: + + if response.status == 200: + result = await response.json() + return self.parse_ollama_response(result, sentences, batch_index) + else: + error_text = await response.text() + logger.warning(f"API请求失败 (批次 {batch_index}, 尝试 {attempt + 1}/{self.retry_attempts}): {response.status} - {error_text}") + + if attempt == self.retry_attempts - 1: # 最后一次尝试 + logger.error(f"批次 {batch_index} 在 {self.retry_attempts} 次尝试后仍然失败") + self.failed_requests += len(sentences) + return self.create_default_responses(sentences) + else: + # 等待后重试 + await asyncio.sleep(2 ** attempt) # 指数退避 + continue + + except asyncio.TimeoutError: + logger.warning(f"批次 {batch_index} 请求超时 (尝试 {attempt + 1}/{self.retry_attempts})") + if attempt == self.retry_attempts - 1: + logger.error(f"批次 {batch_index} 在 {self.retry_attempts} 次尝试后仍然超时") + self.failed_requests += len(sentences) + return self.create_default_responses(sentences) + else: + await asyncio.sleep(2 ** attempt) + continue + + except Exception as e: + logger.warning(f"处理批次 {batch_index} 时出错 (尝试 {attempt + 1}/{self.retry_attempts}): {e}") + if attempt == self.retry_attempts - 1: + logger.error(f"批次 {batch_index} 在 {self.retry_attempts} 次尝试后仍然失败") + self.failed_requests += len(sentences) + return self.create_default_responses(sentences) + else: + await asyncio.sleep(2 ** attempt) + continue + + # 如果所有重试都失败了 + return self.create_default_responses(sentences) + + def parse_ollama_response(self, response: Dict[str, Any], original_sentences: List[str], batch_index: int) -> List[Dict[str, Any]]: + """解析Ollama响应""" + processed_sentences = [] + + try: + # Ollama的响应格式 + message = response.get('message', {}) + content = message.get('content', '') + + if not content: + logger.warning(f"批次 {batch_index} 没有返回内容") + return self.create_default_responses(original_sentences) + + # 尝试解析JSON响应 try: - prompt = f"Please correct the errors in the following sentence and evaluate its importance: {sentence}" - - # 使用asyncio.wait_for添加超时机制 - response = await asyncio.wait_for( - self.agent.arun(prompt), - timeout=self.llm_timeout - ) - - # 根据agno文档,response应该直接是ProcessedSentence类型 - if isinstance(response, ProcessedSentence): - return response - else: - message = response.messages[-1].content - message = message.replace("```json", "").replace("```", "") - message = json.loads(message) - return ProcessedSentence( - corrected_sentence=message['corrected_sentence'], - importance_score=message['importance_score'] + # 如果返回的是单个JSON对象 + if content.strip().startswith('{') and content.strip().endswith('}'): + response_data = json.loads(content) + processed_sentence = ProcessedSentence( + corrected_sentence=response_data.get('corrected_sentence', original_sentences[0] if original_sentences else ""), + importance_score=float(response_data.get('importance_score', 5.0)) ) - except asyncio.TimeoutError: - logger.warning(f"LLM请求超时 (尝试 {attempt + 1}/{self.max_retries}): {sentence[:50]}...") - if attempt == self.max_retries - 1: - logger.error(f"LLM请求最终超时,使用默认处理: {sentence[:50]}...") - break - # 等待一段时间后重试 - await asyncio.sleep(2 ** attempt) # 指数退避 + processed_sentences.append({ + "original_sentence": original_sentences[0] if original_sentences else "", + "corrected_sentence": processed_sentence.corrected_sentence, + "importance_score": processed_sentence.importance_score + }) + self.successful_requests += 1 + + # 如果有多个句子但只返回一个结果,为其他句子创建默认响应 + for i in range(1, len(original_sentences)): + processed_sentences.append({ + "original_sentence": original_sentences[i], + "corrected_sentence": original_sentences[i], + "importance_score": 5.0 + }) + self.failed_requests += 1 + + else: + # 尝试解析多个JSON对象 + json_objects = [] + for line in content.split('\n'): + line = line.strip() + if line.startswith('{') and line.endswith('}'): + try: + json_objects.append(json.loads(line)) + except: + continue + + if json_objects: + for i, (sentence, json_obj) in enumerate(zip(original_sentences, json_objects)): + try: + processed_sentence = ProcessedSentence( + corrected_sentence=json_obj.get('corrected_sentence', sentence), + importance_score=float(json_obj.get('importance_score', 5.0)) + ) + + processed_sentences.append({ + "original_sentence": sentence, + "corrected_sentence": processed_sentence.corrected_sentence, + "importance_score": processed_sentence.importance_score + }) + self.successful_requests += 1 + except Exception as e: + logger.warning(f"解析JSON对象失败: {e}") + processed_sentences.append({ + "original_sentence": sentence, + "corrected_sentence": sentence, + "importance_score": 5.0 + }) + self.failed_requests += 1 + + # 为剩余句子创建默认响应 + for i in range(len(json_objects), len(original_sentences)): + processed_sentences.append({ + "original_sentence": original_sentences[i], + "corrected_sentence": original_sentences[i], + "importance_score": 5.0 + }) + self.failed_requests += 1 + else: + logger.warning(f"批次 {batch_index} 无法解析JSON响应: {content}") + return self.create_default_responses(original_sentences) + + except (json.JSONDecodeError, ValueError) as e: + logger.warning(f"批次 {batch_index} 解析响应JSON失败: {e}") + logger.warning(f"原始内容: {content}") + return self.create_default_responses(original_sentences) - except Exception as e: - logger.error(f"LLM处理句子时出错 (尝试 {attempt + 1}/{self.max_retries}): {e}") - if attempt == self.max_retries - 1: - break - await asyncio.sleep(1) + except Exception as e: + logger.error(f"解析批次 {batch_index} 响应时出错: {e}") + return self.create_default_responses(original_sentences) - # 所有重试都失败,返回原句子和中等评分 - logger.warning(f"使用默认处理: {sentence[:50]}...") - return ProcessedSentence( - corrected_sentence=sentence, - importance_score=5.0 - ) + return processed_sentences + + def create_default_responses(self, sentences: List[str]) -> List[Dict[str, Any]]: + """为失败的请求创建默认响应""" + default_responses = [] + for sentence in sentences: + default_responses.append({ + "original_sentence": sentence, + "corrected_sentence": sentence, + "importance_score": 5.0 + }) + return default_responses + + async def process_sentences_with_vllm_api(self, sentences: List[str]) -> List[Dict[str, Any]]: + """使用Ollama API处理句子""" + logger.info(f"开始使用Ollama API处理 {len(sentences)} 个句子...") + print(f"开始使用Ollama API处理 {len(sentences)} 个句子...") + + start_time = time.time() + total_sentences = len(sentences) + total_processed_count = 0 + + # 检查Ollama服务状态 + if not self.check_ollama_status(): + logger.error("Ollama服务状态异常,无法继续处理") + print("错误:Ollama服务状态异常,请检查服务是否正常运行") + return [] + + # 分大批次处理(用于检查点保存) + large_batch_size = 1000 # 每1000个句子保存一次检查点 + all_processed_sentences = [] + + for large_batch_start in range(0, total_sentences, large_batch_size): + large_batch_end = min(large_batch_start + large_batch_size, total_sentences) + large_batch_sentences = sentences[large_batch_start:large_batch_end] + large_batch_number = large_batch_start // large_batch_size + 1 + + logger.info(f"=== 处理大批次 {large_batch_number} ({large_batch_start + 1}-{large_batch_end}/{total_sentences}) ===") + print(f"\n=== 处理大批次 {large_batch_number} ({large_batch_start + 1}-{large_batch_end}/{total_sentences}) ===") + + large_batch_start_time = time.time() + + # 处理当前大批次 + batch_processed = await self.process_batch_with_vllm_api(large_batch_sentences) + all_processed_sentences.extend(batch_processed) + total_processed_count += len(batch_processed) + + # 保存当前大批次的检查点 + checkpoint_filename = self.save_batch_checkpoint(batch_processed, large_batch_number, total_processed_count) + + # 打印进度 + large_batch_time = time.time() - large_batch_start_time + elapsed_time = time.time() - start_time + + logger.info(f"大批次 {large_batch_number} 处理完成!") + logger.info(f" - 当前批次:成功 {len(batch_processed)},用时 {large_batch_time/60:.1f}分钟") + logger.info(f" - 总体进度:{total_processed_count}/{total_sentences} ({total_processed_count/total_sentences*100:.1f}%)") + logger.info(f" - 已用时间:{elapsed_time/60:.1f}分钟") + logger.info(f" - 批次检查点已保存:{checkpoint_filename}") + + print(f"大批次 {large_batch_number} 处理完成!") + print(f" - 当前批次:成功 {len(batch_processed)},用时 {large_batch_time/60:.1f}分钟") + print(f" - 总体进度:{total_processed_count}/{total_sentences} ({total_processed_count/total_sentences*100:.1f}%)") + print(f" - 已用时间:{elapsed_time/60:.1f}分钟") + print(f" - 批次检查点已保存:{checkpoint_filename}") + + if large_batch_end < total_sentences: + remaining_sentences = total_sentences - total_processed_count + avg_time_per_sentence = elapsed_time / total_processed_count + estimated_remaining_time = avg_time_per_sentence * remaining_sentences + logger.info(f" - 预估剩余时间:{estimated_remaining_time/60:.1f}分钟") + print(f" - 预估剩余时间:{estimated_remaining_time/60:.1f}分钟") + + # 打印最终统计 + total_time = time.time() - start_time + logger.info(f"=== 全部处理完成!===") + logger.info(f" - 总成功:{self.successful_requests}") + logger.info(f" - 总失败:{self.failed_requests}") + logger.info(f" - 总用时:{total_time/60:.1f}分钟") + logger.info(f" - 平均处理速度:{total_processed_count/total_time:.2f}句/秒") + + print(f"\n=== 全部处理完成!===") + print(f" - 总成功:{self.successful_requests}") + print(f" - 总失败:{self.failed_requests}") + print(f" - 总用时:{total_time/60:.1f}分钟") + print(f" - 平均处理速度:{total_processed_count/total_time:.2f}句/秒") + + return all_processed_sentences + + def check_ollama_status(self) -> bool: + """检查Ollama服务是否正常运行""" + try: + # 检查Ollama API是否响应 + response = requests.get(f'{self.ollama_base_url}/api/tags', timeout=10) + + if response.status_code == 200: + models = response.json() + model_names = [model.get('name', 'unknown') for model in models.get('models', [])] + logger.info(f"Ollama服务状态正常,可用模型: {model_names}") + + # 检查目标模型是否可用 + if self.model_name in model_names: + logger.info(f"目标模型 {self.model_name} 可用") + return True + else: + logger.warning(f"目标模型 {self.model_name} 不在可用模型列表中: {model_names}") + logger.info("尝试拉取模型...") + # 尝试拉取模型 + try: + pull_response = requests.post( + f'{self.ollama_base_url}/api/pull', + json={"name": self.model_name}, + timeout=300 # 5分钟超时 + ) + if pull_response.status_code == 200: + logger.info(f"成功拉取模型 {self.model_name}") + return True + else: + logger.error(f"拉取模型失败: {pull_response.status_code}") + return False + except Exception as e: + logger.error(f"拉取模型时出错: {e}") + return False + else: + logger.error(f"Ollama API响应异常,状态码: {response.status_code}") + return False + + except requests.exceptions.RequestException as e: + logger.error(f"无法连接到Ollama API: {e}") + return False + except Exception as e: + logger.error(f"检查Ollama状态时出错: {e}") + return False def clean_text(self, text: str) -> str: """清理文本,处理特殊字符""" @@ -459,300 +719,20 @@ class EnhancedTRExProcessor: print(f"Error converting triple to sentence: {e}") return "" - async def process_sentence_with_llm_concurrent(self, semaphore: asyncio.Semaphore, sentence: str, index: int, total_sentences: int, start_time: float) -> Dict[str, Any]: - """使用信号量控制并发的LLM处理""" - async with semaphore: - self.total_requests += 1 - self.last_activity_time = time.time() # 更新活动时间 - success = False - - for attempt in range(self.max_retries): - try: - prompt = f"Please correct the errors in the following sentence and evaluate its importance: {sentence}" - - # 使用asyncio.wait_for添加超时机制 - response = await asyncio.wait_for( - self.agent.arun(prompt), - timeout=self.llm_timeout - ) - - # 根据agno文档,response应该直接是ProcessedSentence类型 - if isinstance(response, ProcessedSentence): - result = { - "index": index, - "original_sentence": sentence, - "corrected_sentence": response.corrected_sentence, - "importance_score": response.importance_score - } - else: - message = response.messages[-1].content - message = message.replace("```json", "").replace("```", "") - message = json.loads(message) - result = { - "index": index, - "original_sentence": sentence, - "corrected_sentence": message['corrected_sentence'], - "importance_score": message['importance_score'] - } - - # 成功处理 - self.successful_requests += 1 - self.last_successful_time = time.time() - self.last_activity_time = time.time() # 更新活动时间 - success = True - - # 打印详细进度信息 - 降低频率到每50个 - if index % 50 == 0: - current_time = time.time() - elapsed_time = current_time - start_time - avg_time_per_sentence = elapsed_time / (index + 1) if index > 0 else elapsed_time - remaining_sentences = total_sentences - (index + 1) - estimated_remaining_time = avg_time_per_sentence * remaining_sentences - success_rate = (self.successful_requests / self.total_requests * 100) if self.total_requests > 0 else 0 - - # 格式化时间显示 - def format_time(seconds): - if seconds < 60: - return f"{seconds:.1f}秒" - elif seconds < 3600: - minutes = seconds / 60 - return f"{minutes:.1f}分钟" - else: - hours = seconds / 3600 - return f"{hours:.1f}小时" - - logger.info(f"已完成第 {index + 1} 个句子的处理") - logger.info(f" - 剩余句子数: {remaining_sentences}") - logger.info(f" - 平均处理时间: {avg_time_per_sentence:.2f}秒/句") - logger.info(f" - 预估剩余时间: {format_time(estimated_remaining_time)}") - logger.info(f" - 已用时间: {format_time(elapsed_time)}") - logger.info(f" - 成功率: {success_rate:.1f}% ({self.successful_requests}/{self.total_requests})") - - print(f"已完成第 {index + 1} 个句子的处理") - print(f" - 剩余句子数: {remaining_sentences}") - print(f" - 平均处理时间: {avg_time_per_sentence:.2f}秒/句") - print(f" - 预估剩余时间: {format_time(estimated_remaining_time)}") - print(f" - 已用时间: {format_time(elapsed_time)}") - print(f" - 成功率: {success_rate:.1f}% ({self.successful_requests}/{self.total_requests})") - - return result - - except asyncio.TimeoutError: - self.timeout_requests += 1 - self.last_activity_time = time.time() # 更新活动时间 - logger.warning(f"第 {index} 个句子处理超时 (尝试 {attempt + 1}/{self.max_retries}): {sentence[:50]}...") - if attempt == self.max_retries - 1: - logger.error(f"第 {index} 个句子最终超时,使用默认处理") - break - # 指数退避 - await asyncio.sleep(2 ** attempt) - - except Exception as e: - self.last_activity_time = time.time() # 更新活动时间 - logger.error(f"处理第 {index} 个句子时出错 (尝试 {attempt + 1}/{self.max_retries}): {e}") - if attempt == self.max_retries - 1: - break - await asyncio.sleep(1) - - # 所有重试都失败,使用默认处理 - if not success: - self.failed_requests += 1 - logger.warning(f"第 {index} 个句子使用默认处理: {sentence[:50]}...") - - return { - "index": index, - "original_sentence": sentence, - "corrected_sentence": sentence, - "importance_score": 5.0 - } - - async def heartbeat_monitor(self, total_sentences: int): - """心跳监控,检测是否有长时间无响应""" - consecutive_warnings = 0 - - while True: - await asyncio.sleep(self.heartbeat_interval) - - current_time = time.time() - time_since_last_success = current_time - self.last_successful_time - time_since_last_activity = current_time - self.last_activity_time - - # 检查最后成功时间 - if time_since_last_success > self.heartbeat_interval: - consecutive_warnings += 1 - logger.warning(f"⚠️ 心跳检测 #{consecutive_warnings}:已有 {time_since_last_success:.1f} 秒没有成功的LLM响应") - print(f"⚠️ 心跳检测 #{consecutive_warnings}:已有 {time_since_last_success:.1f} 秒没有成功的LLM响应") - - # 打印当前统计信息 - if self.total_requests > 0: - success_rate = self.successful_requests / self.total_requests * 100 - logger.warning(f" 当前统计:总请求 {self.total_requests},成功 {self.successful_requests} ({success_rate:.1f}%),超时 {self.timeout_requests}") - print(f" 当前统计:总请求 {self.total_requests},成功 {self.successful_requests} ({success_rate:.1f}%),超时 {self.timeout_requests}") - - if time_since_last_success > self.heartbeat_interval * 3: - logger.error(f"❌ 严重警告:LLM可能已卡死,超过 {time_since_last_success:.1f} 秒无成功响应!") - print(f"❌ 严重警告:LLM可能已卡死,超过 {time_since_last_success:.1f} 秒无成功响应!") - print(f" 建议:检查ollama服务状态,或考虑重启程序") - - # 检查ollama服务状态 - if not self.check_ollama_status(): - logger.critical("💀 Ollama服务异常,这可能是卡死的原因!") - print("💀 Ollama服务异常,这可能是卡死的原因!") - - if consecutive_warnings >= 5: - logger.critical(f"💀 致命错误:连续 {consecutive_warnings} 次心跳警告,可能需要人工干预") - print(f"💀 致命错误:连续 {consecutive_warnings} 次心跳警告,可能需要人工干预") - else: - if consecutive_warnings > 0: - logger.info(f"✅ 心跳恢复正常:最后成功时间 {time_since_last_success:.1f} 秒前") - print(f"✅ 心跳恢复正常:最后成功时间 {time_since_last_success:.1f} 秒前") - consecutive_warnings = 0 - logger.debug(f"💓 心跳正常:最后成功时间 {time_since_last_success:.1f} 秒前") - - async def process_sentences_with_llm(self, sentences: List[str]) -> List[Dict[str, Any]]: - """批量并发处理句子,每2000条保存一次检查点""" - logger.info(f"开始使用LLM并发处理 {len(sentences)} 个句子(最大并发数:{self.max_concurrent})...") - print(f"开始使用LLM并发处理 {len(sentences)} 个句子(最大并发数:{self.max_concurrent})...") - - # 记录开始时间 - start_time = time.time() - total_sentences = len(sentences) - - # 分批处理,每批1000个句子(减少批次大小) - batch_size = 1000 - all_processed_sentences = [] - - # 启动心跳监控 - heartbeat_task = asyncio.create_task(self.heartbeat_monitor(total_sentences)) - - try: - for batch_start in range(0, total_sentences, batch_size): - batch_end = min(batch_start + batch_size, total_sentences) - batch_sentences = sentences[batch_start:batch_end] - - logger.info(f"=== 处理第 {batch_start//batch_size + 1} 批 ({batch_start + 1}-{batch_end}/{total_sentences}) ===") - print(f"\n=== 处理第 {batch_start//batch_size + 1} 批 ({batch_start + 1}-{batch_end}/{total_sentences}) ===") - - # 创建信号量限制并发数(降低到8) - semaphore = asyncio.Semaphore(self.max_concurrent) - - # 重置批次统计 - batch_start_time = time.time() - self.total_requests = 0 - self.successful_requests = 0 - self.failed_requests = 0 - self.timeout_requests = 0 - - # 创建当前批次的任务 - tasks = [] - for i, sentence in enumerate(batch_sentences): - global_index = batch_start + i - task = self.process_sentence_with_llm_concurrent(semaphore, sentence, global_index, total_sentences, start_time) - tasks.append(task) - - # 并发执行当前批次的任务 - logger.info(f"正在并发处理第 {batch_start//batch_size + 1} 批的 {len(batch_sentences)} 个句子...") - print(f"正在并发处理第 {batch_start//batch_size + 1} 批的 {len(batch_sentences)} 个句子...") - - batch_results = await asyncio.gather(*tasks, return_exceptions=True) - - # 处理当前批次的结果,过滤异常 - batch_processed_sentences = [] - batch_error_count = 0 - - for result in batch_results: - if isinstance(result, Exception): - logger.error(f"任务执行异常: {result}") - print(f"任务执行异常: {result}") - batch_error_count += 1 - elif isinstance(result, dict): - batch_processed_sentences.append(result) - else: - batch_error_count += 1 - - # 按原始顺序排序(因为并发执行可能改变顺序) - batch_processed_sentences.sort(key=lambda x: x['index']) - - # 移除index字段 - for item in batch_processed_sentences: - del item['index'] - - # 添加到总结果中 - all_processed_sentences.extend(batch_processed_sentences) - - # 保存检查点 - checkpoint_filename = self.save_checkpoint(all_processed_sentences, batch_end) - - # 打印当前批次统计信息 - elapsed_time = time.time() - start_time - batch_time = time.time() - batch_start_time - completed_sentences = len(all_processed_sentences) - - logger.info(f"第 {batch_start//batch_size + 1} 批处理完成!") - logger.info(f" - 当前批次:成功 {len(batch_processed_sentences)},失败 {batch_error_count}") - logger.info(f" - 批次用时:{batch_time/60:.1f}分钟") - logger.info(f" - LLM统计:成功 {self.successful_requests},失败 {self.failed_requests},超时 {self.timeout_requests}") - logger.info(f" - 总体进度:{completed_sentences}/{total_sentences} ({completed_sentences/total_sentences*100:.1f}%)") - logger.info(f" - 已用时间:{elapsed_time/60:.1f}分钟") - logger.info(f" - 平均速度:{completed_sentences/elapsed_time:.2f}句/秒") - logger.info(f" - 检查点已保存:{checkpoint_filename}") - - print(f"第 {batch_start//batch_size + 1} 批处理完成!") - print(f" - 当前批次:成功 {len(batch_processed_sentences)},失败 {batch_error_count}") - print(f" - 批次用时:{batch_time/60:.1f}分钟") - print(f" - LLM统计:成功 {self.successful_requests},失败 {self.failed_requests},超时 {self.timeout_requests}") - print(f" - 总体进度:{completed_sentences}/{total_sentences} ({completed_sentences/total_sentences*100:.1f}%)") - print(f" - 已用时间:{elapsed_time/60:.1f}分钟") - print(f" - 平均速度:{completed_sentences/elapsed_time:.2f}句/秒") - print(f" - 检查点已保存:{checkpoint_filename}") - - if batch_end < total_sentences: - remaining_sentences = total_sentences - completed_sentences - avg_time_per_sentence = elapsed_time / completed_sentences - estimated_remaining_time = avg_time_per_sentence * remaining_sentences - logger.info(f" - 预估剩余时间:{estimated_remaining_time/60:.1f}分钟") - print(f" - 预估剩余时间:{estimated_remaining_time/60:.1f}分钟") - - # 在批次之间稍作休息,避免过度压力 - if batch_end < total_sentences: - logger.info("批次间休息5秒...") - await asyncio.sleep(5) - - finally: - # 取消心跳监控 - heartbeat_task.cancel() - try: - await heartbeat_task - except asyncio.CancelledError: - pass - - # 打印最终统计信息 - total_time = time.time() - start_time - logger.info(f"=== 全部处理完成!===") - logger.info(f" - 总成功:{len(all_processed_sentences)}") - logger.info(f" - 总用时:{total_time/60:.1f}分钟") - logger.info(f" - 平均处理速度:{len(all_processed_sentences)/total_time:.2f}句/秒") - - print(f"\n=== 全部处理完成!===") - print(f" - 总成功:{len(all_processed_sentences)}") - print(f" - 总用时:{total_time/60:.1f}分钟") - print(f" - 平均处理速度:{len(all_processed_sentences)/total_time:.2f}句/秒") - - return all_processed_sentences - - def save_checkpoint(self, processed_sentences: List[Dict[str, Any]], current_count: int) -> str: - """保存检查点文件""" + def save_batch_checkpoint(self, processed_sentences: List[Dict[str, Any]], batch_number: int, total_processed_count: int) -> str: + """保存当前批次的检查点文件""" # 生成检查点文件名,确保在output目录中 base_name = os.path.splitext(os.path.basename(self.output_file))[0] - checkpoint_filename = os.path.join('output', f"{base_name}_checkpoint_{current_count}.json") + checkpoint_filename = os.path.join('output', f"{base_name}_batch_{batch_number}.json") # 保存检查点 with open(checkpoint_filename, 'w', encoding='utf-8') as f: json.dump({ "metadata": { - "total_processed": len(processed_sentences), - "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), - "checkpoint_number": current_count + "batch_number": batch_number, + "batch_size": len(processed_sentences), + "total_processed_count": total_processed_count, + "timestamp": time.strftime("%Y-%m-%d %H:%M:%S") }, "sentences": processed_sentences }, f, ensure_ascii=False, indent=2) @@ -913,11 +893,41 @@ class EnhancedTRExProcessor: return set() processed_sentences = set() - - # 查找所有检查点文件 base_name = os.path.splitext(os.path.basename(self.output_file))[0] - pattern = os.path.join('output', f"{base_name}_checkpoint_*.json") - checkpoint_files = glob.glob(pattern) + + # 首先查找新格式的批次文件 + batch_pattern = os.path.join('output', f"{base_name}_batch_*.json") + batch_files = glob.glob(batch_pattern) + + if batch_files: + print(f"找到 {len(batch_files)} 个批次检查点文件") + batch_files.sort() # 确保按顺序处理 + + for batch_file in batch_files: + try: + with open(batch_file, 'r', encoding='utf-8') as f: + data = json.load(f) + + sentences_data = data.get('sentences', []) + for item in sentences_data: + original_sentence = item.get('original_sentence', '') + if original_sentence: + processed_sentences.add(original_sentence) + + batch_number = data.get('metadata', {}).get('batch_number', 'unknown') + print(f" - 从批次 {batch_number} 加载了 {len(sentences_data)} 个句子") + + except Exception as e: + print(f"读取批次文件 {batch_file} 失败: {e}") + continue + + print(f"从批次文件总计加载了 {len(processed_sentences)} 个已处理的句子") + logger.info(f"从批次文件总计加载了 {len(processed_sentences)} 个已处理的句子") + return processed_sentences + + # 如果没有批次文件,尝试查找旧格式的检查点文件 + old_pattern = os.path.join('output', f"{base_name}_checkpoint_*.json") + checkpoint_files = glob.glob(old_pattern) if not checkpoint_files: print("未找到检查点文件,将从头开始处理") @@ -939,8 +949,8 @@ class EnhancedTRExProcessor: continue if latest_file: - print(f"找到最新检查点: {latest_file} (包含 {latest_count} 条记录)") - logger.info(f"找到最新检查点: {latest_file} (包含 {latest_count} 条记录)") + print(f"找到旧格式检查点: {latest_file} (包含 {latest_count} 条记录)") + logger.info(f"找到旧格式检查点: {latest_file} (包含 {latest_count} 条记录)") try: with open(latest_file, 'r', encoding='utf-8') as f: data = json.load(f) @@ -951,8 +961,8 @@ class EnhancedTRExProcessor: if original_sentence: processed_sentences.add(original_sentence) - print(f"从检查点加载了 {len(processed_sentences)} 个已处理的句子") - logger.info(f"从检查点加载了 {len(processed_sentences)} 个已处理的句子") + print(f"从旧格式检查点加载了 {len(processed_sentences)} 个已处理的句子") + logger.info(f"从旧格式检查点加载了 {len(processed_sentences)} 个已处理的句子") except Exception as e: print(f"读取检查点文件失败: {e}") @@ -961,7 +971,11 @@ class EnhancedTRExProcessor: return processed_sentences async def process_with_llm(self): - """步骤2:从JSON文件读取句子并进行LLM处理""" + """步骤2:从JSON文件读取句子并进行vLLM处理(保持兼容性)""" + await self.process_with_vllm_api() + + async def process_with_vllm_api(self): + """步骤2:从JSON文件读取句子并进行vLLM处理""" if not self.enable_llm_processing: print("Error: LLM processing is disabled!") return @@ -970,7 +984,7 @@ class EnhancedTRExProcessor: print("Error: output_file is required for LLM processing!") return - print("=== 步骤2:LLM处理 ===") + print("=== 步骤2:vLLM处理 ===") # 读取句子JSON文件 if not os.path.exists(self.sentences_json): @@ -1017,37 +1031,68 @@ class EnhancedTRExProcessor: return # 处理未处理的句子 - print("开始LLM处理...") + print("开始vLLM处理...") - # 检查ollama服务状态 - logger.info("检查Ollama服务状态...") - if not self.check_ollama_status(): - logger.error("Ollama服务状态异常,无法继续处理") - print("错误:Ollama服务状态异常,请检查服务是否正常运行") - return + # 处理新句子(现在返回空列表,数据保存在批次检查点中) + await self.process_sentences_with_vllm_api(unprocessed_sentences) - new_processed_sentences = await self.process_sentences_with_llm(unprocessed_sentences) + # 处理完成后,合并所有批次检查点生成最终文件 + print("合并所有批次检查点生成最终文件...") + all_processed_sentences = self.merge_all_batch_checkpoints() - # 如果有之前的处理结果,合并它们 - if processed_sentences_set: - latest_checkpoint = self.find_latest_checkpoint() - if latest_checkpoint: - checkpoint_file, _ = latest_checkpoint - previous_processed = self.load_checkpoint(checkpoint_file) - - # 合并结果 - all_processed_sentences = previous_processed + new_processed_sentences - print(f"合并了之前的 {len(previous_processed)} 条和新处理的 {len(new_processed_sentences)} 条记录") - else: - all_processed_sentences = new_processed_sentences + if all_processed_sentences: + # 保存最终结果 + self.save_sentences(all_processed_sentences) + print("vLLM处理完成!") else: - all_processed_sentences = new_processed_sentences - - # 保存最终结果 - self.save_sentences(all_processed_sentences) - print("LLM处理完成!") + print("警告:没有找到任何处理结果") - # ==================== 新增:句子提取功能 ==================== + def merge_all_batch_checkpoints(self) -> List[Dict[str, Any]]: + """合并所有批次检查点文件""" + if not self.output_file: + return [] + + base_name = os.path.splitext(os.path.basename(self.output_file))[0] + + # 查找所有批次检查点文件 + batch_pattern = os.path.join('output', f"{base_name}_batch_*.json") + batch_files = glob.glob(batch_pattern) + + if not batch_files: + # 如果没有批次文件,尝试查找旧格式的检查点文件 + old_pattern = os.path.join('output', f"{base_name}_checkpoint_*.json") + old_files = glob.glob(old_pattern) + if old_files: + print("找到旧格式检查点文件,尝试读取最新的...") + latest_checkpoint = self.find_latest_checkpoint() + if latest_checkpoint: + checkpoint_file, _ = latest_checkpoint + return self.load_checkpoint(checkpoint_file) + return [] + + print(f"找到 {len(batch_files)} 个批次检查点文件") + + all_sentences = [] + batch_files.sort() # 确保按顺序处理 + + for batch_file in batch_files: + try: + with open(batch_file, 'r', encoding='utf-8') as f: + data = json.load(f) + + batch_sentences = data.get('sentences', []) + all_sentences.extend(batch_sentences) + + batch_number = data.get('metadata', {}).get('batch_number', 'unknown') + batch_size = len(batch_sentences) + print(f" - 批次 {batch_number}: {batch_size} 个句子") + + except Exception as e: + print(f"读取批次文件 {batch_file} 失败: {e}") + continue + + print(f"总计合并了 {len(all_sentences)} 个句子") + return all_sentences def extract_sentences(self): """步骤1:从TREx数据集提取句子并保存为JSON""" @@ -1118,41 +1163,16 @@ class EnhancedTRExProcessor: return unique_sentences - def check_ollama_status(self) -> bool: - """检查ollama服务是否正常运行""" - try: - # 检查ollama进程是否运行 - result = subprocess.run(['pgrep', 'ollama'], capture_output=True, text=True) - if result.returncode != 0: - logger.error("Ollama进程未运行") - return False - - # 检查ollama API是否响应 - response = requests.get('http://localhost:11434/api/tags', timeout=5) - if response.status_code == 200: - logger.info("Ollama服务状态正常") - return True - else: - logger.error(f"Ollama API响应异常,状态码: {response.status_code}") - return False - - except requests.exceptions.RequestException as e: - logger.error(f"无法连接到Ollama API: {e}") - return False - except Exception as e: - logger.error(f"检查Ollama状态时出错: {e}") - return False - def main(): """主函数""" import argparse - parser = argparse.ArgumentParser(description='Convert TREx dataset to enhanced sentences with LLM processing') + parser = argparse.ArgumentParser(description='Convert TREx dataset to enhanced sentences with vLLM processing') # 选择运行模式 parser.add_argument('--step', choices=['extract', 'llm', 'all'], default='llm', - help='运行步骤: extract=仅提取句子, llm=仅LLM处理, all=完整流程') + help='运行步骤: extract=仅提取句子, llm=仅vLLM处理, all=完整流程') # 文件路径参数 parser.add_argument('--input_dir', default='dataset/TREx', help='Input directory containing TREx JSON files') @@ -1161,7 +1181,7 @@ def main(): # 处理参数 parser.add_argument('--max_files', type=int, help='Maximum number of files to process (for testing)') - parser.add_argument('--no_llm', action='store_true', help='Disable LLM processing (basic mode)') + parser.add_argument('--no_llm', action='store_true', help='Disable vLLM processing (basic mode)') args = parser.parse_args() @@ -1173,7 +1193,7 @@ def main(): if args.step in ['llm', 'all']: if args.no_llm: - print("Error: Cannot run LLM step with --no_llm flag!") + print("Error: Cannot run vLLM step with --no_llm flag!") return # 创建处理器 @@ -1191,8 +1211,8 @@ def main(): processor.extract_sentences() elif args.step == 'llm': - print("=== 运行模式:仅LLM处理 ===") - asyncio.run(processor.process_with_llm()) + print("=== 运行模式:仅vLLM处理 ===") + asyncio.run(processor.process_with_vllm_api()) elif args.step == 'all': print("=== 运行模式:完整流程 ===") @@ -1206,12 +1226,12 @@ def main(): return if args.no_llm: - print("LLM处理已禁用,流程结束") + print("vLLM处理已禁用,流程结束") return - # 步骤2:LLM处理 - print("\n--- 开始步骤2:LLM处理 ---") - asyncio.run(processor.process_with_llm()) + # 步骤2:vLLM处理 + print("\n--- 开始步骤2:vLLM处理 ---") + asyncio.run(processor.process_with_vllm_api()) if __name__ == "__main__":