diff --git a/analysis/data_comparison.py b/analysis/data_comparison.py index 88cc0f0..99df13d 100644 --- a/analysis/data_comparison.py +++ b/analysis/data_comparison.py @@ -45,7 +45,7 @@ COLORS = { QUALITY_DIMENSIONS = [ 'clinical_inquiry', 'communication_quality', - 'multi_round_consistency', + 'information_completeness', 'overall_professionalism' ] @@ -64,7 +64,7 @@ DIMENSION_NAMES = { 'clinical_inquiry': 'CI', 'diagnostic_reasoning': 'DR', 'communication_quality': 'CQ', - 'multi_round_consistency': 'MRC', + 'information_completeness': 'IC', 'overall_professionalism': 'OP', 'present_illness_similarity': 'PHI Similarity', 'past_history_similarity': 'HP Similarity', @@ -134,8 +134,13 @@ class DataQualityComparisonAnalyzer: # 处理评估分数 for dimension in EVALUATION_DIMENSIONS: - if dimension in evaluation_scores: - score_info = evaluation_scores[dimension] + # 向后兼容性处理:将旧的 multi_round_consistency 映射到新的 information_completeness + actual_dimension = dimension + if dimension == 'information_completeness' and dimension not in evaluation_scores and 'multi_round_consistency' in evaluation_scores: + actual_dimension = 'multi_round_consistency' + + if actual_dimension in evaluation_scores: + score_info = evaluation_scores[actual_dimension] if isinstance(score_info, dict) and 'score' in score_info: score = score_info['score'] elif isinstance(score_info, (int, float)): @@ -536,7 +541,7 @@ def main(): has_significant = False # 定义需要显示的维度顺序(四个质量指标 + 三个相似度指标) - target_dimensions = ['clinical_inquiry', 'multi_round_consistency', 'present_illness_similarity', 'past_history_similarity', 'chief_complaint_similarity'] + target_dimensions = ['clinical_inquiry', 'information_completeness', 'present_illness_similarity', 'past_history_similarity', 'chief_complaint_similarity'] for dimension in target_dimensions: if dimension in statistics['quality_statistics']['statistical_tests']: diff --git a/analysis/evaluate_metrics_analysis.py b/analysis/evaluate_metrics_analysis.py index 8d913bc..5db5459 100755 --- a/analysis/evaluate_metrics_analysis.py +++ b/analysis/evaluate_metrics_analysis.py @@ -81,7 +81,7 @@ def extract_evaluate_scores(workflow: List[Dict]) -> List[Dict]: # 检查是否包含评估分数 if any(key in output_data for key in [ 'clinical_inquiry', 'communication_quality', - 'multi_round_consistency', 'overall_professionalism', + 'information_completeness', 'overall_professionalism', 'present_illness_similarity', 'past_history_similarity', 'chief_complaint_similarity' ]): @@ -110,7 +110,7 @@ def calculate_metrics_by_step(workflow_data: List[List[Dict]]) -> Dict[str, List metrics_data = { 'clinical_inquiry': [[] for _ in range(max_steps)], 'communication_quality': [[] for _ in range(max_steps)], - 'multi_round_consistency': [[] for _ in range(max_steps)], + 'information_completeness': [[] for _ in range(max_steps)], 'overall_professionalism': [[] for _ in range(max_steps)], 'present_illness_similarity': [[] for _ in range(max_steps)], 'past_history_similarity': [[] for _ in range(max_steps)], @@ -124,8 +124,13 @@ def calculate_metrics_by_step(workflow_data: List[List[Dict]]) -> Dict[str, List for step_idx, score_data in enumerate(evaluate_scores): # 提取各维度分数 for metric in metrics_data.keys(): - if metric in score_data and isinstance(score_data[metric], dict): - score = score_data[metric].get('score', 0.0) + # 向后兼容性处理:将旧的 multi_round_consistency 映射到新的 information_completeness + actual_metric = metric + if metric == 'information_completeness' and metric not in score_data and 'multi_round_consistency' in score_data: + actual_metric = 'multi_round_consistency' + + if actual_metric in score_data and isinstance(score_data[actual_metric], dict): + score = score_data[actual_metric].get('score', 0.0) metrics_data[metric][step_idx].append(score) # 计算平均值 diff --git a/analysis/medical_workflow_analysis.py b/analysis/medical_workflow_analysis.py index b70bf52..60c6b1c 100755 --- a/analysis/medical_workflow_analysis.py +++ b/analysis/medical_workflow_analysis.py @@ -10,7 +10,7 @@ import os from collections import defaultdict import matplotlib.pyplot as plt from typing import Dict, List -from file_filter_utils import filter_complete_files, print_filter_summary +from file_filter_utils import load_incomplete_files class MedicalWorkflowAnalyzer: @@ -30,23 +30,20 @@ class MedicalWorkflowAnalyzer: self.step_statistics = defaultdict(int) def load_workflow_data(self) -> None: - """加载所有工作流数据文件""" + """加载所有工作流数据文件(包括完成和未完成的)""" if not os.path.exists(self.results_dir): print(f"结果目录不存在: {self.results_dir}") return # 获取所有jsonl文件 - all_files = [os.path.join(self.results_dir, f) for f in os.listdir(self.results_dir) - if f.endswith('.jsonl')] + all_files = [f for f in os.listdir(self.results_dir) if f.endswith('.jsonl')] - # 过滤出完成的文件 - filtered_files = filter_complete_files(all_files, self.output_dir) - print_filter_summary(self.output_dir) + # 获取未完成文件列表 + incomplete_files = load_incomplete_files(self.output_dir) - print(f"找到 {len(all_files)} 个数据文件,将处理 {len(filtered_files)} 个完成的文件") + print(f"找到 {len(all_files)} 个数据文件,将处理所有文件(包括未完成的)") - for filepath in sorted(filtered_files): - filename = os.path.basename(filepath) + for filename in sorted(all_files): filepath = os.path.join(self.results_dir, filename) try: with open(filepath, 'r', encoding='utf-8') as f: @@ -62,19 +59,28 @@ class MedicalWorkflowAnalyzer: continue if case_data: + # 检查是否为未完成的文件 + is_incomplete = filename in incomplete_files self.workflow_data.append({ 'filename': filename, - 'data': case_data + 'data': case_data, + 'is_incomplete': is_incomplete }) except Exception as e: print(f"读取文件 {filename} 失败: {e}") - + + complete_count = len([case for case in self.workflow_data if not case.get('is_incomplete', False)]) + incomplete_count = len([case for case in self.workflow_data if case.get('is_incomplete', False)]) + print(f"成功加载 {len(self.workflow_data)} 个病例的数据") + print(f" - 完成的病例: {complete_count} 个") + print(f" - 未完成的病例: {incomplete_count} 个") def analyze_workflow_steps(self) -> Dict[str, List[int]]: """ 分析每个病例完成triage、hpi、ph三个阶段所需的step数量 + 包括未完成的样本(用-1表示未完成状态) Returns: Dict包含每个阶段所需的step数量列表 @@ -90,6 +96,7 @@ class MedicalWorkflowAnalyzer: for case_info in self.workflow_data: case_data = case_info['data'] + is_incomplete = case_info.get('is_incomplete', False) # 按阶段分组step triage_steps = set() @@ -97,6 +104,19 @@ class MedicalWorkflowAnalyzer: ph_steps = set() all_steps = set() + # 如果是未完成的样本,检查任务完成状态 + incomplete_phases = set() + if is_incomplete: + # 查找倒数第二行的task_completion_summary + for entry in reversed(case_data): + if 'task_completion_summary' in entry: + phases = entry.get('task_completion_summary', {}).get('phases', {}) + for phase_name in ['triage', 'hpi', 'ph']: + phase_info = phases.get(phase_name, {}) + if not phase_info.get('is_completed', False): + incomplete_phases.add(phase_name) + break + for entry in case_data: if entry.get('event_type') == 'step_start' and 'current_phase' in entry: step_num = entry.get('step_number', 0) @@ -111,18 +131,18 @@ class MedicalWorkflowAnalyzer: elif phase == 'ph': ph_steps.add(step_num) - # 计算每个阶段的step数量 - triage_count = len(triage_steps) - hpi_count = len(hpi_steps) - ph_count = len(ph_steps) + # 计算每个阶段的step数量,对于未完成的阶段使用-1 + triage_count = -1 if 'triage' in incomplete_phases else len(triage_steps) + hpi_count = -1 if 'hpi' in incomplete_phases else len(hpi_steps) + ph_count = -1 if 'ph' in incomplete_phases else len(ph_steps) final_step = max(all_steps) if all_steps else 0 - # 只添加有数据的阶段 - if triage_count > 0: + # 添加数据(包括-1表示的未完成状态) + if triage_count != 0: # 包括-1和正数 stage_steps['triage'].append(triage_count) - if hpi_count > 0: + if hpi_count != 0: # 包括-1和正数 stage_steps['hpi'].append(hpi_count) - if ph_count > 0: + if ph_count != 0: # 包括-1和正数 stage_steps['ph'].append(ph_count) if final_step > 0: stage_steps['final_step'].append(final_step) @@ -156,7 +176,7 @@ class MedicalWorkflowAnalyzer: def plot_step_distribution_subplots(self, stage_stats: Dict[str, Dict[int, int]], output_file: str = "step_distribution_subplots.png") -> None: """ - 绘制四个子图的step数量分布柱形图 + 绘制四个子图的step数量分布柱形图(包括未完成的数据) Args: stage_stats: 各阶段的step数量统计数据 @@ -166,9 +186,10 @@ class MedicalWorkflowAnalyzer: print("没有数据可供绘制") return - # 设置英文显示 - plt.rcParams['font.family'] = 'DejaVu Sans' - plt.rcParams['axes.unicode_minus'] = False + # 设置字体支持中文 + import matplotlib + matplotlib.rcParams['font.sans-serif'] = ['SimHei', 'Arial Unicode MS', 'WenQuanYi Micro Hei', 'sans-serif'] + matplotlib.rcParams['axes.unicode_minus'] = False # 创建四个子图 fig, axes = plt.subplots(2, 2, figsize=(16, 12)) @@ -190,39 +211,70 @@ class MedicalWorkflowAnalyzer: ax = axes[row, col] if stage in stage_stats and stage_stats[stage]: - steps = sorted(stage_stats[stage].keys()) - counts = [stage_stats[stage][step] for step in steps] + # 分离完成和未完成的数据 + completed_data = {k: v for k, v in stage_stats[stage].items() if k != -1} + incomplete_count = stage_stats[stage].get(-1, 0) - # 绘制柱形图 - bars = ax.bar(steps, counts, color=['#FF6B6B', '#4ECDC4', '#45B7D1', '#96CEB4'][stages_order.index(stage) % 4], - alpha=0.7, edgecolor='black', linewidth=0.5) + # 准备x轴数据和标签 + if completed_data: + steps = sorted(completed_data.keys()) + counts = [completed_data[step] for step in steps] + x_labels = [str(step) for step in steps] + else: + steps = [] + counts = [] + x_labels = [] - # 在柱形上标注数值 - for bar, count in zip(bars, counts): - height = bar.get_height() - ax.text(bar.get_x() + bar.get_width()/2., height + max(counts)*0.01, - f'{count}', ha='center', va='bottom', fontsize=9, fontweight='bold') + # 如果有未完成数据,添加到最后 + if incomplete_count > 0: + steps.append(len(steps)) # 位置索引 + counts.append(incomplete_count) + x_labels.append('未完成') - # 设置子图属性 - ax.set_title(f'{subplot_titles[stage]}\n(n={sum(counts)})', fontsize=12, fontweight='bold') - ax.set_xlabel('Number of Steps', fontsize=10) - ax.set_ylabel('Number of Cases', fontsize=10) - ax.grid(True, alpha=0.3, linestyle='--') - - # 设置x轴刻度 - if steps: - ax.set_xticks(steps) - ax.set_xticklabels(steps, rotation=45) - - # 添加统计信息文本 - if counts: - mean_val = sum(s*c for s, c in zip(steps, counts)) / sum(counts) - max_val = max(steps) - min_val = min(steps) + if steps and counts: + # 绘制柱形图 + bars = ax.bar(range(len(steps)), counts, + color=['#FF6B6B', '#4ECDC4', '#45B7D1', '#96CEB4'][stages_order.index(stage) % 4], + alpha=0.7, edgecolor='black', linewidth=0.5) - stats_text = f'Mean: {mean_val:.1f}\nRange: {min_val}-{max_val}' - ax.text(0.02, 0.98, stats_text, transform=ax.transAxes, fontsize=9, - verticalalignment='top', bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5)) + # 在柱形上标注数值 + for i, (bar, count) in enumerate(zip(bars, counts)): + height = bar.get_height() + ax.text(bar.get_x() + bar.get_width()/2., height + max(counts)*0.01, + f'{count}', ha='center', va='bottom', fontsize=9, fontweight='bold') + + # 设置子图属性 + ax.set_title(f'{subplot_titles[stage]}\n(n={sum(counts)})', fontsize=12, fontweight='bold') + ax.set_xlabel('Number of Steps', fontsize=10) + ax.set_ylabel('Number of Cases', fontsize=10) + ax.grid(True, alpha=0.3, linestyle='--') + + # 设置x轴刻度和标签 + ax.set_xticks(range(len(steps))) + ax.set_xticklabels(x_labels, rotation=45) + + # 添加统计信息文本(只针对完成的数据) + if completed_data: + completed_steps = list(completed_data.keys()) + completed_counts = list(completed_data.values()) + mean_val = sum(s*c for s, c in zip(completed_steps, completed_counts)) / sum(completed_counts) + max_val = max(completed_steps) + min_val = min(completed_steps) + + stats_text = f'Completed Mean: {mean_val:.1f}\nCompleted Range: {min_val}-{max_val}' + if incomplete_count > 0: + stats_text += f'\nIncomplete: {incomplete_count}' + + ax.text(0.02, 0.98, stats_text, transform=ax.transAxes, fontsize=9, + verticalalignment='top', bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5)) + elif incomplete_count > 0: + stats_text = f'All Incomplete: {incomplete_count}' + ax.text(0.02, 0.98, stats_text, transform=ax.transAxes, fontsize=9, + verticalalignment='top', bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5)) + else: + ax.text(0.5, 0.5, 'No Data Available', ha='center', va='center', + transform=ax.transAxes, fontsize=12) + ax.set_title(f'{subplot_titles[stage]}\n(n=0)', fontsize=12, fontweight='bold') else: ax.text(0.5, 0.5, 'No Data Available', ha='center', va='center', transform=ax.transAxes, fontsize=12) @@ -242,7 +294,7 @@ class MedicalWorkflowAnalyzer: print(f"Four-subplot chart saved to: {output_path}") def print_statistics_summary(self, stage_steps: Dict[str, List[int]]) -> None: - """打印统计摘要""" + """打印统计摘要(包括未完成数据)""" print("\n=== Medical Workflow Step Statistics Summary ===") # 英文阶段名称映射 @@ -256,12 +308,25 @@ class MedicalWorkflowAnalyzer: for stage, steps in stage_steps.items(): stage_name = stage_names.get(stage, stage.upper()) if steps: + # 分离完成和未完成的数据 + completed_steps = [s for s in steps if s != -1] + incomplete_count = steps.count(-1) + print(f"\n{stage_name}:") print(f" Total Cases: {len(steps)}") - print(f" Mean Steps: {sum(steps)/len(steps):.2f}") - print(f" Min Steps: {min(steps)}") - print(f" Max Steps: {max(steps)}") - print(f" Step Distribution: {dict(sorted({s: steps.count(s) for s in set(steps)}.items()))}") + + if completed_steps: + print(f" Mean Steps: {sum(completed_steps)/len(completed_steps):.2f}") + print(f" Min Steps: {min(completed_steps)}") + print(f" Max Steps: {max(completed_steps)}") + + # 构建分布字典 + distribution = dict(sorted({s: completed_steps.count(s) for s in set(completed_steps)}.items())) + if incomplete_count > 0: + distribution['未完成'] = incomplete_count + print(f" Step Distribution: {distribution}") + else: + print(f" All cases incomplete: {incomplete_count}") else: print(f"\n{stage_name}: No Data") diff --git a/analysis/workflow_completeness_checker.py b/analysis/workflow_completeness_checker.py index 41e2995..bde29de 100644 --- a/analysis/workflow_completeness_checker.py +++ b/analysis/workflow_completeness_checker.py @@ -41,19 +41,32 @@ class WorkflowCompletenessChecker: """ try: with open(filepath, 'r', encoding='utf-8') as f: - # 读取最后一行 lines = f.readlines() - if not lines: + if len(lines) < 2: # 需要至少两行:倒数第二行和最后一行 return False - last_line = lines[-1].strip() - if not last_line: + # 检查倒数第二行的task_completion_summary + second_to_last_line = lines[-2].strip() + if not second_to_last_line: return False - # 解析最后一行JSON try: - last_event = json.loads(last_line) - return last_event.get('event_type') == 'workflow_complete' + second_to_last_event = json.loads(second_to_last_line) + # 检查是否有task_completion_summary字段 + task_summary = second_to_last_event.get('task_completion_summary', {}) + if not task_summary: + return False + + # 检查三个阶段的完成状态 + phases = task_summary.get('phases', {}) + required_phases = ['triage', 'hpi', 'ph'] + + for phase in required_phases: + phase_info = phases.get(phase, {}) + if not phase_info.get('is_completed', False): + return False + + return True except json.JSONDecodeError: return False diff --git a/analysis/workflow_file_cleaner.py b/analysis/workflow_file_cleaner.py new file mode 100644 index 0000000..7a46d32 --- /dev/null +++ b/analysis/workflow_file_cleaner.py @@ -0,0 +1,188 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +工作流文件清理器 +检测指定目录中的所有JSONL文件,删除不完整的工作流记录文件 +""" + +import json +import os +import glob +from pathlib import Path +from typing import Dict, Any, List +import argparse +import logging + +# 配置日志 +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + + +class WorkflowFileCleaner: + """工作流文件清理器""" + + def __init__(self, directory: str, dry_run: bool = False): + """ + 初始化清理器 + + Args: + directory: 要检查的目录路径 + dry_run: 是否为试运行模式(不实际删除文件) + """ + self.directory = Path(directory) + self.dry_run = dry_run + self.stats = { + 'total_files': 0, + 'complete_files': 0, + 'incomplete_files': 0, + 'deleted_files': [], + 'error_files': [] + } + + def check_workflow_completion(self, jsonl_file: str) -> bool: + """ + 检查工作流是否完整 + + Args: + jsonl_file: JSONL文件路径 + + Returns: + bool: True表示工作流完整,False表示不完整 + """ + try: + with open(jsonl_file, 'r', encoding='utf-8') as f: + lines = f.readlines() + + if not lines: + logger.warning(f"文件为空: {jsonl_file}") + return False + + # 获取最后一行 + last_line = lines[-1].strip() + if not last_line: + logger.warning(f"文件最后一行为空: {jsonl_file}") + return False + + try: + last_event = json.loads(last_line) + except json.JSONDecodeError as e: + logger.error(f"解析最后一行JSON失败 {jsonl_file}: {e}") + return False + + # 检查是否包含workflow_complete事件 + if last_event.get('event_type') != 'workflow_complete': + logger.info(f"工作流未完成 - 缺少workflow_complete事件: {jsonl_file}") + return False + + # 检查final_summary中的phases完成状态 + final_summary = last_event.get('final_summary', {}) + phases = final_summary.get('phases', {}) + + required_phases = ['triage', 'hpi', 'ph'] + for phase in required_phases: + phase_info = phases.get(phase, {}) + is_completed = phase_info.get('is_completed', False) + completion_rate = phase_info.get('completion_rate', 0.0) + + if not is_completed or completion_rate != 1.0: + logger.info(f"工作流未完成 - 阶段 {phase} 未完成: {jsonl_file}") + return False + + logger.info(f"工作流完整: {jsonl_file}") + return True + + except Exception as e: + logger.error(f"检查文件时发生错误 {jsonl_file}: {e}") + return False + + def scan_and_clean_files(self) -> None: + """扫描目录中的所有JSONL文件并清理不完整的文件""" + if not self.directory.exists(): + logger.error(f"目录不存在: {self.directory}") + return + + # 查找所有JSONL文件 + jsonl_pattern = str(self.directory / "**" / "*.jsonl") + jsonl_files = glob.glob(jsonl_pattern, recursive=True) + + self.stats['total_files'] = len(jsonl_files) + logger.info(f"找到 {len(jsonl_files)} 个JSONL文件") + + for jsonl_file in jsonl_files: + try: + is_complete = self.check_workflow_completion(jsonl_file) + + if is_complete: + self.stats['complete_files'] += 1 + else: + self.stats['incomplete_files'] += 1 + + if self.dry_run: + logger.info(f"[试运行] 将删除不完整文件: {jsonl_file}") + self.stats['deleted_files'].append(jsonl_file) + else: + os.remove(jsonl_file) + logger.info(f"已删除不完整文件: {jsonl_file}") + self.stats['deleted_files'].append(jsonl_file) + + except Exception as e: + logger.error(f"处理文件时发生错误 {jsonl_file}: {e}") + self.stats['error_files'].append(jsonl_file) + + def print_summary(self) -> None: + """打印统计摘要""" + print("\n" + "="*60) + print("工作流文件清理摘要") + print("="*60) + print(f"总文件数: {self.stats['total_files']}") + print(f"完整文件数: {self.stats['complete_files']}") + print(f"不完整文件数: {self.stats['incomplete_files']}") + print(f"删除文件数: {len(self.stats['deleted_files'])}") + print(f"错误文件数: {len(self.stats['error_files'])}") + + if self.stats['deleted_files']: + print("\n已删除的文件:") + for file in self.stats['deleted_files']: + print(f" - {file}") + + if self.stats['error_files']: + print("\n处理错误的文件:") + for file in self.stats['error_files']: + print(f" - {file}") + + if self.dry_run and self.stats['deleted_files']: + print(f"\n注意: 这是试运行模式,实际上没有删除任何文件") + + def run(self) -> Dict[str, Any]: + """ + 运行清理器 + + Returns: + Dict: 包含统计信息的字典 + """ + logger.info(f"开始检查目录: {self.directory}") + if self.dry_run: + logger.info("运行在试运行模式") + + self.scan_and_clean_files() + self.print_summary() + + return self.stats + + +def main(): + """主函数""" + parser = argparse.ArgumentParser(description='工作流文件清理器') + parser.add_argument('directory', nargs='?', default='results/results0903', + help='要检查的目录路径 (默认: results)') + parser.add_argument('--dry-run', action='store_true', + help='试运行模式,不实际删除文件') + + args = parser.parse_args() + + cleaner = WorkflowFileCleaner(args.directory, args.dry_run) + cleaner.run() + + +if __name__ == "__main__": + main() \ No newline at end of file