文思AI产品笔记
首页
最新文章
AI编程
AI架构
关于我
  • AI生态圈
  • AI大模型
  • 多维表格
  • AI创业笔记
  • AI落地案例
  • 个人产品推介
  • 问题集
  • 简体中文
  • English
首页
最新文章
AI编程
AI架构
关于我
  • AI生态圈
  • AI大模型
  • 多维表格
  • AI创业笔记
  • AI落地案例
  • 个人产品推介
  • 问题集
  • 简体中文
  • English
  • AI架构

    • AI架构

Agent系统设计与实现

构建智能AI Agent:从单一Agent到多Agent协作

📋 概述

Agent是能够感知环境、做出决策并执行行动的智能体。本文介绍如何设计和实现生产级的Agent系统,包括ReAct、Plan-and-Execute、多Agent协作等模式。

🎯 Agent核心概念

什么是Agent?

定义:Agent = LLM + 记忆 + 规划 + 工具

Agent = {
    "大脑": LLM(推理和决策),
    "记忆": 对话历史和知识,
    "能力": 可调用的工具集,
    "策略": 规划和执行方法
}

Agent vs 传统程序

特性传统程序Agent
逻辑预定义规则动态推理
任务处理单一流程多步规划
适应性固定灵活
工具使用硬编码智能选择

🏗️ Agent架构模式

1. ReAct模式

原理:Reasoning(推理) + Acting(行动)交替进行

from langchain.agents import create_react_agent, AgentExecutor
from langchain.tools import Tool
from langchain.llms import OpenAI

class ReActAgent:
    def __init__(self, llm, tools):
        self.llm = llm
        self.tools = tools
        self.agent = create_react_agent(llm, tools, self.get_prompt())
        self.executor = AgentExecutor(
            agent=self.agent,
            tools=tools,
            verbose=True,
            max_iterations=5
        )
    
    def get_prompt(self):
        return """
你是一个智能助手,可以使用以下工具:

{tools}

工具格式:工具名称[参数]

请使用以下格式思考和行动:

Question: 用户的问题
Thought: 我需要思考如何解决
Action: 工具名称[参数]
Observation: 工具的返回结果
... (重复 Thought/Action/Observation 直到找到答案)
Thought: 我现在知道最终答案了
Final Answer: 最终答案

开始!

Question: {input}
Thought: {agent_scratchpad}
"""
    
    def run(self, query: str):
        return self.executor.invoke({"input": query})

# 定义工具
def search_web(query: str) -> str:
    """搜索网络"""
    # 实现搜索逻辑
    return f"搜索结果for {query}"

def calculate(expression: str) -> str:
    """计算数学表达式"""
    try:
        result = eval(expression)
        return str(result)
    except:
        return "计算错误"

tools = [
    Tool(name="Search", func=search_web, description="搜索网络信息"),
    Tool(name="Calculator", func=calculate, description="进行数学计算")
]

# 使用
agent = ReActAgent(OpenAI(), tools)
result = agent.run("北京的人口是多少?将这个数字乘以2")

2. Plan-and-Execute模式

原理:先制定完整计划,再逐步执行

class PlanAndExecuteAgent:
    def __init__(self, llm, tools):
        self.llm = llm
        self.tools = {tool.name: tool for tool in tools}
    
    def run(self, task: str):
        # 1. 制定计划
        plan = self.create_plan(task)
        print(f"计划:\n{plan}\n")
        
        # 2. 执行计划
        results = []
        for step in plan['steps']:
            print(f"执行步骤:{step['description']}")
            result = self.execute_step(step)
            results.append(result)
            print(f"结果:{result}\n")
        
        # 3. 整合结果
        final_answer = self.synthesize(task, results)
        return final_answer
    
    def create_plan(self, task: str) -> dict:
        """制定计划"""
        prompt = f"""
任务:{task}

可用工具:
{self.format_tools()}

请制定详细的执行计划,以JSON格式输出:
{{
    "steps": [
        {{
            "step_number": 1,
            "description": "步骤描述",
            "tool": "工具名称",
            "parameters": "参数"
        }}
    ]
}}
"""
        
        response = self.llm.generate(prompt)
        return json.loads(response)
    
    def execute_step(self, step: dict):
        """执行单个步骤"""
        tool_name = step['tool']
        parameters = step['parameters']
        
        if tool_name in self.tools:
            tool = self.tools[tool_name]
            return tool.func(parameters)
        else:
            return f"工具 {tool_name} 不存在"
    
    def synthesize(self, task: str, results: list) -> str:
        """整合结果"""
        prompt = f"""
原始任务:{task}

执行结果:
{chr(10).join([f"{i+1}. {r}" for i, r in enumerate(results)])}

请基于这些结果,给出最终答案:
"""
        
        return self.llm.generate(prompt)

