使用TREx数据集生成数据库初始化问题

This commit is contained in:
iomgaa 2025-05-29 19:30:19 +08:00
parent c5d0a3aba3
commit 6932e5fa8e
3 changed files with 841 additions and 539 deletions

View File

@ -4,10 +4,35 @@
1. **句子提取**:从 TREx 数据集提取三元组并转换为自然语言句子 1. **句子提取**:从 TREx 数据集提取三元组并转换为自然语言句子
2. **LLM 处理**:使用 ollama qwen3:4b 模型进行句子修正和重要性评分 2. **LLM 处理**:使用 ollama qwen3:4b 模型进行句子修正和重要性评分
## 🆕 防卡死机制
为了解决LLM处理时可能出现的卡死问题新增了以下功能
### 超时和重试机制
- **超时时间**每个LLM请求60秒超时
- **重试机制**失败后最多重试2次采用指数退避策略
- **并发控制**降低并发数至4个减少服务器压力
### 心跳监控系统
- **实时监控**每30秒检查一次LLM响应状态
- **异常警告**超过30秒无成功响应时发出警告
- **服务检测**自动检查ollama服务状态
- **详细统计**:实时显示成功率、超时率等统计信息
### 日志系统
- **详细日志**:所有操作都记录在 `logs/` 目录下
- **双重输出**:同时输出到日志文件和控制台
- **时间戳标记**:日志文件包含启动时间戳
### 改进的错误处理
- **异常恢复**LLM处理失败时使用原句子和默认评分
- **状态监控**处理前检查ollama服务状态
- **批次间休息**批次之间休息5秒避免过度压力
## 安装依赖 ## 安装依赖
```bash ```bash
pip install agno asyncio pydantic pip install agno asyncio pydantic requests
``` ```
确保已安装并启动 ollama并下载 qwen3:4b 模型: 确保已安装并启动 ollama并下载 qwen3:4b 模型:
@ -50,24 +75,52 @@ python trex_to_sentences_simple.py --step llm --sentences_json my_sentences.json
## 输出文件 ## 输出文件
**注意:所有输出文件都会自动保存在 `./output/` 目录中** **注意:所有输出文件都会自动保存在相应目录中**
### 步骤1输出 ### 句子提取输出
- `output/extracted_sentences.json`: 提取的原始句子,包含元数据 - `output/extracted_sentences.json`: 提取的原始句子,包含元数据
### 步骤2输出 ### LLM处理输出
- `output/{output_file}.txt`: 修正后的句子文本文件 - `output/{output_file}.txt`: 修正后的句子文本文件
- `output/{output_file}.json`: 完整的处理结果(包含原句、修正句、评分) - `output/{output_file}.json`: 完整的处理结果(包含原句、修正句、评分)
- `output/{output_file}_sorted_by_importance.txt`: 按重要性评分排序的句子 - `output/{output_file}_sorted_by_importance.txt`: 按重要性评分排序的句子
### 检查点文件 ### 检查点文件
- `output/{output_file}_checkpoint_{数量}.json`: 每2000条句子自动保存的检查点 - `output/{output_file}_checkpoint_{数量}.json`: 每1000条句子自动保存的检查点
### 日志文件
- `logs/trex_processor_{时间戳}.log`: 详细的处理日志
## 🆕 故障诊断
### 如果遇到卡死问题:
1. **检查日志文件**:查看 `logs/` 目录下的最新日志
2. **观察心跳监控**:注意控制台的心跳警告信息
3. **检查ollama服务**
```bash
ps aux | grep ollama
curl http://localhost:11434/api/tags
```
4. **重启ollama服务**(如果需要):
```bash
pkill ollama
ollama serve &
```
### 常见警告信息:
- `⚠️ 心跳检测`: 30秒无成功响应正常情况下会自动恢复
- `❌ 严重警告`: 90秒无成功响应可能需要检查服务
- `💀 Ollama服务异常`: ollama服务可能已停止
- `💀 致命错误`: 连续多次警告(建议重启程序)
## 检查点恢复机制 ## 检查点恢复机制
- 步骤2会自动检测已有的检查点文件`output/` 目录中) - 步骤2会自动检测已有的检查点文件`output/` 目录中)
- 只处理尚未处理的句子,避免重复工作 - 只处理尚未处理的句子,避免重复工作
- 如果所有句子都已处理,会直接生成最终输出文件 - 如果所有句子都已处理,会直接生成最终输出文件
- 中断后重新运行会自动从最新检查点继续
## 示例工作流 ## 示例工作流
@ -84,14 +137,18 @@ python trex_to_sentences_simple.py --step llm
## 性能特点 ## 性能特点
- **并发处理**: 最大54个并发LLM请求 - **保守的并发**: 最大4个并发LLM请求降低卡死风险
- **检查点保存**: 每2000条句子自动保存支持断点续传 - **检查点保存**: 每1000条句子自动保存支持断点续传
- **进度显示**: 详细的处理进度和时间预估 - **智能监控**: 详细的处理进度和时间预估
- **错误处理**: LLM请求失败时使用原句子和默认评分 - **健壮的错误处理**: LLM请求失败时使用原句子和默认评分
- **服务监控**: 自动检测ollama服务状态
## 注意事项 ## 注意事项
1. 首次运行步骤2前必须先完成步骤1 1. 首次运行步骤2前必须先完成步骤1
2. 检查点文件会占用额外磁盘空间(每个都包含所有已处理数据) 2. 检查点文件会占用额外磁盘空间(每个都包含所有已处理数据)
3. LLM处理速度取决于模型性能和网络状况 3. LLM处理速度取决于模型性能和网络状况
4. 建议先用`--max_files`参数测试小批量数据 4. 建议先用`--max_files`参数测试小批量数据
5. **新增**:如果遇到卡死,查看日志文件和心跳监控信息
6. **新增**程序会自动检测并报告ollama服务状态
7. **新增**:所有处理过程都有详细日志记录,便于问题诊断

