#!/usr/bin/env python3 """ 文本分析综合示例 基于SubAgent系统的复杂应用示例,展示: 1. 多Agent协作系统 2. 复杂的数据处理pipeline 3. 结构化输出和错误恢复 4. 性能监控和质量评估 """ import sys import os import time from typing import List, Dict, Any, Optional, Tuple from datetime import datetime import uuid # 添加项目根路径到Python路径 project_root = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) sys.path.append(project_root) from src.agent_system import SubAgent, create_json_agent from example_models import ( TextAnalysisResult, DocumentClassificationResult, StructuredDataExtraction, ComprehensiveAnalysisResult, SentimentAnalysis, KeywordExtraction, DataExtractionItem, CategoryClassification, TaskExecutionResult ) class TextAnalysisEngine: """文本分析引擎 - 多Agent协作系统""" def __init__(self): """初始化文本分析引擎""" self.agents = {} self.processing_stats = { "total_processed": 0, "successful_analyses": 0, "failed_analyses": 0, "average_processing_time": 0.0 } # 初始化所有Agent self._initialize_agents() def _initialize_agents(self): """初始化所有分析Agent""" print("🔧 初始化文本分析引擎...") try: # 1. 情感分析Agent self.agents['sentiment'] = self._create_sentiment_agent() # 2. 关键词提取Agent self.agents['keywords'] = self._create_keyword_agent() # 3. 文本分类Agent self.agents['classification'] = self._create_classification_agent() # 4. 数据提取Agent self.agents['extraction'] = self._create_extraction_agent() # 5. 综合分析Agent self.agents['comprehensive'] = self._create_comprehensive_agent() print(f"✅ 成功初始化 {len(self.agents)} 个Agent") except Exception as e: print(f"❌ Agent初始化失败: {e}") raise def _create_sentiment_agent(self) -> SubAgent: """创建情感分析Agent""" instructions = [ "你是专业的文本情感分析专家", "准确识别文本的情感倾向和情感强度", "提供详细的分析依据和相关关键词", "对分析结果给出可信度评估" ] prompt_template = """ 请对以下文本进行深入的情感分析: 【文本内容】 {text} 【分析要求】 1. 识别主要情感倾向(positive/negative/neutral) 2. 评估情感强度和置信度(0-1) 3. 提供分析说明和判断依据 4. 提取影响情感判断的关键词和短语 5. 考虑语言的细微差别和上下文含义 请提供准确、专业的分析结果。 """ return SubAgent( provider="aliyun", model_name="qwen-max", name="sentiment_analyzer", description="专业的情感分析系统", instructions=instructions, prompt_template=prompt_template, response_model=SentimentAnalysis ) def _create_keyword_agent(self) -> SubAgent: """创建关键词提取Agent""" instructions = [ "你是专业的关键词提取专家", "从文本中识别最重要和最相关的关键词", "评估关键词的重要性和频率", "对关键词进行合理的分类" ] prompt_template = """ 请从以下文本中提取关键词: 【文本内容】 {text} 【提取要求】 1. 识别最重要的关键词和短语 2. 统计关键词出现频率 3. 评估每个关键词的重要性(0-1) 4. 对关键词进行分类(如:人物、地点、概念、技术等) 5. 排除停用词和无意义词汇 请提供结构化的关键词提取结果。 """ # 创建专门处理关键词列表的响应模型 from pydantic import BaseModel, Field from typing import List class KeywordExtractionResult(BaseModel): keywords: List[KeywordExtraction] = Field(description="提取的关键词列表") total_count: int = Field(description="关键词总数", ge=0) text_complexity: float = Field(description="文本复杂度", ge=0.0, le=1.0) return SubAgent( provider="aliyun", model_name="qwen-max", name="keyword_extractor", description="智能关键词提取系统", instructions=instructions, prompt_template=prompt_template, response_model=KeywordExtractionResult ) def _create_classification_agent(self) -> SubAgent: """创建文档分类Agent""" instructions = [ "你是专业的文档分类专家", "准确识别文档的类型和主题", "提供多级分类和置信度评估", "考虑文档的内容、风格和用途" ] prompt_template = """ 请对以下文档进行分类: 【文档内容】 {text} 【分类体系】 主要分类:技术文档、商业文档、学术论文、新闻报道、个人写作、法律文档、医学文档等 详细分类:根据具体内容进一步细分 【分类要求】 1. 确定主要分类和置信度 2. 提供所有可能分类的概率分布 3. 识别用于分类判断的关键特征 4. 评估分类的可信度 请提供准确的分类结果。 """ return SubAgent( provider="aliyun", model_name="qwen-max", name="document_classifier", description="智能文档分类系统", instructions=instructions, prompt_template=prompt_template, response_model=DocumentClassificationResult ) def _create_extraction_agent(self) -> SubAgent: """创建数据提取Agent""" instructions = [ "你是专业的结构化数据提取专家", "从非结构化文本中提取有价值的信息", "确保提取的数据准确性和完整性", "评估提取质量和可靠性" ] prompt_template = """ 请从以下文本中提取结构化数据: 【文本内容】 {text} 【提取目标】 根据文本内容自动识别可提取的数据类型,可能包括: - 人名、地名、机构名 - 日期、时间、数量 - 联系方式、地址 - 专业术语、概念 - 关键指标、统计数据 【提取要求】 1. 自动识别文本中的结构化信息 2. 为每个提取项提供置信度评估 3. 记录提取依据和来源文本片段 4. 评估整体提取质量和完整性 请提供详细的数据提取结果。 """ return SubAgent( provider="aliyun", model_name="qwen-max", name="data_extractor", description="智能数据提取系统", instructions=instructions, prompt_template=prompt_template, response_model=StructuredDataExtraction ) def _create_comprehensive_agent(self) -> SubAgent: """创建综合分析Agent""" instructions = [ "你是文本综合分析专家", "整合多种分析结果提供整体评估", "识别分析中的一致性和矛盾之处", "提供改进建议和深度见解" ] prompt_template = """ 基于以下多维度分析结果,请提供综合评估: 【原始文本】 {original_text} 【分析结果】 情感分析: {sentiment_result} 关键词提取: {keyword_result} 文档分类: {classification_result} 数据提取: {extraction_result} 【综合评估要求】 1. 评估各项分析结果的一致性 2. 识别潜在的分析矛盾或问题 3. 提供整体质量评估 4. 给出置信度评估 5. 提出改进建议 请提供专业的综合分析报告。 """ return SubAgent( provider="aliyun", model_name="qwen-max", name="comprehensive_analyzer", description="文本综合分析系统", instructions=instructions, prompt_template=prompt_template, response_model=ComprehensiveAnalysisResult ) def analyze_text(self, text: str, analysis_id: Optional[str] = None) -> ComprehensiveAnalysisResult: """执行完整的文本分析""" if analysis_id is None: analysis_id = f"analysis_{uuid.uuid4().hex[:8]}" start_time = time.time() self.processing_stats["total_processed"] += 1 print(f"\n🔍 开始分析 [{analysis_id}]") print(f"文本长度: {len(text)} 字符") try: # 阶段1: 基础分析 print("📊 执行基础分析...") sentiment_result = self._analyze_sentiment(text) keyword_result = self._extract_keywords(text) # 阶段2: 高级分析 print("🧠 执行高级分析...") classification_result = self._classify_document(text) extraction_result = self._extract_data(text) # 阶段3: 综合分析 print("🎯 执行综合分析...") comprehensive_result = self._comprehensive_analysis( text, sentiment_result, keyword_result, classification_result, extraction_result, analysis_id ) # 更新统计信息 processing_time = time.time() - start_time self.processing_stats["successful_analyses"] += 1 self._update_processing_stats(processing_time) print(f"✅ 分析完成 [{analysis_id}] - 耗时: {processing_time:.2f}秒") return comprehensive_result except Exception as e: self.processing_stats["failed_analyses"] += 1 print(f"❌ 分析失败 [{analysis_id}]: {e}") raise def _analyze_sentiment(self, text: str) -> SentimentAnalysis: """执行情感分析""" try: result = self.agents['sentiment'].run(template_vars={"text": text}) print(f" 情感: {result.sentiment} (置信度: {result.confidence:.3f})") return result except Exception as e: print(f" ⚠️ 情感分析失败: {e}") # 返回默认结果 return SentimentAnalysis( sentiment="neutral", confidence=0.0, explanation=f"分析失败: {e}", keywords=[] ) def _extract_keywords(self, text: str): """提取关键词""" try: result = self.agents['keywords'].run(template_vars={"text": text}) print(f" 关键词: {result.total_count} 个") return result except Exception as e: print(f" ⚠️ 关键词提取失败: {e}") # 返回空结果 from pydantic import BaseModel, Field from typing import List class KeywordExtractionResult(BaseModel): keywords: List[KeywordExtraction] = Field(default_factory=list) total_count: int = Field(default=0) text_complexity: float = Field(default=0.5) return KeywordExtractionResult() def _classify_document(self, text: str) -> DocumentClassificationResult: """执行文档分类""" try: result = self.agents['classification'].run(template_vars={"text": text}) print(f" 分类: {result.primary_category} (置信度: {result.confidence:.3f})") return result except Exception as e: print(f" ⚠️ 文档分类失败: {e}") # 返回默认结果 return DocumentClassificationResult( primary_category="未知", confidence=0.0, all_categories=[ CategoryClassification( category="未知", confidence=0.0, probability=0.0 ) ] ) def _extract_data(self, text: str) -> StructuredDataExtraction: """提取结构化数据""" try: result = self.agents['extraction'].run(template_vars={"text": text}) print(f" 数据提取: {result.extracted_fields}/{result.total_fields} 字段") return result except Exception as e: print(f" ⚠️ 数据提取失败: {e}") # 返回空结果 return StructuredDataExtraction( extracted_data={}, extraction_items=[], extraction_quality="poor", completeness=0.0, accuracy=0.0, total_fields=0, extracted_fields=0, failed_fields=0 ) def _comprehensive_analysis( self, original_text: str, sentiment_result: SentimentAnalysis, keyword_result, classification_result: DocumentClassificationResult, extraction_result: StructuredDataExtraction, analysis_id: str ) -> ComprehensiveAnalysisResult: """执行综合分析""" try: # 准备模板变量 template_vars = { "original_text": original_text[:500] + ("..." if len(original_text) > 500 else ""), "sentiment_result": f"情感:{sentiment_result.sentiment}, 置信度:{sentiment_result.confidence}", "keyword_result": f"关键词数量:{getattr(keyword_result, 'total_count', 0)}", "classification_result": f"分类:{classification_result.primary_category}, 置信度:{classification_result.confidence}", "extraction_result": f"提取质量:{extraction_result.extraction_quality}" } result = self.agents['comprehensive'].run(template_vars=template_vars) # 补充一些字段 result.analysis_id = analysis_id result.input_summary = f"长度:{len(original_text)}字符, 类型:{classification_result.primary_category}" result.text_analysis = self._build_text_analysis_result( original_text, sentiment_result, keyword_result ) result.classification = classification_result result.data_extraction = extraction_result print(f" 综合评估: {result.overall_quality}") return result except Exception as e: print(f" ⚠️ 综合分析失败: {e}") # 构建基本的综合结果 return ComprehensiveAnalysisResult( analysis_id=analysis_id, input_summary=f"长度:{len(original_text)}字符", overall_quality="poor", confidence_level=0.0, total_processing_time=0.0, components_completed=0, components_failed=4, recommendations=["分析失败,请检查输入文本和系统配置"] ) def _build_text_analysis_result( self, text: str, sentiment: SentimentAnalysis, keyword_result ) -> TextAnalysisResult: """构建文本分析结果""" # 获取关键词列表 keywords = getattr(keyword_result, 'keywords', []) return TextAnalysisResult( text_length=len(text), word_count=len(text.split()), language="zh", summary=f"文本分析摘要: 情感倾向为{sentiment.sentiment}", sentiment=sentiment, keywords=keywords, readability="medium", complexity=getattr(keyword_result, 'text_complexity', 0.5) ) def _update_processing_stats(self, processing_time: float): """更新处理统计信息""" total = self.processing_stats["total_processed"] current_avg = self.processing_stats["average_processing_time"] # 计算新的平均处理时间 new_avg = ((current_avg * (total - 1)) + processing_time) / total self.processing_stats["average_processing_time"] = new_avg def get_processing_stats(self) -> Dict[str, Any]: """获取处理统计信息""" return self.processing_stats.copy() def display_stats(self): """显示处理统计""" stats = self.get_processing_stats() print("\n📈 处理统计信息:") print(f" 总处理数: {stats['total_processed']}") print(f" 成功数: {stats['successful_analyses']}") print(f" 失败数: {stats['failed_analyses']}") if stats['total_processed'] > 0: success_rate = stats['successful_analyses'] / stats['total_processed'] * 100 print(f" 成功率: {success_rate:.1f}%") print(f" 平均处理时间: {stats['average_processing_time']:.2f}秒") def demo_single_text_analysis(): """演示单个文本分析""" print("🔍 单文本分析演示") print("="*50) # 创建分析引擎 engine = TextAnalysisEngine() # 测试文本 test_text = """ 人工智能技术正在快速发展,深度学习和机器学习算法在各个领域都取得了显著的进展。 从自然语言处理到计算机视觉,从推荐系统到自动驾驶,AI技术正在改变我们的生活方式。 然而,我们也需要关注AI发展带来的挑战,包括隐私保护、算法偏见、就业影响等问题。 只有在技术发展和社会责任之间找到平衡,AI才能真正造福人类社会。 总的来说,人工智能的未来充满希望,但也需要我们谨慎对待,确保技术发展的方向符合人类的长远利益。 """ try: # 执行分析 result = engine.analyze_text(test_text) # 显示详细结果 display_analysis_result(result) # 显示统计信息 engine.display_stats() return True except Exception as e: print(f"❌ 演示失败: {e}") return False def demo_batch_analysis(): """演示批量文本分析""" print("\n🔄 批量文本分析演示") print("="*50) # 创建分析引擎 engine = TextAnalysisEngine() # 测试文本集合 test_texts = [ "今天天气真好,阳光明媚,心情特别愉快!", "公司最新发布的季度财报显示,营收同比增长15%,净利润达到2.3亿元。董事会决定向股东分红每股0.5元。", "机器学习是人工智能的一个重要分支,通过算法让计算机能够从数据中学习模式。常见的机器学习算法包括线性回归、决策树、神经网络等。", "服务态度恶劣,产品质量很差,完全不值这个价格。强烈不推荐大家购买!", "根据《合同法》第一百二十一条规定,当事人一方因第三人的原因造成违约的,应当向对方承担违约责任。" ] results = [] start_time = time.time() for i, text in enumerate(test_texts, 1): print(f"\n处理第 {i}/{len(test_texts)} 个文本...") try: result = engine.analyze_text(text, f"batch_{i}") results.append(result) # 显示简要结果 print(f" 结果: {result.overall_quality} | 置信度: {result.confidence_level:.3f}") except Exception as e: print(f" ❌ 处理失败: {e}") results.append(None) total_time = time.time() - start_time # 显示批量处理总结 print(f"\n📊 批量处理完成 - 总耗时: {total_time:.2f}秒") engine.display_stats() # 显示成功处理的结果统计 successful_results = [r for r in results if r is not None] if successful_results: print(f"\n🎯 成功处理 {len(successful_results)} 个文本:") quality_stats = {} for result in successful_results: quality = result.overall_quality quality_stats[quality] = quality_stats.get(quality, 0) + 1 for quality, count in quality_stats.items(): print(f" {quality}: {count} 个") return len(successful_results) > 0 def display_analysis_result(result: ComprehensiveAnalysisResult): """显示详细的分析结果""" print(f"\n📋 详细分析结果 [{result.analysis_id}]") print("="*60) print(f"输入摘要: {result.input_summary}") print(f"分析时间: {result.analysis_timestamp}") print(f"整体质量: {result.overall_quality}") print(f"置信度: {result.confidence_level:.3f}") print(f"处理时间: {result.total_processing_time:.2f}秒") # 文本分析结果 if result.text_analysis: ta = result.text_analysis print(f"\n📝 文本分析:") print(f" 长度: {ta.text_length} 字符") print(f" 词数: {ta.word_count}") print(f" 摘要: {ta.summary}") print(f" 情感: {ta.sentiment.sentiment} (置信度: {ta.sentiment.confidence:.3f})") print(f" 可读性: {ta.readability}") if ta.keywords: top_keywords = ta.keywords[:5] print(f" 关键词: {[k.keyword for k in top_keywords]}") # 分类结果 if result.classification: cls = result.classification print(f"\n🏷️ 文档分类:") print(f" 主分类: {cls.primary_category}") print(f" 置信度: {cls.confidence:.3f}") if len(cls.all_categories) > 1: other_cats = cls.all_categories[1:3] print(f" 其他可能: {[c.category for c in other_cats]}") # 数据提取结果 if result.data_extraction: de = result.data_extraction print(f"\n🔍 数据提取:") print(f" 质量: {de.extraction_quality}") print(f" 完整性: {de.completeness:.3f}") print(f" 准确性: {de.accuracy:.3f}") print(f" 字段统计: {de.extracted_fields}/{de.total_fields}") if de.extraction_items: print(" 提取项目:") for item in de.extraction_items[:3]: # 显示前3个 print(f" {item.field_name}: {item.field_value} (置信度: {item.confidence:.3f})") # 改进建议 if result.recommendations: print(f"\n💡 改进建议:") for i, rec in enumerate(result.recommendations[:3], 1): print(f" {i}. {rec}") def interactive_analysis(): """交互式分析功能""" print("\n💬 交互式文本分析") print("="*50) print("输入文本进行综合分析,输入'quit'退出") try: engine = TextAnalysisEngine() while True: print("\n" + "-"*30) user_input = input("请输入要分析的文本: ").strip() if user_input.lower() == 'quit': print("分析结束,再见!") break if not user_input: continue if len(user_input) < 10: print("⚠️ 文本太短,请输入至少10个字符") continue try: result = engine.analyze_text(user_input) display_analysis_result(result) except Exception as e: print(f"❌ 分析失败: {e}") # 显示最终统计 engine.display_stats() except KeyboardInterrupt: print("\n程序已中断") except Exception as e: print(f"❌ 交互式分析失败: {e}") def test_engine_initialization(): """测试引擎初始化""" print("正在测试文本分析引擎初始化...") try: engine = TextAnalysisEngine() print(f"✅ 引擎初始化成功,包含 {len(engine.agents)} 个Agent") # 显示Agent信息 for name, agent in engine.agents.items(): info = agent.get_model_info() print(f" {name}: {info['model_name']} ({info['provider']})") return True except Exception as e: print(f"❌ 引擎初始化失败: {e}") return False def main(): """主函数""" print("🚀 文本分析综合示例") print("="*60) # 运行各种演示 demos = [ ("引擎初始化测试", test_engine_initialization), ("单文本分析", demo_single_text_analysis), ("批量文本分析", demo_batch_analysis), ] results = {} for name, demo_func in demos: print(f"\n开始: {name}") try: success = demo_func() results[name] = success print(f"{'✅' if success else '❌'} {name} {'成功' if success else '失败'}") except Exception as e: print(f"❌ {name} 异常: {e}") results[name] = False # 显示总结 print(f"\n📊 演示总结") print("="*60) successful_demos = sum(results.values()) total_demos = len(results) for name, success in results.items(): status = "✅ 成功" if success else "❌ 失败" print(f" {name}: {status}") print(f"\n🎯 总计: {successful_demos}/{total_demos} 个演示成功") # 询问是否运行交互式演示 if successful_demos > 0: try: choice = input("\n是否运行交互式分析?(y/n): ").strip().lower() if choice in ['y', 'yes', '是']: interactive_analysis() except (KeyboardInterrupt, EOFError): print("\n程序结束") return successful_demos == total_demos if __name__ == "__main__": # 可以选择运行测试或完整演示 import sys if len(sys.argv) > 1 and sys.argv[1] == "--test": # 仅运行初始化测试 success = test_engine_initialization() exit(0 if success else 1) else: # 运行完整演示 main()