3. Reflexion模式

原理:Agent能够反思和自我改进

class ReflexionAgent:
    def __init__(self, llm, tools, max_retries=3):
        self.llm = llm
        self.tools = tools
        self.max_retries = max_retries
    
    def run(self, task: str):
        for attempt in range(self.max_retries):
            print(f"\n尝试 {attempt + 1}/{self.max_retries}")
            
            # 执行任务
            result = self.execute(task)
            
            # 反思结果
            reflection = self.reflect(task, result)
            
            if reflection['is_correct']:
                return result
            
            # 基于反思改进
            print(f"反思:{reflection['feedback']}")
            task = self.improve_task(task, reflection)
        
        return result
    
    def reflect(self, task: str, result: str) -> dict:
        """反思执行结果"""
        prompt = f"""
任务:{task}
执行结果:{result}

请评估这个结果:
1. 是否正确回答了问题?
2. 有什么问题或不足?
3. 如何改进?

以JSON格式输出:
{{
    "is_correct": true/false,
    "feedback": "反馈内容",
    "improvement_suggestions": "改进建议"
}}
"""
        
        response = self.llm.generate(prompt)
        return json.loads(response)

🔧 工具集成

1. 工具定义

from typing import Callable, Optional
from pydantic import BaseModel, Field

class ToolParameter(BaseModel):
    """工具参数"""
    name: str
    type: str
    description: str
    required: bool = True

class Tool(BaseModel):
    """工具定义"""
    name: str = Field(description="工具名称")
    description: str = Field(description="工具功能描述")
    parameters: list[ToolParameter] = Field(description="参数列表")
    function: Callable = Field(description="实际执行的函数")
    
    def execute(self, **kwargs):
        """执行工具"""
        try:
            return self.function(**kwargs)
        except Exception as e:
            return f"执行错误:{str(e)}"
    
    def get_schema(self) -> dict:
        """获取工具schema(用于LLM理解)"""
        return {
            "name": self.name,
            "description": self.description,
            "parameters": {
                "type": "object",
                "properties": {
                    param.name: {
                        "type": param.type,
                        "description": param.description
                    }
                    for param in self.parameters
                },
                "required": [p.name for p in self.parameters if p.required]
            }
        }

2. 常用工具实现

# 搜索工具
def create_search_tool():
    def search(query: str) -> str:
        """搜索网络信息"""
        # 使用 SerpAPI 或其他搜索API
        import requests
        
        response = requests.get(
            "https://api.serpapi.com/search",
            params={
                "q": query,
                "api_key": "YOUR_API_KEY"
            }
        )
        
        results = response.json()
        # 格式化结果
        return format_search_results(results)
    
    return Tool(
        name="search",
        description="搜索互联网信息",
        parameters=[
            ToolParameter(
                name="query",
                type="string",
                description="搜索查询"
            )
        ],
        function=search
    )

# 计算器工具
def create_calculator_tool():
    def calculate(expression: str) -> str:
        """计算数学表达式"""
        try:
            # 安全的数学计算
            import ast
            import operator
            
            ops = {
                ast.Add: operator.add,
                ast.Sub: operator.sub,
                ast.Mult: operator.mul,
                ast.Div: operator.truediv,
                ast.Pow: operator.pow
            }
            
            def eval_expr(node):
                if isinstance(node, ast.Num):
                    return node.n
                elif isinstance(node, ast.BinOp):
                    return ops[type(node.op)](
                        eval_expr(node.left),
                        eval_expr(node.right)
                    )
                else:
                    raise TypeError(node)
            
            return str(eval_expr(ast.parse(expression).body[0].value))
        except:
            return "计算错误"
    
    return Tool(
        name="calculator",
        description="计算数学表达式",
        parameters=[
            ToolParameter(
                name="expression",
                type="string",
                description="数学表达式,如:2+3*4"
            )
        ],
        function=calculate
    )

