文思AI产品笔记
首页
最新文章
AI编程
AI架构
关于我
  • AI生态圈
  • AI大模型
  • 多维表格
  • AI创业笔记
  • AI落地案例
  • 个人产品推介
  • 问题集
  • 简体中文
  • English
首页
最新文章
AI编程
AI架构
关于我
  • AI生态圈
  • AI大模型
  • 多维表格
  • AI创业笔记
  • AI落地案例
  • 个人产品推介
  • 问题集
  • 简体中文
  • English
  • AI架构

    • AI架构

Agent间通信(A2A)

构建多Agent协作系统的通信机制

📋 概述

Agent间通信(Agent-to-Agent Communication, A2A)是多Agent系统的核心。本文介绍A2A的通信模式、协议设计和实现方法。

🎯 A2A核心概念

为什么需要A2A?

单个Agent的能力有限,多个Agent协作可以:

  • 分工合作,提高效率
  • 组合专长,解决复杂问题
  • 并行处理,加快速度
  • 容错和冗余

🏗️ 通信架构模式

1. 点对点通信

class DirectCommunication:
    """点对点通信"""
    def __init__(self):
        self.agents = {}
    
    def register_agent(self, agent):
        self.agents[agent.name] = agent
    
    def send_message(self, sender: str, receiver: str, message: dict):
        """发送消息"""
        if receiver in self.agents:
            self.agents[receiver].receive_message(sender, message)

class Agent:
    def __init__(self, name: str, comm: DirectCommunication):
        self.name = name
        self.comm = comm
        self.inbox = []
    
    def send_to(self, receiver: str, message: dict):
        """发送消息"""
        self.comm.send_message(self.name, receiver, message)
    
    def receive_message(self, sender: str, message: dict):
        """接收消息"""
        self.inbox.append({
            "sender": sender,
            "message": message,
            "timestamp": datetime.now()
        })

2. 消息总线

from typing import Callable, List
from dataclasses import dataclass
from datetime import datetime

@dataclass
class Message:
    sender: str
    receiver: str
    content: dict
    message_type: str
    timestamp: datetime = None
    
    def __post_init__(self):
        if self.timestamp is None:
            self.timestamp = datetime.now()

class MessageBus:
    """消息总线"""
    def __init__(self):
        self.subscribers = {}
        self.message_history = []
    
    def subscribe(self, agent_name: str, callback: Callable):
        """订阅消息"""
        if agent_name not in self.subscribers:
            self.subscribers[agent_name] = []
        self.subscribers[agent_name].append(callback)
    
    def publish(self, message: Message):
        """发布消息"""
        # 记录历史
        self.message_history.append(message)
        
        # 路由消息
        if message.receiver == "broadcast":
            # 广播给所有人
            for callbacks in self.subscribers.values():
                for callback in callbacks:
                    callback(message)
        elif message.receiver in self.subscribers:
            # 发送给特定接收者
            for callback in self.subscribers[message.receiver]:
                callback(message)
    
    def get_history(self, agent_name: str = None) -> List[Message]:
        """获取消息历史"""
        if agent_name:
            return [m for m in self.message_history 
                   if m.sender == agent_name or m.receiver == agent_name]
        return self.message_history

class BusAgent:
    def __init__(self, name: str, bus: MessageBus):
        self.name = name
        self.bus = bus
        self.inbox = []
        
        # 订阅消息
        self.bus.subscribe(self.name, self.on_message)
    
    def send(self, receiver: str, content: dict, msg_type: str = "info"):
        """发送消息"""
        message = Message(
            sender=self.name,
            receiver=receiver,
            content=content,
            message_type=msg_type
        )
        self.bus.publish(message)
    
    def broadcast(self, content: dict, msg_type: str = "info"):
        """广播消息"""
        self.send("broadcast", content, msg_type)
    
    def on_message(self, message: Message):
        """处理接收到的消息"""
        self.inbox.append(message)
        
        # 根据消息类型处理
        if message.message_type == "request":
            self.handle_request(message)
        elif message.message_type == "response":
            self.handle_response(message)
        elif message.message_type == "info":
            self.handle_info(message)
    
    def handle_request(self, message: Message):
        """处理请求"""
        pass  # 由子类实现
    
    def handle_response(self, message: Message):
        """处理响应"""
        pass
    
    def handle_info(self, message: Message):
        """处理信息"""
        pass

3. 协调者模式

