triage/workflow/task_manager.py

183 lines
6.6 KiB
Python
Raw Normal View History

from typing import Dict, List, Optional
from enum import Enum
class TaskPhase(Enum):
"""任务阶段枚举"""
TRIAGE = "triage" # 分诊阶段
HPI = "hpi" # 现病史阶段
PH = "ph" # 既往史阶段
COMPLETED = "completed" # 全部完成
class TaskManager:
"""
任务管理器
负责管理分诊现病史既往史三个阶段的子任务状态和完成度评分
"""
def __init__(self):
"""初始化任务管理器"""
self.completion_threshold = 0.85 # 任务完成阈值
# 定义各阶段的子任务
self.task_definitions = {
TaskPhase.TRIAGE: {
"一级科室判定": {"description": "确定患者应就诊的一级科室(如内科、外科等)"},
"二级科室判定": {"description": "在一级科室基础上确定具体的二级科室"}
},
TaskPhase.HPI: {
"起病情况和患病时间": {"description": "了解疾病发生的时间、诱因和起病方式"},
"主要症状特征": {"description": "详细描述患者的主要症状表现和特点"},
"病情发展与演变": {"description": "了解病情从发病到现在的发展变化过程"},
"伴随症状": {"description": "询问除主要症状外的其他相关症状"},
"诊疗经过": {"description": "了解患者已接受的诊断和治疗情况"},
"病程基本情况": {"description": "掌握疾病的整体病程和基本情况"}
},
TaskPhase.PH: {
"疾病史": {"description": "了解患者既往患过的疾病"},
"手术史": {"description": "询问患者既往手术经历"},
"过敏史": {"description": "了解患者药物或其他过敏史"},
"家族史": {"description": "询问家族相关疾病史"},
"个人史": {"description": "了解患者个人生活史"},
"预防接种史": {"description": "询问患者疫苗接种情况"}
}
}
# 初始化任务完成度评分所有任务初始分数为0.0
self.task_scores = {}
for phase in self.task_definitions:
self.task_scores[phase] = {}
for task_name in self.task_definitions[phase]:
self.task_scores[phase][task_name] = 0.0
def get_current_phase(self) -> TaskPhase:
"""
获取当前应该执行的任务阶段
Returns:
TaskPhase: 当前任务阶段
"""
# 检查分诊阶段是否完成
if not self._is_phase_completed(TaskPhase.TRIAGE):
return TaskPhase.TRIAGE
# 检查现病史阶段是否完成
if not self._is_phase_completed(TaskPhase.HPI):
return TaskPhase.HPI
# 检查既往史阶段是否完成
if not self._is_phase_completed(TaskPhase.PH):
return TaskPhase.PH
# 所有阶段都完成
return TaskPhase.COMPLETED
def get_pending_tasks(self, phase: Optional[TaskPhase] = None) -> List[Dict[str, str]]:
"""
获取指定阶段的未完成任务列表
Args:
phase: 指定的任务阶段如果为None则获取当前阶段
Returns:
List[Dict]: 未完成任务列表每个任务包含name和description字段
"""
if phase is None:
phase = self.get_current_phase()
if phase == TaskPhase.COMPLETED:
return []
pending_tasks = []
phase_tasks = self.task_definitions[phase]
phase_scores = self.task_scores[phase]
for task_name, task_info in phase_tasks.items():
if phase_scores[task_name] < self.completion_threshold:
pending_tasks.append({
"name": task_name,
"description": task_info["description"]
})
return pending_tasks
def update_task_scores(self, phase: TaskPhase, task_scores: Dict[str, float]):
"""
更新指定阶段的任务完成度评分
Args:
phase: 任务阶段
task_scores: 任务评分字典格式为 {任务名: 评分}
"""
if phase not in self.task_scores:
return
for task_name, score in task_scores.items():
if task_name in self.task_scores[phase]:
self.task_scores[phase][task_name] = score
def get_task_scores(self, phase: Optional[TaskPhase] = None) -> Dict:
"""
获取任务评分信息
Args:
phase: 指定的任务阶段如果为None则返回所有阶段
Returns:
Dict: 任务评分信息
"""
if phase is None:
return self.task_scores
return self.task_scores.get(phase, {})
def get_completion_summary(self) -> Dict[str, any]:
"""
获取任务完成情况摘要
Returns:
Dict: 完成情况摘要包含各阶段完成状态和进度
"""
summary = {
"current_phase": self.get_current_phase().value,
"phases": {}
}
for phase, tasks in self.task_definitions.items():
completed_count = sum(
1 for task_name in tasks
if self.task_scores[phase][task_name] >= self.completion_threshold
)
total_count = len(tasks)
summary["phases"][phase.value] = {
"completed": completed_count,
"total": total_count,
"completion_rate": completed_count / total_count if total_count > 0 else 0,
"is_completed": self._is_phase_completed(phase)
}
return summary
def _is_phase_completed(self, phase: TaskPhase) -> bool:
"""
检查指定阶段是否完成
Args:
phase: 任务阶段
Returns:
bool: 是否完成
"""
if phase not in self.task_scores:
return False
phase_scores = self.task_scores[phase]
return all(score >= self.completion_threshold for score in phase_scores.values())
def is_workflow_completed(self) -> bool:
"""
检查整个工作流是否完成
Returns:
bool: 是否完成
"""
return self.get_current_phase() == TaskPhase.COMPLETED