# 数据库查询工具
def create_database_tool(db_config):
    def query_database(sql: str) -> str:
        """查询数据库"""
        import sqlite3
        
        conn = sqlite3.connect(db_config['database'])
        cursor = conn.cursor()
        
        try:
            cursor.execute(sql)
            results = cursor.fetchall()
            return str(results)
        except Exception as e:
            return f"查询错误:{str(e)}"
        finally:
            conn.close()
    
    return Tool(
        name="database",
        description="查询数据库",
        parameters=[
            ToolParameter(
                name="sql",
                type="string",
                description="SQL查询语句"
            )
        ],
        function=query_database
    )

# Python代码执行工具
def create_python_tool():
    def execute_python(code: str) -> str:
        """执行Python代码"""
        import sys
        from io import StringIO
        
        # 捕获输出
        old_stdout = sys.stdout
        sys.stdout = StringIO()
        
        try:
            exec(code)
            output = sys.stdout.getvalue()
            return output
        except Exception as e:
            return f"执行错误:{str(e)}"
        finally:
            sys.stdout = old_stdout
    
    return Tool(
        name="python",
        description="执行Python代码",
        parameters=[
            ToolParameter(
                name="code",
                type="string",
                description="要执行的Python代码"
            )
        ],
        function=execute_python
    )

🤝 多Agent协作

1. 角色分工模式

class MultiAgentSystem:
    def __init__(self, llm):
        self.llm = llm
        self.agents = {
            "researcher": ResearchAgent(llm),
            "writer": WriterAgent(llm),
            "critic": CriticAgent(llm)
        }
    
    def solve(self, task: str):
        # 1. 研究员收集信息
        research_results = self.agents["researcher"].research(task)
        
        # 2. 写作者生成内容
        draft = self.agents["writer"].write(task, research_results)
        
        # 3. 评论家review并提出改进
        feedback = self.agents["critic"].critique(draft)
        
        # 4. 写作者根据反馈修改
        final_version = self.agents["writer"].revise(draft, feedback)
        
        return final_version

class ResearchAgent:
    def __init__(self, llm):
        self.llm = llm
        self.tools = [create_search_tool()]
    
    def research(self, topic: str) -> dict:
        """研究主题"""
        prompt = f"""
作为研究员,请收集关于以下主题的信息:

主题:{topic}

使用搜索工具收集:
1. 主要事实和数据
2. 相关案例
3. 专家观点

研究报告:
"""
        
        # 使用工具收集信息
        results = {}
        # ... 执行搜索
        
        return results

class WriterAgent:
    def __init__(self, llm):
        self.llm = llm
    
    def write(self, topic: str, research: dict) -> str:
        """撰写内容"""
        prompt = f"""
作为内容作者,基于以下研究结果撰写文章:

主题:{topic}
研究结果:{research}

要求:
- 结构清晰
- 论据充分
- 语言流畅

文章:
"""
        
        return self.llm.generate(prompt)
    
    def revise(self, draft: str, feedback: dict) -> str:
        """修改内容"""
        prompt = f"""
基于以下反馈修改文章:

原稿:{draft}

反馈:{feedback}

修改后的文章:
"""
        
        return self.llm.generate(prompt)

class CriticAgent:
    def __init__(self, llm):
        self.llm = llm
    
    def critique(self, content: str) -> dict:
        """评论内容"""
        prompt = f"""
作为评论家,请review以下内容:

内容:{content}

从以下维度评分(1-10):
1. 逻辑性
2. 准确性
3. 可读性
4. 完整性

并提出具体的改进建议。

评论(JSON格式):
"""
        
        response = self.llm.generate(prompt)
        return json.loads(response)

2. 协作通信模式