class Coordinator:
    """协调者"""
    def __init__(self, bus: MessageBus):
        self.bus = bus
        self.agents = {}
        self.task_queue = []
        self.results = {}
    
    def register_agent(self, agent: BusAgent, capabilities: List[str]):
        """注册Agent及其能力"""
        self.agents[agent.name] = {
            "agent": agent,
            "capabilities": capabilities,
            "busy": False
        }
    
    def assign_task(self, task: dict) -> str:
        """分配任务"""
        task_id = str(uuid.uuid4())
        task["id"] = task_id
        
        # 找到合适的Agent
        suitable_agent = self.find_agent_for_task(task)
        
        if suitable_agent:
            # 直接分配
            self.send_task(suitable_agent, task)
        else:
            # 加入队列
            self.task_queue.append(task)
        
        return task_id
    
    def find_agent_for_task(self, task: dict) -> str:
        """找到合适的Agent"""
        required_capability = task.get("capability")
        
        for agent_name, info in self.agents.items():
            if (required_capability in info["capabilities"] and 
                not info["busy"]):
                return agent_name
        
        return None
    
    def send_task(self, agent_name: str, task: dict):
        """发送任务"""
        message = Message(
            sender="coordinator",
            receiver=agent_name,
            content=task,
            message_type="task"
        )
        
        self.agents[agent_name]["busy"] = True
        self.bus.publish(message)
    
    def handle_result(self, agent_name: str, result: dict):
        """处理结果"""
        task_id = result.get("task_id")
        self.results[task_id] = result
        
        # 标记Agent为空闲
        self.agents[agent_name]["busy"] = False
        
        # 处理队列中的任务
        if self.task_queue:
            next_task = self.task_queue.pop(0)
            suitable_agent = self.find_agent_for_task(next_task)
            if suitable_agent:
                self.send_task(suitable_agent, next_task)

💬 通信协议设计

消息格式

from enum import Enum
from typing import Any, Optional

class MessageType(Enum):
    """消息类型"""
    REQUEST = "request"        # 请求
    RESPONSE = "response"      # 响应
    INFO = "info"             # 信息
    ERROR = "error"           # 错误
    TASK = "task"             # 任务分配
    RESULT = "result"         # 任务结果
    QUERY = "query"           # 查询
    ANSWER = "answer"         # 答案

@dataclass
class StandardMessage:
    """标准消息格式"""
    # 元数据
    id: str
    sender: str
    receiver: str
    timestamp: datetime
    message_type: MessageType
    
    # 内容
    content: Any
    
    # 可选字段
    priority: int = 1          # 优先级(1-10)
    expires_at: Optional[datetime] = None  # 过期时间
    reply_to: Optional[str] = None         # 回复的消息ID
    requires_ack: bool = False             # 是否需要确认
    metadata: dict = None                  # 额外元数据
    
    def __post_init__(self):
        if self.metadata is None:
            self.metadata = {}
    
    def to_dict(self) -> dict:
        """转换为字典"""
        return {
            "id": self.id,
            "sender": self.sender,
            "receiver": self.receiver,
            "timestamp": self.timestamp.isoformat(),
            "message_type": self.message_type.value,
            "content": self.content,
            "priority": self.priority,
            "expires_at": self.expires_at.isoformat() if self.expires_at else None,
            "reply_to": self.reply_to,
            "requires_ack": self.requires_ack,
            "metadata": self.metadata
        }
    
    @classmethod
    def from_dict(cls, data: dict) -> 'StandardMessage':
        """从字典创建"""
        return cls(
            id=data["id"],
            sender=data["sender"],
            receiver=data["receiver"],
            timestamp=datetime.fromisoformat(data["timestamp"]),
            message_type=MessageType(data["message_type"]),
            content=data["content"],
            priority=data.get("priority", 1),
            expires_at=datetime.fromisoformat(data["expires_at"]) if data.get("expires_at") else None,
            reply_to=data.get("reply_to"),
            requires_ack=data.get("requires_ack", False),
            metadata=data.get("metadata", {})
        )

请求-响应模式

import asyncio

class RequestResponseAgent(BusAgent):
    def __init__(self, name: str, bus: MessageBus):
        super().__init__(name, bus)
        self.pending_requests = {}
    
    async def request(
        self,
        receiver: str,
        content: dict,
        timeout: float = 30.0
    ) -> dict:
        """发送请求并等待响应"""
        request_id = str(uuid.uuid4())
        
        # 创建future用于等待响应
        future = asyncio.Future()
        self.pending_requests[request_id] = future
        
        # 发送请求
        message = StandardMessage(
            id=request_id,
            sender=self.name,
            receiver=receiver,
            timestamp=datetime.now(),
            message_type=MessageType.REQUEST,
            content=content,
            requires_ack=True
        )
        
        self.bus.publish(message)
        
        # 等待响应(带超时)
        try:
            response = await asyncio.wait_for(future, timeout=timeout)
            return response
        except asyncio.TimeoutError:
            del self.pending_requests[request_id]
            raise TimeoutError(f"Request {request_id} timed out")
    
    def handle_response(self, message: StandardMessage):
        """处理响应"""
        request_id = message.reply_to
        
        if request_id in self.pending_requests:
            future = self.pending_requests[request_id]
            future.set_result(message.content)
            del self.pending_requests[request_id]
    
    def send_response(self, request_msg: StandardMessage, content: dict):
        """发送响应"""
        response = StandardMessage(
            id=str(uuid.uuid4()),
            sender=self.name,
            receiver=request_msg.sender,
            timestamp=datetime.now(),
            message_type=MessageType.RESPONSE,
            content=content,
            reply_to=request_msg.id
        )
        
        self.bus.publish(response)

🤝 协作模式

1. Pipeline模式

