From 399e4d44475b500b34cb5b56f8ba49f52e8185bb Mon Sep 17 00:00:00 2001 From: iomgaa Date: Mon, 11 Aug 2025 20:40:33 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=EF=BC=9A=E5=AE=9E=E7=8E=B0?= =?UTF-8?q?=E5=8C=BB=E7=96=97=E9=97=AE=E8=AF=8A=E5=B7=A5=E4=BD=9C=E6=B5=81?= =?UTF-8?q?=E7=B3=BB=E7=BB=9F=E5=B9=B6=E4=BC=98=E5=8C=96Monitor=E6=99=BA?= =?UTF-8?q?=E8=83=BD=E4=BD=93?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增完整的workflow模块,包含MedicalWorkflow、TaskManager、StepExecutor和WorkflowLogger四个核心组件 - 实现分诊、现病史、既往史三阶段任务管理和状态跟踪机制 - 优化Monitor智能体支持针对特定任务的精准评估,解决任务评价针对性问题 - 完善agent_system各模块的__init__.py文件,确保正确的模块导入 - 实现详细的jsonl格式日志记录系统,支持完整workflow执行追踪 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- agent_system/controller/__init__.py | 6 +- agent_system/monitor/agent.py | 67 +++- agent_system/virtual_patient/__init__.py | 5 + workflow/__init__.py | 7 + workflow/medical_workflow.py | 231 ++++++++++++++ workflow/step_executor.py | 374 +++++++++++++++++++++++ workflow/task_manager.py | 183 +++++++++++ workflow/workflow_logger.py | 247 +++++++++++++++ 8 files changed, 1115 insertions(+), 5 deletions(-) create mode 100644 agent_system/virtual_patient/__init__.py create mode 100644 workflow/__init__.py create mode 100644 workflow/medical_workflow.py create mode 100644 workflow/step_executor.py create mode 100644 workflow/task_manager.py create mode 100644 workflow/workflow_logger.py diff --git a/agent_system/controller/__init__.py b/agent_system/controller/__init__.py index 4f35c5f..6cd2124 100644 --- a/agent_system/controller/__init__.py +++ b/agent_system/controller/__init__.py @@ -1 +1,5 @@ -# Controller Agent Module \ No newline at end of file +from .agent import TaskController +from .prompt import ControllerPrompt +from .response_model import ControllerDecision + +__all__ = ["TaskController", "ControllerPrompt", "ControllerDecision"] \ No newline at end of file diff --git a/agent_system/monitor/agent.py b/agent_system/monitor/agent.py index bb6ed9d..81c19ea 100644 --- a/agent_system/monitor/agent.py +++ b/agent_system/monitor/agent.py @@ -1,4 +1,4 @@ -from typing import Optional +from typing import Optional, List, Dict from agent_system.base import BaseAgent from agent_system.monitor.prompt import MonitorPrompt from agent_system.monitor.response_model import MonitorResult @@ -20,7 +20,8 @@ class Monitor(BaseAgent): use_cache=False ) - def run(self, hpi_content: str, ph_content: str, chief_complaint: str) -> MonitorResult: + def run(self, hpi_content: str, ph_content: str, chief_complaint: str, + task_name: str = None, task_description: str = None) -> MonitorResult: """ 监控病史质量 @@ -28,12 +29,18 @@ class Monitor(BaseAgent): hpi_content: 现病史内容 ph_content: 既往史内容 chief_complaint: 主诉 + task_name: 任务名称(可选,用于针对性评估) + task_description: 任务描述(可选,用于针对性评估) Returns: MonitorResult: 包含完成度评分和评分理由 """ - # 构建评估提示 - prompt = self.build_prompt(hpi_content, ph_content, chief_complaint) + # 根据是否提供任务信息选择不同的构建方式 + if task_name and task_description: + prompt = self._build_task_specific_prompt(task_name, task_description, + hpi_content, ph_content, chief_complaint) + else: + prompt = self.build_prompt(hpi_content, ph_content, chief_complaint) # 调用LLM进行评估 result = super().run(prompt) @@ -97,4 +104,56 @@ class Monitor(BaseAgent): 请基于上述标准进行客观评估。""" + return prompt + + def _build_task_specific_prompt(self, task_name: str, task_description: str, + hpi_content: str, ph_content: str, chief_complaint: str) -> str: + """ + 构建针对特定任务的评估提示语 + + Args: + task_name: 任务名称 + task_description: 任务描述 + hpi_content: 现病史内容 + ph_content: 既往史内容 + chief_complaint: 主诉 + + Returns: + str: 构建好的任务特定评估提示语 + """ + prompt = f"""请针对特定任务对病史信息进行质量监控和评估: + +**评估目标任务**: +任务名称:{task_name} +任务描述:{task_description} + +**当前病史信息**: +主诉:{chief_complaint} +现病史:{hpi_content} +既往史:{ph_content} + +**评估要求**: +1. **专门针对任务"{task_name}"进行评估** +2. 根据任务描述"{task_description}",判断当前病史信息在这个方面的完整性 +3. 重点关注与该任务相关的信息是否充分收集 +4. 给出该任务的完成度评分(0.0-1.0范围) +5. 详细说明评分理由,解释该任务还缺少哪些关键信息 + +**评分标准**(针对该任务): +- 0.9-1.0: 该任务相关信息非常完整,无需补充 +- 0.8-0.9: 该任务相关信息较完整,仅有少量细节缺失 +- 0.7-0.8: 该任务相关信息基本齐全,有一些重要细节待补充 +- 0.6-0.7: 该任务相关信息不够完整,缺少多项关键信息 +- 0.5-0.6: 该任务相关信息缺失较多,需要大量补充 +- 0.0-0.5: 该任务相关信息严重不足或完全缺失 + +**输出格式**: +严格按照以下JSON格式输出: +{{ + "completion_score": 浮点数(0.0-1.0), + "reason": "针对任务'{task_name}'的详细评分理由,说明该任务完成情况和缺失信息" +}} + +请基于上述要求进行针对性评估。""" + return prompt \ No newline at end of file diff --git a/agent_system/virtual_patient/__init__.py b/agent_system/virtual_patient/__init__.py new file mode 100644 index 0000000..5b74087 --- /dev/null +++ b/agent_system/virtual_patient/__init__.py @@ -0,0 +1,5 @@ +from .agent import VirtualPatientAgent +from .response_model import TriageVirtualPatientResponseModel +from .prompt import TriageVirtualPatientPrompt + +__all__ = ["VirtualPatientAgent", "TriageVirtualPatientResponseModel", "TriageVirtualPatientPrompt"] \ No newline at end of file diff --git a/workflow/__init__.py b/workflow/__init__.py new file mode 100644 index 0000000..48aa0fc --- /dev/null +++ b/workflow/__init__.py @@ -0,0 +1,7 @@ +# 医疗问诊工作流模块 +from .medical_workflow import MedicalWorkflow +from .task_manager import TaskManager +from .step_executor import StepExecutor +from .workflow_logger import WorkflowLogger + +__all__ = ["MedicalWorkflow", "TaskManager", "StepExecutor", "WorkflowLogger"] \ No newline at end of file diff --git a/workflow/medical_workflow.py b/workflow/medical_workflow.py new file mode 100644 index 0000000..084956d --- /dev/null +++ b/workflow/medical_workflow.py @@ -0,0 +1,231 @@ +from typing import Dict, Any, Optional +import time +from .task_manager import TaskManager, TaskPhase +from .step_executor import StepExecutor +from .workflow_logger import WorkflowLogger + +class MedicalWorkflow: + """ + 医疗问诊工作流主控制器 + 负责协调整个30步问诊过程的执行 + """ + + def __init__(self, case_data: Dict[str, Any], model_type: str = "gpt-oss:latest", + llm_config: Optional[Dict] = None, max_steps: int = 30, log_dir: str = "logs"): + """ + 初始化医疗问诊工作流 + + Args: + case_data: 病例数据,包含病案介绍等信息 + model_type: 使用的语言模型类型,默认为"gpt-oss:latest" + llm_config: 语言模型配置,默认为None + max_steps: 最大执行步数,默认为30 + log_dir: 日志目录,默认为"logs" + """ + self.case_data = case_data + self.model_type = model_type + self.llm_config = llm_config or {} + self.max_steps = max_steps + + # 初始化核心组件 + self.task_manager = TaskManager() + self.step_executor = StepExecutor(model_type=model_type, llm_config=self.llm_config) + self.logger = WorkflowLogger(case_data=case_data, log_dir=log_dir) + + # 初始化工作流状态 + self.current_step = 0 + self.conversation_history = "" + self.current_hpi = "" + self.current_ph = "" + self.current_chief_complaint = "" + self.workflow_completed = False + self.workflow_success = False + + def run(self) -> str: + """ + 执行完整的医疗问诊工作流 + + Returns: + str: 日志文件路径 + """ + print(f"开始执行医疗问诊工作流,病例:{self.case_data.get('病案介绍', {}).get('主诉', '未知病例')}") + + try: + # 执行工作流的主循环 + for step in range(1, self.max_steps + 1): + self.current_step = step + + # 检查是否所有任务都已完成 + if self.task_manager.is_workflow_completed(): + print(f"所有任务已完成,工作流在第 {step} 步结束") + self.workflow_completed = True + self.workflow_success = True + break + + # 执行当前step + if not self._execute_single_step(step): + print(f"Step {step} 执行失败,工作流终止") + break + + # 显示进度 + self._print_step_progress(step) + + # 如果达到最大步数但任务未完成 + if not self.workflow_completed: + print(f"已达到最大步数 {self.max_steps},工作流结束") + self.workflow_success = False + + except Exception as e: + print(f"工作流执行出现异常: {str(e)}") + self.logger.log_error(self.current_step, "workflow_error", str(e)) + self.workflow_success = False + + finally: + # 记录工作流完成信息 + final_summary = self.task_manager.get_completion_summary() + self.logger.log_workflow_complete( + total_steps=self.current_step, + final_summary=final_summary, + success=self.workflow_success + ) + + print(f"工作流执行完成,日志文件:{self.logger.get_log_file_path()}") + return self.logger.get_log_file_path() + + def _execute_single_step(self, step_num: int) -> bool: + """ + 执行单个step + + Args: + step_num: step编号 + + Returns: + bool: 是否执行成功 + """ + try: + # 获取当前阶段和待完成任务 + current_phase = self.task_manager.get_current_phase() + pending_tasks = self.task_manager.get_pending_tasks(current_phase) + + # 记录step开始 + self.logger.log_step_start(step_num, current_phase.value, pending_tasks) + + # 确定是否为第一步 + is_first_step = (step_num == 1) + + # 准备医生问题(非首轮时使用上轮的结果) + doctor_question = getattr(self, '_last_doctor_question', "") + + # 执行step + step_result = self.step_executor.execute_step( + step_num=step_num, + case_data=self.case_data, + task_manager=self.task_manager, + logger=self.logger, + conversation_history=self.conversation_history, + previous_hpi=self.current_hpi, + previous_ph=self.current_ph, + previous_chief_complaint=self.current_chief_complaint, + is_first_step=is_first_step, + doctor_question=doctor_question + ) + + # 检查执行结果 + if not step_result["success"]: + print(f"Step {step_num} 执行失败: {step_result.get('errors', [])}") + return False + + # 更新工作流状态 + self._update_workflow_state(step_result) + + # 记录step完成 + self.logger.log_step_complete( + step_num=step_num, + doctor_question=step_result["doctor_question"], + conversation_history=step_result["conversation_history"], + task_completion_summary=step_result["task_completion_summary"] + ) + + return True + + except Exception as e: + error_msg = f"Step {step_num} 执行异常: {str(e)}" + print(error_msg) + self.logger.log_error(step_num, "step_error", error_msg) + return False + + def _update_workflow_state(self, step_result: Dict[str, Any]): + """ + 根据step执行结果更新工作流状态 + + Args: + step_result: step执行结果 + """ + self.conversation_history = step_result["conversation_history"] + self.current_hpi = step_result["updated_hpi"] + self.current_ph = step_result["updated_ph"] + self.current_chief_complaint = step_result["updated_chief_complaint"] + self._last_doctor_question = step_result["doctor_question"] + + def _print_step_progress(self, step_num: int): + """ + 打印step进度信息 + + Args: + step_num: step编号 + """ + current_phase = self.task_manager.get_current_phase() + completion_summary = self.task_manager.get_completion_summary() + + print(f"\n=== Step {step_num} 完成 ===") + print(f"当前阶段: {current_phase.value}") + + # 显示各阶段完成情况 + for phase_name, phase_info in completion_summary["phases"].items(): + status = "✓" if phase_info["is_completed"] else "○" + print(f"{status} {phase_name}: {phase_info['completed']}/{phase_info['total']} 任务完成 " + f"({phase_info['completion_rate']:.1%})") + + print(f"对话轮次: {step_num}") + print(f"最新医生问题: {getattr(self, '_last_doctor_question', '暂无')[:50]}...") + print("-" * 60) + + def get_current_status(self) -> Dict[str, Any]: + """ + 获取当前工作流状态 + + Returns: + Dict: 工作流状态信息 + """ + return { + "current_step": self.current_step, + "max_steps": self.max_steps, + "current_phase": self.task_manager.get_current_phase().value, + "workflow_completed": self.workflow_completed, + "workflow_success": self.workflow_success, + "completion_summary": self.task_manager.get_completion_summary(), + "conversation_length": len(self.conversation_history), + "log_file_path": self.logger.get_log_file_path() + } + + def get_conversation_history(self) -> str: + """ + 获取完整的对话历史 + + Returns: + str: 对话历史 + """ + return self.conversation_history + + def get_medical_summary(self) -> Dict[str, str]: + """ + 获取当前医疗信息摘要 + + Returns: + Dict: 医疗信息摘要 + """ + return { + "chief_complaint": self.current_chief_complaint, + "history_of_present_illness": self.current_hpi, + "past_history": self.current_ph + } \ No newline at end of file diff --git a/workflow/step_executor.py b/workflow/step_executor.py new file mode 100644 index 0000000..1124286 --- /dev/null +++ b/workflow/step_executor.py @@ -0,0 +1,374 @@ +import time +from typing import Dict, Any, List, Optional +from agent_system.recipient import RecipientAgent +from agent_system.monitor import Monitor +from agent_system.controller import TaskController +from agent_system.prompter import Prompter +from agent_system.inquirer import Inquirer +from agent_system.virtual_patient import VirtualPatientAgent +from .task_manager import TaskManager, TaskPhase +from .workflow_logger import WorkflowLogger + +class StepExecutor: + """ + 单step执行器 + 负责执行单个step中的完整agent pipeline流程 + """ + + def __init__(self, model_type: str = "gpt-oss:latest", llm_config: dict = None): + """ + 初始化step执行器 + + Args: + model_type: 使用的语言模型类型 + llm_config: 语言模型配置 + """ + self.model_type = model_type + self.llm_config = llm_config or {} + + # 初始化所有agent + self.recipient = RecipientAgent(model_type=model_type, llm_config=self.llm_config) + self.monitor = Monitor(model_type=model_type, llm_config=self.llm_config) + self.controller = TaskController(model_type=model_type, llm_config=self.llm_config) + self.prompter = Prompter(model_type=model_type, llm_config=self.llm_config) + self.virtual_patient = VirtualPatientAgent(model_type=model_type, llm_config=self.llm_config) + + def execute_step(self, + step_num: int, + case_data: Dict[str, Any], + task_manager: TaskManager, + logger: WorkflowLogger, + conversation_history: str = "", + previous_hpi: str = "", + previous_ph: str = "", + previous_chief_complaint: str = "", + is_first_step: bool = False, + doctor_question: str = "") -> Dict[str, Any]: + """ + 执行单个step的完整流程 + + Args: + step_num: step编号 + case_data: 病例数据 + task_manager: 任务管理器 + logger: 日志记录器 + conversation_history: 对话历史 + previous_hpi: 上轮现病史 + previous_ph: 上轮既往史 + previous_chief_complaint: 上轮主诉 + is_first_step: 是否为第一个step + doctor_question: 医生问题(非首轮时) + + Returns: + Dict: step执行结果,包含更新后的病史信息、医生问题、患者回应等 + """ + step_result = { + "step_number": step_num, + "success": False, + "patient_response": "", + "updated_hpi": previous_hpi, + "updated_ph": previous_ph, + "updated_chief_complaint": previous_chief_complaint, + "doctor_question": "", + "conversation_history": conversation_history, + "task_completion_summary": {}, + "errors": [] + } + + try: + # Step 1: 获取患者回应 + patient_response = self._get_patient_response( + step_num, case_data, logger, is_first_step, doctor_question + ) + step_result["patient_response"] = patient_response + + # 更新对话历史 + if is_first_step: + updated_conversation = f"患者: {patient_response}" + else: + updated_conversation = conversation_history + f"\n医生: {doctor_question}\n患者: {patient_response}" + step_result["conversation_history"] = updated_conversation + + # Step 2: 使用Recipient更新病史信息 + recipient_result = self._execute_recipient( + step_num, logger, updated_conversation, previous_hpi, previous_ph, previous_chief_complaint + ) + step_result.update({ + "updated_hpi": recipient_result.updated_HPI, + "updated_ph": recipient_result.updated_PH, + "updated_chief_complaint": recipient_result.chief_complaint + }) + + # Step 3: 使用Monitor评估任务完成度 + monitor_results = self._execute_monitor_by_phase( + step_num, logger, task_manager, recipient_result + ) + + # Step 4: 更新任务分数 + self._update_task_scores(step_num, logger, task_manager, monitor_results) + + # Step 5: 使用Controller选择下一个任务 + controller_result = self._execute_controller( + step_num, logger, task_manager, recipient_result + ) + + # Step 6: 使用Prompter生成询问策略 + prompter_result = self._execute_prompter( + step_num, logger, recipient_result, controller_result + ) + + # Step 7: 使用Inquirer生成医生问题 + doctor_question = self._execute_inquirer( + step_num, logger, recipient_result, prompter_result + ) + step_result["doctor_question"] = doctor_question + + # Step 8: 获取任务完成情况摘要 + step_result["task_completion_summary"] = task_manager.get_completion_summary() + + step_result["success"] = True + + except Exception as e: + error_msg = f"Step {step_num} 执行失败: {str(e)}" + step_result["errors"].append(error_msg) + logger.log_error(step_num, "step_execution_error", error_msg, {"case_data": case_data}) + print(error_msg) + + return step_result + + def _get_patient_response(self, step_num: int, case_data: Dict[str, Any], + logger: WorkflowLogger, is_first_step: bool, + doctor_question: str = "") -> str: + """获取虚拟患者的回应""" + start_time = time.time() + + try: + # 构建虚拟患者输入 + if is_first_step: + worker_inquiry = "您好,请问您哪里不舒服?" + else: + worker_inquiry = doctor_question + + # 调用虚拟患者agent + patient_result = self.virtual_patient.run( + worker_inquiry=worker_inquiry, + is_first_epoch=is_first_step, + patient_case=case_data + ) + + execution_time = time.time() - start_time + patient_response = patient_result.current_chat + + # 记录日志 + logger.log_agent_execution( + step_num, "virtual_patient", + { + "worker_inquiry": worker_inquiry, + "is_first_epoch": is_first_step, + "case_data": case_data + }, + {"patient_response": patient_response}, + execution_time + ) + + logger.log_patient_response(step_num, patient_response, is_first_step) + + return patient_response + + except Exception as e: + error_msg = f"虚拟患者执行失败: {str(e)}" + logger.log_error(step_num, "virtual_patient_error", error_msg) + # 返回默认回应 + return "对不起,我不太清楚怎么描述,医生您看着办吧。" + + def _execute_recipient(self, step_num: int, logger: WorkflowLogger, + conversation_history: str, previous_hpi: str, + previous_ph: str, previous_chief_complaint: str): + """执行Recipient agent""" + start_time = time.time() + + input_data = { + "conversation_history": conversation_history, + "previous_HPI": previous_hpi, + "previous_PH": previous_ph, + "previous_chief_complaint": previous_chief_complaint + } + + result = self.recipient.run(**input_data) + execution_time = time.time() - start_time + + output_data = { + "updated_HPI": result.updated_HPI, + "updated_PH": result.updated_PH, + "chief_complaint": result.chief_complaint + } + + logger.log_agent_execution(step_num, "recipient", input_data, output_data, execution_time) + + return result + + def _execute_monitor_by_phase(self, step_num: int, logger: WorkflowLogger, + task_manager: TaskManager, recipient_result) -> Dict[str, Dict[str, float]]: + """按阶段执行Monitor评估,只评估当前阶段未完成的任务""" + monitor_results = {} + current_phase = task_manager.get_current_phase() + + # 如果所有任务都完成了,不需要评估 + if current_phase == TaskPhase.COMPLETED: + return monitor_results + + # 获取当前阶段未完成的任务 + pending_tasks = task_manager.get_pending_tasks(current_phase) + if not pending_tasks: + return monitor_results + + start_time = time.time() + + try: + # 使用for循环逐个评估所有未完成的任务 + phase_scores = {} + for task in pending_tasks: + task_name = task.get("name", "") + task_description = task.get("description", "") + + # 调用Monitor评估特定任务 + monitor_result = self.monitor.run( + hpi_content=recipient_result.updated_HPI, + ph_content=recipient_result.updated_PH, + chief_complaint=recipient_result.chief_complaint, + task_name=task_name, + task_description=task_description + ) + + phase_scores[task_name] = monitor_result.completion_score + print(f"任务'{task_name}'评分: {monitor_result.completion_score:.2f} - {monitor_result.reason}") + + execution_time = time.time() - start_time + monitor_results[current_phase] = phase_scores + + # 记录日志 + input_data = { + "hpi_content": recipient_result.updated_HPI, + "ph_content": recipient_result.updated_PH, + "chief_complaint": recipient_result.chief_complaint, + "evaluated_phase": current_phase.value, + "pending_tasks": [t["name"] for t in pending_tasks] + } + + output_data = { + "phase_scores": phase_scores, + "evaluated_tasks": list(phase_scores.keys()), + "average_score": sum(phase_scores.values()) / len(phase_scores) if phase_scores else 0.0 + } + + logger.log_agent_execution(step_num, "monitor", input_data, output_data, execution_time) + + except Exception as e: + error_msg = f"Monitor执行失败: {str(e)}" + logger.log_error(step_num, "monitor_error", error_msg) + # 返回默认的低分评估 + phase_scores = {task["name"]: 0.1 for task in pending_tasks} + monitor_results[current_phase] = phase_scores + + return monitor_results + + def _update_task_scores(self, step_num: int, logger: WorkflowLogger, + task_manager: TaskManager, monitor_results: Dict): + """更新任务分数""" + for phase, scores in monitor_results.items(): + if scores: + old_scores = task_manager.get_task_scores(phase).copy() + task_manager.update_task_scores(phase, scores) + new_scores = task_manager.get_task_scores(phase) + + logger.log_task_scores_update(step_num, phase.value, old_scores, new_scores) + + def _execute_controller(self, step_num: int, logger: WorkflowLogger, + task_manager: TaskManager, recipient_result): + """执行Controller agent""" + start_time = time.time() + + # 获取当前阶段的未完成任务 + current_phase = task_manager.get_current_phase() + pending_tasks = task_manager.get_pending_tasks(current_phase) + + input_data = { + "pending_tasks": pending_tasks, + "chief_complaint": recipient_result.chief_complaint, + "hpi_content": recipient_result.updated_HPI, + "ph_content": recipient_result.updated_PH + } + + result = self.controller.run(**input_data) + execution_time = time.time() - start_time + + output_data = { + "selected_task": result.selected_task, + "specific_guidance": result.specific_guidance + } + + logger.log_agent_execution(step_num, "controller", input_data, output_data, execution_time) + + return result + + def _execute_prompter(self, step_num: int, logger: WorkflowLogger, + recipient_result, controller_result): + """执行Prompter agent""" + start_time = time.time() + + input_data = { + "hpi_content": recipient_result.updated_HPI, + "ph_content": recipient_result.updated_PH, + "chief_complaint": recipient_result.chief_complaint, + "current_task": controller_result.selected_task, + "specific_guidance": controller_result.specific_guidance + } + + result = self.prompter.run(**input_data) + execution_time = time.time() - start_time + + output_data = { + "description": result.description, + "instructions": result.instructions + } + + logger.log_agent_execution(step_num, "prompter", input_data, output_data, execution_time) + + return result + + def _execute_inquirer(self, step_num: int, logger: WorkflowLogger, + recipient_result, prompter_result) -> str: + """执行Inquirer agent""" + start_time = time.time() + + try: + # 使用Prompter生成的描述和指令初始化Inquirer + inquirer = Inquirer( + description=prompter_result.description, + instructions=prompter_result.instructions, + model_type=self.model_type, + llm_config=self.llm_config + ) + + input_data = { + "hpi_content": recipient_result.updated_HPI, + "ph_content": recipient_result.updated_PH, + "chief_complaint": recipient_result.chief_complaint + } + + result = inquirer.run(**input_data) + execution_time = time.time() - start_time + + doctor_question = result.current_chat + + output_data = {"doctor_question": doctor_question} + + logger.log_agent_execution(step_num, "inquirer", input_data, output_data, execution_time) + + return doctor_question + + except Exception as e: + error_msg = f"Inquirer执行失败: {str(e)}" + logger.log_error(step_num, "inquirer_error", error_msg) + # 返回默认问题 + return "请您详细描述一下您的症状,包括什么时候开始的,有什么特点?" \ No newline at end of file diff --git a/workflow/task_manager.py b/workflow/task_manager.py new file mode 100644 index 0000000..4fd505b --- /dev/null +++ b/workflow/task_manager.py @@ -0,0 +1,183 @@ +from typing import Dict, List, Optional +from enum import Enum + +class TaskPhase(Enum): + """任务阶段枚举""" + TRIAGE = "triage" # 分诊阶段 + HPI = "hpi" # 现病史阶段 + PH = "ph" # 既往史阶段 + COMPLETED = "completed" # 全部完成 + +class TaskManager: + """ + 任务管理器 + 负责管理分诊、现病史、既往史三个阶段的子任务状态和完成度评分 + """ + + def __init__(self): + """初始化任务管理器""" + self.completion_threshold = 0.85 # 任务完成阈值 + + # 定义各阶段的子任务 + self.task_definitions = { + TaskPhase.TRIAGE: { + "一级科室判定": {"description": "确定患者应就诊的一级科室(如内科、外科等)"}, + "二级科室判定": {"description": "在一级科室基础上确定具体的二级科室"} + }, + TaskPhase.HPI: { + "起病情况和患病时间": {"description": "了解疾病发生的时间、诱因和起病方式"}, + "主要症状特征": {"description": "详细描述患者的主要症状表现和特点"}, + "病情发展与演变": {"description": "了解病情从发病到现在的发展变化过程"}, + "伴随症状": {"description": "询问除主要症状外的其他相关症状"}, + "诊疗经过": {"description": "了解患者已接受的诊断和治疗情况"}, + "病程基本情况": {"description": "掌握疾病的整体病程和基本情况"} + }, + TaskPhase.PH: { + "疾病史": {"description": "了解患者既往患过的疾病"}, + "手术史": {"description": "询问患者既往手术经历"}, + "过敏史": {"description": "了解患者药物或其他过敏史"}, + "家族史": {"description": "询问家族相关疾病史"}, + "个人史": {"description": "了解患者个人生活史"}, + "预防接种史": {"description": "询问患者疫苗接种情况"} + } + } + + # 初始化任务完成度评分(所有任务初始分数为0.0) + self.task_scores = {} + for phase in self.task_definitions: + self.task_scores[phase] = {} + for task_name in self.task_definitions[phase]: + self.task_scores[phase][task_name] = 0.0 + + def get_current_phase(self) -> TaskPhase: + """ + 获取当前应该执行的任务阶段 + + Returns: + TaskPhase: 当前任务阶段 + """ + # 检查分诊阶段是否完成 + if not self._is_phase_completed(TaskPhase.TRIAGE): + return TaskPhase.TRIAGE + + # 检查现病史阶段是否完成 + if not self._is_phase_completed(TaskPhase.HPI): + return TaskPhase.HPI + + # 检查既往史阶段是否完成 + if not self._is_phase_completed(TaskPhase.PH): + return TaskPhase.PH + + # 所有阶段都完成 + return TaskPhase.COMPLETED + + def get_pending_tasks(self, phase: Optional[TaskPhase] = None) -> List[Dict[str, str]]: + """ + 获取指定阶段的未完成任务列表 + + Args: + phase: 指定的任务阶段,如果为None则获取当前阶段 + + Returns: + List[Dict]: 未完成任务列表,每个任务包含name和description字段 + """ + if phase is None: + phase = self.get_current_phase() + + if phase == TaskPhase.COMPLETED: + return [] + + pending_tasks = [] + phase_tasks = self.task_definitions[phase] + phase_scores = self.task_scores[phase] + + for task_name, task_info in phase_tasks.items(): + if phase_scores[task_name] < self.completion_threshold: + pending_tasks.append({ + "name": task_name, + "description": task_info["description"] + }) + + return pending_tasks + + def update_task_scores(self, phase: TaskPhase, task_scores: Dict[str, float]): + """ + 更新指定阶段的任务完成度评分 + + Args: + phase: 任务阶段 + task_scores: 任务评分字典,格式为 {任务名: 评分} + """ + if phase not in self.task_scores: + return + + for task_name, score in task_scores.items(): + if task_name in self.task_scores[phase]: + self.task_scores[phase][task_name] = score + + def get_task_scores(self, phase: Optional[TaskPhase] = None) -> Dict: + """ + 获取任务评分信息 + + Args: + phase: 指定的任务阶段,如果为None则返回所有阶段 + + Returns: + Dict: 任务评分信息 + """ + if phase is None: + return self.task_scores + return self.task_scores.get(phase, {}) + + def get_completion_summary(self) -> Dict[str, any]: + """ + 获取任务完成情况摘要 + + Returns: + Dict: 完成情况摘要,包含各阶段完成状态和进度 + """ + summary = { + "current_phase": self.get_current_phase().value, + "phases": {} + } + + for phase, tasks in self.task_definitions.items(): + completed_count = sum( + 1 for task_name in tasks + if self.task_scores[phase][task_name] >= self.completion_threshold + ) + total_count = len(tasks) + + summary["phases"][phase.value] = { + "completed": completed_count, + "total": total_count, + "completion_rate": completed_count / total_count if total_count > 0 else 0, + "is_completed": self._is_phase_completed(phase) + } + + return summary + + def _is_phase_completed(self, phase: TaskPhase) -> bool: + """ + 检查指定阶段是否完成 + + Args: + phase: 任务阶段 + + Returns: + bool: 是否完成 + """ + if phase not in self.task_scores: + return False + + phase_scores = self.task_scores[phase] + return all(score >= self.completion_threshold for score in phase_scores.values()) + + def is_workflow_completed(self) -> bool: + """ + 检查整个工作流是否完成 + + Returns: + bool: 是否完成 + """ + return self.get_current_phase() == TaskPhase.COMPLETED \ No newline at end of file diff --git a/workflow/workflow_logger.py b/workflow/workflow_logger.py new file mode 100644 index 0000000..6624e7f --- /dev/null +++ b/workflow/workflow_logger.py @@ -0,0 +1,247 @@ +import json +import os +from datetime import datetime +from typing import Dict, Any, Optional +import hashlib + +class WorkflowLogger: + """ + 工作流日志记录器 + 负责将每个step的详细信息记录到jsonl格式文件中 + """ + + def __init__(self, case_data: Dict[str, Any], log_dir: str = "logs"): + """ + 初始化日志记录器 + + Args: + case_data: 病例数据 + log_dir: 日志目录,默认为"logs" + """ + self.case_data = case_data + self.log_dir = log_dir + self.log_file_path = self._generate_log_file_path() + self.step_count = 0 + + # 确保日志目录存在 + os.makedirs(log_dir, exist_ok=True) + + # 初始化日志文件,记录工作流开始信息 + self._log_workflow_start() + + def _generate_log_file_path(self) -> str: + """ + 为当前病例生成唯一的日志文件路径 + + Returns: + str: 日志文件路径 + """ + # 生成基于病例内容的唯一标识 + case_str = json.dumps(self.case_data, ensure_ascii=False, sort_keys=True) + case_hash = hashlib.md5(case_str.encode('utf-8')).hexdigest()[:8] + + # 生成时间戳 + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + + # 构建文件名 + filename = f"workflow_{timestamp}_{case_hash}.jsonl" + return os.path.join(self.log_dir, filename) + + def _log_workflow_start(self): + """记录工作流开始信息""" + start_log = { + "event_type": "workflow_start", + "timestamp": datetime.now().isoformat(), + "case_data": self.case_data, + "workflow_config": { + "max_steps": 30, + "completion_threshold": 0.85, + "phases": ["triage", "hpi", "ph"] + } + } + self._write_log_entry(start_log) + + def log_step_start(self, step_num: int, current_phase: str, pending_tasks: list): + """ + 记录step开始信息 + + Args: + step_num: step编号 + current_phase: 当前阶段 + pending_tasks: 待完成任务列表 + """ + self.step_count = step_num + step_start_log = { + "event_type": "step_start", + "step_number": step_num, + "timestamp": datetime.now().isoformat(), + "current_phase": current_phase, + "pending_tasks": pending_tasks + } + self._write_log_entry(step_start_log) + + def log_patient_response(self, step_num: int, patient_message: str, is_first_step: bool = False): + """ + 记录患者回应 + + Args: + step_num: step编号 + patient_message: 患者消息 + is_first_step: 是否为第一个step + """ + patient_log = { + "event_type": "patient_response", + "step_number": step_num, + "timestamp": datetime.now().isoformat(), + "is_first_step": is_first_step, + "message": patient_message + } + self._write_log_entry(patient_log) + + def log_agent_execution(self, step_num: int, agent_name: str, + input_data: Dict[str, Any], output_data: Dict[str, Any], + execution_time: Optional[float] = None): + """ + 记录agent执行信息 + + Args: + step_num: step编号 + agent_name: agent名称 + input_data: 输入数据 + output_data: 输出数据 + execution_time: 执行时间(秒) + """ + agent_log = { + "event_type": "agent_execution", + "step_number": step_num, + "timestamp": datetime.now().isoformat(), + "agent_name": agent_name, + "input_data": input_data, + "output_data": output_data + } + + if execution_time is not None: + agent_log["execution_time_seconds"] = execution_time + + self._write_log_entry(agent_log) + + def log_task_scores_update(self, step_num: int, phase: str, + old_scores: Dict[str, float], + new_scores: Dict[str, float]): + """ + 记录任务评分更新 + + Args: + step_num: step编号 + phase: 阶段名称 + old_scores: 更新前的评分 + new_scores: 更新后的评分 + """ + scores_log = { + "event_type": "task_scores_update", + "step_number": step_num, + "timestamp": datetime.now().isoformat(), + "phase": phase, + "old_scores": old_scores, + "new_scores": new_scores, + "score_changes": { + task: new_scores[task] - old_scores.get(task, 0.0) + for task in new_scores + } + } + self._write_log_entry(scores_log) + + def log_step_complete(self, step_num: int, doctor_question: str, + conversation_history: str, task_completion_summary: Dict): + """ + 记录step完成信息 + + Args: + step_num: step编号 + doctor_question: 医生生成的问题 + conversation_history: 对话历史 + task_completion_summary: 任务完成情况摘要 + """ + step_complete_log = { + "event_type": "step_complete", + "step_number": step_num, + "timestamp": datetime.now().isoformat(), + "doctor_question": doctor_question, + "conversation_history": conversation_history, + "task_completion_summary": task_completion_summary + } + self._write_log_entry(step_complete_log) + + def log_workflow_complete(self, total_steps: int, final_summary: Dict, success: bool = True): + """ + 记录工作流完成信息 + + Args: + total_steps: 总step数 + final_summary: 最终摘要 + success: 是否成功完成 + """ + complete_log = { + "event_type": "workflow_complete", + "timestamp": datetime.now().isoformat(), + "total_steps": total_steps, + "success": success, + "final_summary": final_summary, + "log_file_path": self.log_file_path + } + self._write_log_entry(complete_log) + + def log_error(self, step_num: int, error_type: str, error_message: str, + error_context: Optional[Dict] = None): + """ + 记录错误信息 + + Args: + step_num: step编号 + error_type: 错误类型 + error_message: 错误消息 + error_context: 错误上下文 + """ + error_log = { + "event_type": "error", + "step_number": step_num, + "timestamp": datetime.now().isoformat(), + "error_type": error_type, + "error_message": error_message + } + + if error_context: + error_log["error_context"] = error_context + + self._write_log_entry(error_log) + + def _write_log_entry(self, log_entry: Dict[str, Any]): + """ + 写入一条日志记录到jsonl文件 + + Args: + log_entry: 日志条目 + """ + try: + with open(self.log_file_path, 'a', encoding='utf-8') as f: + f.write(json.dumps(log_entry, ensure_ascii=False) + '\n') + except Exception as e: + print(f"写入日志失败: {e}") + + def get_log_file_path(self) -> str: + """ + 获取日志文件路径 + + Returns: + str: 日志文件路径 + """ + return self.log_file_path + + def get_step_count(self) -> int: + """ + 获取当前step计数 + + Returns: + int: step计数 + """ + return self.step_count \ No newline at end of file