class Message:
    """Agent间的消息"""
    def __init__(
        self,
        sender: str,
        receiver: str,
        content: str,
        message_type: str = "info"
    ):
        self.sender = sender
        self.receiver = receiver
        self.content = content
        self.message_type = message_type
        self.timestamp = datetime.now()

class MessageBus:
    """消息总线"""
    def __init__(self):
        self.messages = []
        self.subscribers = {}
    
    def subscribe(self, agent_name: str, callback: Callable):
        """订阅消息"""
        if agent_name not in self.subscribers:
            self.subscribers[agent_name] = []
        self.subscribers[agent_name].append(callback)
    
    def publish(self, message: Message):
        """发布消息"""
        self.messages.append(message)
        
        # 通知接收者
        if message.receiver in self.subscribers:
            for callback in self.subscribers[message.receiver]:
                callback(message)
        
        # 广播消息
        if message.receiver == "all":
            for callbacks in self.subscribers.values():
                for callback in callbacks:
                    callback(message)

class CollaborativeAgent:
    def __init__(self, name: str, llm, message_bus: MessageBus):
        self.name = name
        self.llm = llm
        self.bus = message_bus
        
        # 订阅消息
        self.bus.subscribe(self.name, self.handle_message)
    
    def send_message(self, receiver: str, content: str, msg_type: str = "info"):
        """发送消息"""
        message = Message(self.name, receiver, content, msg_type)
        self.bus.publish(message)
    
    def handle_message(self, message: Message):
        """处理接收到的消息"""
        if message.message_type == "request":
            # 处理请求并回复
            response = self.process_request(message.content)
            self.send_message(message.sender, response, "response")
        elif message.message_type == "info":
            # 处理信息
            self.process_info(message.content)

📊 Agent评估与优化

1. 性能指标

class AgentMetrics:
    def __init__(self):
        self.metrics = {
            'success_rate': [],
            'execution_time': [],
            'tool_usage': [],
            'iterations': []
        }
    
    def record(self, result: dict):
        """记录执行结果"""
        self.metrics['success_rate'].append(result['success'])
        self.metrics['execution_time'].append(result['time'])
        self.metrics['tool_usage'].append(result['tools_used'])
        self.metrics['iterations'].append(result['iterations'])
    
    def get_stats(self) -> dict:
        """获取统计数据"""
        return {
            'success_rate': np.mean(self.metrics['success_rate']),
            'avg_time': np.mean(self.metrics['execution_time']),
            'avg_iterations': np.mean(self.metrics['iterations']),
            'most_used_tools': Counter(
                [t for tools in self.metrics['tool_usage'] for t in tools]
            ).most_common(5)
        }

2. A/B测试

def compare_agents(agent_a, agent_b, test_cases):
    """对比两个Agent"""
    results_a = []
    results_b = []
    
    for test_case in test_cases:
        # 测试Agent A
        result_a = agent_a.run(test_case['query'])
        score_a = evaluate_result(result_a, test_case['expected'])
        results_a.append(score_a)
        
        # 测试Agent B
        result_b = agent_b.run(test_case['query'])
        score_b = evaluate_result(result_b, test_case['expected'])
        results_b.append(score_b)
    
    return {
        'agent_a_avg': np.mean(results_a),
        'agent_b_avg': np.mean(results_b),
        'winner': 'A' if np.mean(results_a) > np.mean(results_b) else 'B'
    }

💡 最佳实践

Do's ✅

  1. 明确Agent的角色和能力边界
  2. 设计合理的工具集
  3. 实现错误处理和重试机制
  4. 记录执行过程便于调试
  5. 设置最大迭代次数防止死循环
  6. 使用结构化输出
  7. 定期评估和优化

Don'ts ❌

  1. 工具描述不清晰
  2. 缺少执行限制
  3. 忽视安全性
  4. 过度依赖单一工具
  5. 没有失败处理
  6. 缺少监控和日志

🔗 相关文章

  • 00-AI应用架构总体概述
  • 02-RAG系统架构与实践
  • 04-MCP协议详解
  • 05-Agent间通信A2A

最后更新:2024年12月22日

最近更新: 2025/12/22 14:25
Contributors: wsyx