class PipelineCoordinator:
    """流水线协调器"""
    def __init__(self, bus: MessageBus):
        self.bus = bus
        self.pipeline = []
    
    def add_stage(self, agent: BusAgent):
        """添加阶段"""
        self.pipeline.append(agent)
    
    async def execute(self, initial_data: dict) -> dict:
        """执行流水线"""
        data = initial_data
        
        for agent in self.pipeline:
            # 发送到当前阶段
            result = await agent.process(data)
            # 结果传递给下一阶段
            data = result
        
        return data

# 示例:文档处理流水线
# 加载 → 分块 → 向量化 → 存储

loader_agent = DocumentLoaderAgent("loader", bus)
chunker_agent = ChunkerAgent("chunker", bus)
embedder_agent = EmbedderAgent("embedder", bus)
storage_agent = StorageAgent("storage", bus)

pipeline = PipelineCoordinator(bus)
pipeline.add_stage(loader_agent)
pipeline.add_stage(chunker_agent)
pipeline.add_stage(embedder_agent)
pipeline.add_stage(storage_agent)

result = await pipeline.execute({"file_path": "document.pdf"})

2. 投票模式

class VotingCoordinator:
    """投票协调器"""
    def __init__(self, bus: MessageBus):
        self.bus = bus
        self.agents = []
    
    def add_agent(self, agent: BusAgent):
        self.agents.append(agent)
    
    async def decide(self, question: dict) -> dict:
        """通过投票做决策"""
        votes = []
        
        # 收集所有Agent的意见
        for agent in self.agents:
            vote = await agent.vote(question)
            votes.append({
                "agent": agent.name,
                "decision": vote["decision"],
                "confidence": vote["confidence"],
                "reasoning": vote["reasoning"]
            })
        
        # 统计投票
        decision_counts = {}
        weighted_votes = {}
        
        for vote in votes:
            decision = vote["decision"]
            confidence = vote["confidence"]
            
            decision_counts[decision] = decision_counts.get(decision, 0) + 1
            weighted_votes[decision] = weighted_votes.get(decision, 0) + confidence
        
        # 选择获胜决策
        winner = max(weighted_votes.items(), key=lambda x: x[1])
        
        return {
            "decision": winner[0],
            "votes": votes,
            "vote_counts": decision_counts,
            "total_confidence": winner[1]
        }

3. 拍卖模式

class AuctionCoordinator:
    """拍卖协调器"""
    def __init__(self, bus: MessageBus):
        self.bus = bus
        self.agents = []
    
    def add_agent(self, agent: BusAgent):
        self.agents.append(agent)
    
    async def auction_task(self, task: dict) -> tuple:
        """拍卖任务"""
        bids = []
        
        # 征集出价
        for agent in self.agents:
            bid = await agent.bid(task)
            if bid:
                bids.append({
                    "agent": agent,
                    "price": bid["price"],
                    "estimated_time": bid["estimated_time"],
                    "quality_score": bid["quality_score"]
                })
        
        if not bids:
            return None, None
        
        # 选择最佳出价(综合考虑价格、时间、质量)
        best_bid = max(bids, key=lambda b: self.evaluate_bid(b))
        
        return best_bid["agent"], best_bid
    
    def evaluate_bid(self, bid: dict) -> float:
        """评估出价"""
        # 综合评分(可自定义权重)
        score = (
            bid["quality_score"] * 0.5 -
            bid["price"] * 0.3 -
            bid["estimated_time"] * 0.2
        )
        return score

📊 监控和调试

消息追踪

class MessageTracer:
    """消息追踪器"""
    def __init__(self):
        self.traces = []
    
    def trace(self, message: StandardMessage, event: str):
        """记录消息事件"""
        self.traces.append({
            "message_id": message.id,
            "event": event,  # sent, received, processed, error
            "timestamp": datetime.now(),
            "sender": message.sender,
            "receiver": message.receiver,
            "message_type": message.message_type.value
        })
    
    def get_message_trace(self, message_id: str) -> List[dict]:
        """获取特定消息的追踪"""
        return [t for t in self.traces if t["message_id"] == message_id]
    
    def get_conversation_trace(self, agent_a: str, agent_b: str) -> List[dict]:
        """获取两个Agent之间的对话追踪"""
        return [t for t in self.traces 
                if (t["sender"] == agent_a and t["receiver"] == agent_b) or
                   (t["sender"] == agent_b and t["receiver"] == agent_a)]

💡 最佳实践

Do's ✅

  1. 定义清晰的消息格式
  2. 实现超时和重试机制
  3. 记录通信日志
  4. 处理消息丢失和乱序
  5. 实现优先级队列
  6. 监控通信性能

Don'ts ❌

  1. 循环依赖
  2. 无限递归调用
  3. 忽略错误处理
  4. 缺少超时控制
  5. 过度耦合

🔗 相关文章

  • 00-AI应用架构总体概述
  • 03-Agent系统设计与实现
  • 04-MCP协议详解

最后更新:2024年12月22日

最近更新: 2025/12/22 14:25
Contributors: wsyx