triage/analysis/failed_tasks_analyzer.py
iomgaa 7c723fbc4b 删除废弃的disease_analyst智能体模块
删除了不再使用的disease_analyst模块的所有相关文件:
- agent.py: 疾病分析智能体主逻辑
- prompt.py: 疾病分析提示模板
- response_model.py: 响应数据模型
- __init__.py: 模块初始化文件

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-03 21:44:01 +08:00

380 lines
14 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
失败任务分析器
根据success=false的案例提取最后step_number中new_scores小于0.85的任务
"""
import json
import os
import re
from typing import Dict, List, Any
from collections import defaultdict
from file_filter_utils import filter_complete_files, print_filter_summary
class FailedTasksAnalyzer:
"""失败任务分析器"""
def __init__(self, results_dir: str = "results", output_dir: str = "analysis"):
"""
初始化分析器
Args:
results_dir: 结果文件目录路径
output_dir: 输出文件目录路径
"""
self.results_dir = results_dir
self.output_dir = output_dir
self.failed_cases = []
def find_final_step_data(self, case_data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
找到最后一步的数据
Args:
case_data: 案例数据列表
Returns:
最后一步的数据字典
"""
final_step_data = None
max_step = -1
for entry in case_data:
step_number = entry.get('step_number', -1)
if step_number > max_step:
max_step = step_number
final_step_data = entry
return final_step_data
def extract_failed_tasks(self, case_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
从失败的案例中提取任务
Args:
case_data: 案例数据列表
Returns:
失败任务列表
"""
failed_tasks = []
# 找到最后一步的数据
final_step = self.find_final_step_data(case_data)
if not final_step:
return failed_tasks
# 提取new_scores数据 - 从task_scores_update事件中查找
new_scores = {}
# 首先尝试从task_scores_update事件中找到最新的new_scores
for entry in reversed(case_data):
if entry.get('event_type') == 'task_scores_update':
new_scores = entry.get('new_scores', {})
if new_scores:
break
# 如果没有找到,尝试从其他位置获取
if not new_scores:
new_scores = final_step.get('new_scores', {})
if not new_scores:
output_data = final_step.get('output_data', {})
if isinstance(output_data, dict):
new_scores = output_data.get('new_scores', {})
if not new_scores:
# 尝试从phase_scores获取
new_scores = output_data.get('phase_scores', {})
# 筛选分数小于0.85的任务
for task_name, score in new_scores.items():
if isinstance(score, (int, float)) and score < 0.85:
failed_tasks.append({
'task_name': task_name,
'score': float(score),
'step_number': final_step.get('step_number', 0)
})
return failed_tasks
def analyze_failed_cases(self) -> None:
"""分析失败的案例"""
if not os.path.exists(self.results_dir):
print(f"Results directory not found: {self.results_dir}")
return
# 获取所有jsonl文件
all_files = [os.path.join(self.results_dir, f) for f in os.listdir(self.results_dir)
if f.endswith('.jsonl')]
# 过滤出完成的文件
filtered_files = filter_complete_files(all_files, self.output_dir)
print_filter_summary(self.output_dir)
print(f"Found {len(all_files)} data files, processing {len(filtered_files)} completed files")
for filepath in sorted(filtered_files):
filename = os.path.basename(filepath)
try:
with open(filepath, 'r', encoding='utf-8') as f:
case_data = []
for line in f:
line = line.strip()
if line:
try:
data = json.loads(line)
case_data.append(data)
except json.JSONDecodeError:
continue
if not case_data:
continue
# 检查最后一行是否有success=false
last_entry = case_data[-1]
success = last_entry.get('success')
# 也检查其他可能的success字段位置
if success is None:
for entry in reversed(case_data):
success = entry.get('success')
if success is not None:
break
if success is False:
# 提取病例索引
case_match = re.search(r'case_(\d+)\.jsonl', filename)
if not case_match:
case_match = re.search(r'workflow_.*case_(\d+)\.jsonl', filename)
case_index = int(case_match.group(1)) if case_match else 0
# 提取失败任务
failed_tasks = self.extract_failed_tasks(case_data)
if failed_tasks:
self.failed_cases.append({
'case_index': case_index,
'case_filename': filename,
'failed_tasks': failed_tasks,
'total_failed_tasks': len(failed_tasks)
})
except Exception as e:
print(f"Error processing {filename}: {e}")
print(f"Found {len(self.failed_cases)} failed cases with tasks scoring < 0.85")
def generate_report(self) -> Dict[str, Any]:
"""
生成失败任务报告
Returns:
报告数据字典
"""
if not self.failed_cases:
return {
'total_failed_cases': 0,
'total_failed_tasks': 0,
'task_distribution': {},
'score_statistics': {},
'failed_cases': []
}
# 统计信息
total_failed_cases = len(self.failed_cases)
total_failed_tasks = sum(case['total_failed_tasks'] for case in self.failed_cases)
# 任务分布统计
task_distribution = defaultdict(int)
all_scores = []
for case in self.failed_cases:
for task in case['failed_tasks']:
task_name = task['task_name']
score = task['score']
task_distribution[task_name] += 1
all_scores.append(score)
# 分数统计
if all_scores:
avg_score = sum(all_scores) / len(all_scores)
min_score = min(all_scores)
max_score = max(all_scores)
score_ranges = self._calculate_score_ranges(all_scores)
else:
avg_score = min_score = max_score = 0.0
score_ranges = {}
return {
'total_failed_cases': total_failed_cases,
'total_failed_tasks': total_failed_tasks,
'task_distribution': dict(task_distribution),
'score_statistics': {
'mean_score': round(avg_score, 3),
'min_score': round(min_score, 3),
'max_score': round(max_score, 3),
'score_ranges': score_ranges
},
'failed_cases': self.failed_cases
}
def _calculate_score_ranges(self, scores: List[float]) -> Dict[str, int]:
"""
计算分数区间分布
Args:
scores: 分数列表
Returns:
分数区间分布字典
"""
ranges = {
'0.0-0.1': 0, '0.1-0.2': 0, '0.2-0.3': 0, '0.3-0.4': 0,
'0.4-0.5': 0, '0.5-0.6': 0, '0.6-0.7': 0, '0.7-0.8': 0,
'0.8-0.85': 0
}
for score in scores:
if score < 0.1:
ranges['0.0-0.1'] += 1
elif score < 0.2:
ranges['0.1-0.2'] += 1
elif score < 0.3:
ranges['0.2-0.3'] += 1
elif score < 0.4:
ranges['0.3-0.4'] += 1
elif score < 0.5:
ranges['0.4-0.5'] += 1
elif score < 0.6:
ranges['0.5-0.6'] += 1
elif score < 0.7:
ranges['0.6-0.7'] += 1
elif score < 0.8:
ranges['0.7-0.8'] += 1
elif score < 0.85:
ranges['0.8-0.85'] += 1
return ranges
def save_reports(self, report_data: Dict[str, Any]) -> None:
"""
保存报告文件
Args:
report_data: 报告数据
"""
os.makedirs(self.output_dir, exist_ok=True)
# 保存完整JSON报告
report_file = os.path.join(self.output_dir, 'failed_tasks_report.json')
with open(report_file, 'w', encoding='utf-8') as f:
json.dump(report_data, f, ensure_ascii=False, indent=2)
# 保存简化版报告
simplified_report = []
for case in report_data['failed_cases']:
simplified_case = {
'case_index': case['case_index'],
'case_filename': case['case_filename'],
'failed_tasks': case['failed_tasks']
}
simplified_report.append(simplified_case)
simplified_file = os.path.join(self.output_dir, 'failed_tasks_summary.json')
with open(simplified_file, 'w', encoding='utf-8') as f:
json.dump(simplified_report, f, ensure_ascii=False, indent=2)
# 保存文本报告
text_file = os.path.join(self.output_dir, 'failed_tasks_analysis.txt')
with open(text_file, 'w', encoding='utf-8') as f:
f.write("=== 失败任务分析报告 ===\n\n")
f.write(f"失败案例总数: {report_data['total_failed_cases']}\n")
f.write(f"失败任务总数: {report_data['total_failed_tasks']}\n\n")
if report_data['task_distribution']:
f.write("=== 任务分布 ===\n")
for task_name, count in sorted(
report_data['task_distribution'].items(),
key=lambda x: x[1],
reverse=True
):
f.write(f"{task_name}: {count} 个案例\n")
f.write("\n=== 分数统计 ===\n")
stats = report_data['score_statistics']
f.write(f"平均分数: {stats['mean_score']}\n")
f.write(f"最低分数: {stats['min_score']}\n")
f.write(f"最高分数: {stats['max_score']}\n\n")
f.write("=== 分数区间分布 ===\n")
for range_name, count in stats['score_ranges'].items():
if count > 0:
f.write(f"{range_name}: {count} 个任务\n")
f.write("\n=== 详细案例 ===\n")
for case in report_data['failed_cases']:
f.write(f"\n案例 {case['case_index']} ({case['case_filename']}):\n")
for task in case['failed_tasks']:
f.write(f" - {task['task_name']}: {task['score']:.3f} (步骤 {task['step_number']})\n")
else:
f.write("没有检测到失败的案例或任务。\n")
print(f"报告已保存到:")
print(f" - {report_file}")
print(f" - {simplified_file}")
print(f" - {text_file}")
def run_analysis(self) -> None:
"""运行完整分析"""
print("开始分析失败任务...")
# 1. 分析失败的案例
self.analyze_failed_cases()
if not self.failed_cases:
print("没有找到失败的案例或分数低于0.85的任务")
return
# 2. 生成报告
report_data = self.generate_report()
# 3. 保存报告
self.save_reports(report_data)
# 4. 打印汇总信息
print(f"\n=== 汇总 ===")
print(f"失败案例数: {report_data['total_failed_cases']}")
print(f"失败任务数: {report_data['total_failed_tasks']}")
if report_data['task_distribution']:
print(f"\n主要失败任务:")
for task_name, count in sorted(
report_data['task_distribution'].items(),
key=lambda x: x[1],
reverse=True
)[:10]:
print(f" {task_name}: {count} 个案例")
print("分析完成!")
def main():
"""主函数"""
import sys
# 从命令行参数获取路径,如果没有提供则使用默认值
if len(sys.argv) >= 3:
results_dir = sys.argv[1]
output_dir = sys.argv[2]
else:
results_dir = "results/results0901"
output_dir = "analysis/0901"
analyzer = FailedTasksAnalyzer(results_dir=results_dir, output_dir=output_dir)
analyzer.run_analysis()
if __name__ == "__main__":
main()