View File

@ -0,0 +1,225 @@
#!/usr/bin/env python3
"""
JSON文件合并脚本
读取多个JSON文件并合并为一个JSON文件
"""
import json
import os
from typing import Dict, List, Any, Union
# 需要合并的JSON文件列表
JSON_FILES_TO_MERGE = [
"output/trex_sentences_enhanced_checkpoint_360000.json"
]
for i in range(1, 1010):
JSON_FILES_TO_MERGE.append(f"output/trex_sentences_enhanced_batch_{i}.json")
def load_json_file(file_path: str) -> Union[Dict, List, None]:
"""加载JSON文件"""
if not os.path.exists(file_path):
print(f"警告: 文件 {file_path} 不存在")
return None
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
print(f"成功加载: {file_path}")
return data
except json.JSONDecodeError as e:
print(f"错误: 无法解析JSON文件 {file_path} - {e}")
return None
except Exception as e:
print(f"错误: 读取文件 {file_path} 失败 - {e}")
return None
def merge_json_data(data1: Union[Dict, List], data2: Union[Dict, List]) -> Union[Dict, List]:
"""合并两个JSON数据结构"""
# 如果两个都是列表,直接合并
if isinstance(data1, list) and isinstance(data2, list):
print(f"合并两个列表: {len(data1)} + {len(data2)} = {len(data1) + len(data2)}")
return data1 + data2
# 如果两个都是字典
elif isinstance(data1, dict) and isinstance(data2, dict):
print("合并两个字典结构")
merged = data1.copy()
# 特殊处理:如果都有'sentences'字段且为列表合并sentences
if 'sentences' in data1 and 'sentences' in data2:
if isinstance(data1['sentences'], list) and isinstance(data2['sentences'], list):
print(f"合并sentences字段: {len(data1['sentences'])} + {len(data2['sentences'])} = {len(data1['sentences']) + len(data2['sentences'])}")
merged['sentences'] = data1['sentences'] + data2['sentences']
# 更新metadata if exists
if 'metadata' in merged:
if isinstance(merged['metadata'], dict):
merged['metadata']['total_sentences'] = len(merged['sentences'])
merged['metadata']['merged_from'] = [os.path.basename(f) for f in JSON_FILES_TO_MERGE if os.path.exists(f)]
# 合并其他字段
for key, value in data2.items():
if key != 'sentences' and key not in merged:
merged[key] = value
return merged
# 普通字典合并
for key, value in data2.items():
if key in merged:
# 如果key重复且都是列表合并列表
if isinstance(merged[key], list) and isinstance(value, list):
merged[key] = merged[key] + value
# 如果key重复且都是字典递归合并
elif isinstance(merged[key], dict) and isinstance(value, dict):
merged[key] = merge_json_data(merged[key], value)
else:
# 其他情况保留第二个文件的值
merged[key] = value
print(f"字段 '{key}' 被覆盖")
else:
merged[key] = value
return merged
# 类型不匹配的情况,创建一个包含两者的新结构
else:
print("数据类型不匹配,创建包含两者的新结构")
return {
"data_from_save.json": data1,
"data_from_save2.json": data2,
"merged_at": "test.py"
}
def save_merged_json(data: Union[Dict, List], output_path: str):
"""保存合并后的JSON数据"""
try:
# 确保输出目录存在
os.makedirs(os.path.dirname(output_path), exist_ok=True)
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print(f"合并结果已保存到: {output_path}")
# 显示统计信息
if isinstance(data, dict):
if 'sentences' in data and isinstance(data['sentences'], list):
print(f"总计句子数: {len(data['sentences'])}")
print(f"总计字段数: {len(data)}")
elif isinstance(data, list):
print(f"总计列表项数: {len(data)}")
except Exception as e:
print(f"错误: 保存文件失败 - {e}")
def remove_duplicates_from_sentences(data: Union[Dict, List]) -> Union[Dict, List]:
"""从合并结果中移除重复的句子(基于句子内容)"""
if isinstance(data, dict) and 'sentences' in data:
if isinstance(data['sentences'], list):
original_count = len(data['sentences'])
seen_sentences = set()
unique_sentences = []
for item in data['sentences']:
if isinstance(item, dict):
# 如果是字典使用sentence字段或corrected_sentence字段作为唯一标识
sentence_key = item.get('sentence') or item.get('corrected_sentence') or item.get('original_sentence')
elif isinstance(item, str):
sentence_key = item
else:
sentence_key = str(item)
if sentence_key and sentence_key not in seen_sentences:
seen_sentences.add(sentence_key)
unique_sentences.append(item)
data['sentences'] = unique_sentences
# 更新metadata
if 'metadata' in data and isinstance(data['metadata'], dict):
data['metadata']['total_sentences'] = len(unique_sentences)
data['metadata']['duplicates_removed'] = original_count - len(unique_sentences)
print(f"去重完成: {original_count} -> {len(unique_sentences)} (移除了 {original_count - len(unique_sentences)} 个重复项)")
return data
def merge_multiple_json_data(data_list: List[Union[Dict, List]]) -> Union[Dict, List]:
"""合并多个JSON数据结构"""
if not data_list:
return {}
if len(data_list) == 1:
return data_list[0]
print(f"准备合并 {len(data_list)} 个JSON数据结构")
# 从第一个数据开始,逐步合并其他数据
merged_data = data_list[0]
for i, data in enumerate(data_list[1:], 1):
print(f"正在合并第 {i+1} 个数据结构...")
merged_data = merge_json_data(merged_data, data)
return merged_data
def main():
"""主函数"""
print("=== JSON文件合并脚本 ===")
# 输出路径
output_path = "output/merged.json"
print(f"准备合并以下文件:")
for i, file_path in enumerate(JSON_FILES_TO_MERGE, 1):
print(f" {i}. {file_path}")
print(f"输出文件: {output_path}")
print()
# 加载所有文件
loaded_data = []
successfully_loaded = []
for file_path in JSON_FILES_TO_MERGE:
data = load_json_file(file_path)
if data is not None:
loaded_data.append(data)
successfully_loaded.append(file_path)
# 检查是否至少有一个文件加载成功
if not loaded_data:
print("错误: 没有文件能够成功加载,退出")
return
print(f"成功加载了 {len(loaded_data)} 个文件:")
for file_path in successfully_loaded:
print(f"{file_path}")
if len(loaded_data) < len(JSON_FILES_TO_MERGE):
failed_count = len(JSON_FILES_TO_MERGE) - len(loaded_data)
print(f"警告: {failed_count} 个文件加载失败")
print()
# 合并所有数据
if len(loaded_data) == 1:
print("只有一个文件可用,直接使用...")
merged_data = loaded_data[0]
else:
print("开始合并所有文件...")
merged_data = merge_multiple_json_data(loaded_data)
# 去重处理
print("\n检查并去除重复项...")
merged_data = remove_duplicates_from_sentences(merged_data)
# 保存合并结果
print("\n保存合并结果...")
save_merged_json(merged_data, output_path)
print("\n=== 合并完成 ===")
print(f"合并了 {len(successfully_loaded)} 个文件的数据")
if __name__ == "__main__":
main()

File diff suppressed because it is too large Load Diff