新增:实现医疗问诊工作流系统并优化Monitor智能体

- 新增完整的workflow模块,包含MedicalWorkflow、TaskManager、StepExecutor和WorkflowLogger四个核心组件
- 实现分诊、现病史、既往史三阶段任务管理和状态跟踪机制
- 优化Monitor智能体支持针对特定任务的精准评估,解决任务评价针对性问题
- 完善agent_system各模块的__init__.py文件,确保正确的模块导入
- 实现详细的jsonl格式日志记录系统,支持完整workflow执行追踪

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
iomgaa 2025-08-11 20:40:33 +08:00
parent 239cd0f730
commit 399e4d4447
8 changed files with 1115 additions and 5 deletions

View File

@ -1 +1,5 @@
# Controller Agent Module
from .agent import TaskController
from .prompt import ControllerPrompt
from .response_model import ControllerDecision
__all__ = ["TaskController", "ControllerPrompt", "ControllerDecision"]

View File

@ -1,4 +1,4 @@
from typing import Optional
from typing import Optional, List, Dict
from agent_system.base import BaseAgent
from agent_system.monitor.prompt import MonitorPrompt
from agent_system.monitor.response_model import MonitorResult
@ -20,7 +20,8 @@ class Monitor(BaseAgent):
use_cache=False
)
def run(self, hpi_content: str, ph_content: str, chief_complaint: str) -> MonitorResult:
def run(self, hpi_content: str, ph_content: str, chief_complaint: str,
task_name: str = None, task_description: str = None) -> MonitorResult:
"""
监控病史质量
@ -28,12 +29,18 @@ class Monitor(BaseAgent):
hpi_content: 现病史内容
ph_content: 既往史内容
chief_complaint: 主诉
task_name: 任务名称可选用于针对性评估
task_description: 任务描述可选用于针对性评估
Returns:
MonitorResult: 包含完成度评分和评分理由
"""
# 构建评估提示
prompt = self.build_prompt(hpi_content, ph_content, chief_complaint)
# 根据是否提供任务信息选择不同的构建方式
if task_name and task_description:
prompt = self._build_task_specific_prompt(task_name, task_description,
hpi_content, ph_content, chief_complaint)
else:
prompt = self.build_prompt(hpi_content, ph_content, chief_complaint)
# 调用LLM进行评估
result = super().run(prompt)
@ -98,3 +105,55 @@ class Monitor(BaseAgent):
请基于上述标准进行客观评估"""
return prompt
def _build_task_specific_prompt(self, task_name: str, task_description: str,
hpi_content: str, ph_content: str, chief_complaint: str) -> str:
"""
构建针对特定任务的评估提示语
Args:
task_name: 任务名称
task_description: 任务描述
hpi_content: 现病史内容
ph_content: 既往史内容
chief_complaint: 主诉
Returns:
str: 构建好的任务特定评估提示语
"""
prompt = f"""请针对特定任务对病史信息进行质量监控和评估:
**评估目标任务**
任务名称{task_name}
任务描述{task_description}
**当前病史信息**
主诉{chief_complaint}
现病史{hpi_content}
既往史{ph_content}
**评估要求**
1. **专门针对任务"{task_name}"进行评估**
2. 根据任务描述"{task_description}"判断当前病史信息在这个方面的完整性
3. 重点关注与该任务相关的信息是否充分收集
4. 给出该任务的完成度评分0.0-1.0范围
5. 详细说明评分理由解释该任务还缺少哪些关键信息
**评分标准**针对该任务
- 0.9-1.0: 该任务相关信息非常完整无需补充
- 0.8-0.9: 该任务相关信息较完整仅有少量细节缺失
- 0.7-0.8: 该任务相关信息基本齐全有一些重要细节待补充
- 0.6-0.7: 该任务相关信息不够完整缺少多项关键信息
- 0.5-0.6: 该任务相关信息缺失较多需要大量补充
- 0.0-0.5: 该任务相关信息严重不足或完全缺失
**输出格式**
严格按照以下JSON格式输出
{{
"completion_score": 浮点数0.0-1.0
"reason": "针对任务'{task_name}'的详细评分理由,说明该任务完成情况和缺失信息"
}}
请基于上述要求进行针对性评估"""
return prompt

View File

@ -0,0 +1,5 @@
from .agent import VirtualPatientAgent
from .response_model import TriageVirtualPatientResponseModel
from .prompt import TriageVirtualPatientPrompt
__all__ = ["VirtualPatientAgent", "TriageVirtualPatientResponseModel", "TriageVirtualPatientPrompt"]

7
workflow/__init__.py Normal file
View File

@ -0,0 +1,7 @@
# 医疗问诊工作流模块
from .medical_workflow import MedicalWorkflow
from .task_manager import TaskManager
from .step_executor import StepExecutor
from .workflow_logger import WorkflowLogger
__all__ = ["MedicalWorkflow", "TaskManager", "StepExecutor", "WorkflowLogger"]

View File

@ -0,0 +1,231 @@
from typing import Dict, Any, Optional
import time
from .task_manager import TaskManager, TaskPhase
from .step_executor import StepExecutor
from .workflow_logger import WorkflowLogger
class MedicalWorkflow:
"""
医疗问诊工作流主控制器
负责协调整个30步问诊过程的执行
"""
def __init__(self, case_data: Dict[str, Any], model_type: str = "gpt-oss:latest",
llm_config: Optional[Dict] = None, max_steps: int = 30, log_dir: str = "logs"):
"""
初始化医疗问诊工作流
Args:
case_data: 病例数据包含病案介绍等信息
model_type: 使用的语言模型类型默认为"gpt-oss:latest"
llm_config: 语言模型配置默认为None
max_steps: 最大执行步数默认为30
log_dir: 日志目录默认为"logs"
"""
self.case_data = case_data
self.model_type = model_type
self.llm_config = llm_config or {}
self.max_steps = max_steps
# 初始化核心组件
self.task_manager = TaskManager()
self.step_executor = StepExecutor(model_type=model_type, llm_config=self.llm_config)
self.logger = WorkflowLogger(case_data=case_data, log_dir=log_dir)
# 初始化工作流状态
self.current_step = 0
self.conversation_history = ""
self.current_hpi = ""
self.current_ph = ""
self.current_chief_complaint = ""
self.workflow_completed = False
self.workflow_success = False
def run(self) -> str:
"""
执行完整的医疗问诊工作流
Returns:
str: 日志文件路径
"""
print(f"开始执行医疗问诊工作流,病例:{self.case_data.get('病案介绍', {}).get('主诉', '未知病例')}")
try:
# 执行工作流的主循环
for step in range(1, self.max_steps + 1):
self.current_step = step
# 检查是否所有任务都已完成
if self.task_manager.is_workflow_completed():
print(f"所有任务已完成,工作流在第 {step} 步结束")
self.workflow_completed = True
self.workflow_success = True
break
# 执行当前step
if not self._execute_single_step(step):
print(f"Step {step} 执行失败,工作流终止")
break
# 显示进度
self._print_step_progress(step)
# 如果达到最大步数但任务未完成
if not self.workflow_completed:
print(f"已达到最大步数 {self.max_steps},工作流结束")
self.workflow_success = False
except Exception as e:
print(f"工作流执行出现异常: {str(e)}")
self.logger.log_error(self.current_step, "workflow_error", str(e))
self.workflow_success = False
finally:
# 记录工作流完成信息
final_summary = self.task_manager.get_completion_summary()
self.logger.log_workflow_complete(
total_steps=self.current_step,
final_summary=final_summary,
success=self.workflow_success
)
print(f"工作流执行完成,日志文件:{self.logger.get_log_file_path()}")
return self.logger.get_log_file_path()
def _execute_single_step(self, step_num: int) -> bool:
"""
执行单个step
Args:
step_num: step编号
Returns:
bool: 是否执行成功
"""
try:
# 获取当前阶段和待完成任务
current_phase = self.task_manager.get_current_phase()
pending_tasks = self.task_manager.get_pending_tasks(current_phase)
# 记录step开始
self.logger.log_step_start(step_num, current_phase.value, pending_tasks)
# 确定是否为第一步
is_first_step = (step_num == 1)
# 准备医生问题(非首轮时使用上轮的结果)
doctor_question = getattr(self, '_last_doctor_question', "")
# 执行step
step_result = self.step_executor.execute_step(
step_num=step_num,
case_data=self.case_data,
task_manager=self.task_manager,
logger=self.logger,
conversation_history=self.conversation_history,
previous_hpi=self.current_hpi,
previous_ph=self.current_ph,
previous_chief_complaint=self.current_chief_complaint,
is_first_step=is_first_step,
doctor_question=doctor_question
)
# 检查执行结果
if not step_result["success"]:
print(f"Step {step_num} 执行失败: {step_result.get('errors', [])}")
return False
# 更新工作流状态
self._update_workflow_state(step_result)
# 记录step完成
self.logger.log_step_complete(
step_num=step_num,
doctor_question=step_result["doctor_question"],
conversation_history=step_result["conversation_history"],
task_completion_summary=step_result["task_completion_summary"]
)
return True
except Exception as e:
error_msg = f"Step {step_num} 执行异常: {str(e)}"
print(error_msg)
self.logger.log_error(step_num, "step_error", error_msg)
return False
def _update_workflow_state(self, step_result: Dict[str, Any]):
"""
根据step执行结果更新工作流状态
Args:
step_result: step执行结果
"""
self.conversation_history = step_result["conversation_history"]
self.current_hpi = step_result["updated_hpi"]
self.current_ph = step_result["updated_ph"]
self.current_chief_complaint = step_result["updated_chief_complaint"]
self._last_doctor_question = step_result["doctor_question"]
def _print_step_progress(self, step_num: int):
"""
打印step进度信息
Args:
step_num: step编号
"""
current_phase = self.task_manager.get_current_phase()
completion_summary = self.task_manager.get_completion_summary()
print(f"\n=== Step {step_num} 完成 ===")
print(f"当前阶段: {current_phase.value}")
# 显示各阶段完成情况
for phase_name, phase_info in completion_summary["phases"].items():
status = "" if phase_info["is_completed"] else ""
print(f"{status} {phase_name}: {phase_info['completed']}/{phase_info['total']} 任务完成 "
f"({phase_info['completion_rate']:.1%})")
print(f"对话轮次: {step_num}")
print(f"最新医生问题: {getattr(self, '_last_doctor_question', '暂无')[:50]}...")
print("-" * 60)
def get_current_status(self) -> Dict[str, Any]:
"""
获取当前工作流状态
Returns:
Dict: 工作流状态信息
"""
return {
"current_step": self.current_step,
"max_steps": self.max_steps,
"current_phase": self.task_manager.get_current_phase().value,
"workflow_completed": self.workflow_completed,
"workflow_success": self.workflow_success,
"completion_summary": self.task_manager.get_completion_summary(),
"conversation_length": len(self.conversation_history),
"log_file_path": self.logger.get_log_file_path()
}
def get_conversation_history(self) -> str:
"""
获取完整的对话历史
Returns:
str: 对话历史
"""
return self.conversation_history
def get_medical_summary(self) -> Dict[str, str]:
"""
获取当前医疗信息摘要
Returns:
Dict: 医疗信息摘要
"""
return {
"chief_complaint": self.current_chief_complaint,
"history_of_present_illness": self.current_hpi,
"past_history": self.current_ph
}

374
workflow/step_executor.py Normal file
View File

@ -0,0 +1,374 @@
import time
from typing import Dict, Any, List, Optional
from agent_system.recipient import RecipientAgent
from agent_system.monitor import Monitor
from agent_system.controller import TaskController
from agent_system.prompter import Prompter
from agent_system.inquirer import Inquirer
from agent_system.virtual_patient import VirtualPatientAgent
from .task_manager import TaskManager, TaskPhase
from .workflow_logger import WorkflowLogger
class StepExecutor:
"""
单step执行器
负责执行单个step中的完整agent pipeline流程
"""
def __init__(self, model_type: str = "gpt-oss:latest", llm_config: dict = None):
"""
初始化step执行器
Args:
model_type: 使用的语言模型类型
llm_config: 语言模型配置
"""
self.model_type = model_type
self.llm_config = llm_config or {}
# 初始化所有agent
self.recipient = RecipientAgent(model_type=model_type, llm_config=self.llm_config)
self.monitor = Monitor(model_type=model_type, llm_config=self.llm_config)
self.controller = TaskController(model_type=model_type, llm_config=self.llm_config)
self.prompter = Prompter(model_type=model_type, llm_config=self.llm_config)
self.virtual_patient = VirtualPatientAgent(model_type=model_type, llm_config=self.llm_config)
def execute_step(self,
step_num: int,
case_data: Dict[str, Any],
task_manager: TaskManager,
logger: WorkflowLogger,
conversation_history: str = "",
previous_hpi: str = "",
previous_ph: str = "",
previous_chief_complaint: str = "",
is_first_step: bool = False,
doctor_question: str = "") -> Dict[str, Any]:
"""
执行单个step的完整流程
Args:
step_num: step编号
case_data: 病例数据
task_manager: 任务管理器
logger: 日志记录器
conversation_history: 对话历史
previous_hpi: 上轮现病史
previous_ph: 上轮既往史
previous_chief_complaint: 上轮主诉
is_first_step: 是否为第一个step
doctor_question: 医生问题非首轮时
Returns:
Dict: step执行结果包含更新后的病史信息医生问题患者回应等
"""
step_result = {
"step_number": step_num,
"success": False,
"patient_response": "",
"updated_hpi": previous_hpi,
"updated_ph": previous_ph,
"updated_chief_complaint": previous_chief_complaint,
"doctor_question": "",
"conversation_history": conversation_history,
"task_completion_summary": {},
"errors": []
}
try:
# Step 1: 获取患者回应
patient_response = self._get_patient_response(
step_num, case_data, logger, is_first_step, doctor_question
)
step_result["patient_response"] = patient_response
# 更新对话历史
if is_first_step:
updated_conversation = f"患者: {patient_response}"
else:
updated_conversation = conversation_history + f"\n医生: {doctor_question}\n患者: {patient_response}"
step_result["conversation_history"] = updated_conversation
# Step 2: 使用Recipient更新病史信息
recipient_result = self._execute_recipient(
step_num, logger, updated_conversation, previous_hpi, previous_ph, previous_chief_complaint
)
step_result.update({
"updated_hpi": recipient_result.updated_HPI,
"updated_ph": recipient_result.updated_PH,
"updated_chief_complaint": recipient_result.chief_complaint
})
# Step 3: 使用Monitor评估任务完成度
monitor_results = self._execute_monitor_by_phase(
step_num, logger, task_manager, recipient_result
)
# Step 4: 更新任务分数
self._update_task_scores(step_num, logger, task_manager, monitor_results)
# Step 5: 使用Controller选择下一个任务
controller_result = self._execute_controller(
step_num, logger, task_manager, recipient_result
)
# Step 6: 使用Prompter生成询问策略
prompter_result = self._execute_prompter(
step_num, logger, recipient_result, controller_result
)
# Step 7: 使用Inquirer生成医生问题
doctor_question = self._execute_inquirer(
step_num, logger, recipient_result, prompter_result
)
step_result["doctor_question"] = doctor_question
# Step 8: 获取任务完成情况摘要
step_result["task_completion_summary"] = task_manager.get_completion_summary()
step_result["success"] = True
except Exception as e:
error_msg = f"Step {step_num} 执行失败: {str(e)}"
step_result["errors"].append(error_msg)
logger.log_error(step_num, "step_execution_error", error_msg, {"case_data": case_data})
print(error_msg)
return step_result
def _get_patient_response(self, step_num: int, case_data: Dict[str, Any],
logger: WorkflowLogger, is_first_step: bool,
doctor_question: str = "") -> str:
"""获取虚拟患者的回应"""
start_time = time.time()
try:
# 构建虚拟患者输入
if is_first_step:
worker_inquiry = "您好,请问您哪里不舒服?"
else:
worker_inquiry = doctor_question
# 调用虚拟患者agent
patient_result = self.virtual_patient.run(
worker_inquiry=worker_inquiry,
is_first_epoch=is_first_step,
patient_case=case_data
)
execution_time = time.time() - start_time
patient_response = patient_result.current_chat
# 记录日志
logger.log_agent_execution(
step_num, "virtual_patient",
{
"worker_inquiry": worker_inquiry,
"is_first_epoch": is_first_step,
"case_data": case_data
},
{"patient_response": patient_response},
execution_time
)
logger.log_patient_response(step_num, patient_response, is_first_step)
return patient_response
except Exception as e:
error_msg = f"虚拟患者执行失败: {str(e)}"
logger.log_error(step_num, "virtual_patient_error", error_msg)
# 返回默认回应
return "对不起,我不太清楚怎么描述,医生您看着办吧。"
def _execute_recipient(self, step_num: int, logger: WorkflowLogger,
conversation_history: str, previous_hpi: str,
previous_ph: str, previous_chief_complaint: str):
"""执行Recipient agent"""
start_time = time.time()
input_data = {
"conversation_history": conversation_history,
"previous_HPI": previous_hpi,
"previous_PH": previous_ph,
"previous_chief_complaint": previous_chief_complaint
}
result = self.recipient.run(**input_data)
execution_time = time.time() - start_time
output_data = {
"updated_HPI": result.updated_HPI,
"updated_PH": result.updated_PH,
"chief_complaint": result.chief_complaint
}
logger.log_agent_execution(step_num, "recipient", input_data, output_data, execution_time)
return result
def _execute_monitor_by_phase(self, step_num: int, logger: WorkflowLogger,
task_manager: TaskManager, recipient_result) -> Dict[str, Dict[str, float]]:
"""按阶段执行Monitor评估只评估当前阶段未完成的任务"""
monitor_results = {}
current_phase = task_manager.get_current_phase()
# 如果所有任务都完成了,不需要评估
if current_phase == TaskPhase.COMPLETED:
return monitor_results
# 获取当前阶段未完成的任务
pending_tasks = task_manager.get_pending_tasks(current_phase)
if not pending_tasks:
return monitor_results
start_time = time.time()
try:
# 使用for循环逐个评估所有未完成的任务
phase_scores = {}
for task in pending_tasks:
task_name = task.get("name", "")
task_description = task.get("description", "")
# 调用Monitor评估特定任务
monitor_result = self.monitor.run(
hpi_content=recipient_result.updated_HPI,
ph_content=recipient_result.updated_PH,
chief_complaint=recipient_result.chief_complaint,
task_name=task_name,
task_description=task_description
)
phase_scores[task_name] = monitor_result.completion_score
print(f"任务'{task_name}'评分: {monitor_result.completion_score:.2f} - {monitor_result.reason}")
execution_time = time.time() - start_time
monitor_results[current_phase] = phase_scores
# 记录日志
input_data = {
"hpi_content": recipient_result.updated_HPI,
"ph_content": recipient_result.updated_PH,
"chief_complaint": recipient_result.chief_complaint,
"evaluated_phase": current_phase.value,
"pending_tasks": [t["name"] for t in pending_tasks]
}
output_data = {
"phase_scores": phase_scores,
"evaluated_tasks": list(phase_scores.keys()),
"average_score": sum(phase_scores.values()) / len(phase_scores) if phase_scores else 0.0
}
logger.log_agent_execution(step_num, "monitor", input_data, output_data, execution_time)
except Exception as e:
error_msg = f"Monitor执行失败: {str(e)}"
logger.log_error(step_num, "monitor_error", error_msg)
# 返回默认的低分评估
phase_scores = {task["name"]: 0.1 for task in pending_tasks}
monitor_results[current_phase] = phase_scores
return monitor_results
def _update_task_scores(self, step_num: int, logger: WorkflowLogger,
task_manager: TaskManager, monitor_results: Dict):
"""更新任务分数"""
for phase, scores in monitor_results.items():
if scores:
old_scores = task_manager.get_task_scores(phase).copy()
task_manager.update_task_scores(phase, scores)
new_scores = task_manager.get_task_scores(phase)
logger.log_task_scores_update(step_num, phase.value, old_scores, new_scores)
def _execute_controller(self, step_num: int, logger: WorkflowLogger,
task_manager: TaskManager, recipient_result):
"""执行Controller agent"""
start_time = time.time()
# 获取当前阶段的未完成任务
current_phase = task_manager.get_current_phase()
pending_tasks = task_manager.get_pending_tasks(current_phase)
input_data = {
"pending_tasks": pending_tasks,
"chief_complaint": recipient_result.chief_complaint,
"hpi_content": recipient_result.updated_HPI,
"ph_content": recipient_result.updated_PH
}
result = self.controller.run(**input_data)
execution_time = time.time() - start_time
output_data = {
"selected_task": result.selected_task,
"specific_guidance": result.specific_guidance
}
logger.log_agent_execution(step_num, "controller", input_data, output_data, execution_time)
return result
def _execute_prompter(self, step_num: int, logger: WorkflowLogger,
recipient_result, controller_result):
"""执行Prompter agent"""
start_time = time.time()
input_data = {
"hpi_content": recipient_result.updated_HPI,
"ph_content": recipient_result.updated_PH,
"chief_complaint": recipient_result.chief_complaint,
"current_task": controller_result.selected_task,
"specific_guidance": controller_result.specific_guidance
}
result = self.prompter.run(**input_data)
execution_time = time.time() - start_time
output_data = {
"description": result.description,
"instructions": result.instructions
}
logger.log_agent_execution(step_num, "prompter", input_data, output_data, execution_time)
return result
def _execute_inquirer(self, step_num: int, logger: WorkflowLogger,
recipient_result, prompter_result) -> str:
"""执行Inquirer agent"""
start_time = time.time()
try:
# 使用Prompter生成的描述和指令初始化Inquirer
inquirer = Inquirer(
description=prompter_result.description,
instructions=prompter_result.instructions,
model_type=self.model_type,
llm_config=self.llm_config
)
input_data = {
"hpi_content": recipient_result.updated_HPI,
"ph_content": recipient_result.updated_PH,
"chief_complaint": recipient_result.chief_complaint
}
result = inquirer.run(**input_data)
execution_time = time.time() - start_time
doctor_question = result.current_chat
output_data = {"doctor_question": doctor_question}
logger.log_agent_execution(step_num, "inquirer", input_data, output_data, execution_time)
return doctor_question
except Exception as e:
error_msg = f"Inquirer执行失败: {str(e)}"
logger.log_error(step_num, "inquirer_error", error_msg)
# 返回默认问题
return "请您详细描述一下您的症状,包括什么时候开始的,有什么特点?"

183
workflow/task_manager.py Normal file
View File

@ -0,0 +1,183 @@
from typing import Dict, List, Optional
from enum import Enum
class TaskPhase(Enum):
"""任务阶段枚举"""
TRIAGE = "triage" # 分诊阶段
HPI = "hpi" # 现病史阶段
PH = "ph" # 既往史阶段
COMPLETED = "completed" # 全部完成
class TaskManager:
"""
任务管理器
负责管理分诊现病史既往史三个阶段的子任务状态和完成度评分
"""
def __init__(self):
"""初始化任务管理器"""
self.completion_threshold = 0.85 # 任务完成阈值
# 定义各阶段的子任务
self.task_definitions = {
TaskPhase.TRIAGE: {
"一级科室判定": {"description": "确定患者应就诊的一级科室(如内科、外科等)"},
"二级科室判定": {"description": "在一级科室基础上确定具体的二级科室"}
},
TaskPhase.HPI: {
"起病情况和患病时间": {"description": "了解疾病发生的时间、诱因和起病方式"},
"主要症状特征": {"description": "详细描述患者的主要症状表现和特点"},
"病情发展与演变": {"description": "了解病情从发病到现在的发展变化过程"},
"伴随症状": {"description": "询问除主要症状外的其他相关症状"},
"诊疗经过": {"description": "了解患者已接受的诊断和治疗情况"},
"病程基本情况": {"description": "掌握疾病的整体病程和基本情况"}
},
TaskPhase.PH: {
"疾病史": {"description": "了解患者既往患过的疾病"},
"手术史": {"description": "询问患者既往手术经历"},
"过敏史": {"description": "了解患者药物或其他过敏史"},
"家族史": {"description": "询问家族相关疾病史"},
"个人史": {"description": "了解患者个人生活史"},
"预防接种史": {"description": "询问患者疫苗接种情况"}
}
}
# 初始化任务完成度评分所有任务初始分数为0.0
self.task_scores = {}
for phase in self.task_definitions:
self.task_scores[phase] = {}
for task_name in self.task_definitions[phase]:
self.task_scores[phase][task_name] = 0.0
def get_current_phase(self) -> TaskPhase:
"""
获取当前应该执行的任务阶段
Returns:
TaskPhase: 当前任务阶段
"""
# 检查分诊阶段是否完成
if not self._is_phase_completed(TaskPhase.TRIAGE):
return TaskPhase.TRIAGE
# 检查现病史阶段是否完成
if not self._is_phase_completed(TaskPhase.HPI):
return TaskPhase.HPI
# 检查既往史阶段是否完成
if not self._is_phase_completed(TaskPhase.PH):
return TaskPhase.PH
# 所有阶段都完成
return TaskPhase.COMPLETED
def get_pending_tasks(self, phase: Optional[TaskPhase] = None) -> List[Dict[str, str]]:
"""
获取指定阶段的未完成任务列表
Args:
phase: 指定的任务阶段如果为None则获取当前阶段
Returns:
List[Dict]: 未完成任务列表每个任务包含name和description字段
"""
if phase is None:
phase = self.get_current_phase()
if phase == TaskPhase.COMPLETED:
return []
pending_tasks = []
phase_tasks = self.task_definitions[phase]
phase_scores = self.task_scores[phase]
for task_name, task_info in phase_tasks.items():
if phase_scores[task_name] < self.completion_threshold:
pending_tasks.append({
"name": task_name,
"description": task_info["description"]
})
return pending_tasks
def update_task_scores(self, phase: TaskPhase, task_scores: Dict[str, float]):
"""
更新指定阶段的任务完成度评分
Args:
phase: 任务阶段
task_scores: 任务评分字典格式为 {任务名: 评分}
"""
if phase not in self.task_scores:
return
for task_name, score in task_scores.items():
if task_name in self.task_scores[phase]:
self.task_scores[phase][task_name] = score
def get_task_scores(self, phase: Optional[TaskPhase] = None) -> Dict:
"""
获取任务评分信息
Args:
phase: 指定的任务阶段如果为None则返回所有阶段
Returns:
Dict: 任务评分信息
"""
if phase is None:
return self.task_scores
return self.task_scores.get(phase, {})
def get_completion_summary(self) -> Dict[str, any]:
"""
获取任务完成情况摘要
Returns:
Dict: 完成情况摘要包含各阶段完成状态和进度
"""
summary = {
"current_phase": self.get_current_phase().value,
"phases": {}
}
for phase, tasks in self.task_definitions.items():
completed_count = sum(
1 for task_name in tasks
if self.task_scores[phase][task_name] >= self.completion_threshold
)
total_count = len(tasks)
summary["phases"][phase.value] = {
"completed": completed_count,
"total": total_count,
"completion_rate": completed_count / total_count if total_count > 0 else 0,
"is_completed": self._is_phase_completed(phase)
}
return summary
def _is_phase_completed(self, phase: TaskPhase) -> bool:
"""
检查指定阶段是否完成
Args:
phase: 任务阶段
Returns:
bool: 是否完成
"""
if phase not in self.task_scores:
return False
phase_scores = self.task_scores[phase]
return all(score >= self.completion_threshold for score in phase_scores.values())
def is_workflow_completed(self) -> bool:
"""
检查整个工作流是否完成
Returns:
bool: 是否完成
"""
return self.get_current_phase() == TaskPhase.COMPLETED

247
workflow/workflow_logger.py Normal file
View File

@ -0,0 +1,247 @@
import json
import os
from datetime import datetime
from typing import Dict, Any, Optional
import hashlib
class WorkflowLogger:
"""
工作流日志记录器
负责将每个step的详细信息记录到jsonl格式文件中
"""
def __init__(self, case_data: Dict[str, Any], log_dir: str = "logs"):
"""
初始化日志记录器
Args:
case_data: 病例数据
log_dir: 日志目录默认为"logs"
"""
self.case_data = case_data
self.log_dir = log_dir
self.log_file_path = self._generate_log_file_path()
self.step_count = 0
# 确保日志目录存在
os.makedirs(log_dir, exist_ok=True)
# 初始化日志文件,记录工作流开始信息
self._log_workflow_start()
def _generate_log_file_path(self) -> str:
"""
为当前病例生成唯一的日志文件路径
Returns:
str: 日志文件路径
"""
# 生成基于病例内容的唯一标识
case_str = json.dumps(self.case_data, ensure_ascii=False, sort_keys=True)
case_hash = hashlib.md5(case_str.encode('utf-8')).hexdigest()[:8]
# 生成时间戳
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# 构建文件名
filename = f"workflow_{timestamp}_{case_hash}.jsonl"
return os.path.join(self.log_dir, filename)
def _log_workflow_start(self):
"""记录工作流开始信息"""
start_log = {
"event_type": "workflow_start",
"timestamp": datetime.now().isoformat(),
"case_data": self.case_data,
"workflow_config": {
"max_steps": 30,
"completion_threshold": 0.85,
"phases": ["triage", "hpi", "ph"]
}
}
self._write_log_entry(start_log)
def log_step_start(self, step_num: int, current_phase: str, pending_tasks: list):
"""
记录step开始信息
Args:
step_num: step编号
current_phase: 当前阶段
pending_tasks: 待完成任务列表
"""
self.step_count = step_num
step_start_log = {
"event_type": "step_start",
"step_number": step_num,
"timestamp": datetime.now().isoformat(),
"current_phase": current_phase,
"pending_tasks": pending_tasks
}
self._write_log_entry(step_start_log)
def log_patient_response(self, step_num: int, patient_message: str, is_first_step: bool = False):
"""
记录患者回应
Args:
step_num: step编号
patient_message: 患者消息
is_first_step: 是否为第一个step
"""
patient_log = {
"event_type": "patient_response",
"step_number": step_num,
"timestamp": datetime.now().isoformat(),
"is_first_step": is_first_step,
"message": patient_message
}
self._write_log_entry(patient_log)
def log_agent_execution(self, step_num: int, agent_name: str,
input_data: Dict[str, Any], output_data: Dict[str, Any],
execution_time: Optional[float] = None):
"""
记录agent执行信息
Args:
step_num: step编号
agent_name: agent名称
input_data: 输入数据
output_data: 输出数据
execution_time: 执行时间
"""
agent_log = {
"event_type": "agent_execution",
"step_number": step_num,
"timestamp": datetime.now().isoformat(),
"agent_name": agent_name,
"input_data": input_data,
"output_data": output_data
}
if execution_time is not None:
agent_log["execution_time_seconds"] = execution_time
self._write_log_entry(agent_log)
def log_task_scores_update(self, step_num: int, phase: str,
old_scores: Dict[str, float],
new_scores: Dict[str, float]):
"""
记录任务评分更新
Args:
step_num: step编号
phase: 阶段名称
old_scores: 更新前的评分
new_scores: 更新后的评分
"""
scores_log = {
"event_type": "task_scores_update",
"step_number": step_num,
"timestamp": datetime.now().isoformat(),
"phase": phase,
"old_scores": old_scores,
"new_scores": new_scores,
"score_changes": {
task: new_scores[task] - old_scores.get(task, 0.0)
for task in new_scores
}
}
self._write_log_entry(scores_log)
def log_step_complete(self, step_num: int, doctor_question: str,
conversation_history: str, task_completion_summary: Dict):
"""
记录step完成信息
Args:
step_num: step编号
doctor_question: 医生生成的问题
conversation_history: 对话历史
task_completion_summary: 任务完成情况摘要
"""
step_complete_log = {
"event_type": "step_complete",
"step_number": step_num,
"timestamp": datetime.now().isoformat(),
"doctor_question": doctor_question,
"conversation_history": conversation_history,
"task_completion_summary": task_completion_summary
}
self._write_log_entry(step_complete_log)
def log_workflow_complete(self, total_steps: int, final_summary: Dict, success: bool = True):
"""
记录工作流完成信息
Args:
total_steps: 总step数
final_summary: 最终摘要
success: 是否成功完成
"""
complete_log = {
"event_type": "workflow_complete",
"timestamp": datetime.now().isoformat(),
"total_steps": total_steps,
"success": success,
"final_summary": final_summary,
"log_file_path": self.log_file_path
}
self._write_log_entry(complete_log)
def log_error(self, step_num: int, error_type: str, error_message: str,
error_context: Optional[Dict] = None):
"""
记录错误信息
Args:
step_num: step编号
error_type: 错误类型
error_message: 错误消息
error_context: 错误上下文
"""
error_log = {
"event_type": "error",
"step_number": step_num,
"timestamp": datetime.now().isoformat(),
"error_type": error_type,
"error_message": error_message
}
if error_context:
error_log["error_context"] = error_context
self._write_log_entry(error_log)
def _write_log_entry(self, log_entry: Dict[str, Any]):
"""
写入一条日志记录到jsonl文件
Args:
log_entry: 日志条目
"""
try:
with open(self.log_file_path, 'a', encoding='utf-8') as f:
f.write(json.dumps(log_entry, ensure_ascii=False) + '\n')
except Exception as e:
print(f"写入日志失败: {e}")
def get_log_file_path(self) -> str:
"""
获取日志文件路径
Returns:
str: 日志文件路径
"""
return self.log_file_path
def get_step_count(self) -> int:
"""
获取当前step计数
Returns:
int: step计数
"""
return self.step_count