392 lines
17 KiB
Python
Raw Permalink Normal View History

from typing import Dict, Any, List
from agent_system.base import BaseAgent
from agent_system.controller.prompt import ControllerPrompt
from agent_system.controller.response_model import ControllerDecision
class TaskController(BaseAgent):
"""
任务控制器智能体
负责根据患者的临床信息现病史既往史主诉从未完成的任务列表中
选择最合适的下一步任务并提供预问诊询问指导建议
核心功能:
1. 分析患者的临床信息和病情特征
2. 选择最重要的询问任务
3. 提供针对性的预问诊询问指导
4. 确保指导内容仅限于医生可询问的信息
Attributes:
model_type (str): 使用的大语言模型类型默认为 gpt-oss:latest
llm_config (dict): LLM模型配置参数
simple_mode (bool): 简化模式标志True时自动选择第一个任务并返回固定指导
"""
def __init__(self, model_type: str = "gpt-oss:latest", llm_config: dict = None,
simple_mode: bool = False, score_driven_mode: bool = False):
"""
初始化任务控制器智能体
Args:
model_type (str): 大语言模型类型默认使用 gpt-oss:latest
llm_config (dict): LLM模型的配置参数如果为None则使用默认配置
simple_mode (bool): 简化模式如果为True则自动选择第一个任务并返回固定指导默认为False
score_driven_mode (bool): 分数驱动模式如果为True则选择当前任务组中分数最低的任务默认为False
Note:
score_driven_mode和simple_mode不能同时为True如果同时为True则优先使用score_driven_mode
"""
self.simple_mode = simple_mode
self.score_driven_mode = score_driven_mode
super().__init__(
model_type=model_type,
description="医疗任务控制器,负责任务选择和预问诊询问指导",
instructions=ControllerPrompt.instructions,
response_model=ControllerDecision,
llm_config=llm_config or {},
structured_outputs=True,
markdown=False,
use_cache=False
)
def run(self,
pending_tasks: List[Dict[str, str]],
chief_complaint: str,
hpi_content: str = "",
ph_content: str = "",
additional_info: str = "",
task_manager = None) -> ControllerDecision:
"""
执行任务控制决策
基于患者的临床信息和待执行的任务列表选择最合适的任务
并提供具体的执行指导建议
Args:
pending_tasks (List[Dict[str, str]]): 待执行的任务列表每个任务包含namedescription字段
chief_complaint (str): 患者主诉
hpi_content (str, optional): 现病史内容默认为空字符串
ph_content (str, optional): 既往史内容默认为空字符串
additional_info (str, optional): 附加信息可能包含补充的临床信息默认为空字符串
Returns:
ControllerDecision: 包含任务选择决策和预问诊询问指导的结构化数据包括
- selected_task: 选择的任务名称
- specific_guidance: 针对选定任务的预问诊询问指导建议
Raises:
Exception: 当LLM调用失败时返回包含默认信息的ControllerDecision
"""
try:
# 分数驱动模式:选择当前任务组中分数最低的任务
if self.score_driven_mode and task_manager is not None:
return self._get_score_driven_result(pending_tasks, task_manager)
# 简化模式:直接选择第一个任务并返回固定指导
elif self.simple_mode:
return self._get_simple_mode_result(pending_tasks)
# 构建决策提示词
prompt = self._build_decision_prompt(
pending_tasks, chief_complaint, hpi_content, ph_content, additional_info
)
# 调用基类的run方法执行LLM推理
result = super().run(prompt)
# 确保返回正确的类型并进行类型转换
return self._ensure_result_type(result)
except Exception as e:
# 当决策失败时记录错误并返回默认结果
print(f"任务控制决策失败: {str(e)}")
return self._get_fallback_result(pending_tasks)
def _ensure_result_type(self, result: Any) -> ControllerDecision:
"""
确保返回结果为正确的类型
Args:
result (Any): LLM返回的原始结果
Returns:
ControllerDecision: 转换后的结构化结果
"""
if isinstance(result, ControllerDecision):
return result
elif isinstance(result, dict):
return ControllerDecision(**result)
else:
# 如果类型不匹配,返回默认结果
return self._get_fallback_result([])
def _get_score_driven_result(self, pending_tasks: List[Dict[str, str]], task_manager) -> ControllerDecision:
"""
分数驱动模式下生成决策结果
在分数驱动模式下从当前任务组的未完成任务中选择分数最低的任务
并返回相应的询问指导这是基于数值比较的算法选择无需LLM参与
Args:
pending_tasks (List[Dict[str, str]]): 待执行的任务列表包含name和description
task_manager: 任务管理器实例用于获取任务分数信息
Returns:
ControllerDecision: 包含分数驱动模式任务选择和指导的结果
"""
if not pending_tasks:
return ControllerDecision(
selected_task="基本信息收集",
specific_guidance="当前没有待执行任务,请按照标准医疗询问流程进行患者评估。"
)
# 获取当前任务阶段
current_phase = task_manager.get_current_phase()
# 获取当前阶段的任务分数
phase_scores = task_manager.get_task_scores(current_phase)
# 在待执行任务中找到分数最低的任务
lowest_score_task = None
lowest_score = float('inf')
for task in pending_tasks:
task_name = task.get("name", "")
task_score = phase_scores.get(task_name, 0.0)
if task_score < lowest_score:
lowest_score = task_score
lowest_score_task = task
# 如果没有找到合适的任务,选择第一个并记录错误日志
if lowest_score_task is None:
# 使用logger记录错误如果没有logger则使用print作为后备
error_msg = f"Controller-ScoreDriven警告在阶段{current_phase.value}中未找到合适任务,使用默认第一个任务"
try:
import logging
logger = logging.getLogger(__name__)
logger.error(error_msg)
except:
print(f"[ERROR] {error_msg}")
lowest_score_task = pending_tasks[0]
lowest_score = phase_scores.get(lowest_score_task.get("name", ""), 0.0)
selected_task_name = lowest_score_task.get("name", "未知任务")
# 使用和simple模式相同的固定指导
return ControllerDecision(
selected_task=selected_task_name,
specific_guidance="请按照标准医疗询问流程进行患者评估,基于患者临床信息选择最重要的询问任务,提供针对性的、具体的、可操作的询问指导建议,确保指导内容仅限于医生可以通过询问获取的信息。"
)
def _get_simple_mode_result(self, pending_tasks: List[Dict[str, str]]) -> ControllerDecision:
"""
简化模式下生成决策结果
在简化模式下直接选择第一个待执行任务并返回固定的询问指导
Args:
pending_tasks (List[Dict[str, str]]): 待执行的任务列表
Returns:
ControllerDecision: 包含简化模式任务选择和固定指导的结果
"""
# 如果有待执行任务,选择第一个作为默认任务
if pending_tasks:
selected_task = pending_tasks[0]
selected_task_name = selected_task.get("name", "未知任务")
else:
selected_task_name = "基本信息收集"
return ControllerDecision(
selected_task=selected_task_name,
specific_guidance="请按照标准医疗询问流程进行患者评估,基于患者临床信息选择最重要的询问任务,提供针对性的、具体的、可操作的询问指导建议,确保指导内容仅限于医生可以通过询问获取的信息。"
)
def _get_fallback_result(self, pending_tasks: List[Dict[str, str]]) -> ControllerDecision:
"""
生成决策失败时的默认结果
Args:
pending_tasks (List[Dict[str, str]]): 待执行的任务列表
Returns:
ControllerDecision: 包含默认任务选择的结果
"""
# 如果有待执行任务,选择第一个作为默认任务
if pending_tasks:
default_task = pending_tasks[0]
selected_task_name = default_task.get("name", "未知任务")
else:
selected_task_name = "基本信息收集"
return ControllerDecision(
selected_task=selected_task_name,
specific_guidance="由于系统异常,请按照标准临床询问流程进行患者评估,重点询问患者的主要症状、起病过程和伴随症状等基本病史信息。"
)
def _build_decision_prompt(self,
pending_tasks: List[Dict[str, str]],
chief_complaint: str,
hpi_content: str,
ph_content: str,
additional_info: str = "") -> str:
"""
构建任务控制决策的提示词模板
根据待执行任务列表和患者临床信息构建简洁高效的决策提示词
引导LLM进行专业的任务选择和指导建议
Args:
pending_tasks (List[Dict[str, str]]): 待执行的任务列表
chief_complaint (str): 患者主诉
hpi_content (str): 现病史内容
ph_content (str): 既往史内容
additional_info (str, optional): 附加信息默认为空字符串
Returns:
str: 精简的决策提示词
"""
# 格式化待执行任务列表
tasks_display = ""
for i, task in enumerate(pending_tasks, 1):
task_name = task.get("name", "未知任务")
task_desc = task.get("description", "无描述")
tasks_display += f"{i}. 任务名称: {task_name}\n 描述: {task_desc}\n\n"
if not tasks_display.strip():
tasks_display = "当前没有待执行的任务。"
# 确保临床信息的合理显示
hpi_display = hpi_content.strip() if hpi_content.strip() else "暂无现病史信息"
ph_display = ph_content.strip() if ph_content.strip() else "暂无既往史信息"
# 处理附加信息
additional_info_section = ""
if additional_info.strip():
additional_info_section = f"\n附加信息: {additional_info.strip()}"
# 检查是否包含科室判定任务,并生成特殊指导
department_guidance = self._generate_department_guidance(pending_tasks, additional_info)
# 从prompt类获取示例输出格式
example_output = ControllerPrompt.get_example_output()
prompt = f"""患者临床信息:
主诉: {chief_complaint}
现病史: {hpi_display}
既往史: {ph_display}{additional_info_section}
待执行任务列表
{tasks_display}
请根据患者的临床信息分析病情特征从上述任务列表中选择最合适的下一步任务并提供具体的执行指导建议
{department_guidance}
输出格式示例
{example_output}
请严格按照上述JSON格式输出
输出内容为:"""
return prompt
def _generate_department_guidance(self, pending_tasks: List[Dict[str, str]], additional_info: str) -> str:
"""
为科室判定任务生成特殊指导
当任务列表中包含科室判定相关任务时利用附加信息医院科室信息和上一轮分诊结果
生成针对科室分诊的特殊指导重点关注容易混淆的科室和误判风险
Args:
pending_tasks (List[Dict[str, str]]): 待执行的任务列表
additional_info (str): 分诊附加信息包含医院科室信息和上一轮分诊结果
Returns:
str: 科室判定的特殊指导内容如果没有相关任务则返回空字符串
"""
# 检查是否有科室判定相关的任务
department_tasks = []
for task in pending_tasks:
task_name = task.get("name", "").lower()
if any(keyword in task_name for keyword in ["科室", "分诊", "department"]):
department_tasks.append(task)
if not department_tasks or not additional_info.strip():
return ""
# 生成科室判定的特殊指导
guidance_parts = ["## 科室判定任务特别指导"]
guidance_parts.append(
"如果选择科室判定任务,请在询问时重点关注:"
)
for task in department_tasks:
task_name = task.get("name", "")
if "一级科室" in task_name:
guidance_parts.extend([
"- 详细询问患者症状特征,结合医院科室设置判断最适合的一级科室",
"- 询问症状的具体表现,与其他一级科室易混淆症状进行鉴别",
"- 通过询问病史和症状发展过程,排除其他可能的科室选择",
"- 重点询问与科室判定相关的关键症状和体征"
])
elif "二级科室" in task_name:
guidance_parts.extend([
"- 基于一级科室判定结果,询问更详细的专业相关症状",
"- 通过询问了解患者症状的专业特征,区分二级科室内不同专业方向",
"- 询问既往相关疾病史和家族史,辅助二级科室判定",
"- 重点询问能帮助细分专业科室的特异性症状"
])
# 将附加信息内容整合到指导中
if additional_info.strip():
guidance_parts.append("\n## 医院具体情况参考:")
guidance_parts.extend(additional_info.splitlines())
return "\n".join(guidance_parts)
def select_optimal_task(self,
tasks: List[Dict[str, str]],
patient_info: Dict[str, str]) -> ControllerDecision:
"""
基于患者信息选择最优任务的便捷接口
这是一个专门用于任务选择的简化接口接受结构化的患者信息
Args:
tasks (List[Dict[str, str]]): 待执行的任务列表
patient_info (Dict[str, str]): 患者信息字典包含chief_complainthpiphadditional_info等字段
Returns:
ControllerDecision: 任务选择决策结果
"""
chief_complaint = patient_info.get("chief_complaint", "")
hpi_content = patient_info.get("hpi", "")
ph_content = patient_info.get("ph", "")
additional_info = patient_info.get("additional_info", "")
return self.run(
pending_tasks=tasks,
chief_complaint=chief_complaint,
hpi_content=hpi_content,
ph_content=ph_content,
additional_info=additional_info
)
def get_task_guidance(self, result: ControllerDecision) -> Dict[str, Any]:
"""
获取任务指导的结构化信息
Args:
result (ControllerDecision): 控制器决策结果
Returns:
Dict[str, Any]: 包含任务指导信息的字典
"""
return {
"task_name": result.selected_task,
"guidance": result.specific_guidance
}