文思AI产品笔记
首页
最新文章
AI编程
AI架构
关于我
  • AI生态圈
  • AI大模型
  • 多维表格
  • AI创业笔记
  • AI落地案例
  • 个人产品推介
  • 问题集
  • 简体中文
  • English
首页
最新文章
AI编程
AI架构
关于我
  • AI生态圈
  • AI大模型
  • 多维表格
  • AI创业笔记
  • AI落地案例
  • 个人产品推介
  • 问题集
  • 简体中文
  • English
  • AI架构

    • AI架构

MCP协议详解

Model Context Protocol:统一的AI工具和资源标准

📋 概述

MCP (Model Context Protocol) 是Anthropic推出的开放协议,用于标准化AI模型与外部工具、数据源的交互方式。本文介绍MCP的核心概念、实现方法和最佳实践。

🎯 MCP核心概念

什么是MCP?

MCP是一个标准化协议,定义了:

  • Resources(资源):AI可以访问的数据
  • Tools(工具):AI可以调用的功能
  • Prompts(提示词):预定义的交互模板

MCP的优势

  1. 标准化:统一的接口定义
  2. 可移植:跨平台、跨模型使用
  3. 安全:权限和访问控制
  4. 可扩展:易于添加新功能

🏗️ MCP架构

基本架构

┌─────────────┐
│ AI Client  │  (Claude, GPT等)
└──────┬──────┘
       │ MCP Protocol
┌──────▼──────┐
│ MCP Server  │
└──────┬──────┘
       │
   ┌───┴────┬────────┐
   │        │        │
┌──▼──┐ ┌──▼──┐ ┌──▼──┐
│Tool │ │Res  │ │Prompt│
└─────┘ └─────┘ └──────┘

MCP Server实现

from typing import List, Dict, Any
from mcp import Server, Tool, Resource, Prompt
import json

class MCPServer:
    def __init__(self, name: str):
        self.name = name
        self.tools: Dict[str, Tool] = {}
        self.resources: Dict[str, Resource] = {}
        self.prompts: Dict[str, Prompt] = {}
    
    def register_tool(self, tool: Tool):
        """注册工具"""
        self.tools[tool.name] = tool
    
    def register_resource(self, resource: Resource):
        """注册资源"""
        self.resources[resource.uri] = resource
    
    def register_prompt(self, prompt: Prompt):
        """注册提示词"""
        self.prompts[prompt.name] = prompt
    
    def handle_request(self, request: dict) -> dict:
        """处理请求"""
        request_type = request.get("type")
        
        if request_type == "tools/list":
            return self.list_tools()
        elif request_type == "tools/call":
            return self.call_tool(request)
        elif request_type == "resources/list":
            return self.list_resources()
        elif request_type == "resources/read":
            return self.read_resource(request)
        elif request_type == "prompts/list":
            return self.list_prompts()
        elif request_type == "prompts/get":
            return self.get_prompt(request)
        else:
            return {"error": "Unknown request type"}
    
    def list_tools(self) -> dict:
        """列出所有工具"""
        return {
            "tools": [
                {
                    "name": tool.name,
                    "description": tool.description,
                    "inputSchema": tool.input_schema
                }
                for tool in self.tools.values()
            ]
        }
    
    def call_tool(self, request: dict) -> dict:
        """调用工具"""
        tool_name = request.get("params", {}).get("name")
        arguments = request.get("params", {}).get("arguments", {})
        
        if tool_name not in self.tools:
            return {"error": f"Tool {tool_name} not found"}
        
        tool = self.tools[tool_name]
        try:
            result = tool.execute(**arguments)
            return {
                "content": [
                    {
                        "type": "text",
                        "text": result
                    }
                ]
            }
        except Exception as e:
            return {"error": str(e)}

Tool定义

class Tool:
    def __init__(
        self,
        name: str,
        description: str,
        input_schema: dict,
        function: callable
    ):
        self.name = name
        self.description = description
        self.input_schema = input_schema
        self.function = function
    
    def execute(self, **kwargs):
        """执行工具"""
        return self.function(**kwargs)

# 示例:搜索工具
search_tool = Tool(
    name="search",
    description="Search the web for information",
    input_schema={
        "type": "object",
        "properties": {
            "query": {
                "type": "string",
                "description": "The search query"
            }
        },
        "required": ["query"]
    },
    function=lambda query: f"Search results for: {query}"
)

# 示例:计算器工具
calculator_tool = Tool(
    name="calculator",
    description="Perform mathematical calculations",
    input_schema={
        "type": "object",
        "properties": {
            "expression": {
                "type": "string",
                "description": "Mathematical expression to evaluate"
            }
        },
        "required": ["expression"]
    },
    function=lambda expression: str(eval(expression))
)

Resource定义

class Resource:
    def __init__(
        self,
        uri: str,
        name: str,
        description: str,
        mime_type: str,
        content_fn: callable
    ):
        self.uri = uri
        self.name = name
        self.description = description
        self.mime_type = mime_type
        self.content_fn = content_fn
    
    def read(self) -> dict:
        """读取资源"""
        content = self.content_fn()
        return {
            "uri": self.uri,
            "mimeType": self.mime_type,
            "text": content
        }

# 示例:文件资源
file_resource = Resource(
    uri="file:///documents/readme.md",
    name="README",
    description="Project README file",
    mime_type="text/markdown",
    content_fn=lambda: open("readme.md").read()
)

# 示例:数据库资源
db_resource = Resource(
    uri="database://users",
    name="Users Database",
    description="User information database",
    mime_type="application/json",
    content_fn=lambda: json.dumps(fetch_users_from_db())
)

🔌 MCP Client集成

基础Client

class MCPClient:
    def __init__(self, server_url: str):
        self.server_url = server_url
        self.available_tools = []
        self.available_resources = []
    
    async def connect(self):
        """连接到MCP Server"""
        # 获取可用工具
        self.available_tools = await self.list_tools()
        
        # 获取可用资源
        self.available_resources = await self.list_resources()
    
    async def list_tools(self) -> List[dict]:
        """列出可用工具"""
        response = await self.send_request({
            "type": "tools/list"
        })
        return response.get("tools", [])
    
    async def call_tool(self, tool_name: str, arguments: dict) -> str:
        """调用工具"""
        response = await self.send_request({
            "type": "tools/call",
            "params": {
                "name": tool_name,
                "arguments": arguments
            }
        })
        
        if "error" in response:
            raise Exception(response["error"])
        
        return response["content"][0]["text"]
    
    async def read_resource(self, uri: str) -> dict:
        """读取资源"""
        response = await self.send_request({
            "type": "resources/read",
            "params": {
                "uri": uri
            }
        })
        return response
    
    async def send_request(self, request: dict) -> dict:
        """发送请求"""
        import aiohttp
        
        async with aiohttp.ClientSession() as session:
            async with session.post(
                self.server_url,
                json=request
            ) as response:
                return await response.json()

在AI应用中使用

class MCPEnabledAgent:
    def __init__(self, llm, mcp_client: MCPClient):
        self.llm = llm
        self.mcp = mcp_client
    
    async def run(self, query: str):
        # 1. 连接MCP服务器
        await self.mcp.connect()
        
        # 2. 构建包含工具信息的Prompt
        tools_desc = self.format_tools(self.mcp.available_tools)
        
        prompt = f"""
你可以使用以下工具:

{tools_desc}

用户问题:{query}

请决定是否使用工具,如果需要,以JSON格式输出:
{{
    "action": "use_tool",
    "tool": "工具名",
    "arguments": {{...}}
}}

如果不需要工具,直接回答。
"""
        
        # 3. LLM决策
        response = await self.llm.generate(prompt)
        
        # 4. 解析并执行
        if self.is_tool_call(response):
            tool_call = json.loads(response)
            result = await self.mcp.call_tool(
                tool_call["tool"],
                tool_call["arguments"]
            )
            
            # 5. 基于工具结果生成最终答案
            final_prompt = f"""
问题:{query}
工具结果:{result}

请基于工具结果回答问题:
"""
            final_answer = await self.llm.generate(final_prompt)
            return final_answer
        else:
            return response

🛠️ 实战示例

示例1:文件系统MCP Server

import os
from pathlib import Path

class FileSystemMCPServer(MCPServer):
    def __init__(self, root_path: str):
        super().__init__("filesystem")
        self.root_path = Path(root_path)
        
        # 注册工具
        self.register_tool(Tool(
            name="list_files",
            description="List files in a directory",
            input_schema={
                "type": "object",
                "properties": {
                    "path": {"type": "string"}
                }
            },
            function=self.list_files
        ))
        
        self.register_tool(Tool(
            name="read_file",
            description="Read file content",
            input_schema={
                "type": "object",
                "properties": {
                    "path": {"type": "string"}
                }
            },
            function=self.read_file
        ))
        
        # 自动发现并注册文件资源
        self.discover_resources()
    
    def list_files(self, path: str = ".") -> str:
        """列出文件"""
        full_path = self.root_path / path
        files = list(full_path.glob("*"))
        return "\n".join([f.name for f in files])
    
    def read_file(self, path: str) -> str:
        """读取文件"""
        full_path = self.root_path / path
        if not full_path.is_file():
            raise ValueError(f"Not a file: {path}")
        return full_path.read_text()
    
    def discover_resources(self):
        """自动发现文件资源"""
        for file_path in self.root_path.rglob("*.md"):
            relative_path = file_path.relative_to(self.root_path)
            
            resource = Resource(
                uri=f"file:///{relative_path}",
                name=file_path.name,
                description=f"Markdown file: {relative_path}",
                mime_type="text/markdown",
                content_fn=lambda p=file_path: p.read_text()
            )
            
            self.register_resource(resource)

示例2:API集成MCP Server

class APIMCPServer(MCPServer):
    def __init__(self, api_key: str):
        super().__init__("api_server")
        self.api_key = api_key
        
        # 注册API工具
        self.register_tool(Tool(
            name="weather",
            description="Get weather information",
            input_schema={
                "type": "object",
                "properties": {
                    "city": {"type": "string"}
                }
            },
            function=self.get_weather
        ))
        
        self.register_tool(Tool(
            name="news",
            description="Get latest news",
            input_schema={
                "type": "object",
                "properties": {
                    "topic": {"type": "string"},
                    "limit": {"type": "integer", "default": 5}
                }
            },
            function=self.get_news
        ))
    
    def get_weather(self, city: str) -> str:
        """获取天气"""
        import requests
        
        response = requests.get(
            f"https://api.weather.com/v1/forecast",
            params={"city": city, "apikey": self.api_key}
        )
        
        data = response.json()
        return f"Temperature: {data['temp']}°C, Condition: {data['condition']}"
    
    def get_news(self, topic: str, limit: int = 5) -> str:
        """获取新闻"""
        import requests
        
        response = requests.get(
            f"https://newsapi.org/v2/everything",
            params={
                "q": topic,
                "pageSize": limit,
                "apiKey": self.api_key
            }
        )
        
        articles = response.json()["articles"]
        return "\n\n".join([
            f"Title: {a['title']}\nDescription: {a['description']}"
            for a in articles
        ])

🔒 安全性考虑

1. 权限控制

class SecureMCPServer(MCPServer):
    def __init__(self, name: str):
        super().__init__(name)
        self.permissions = {}
    
    def set_permission(self, client_id: str, tool_name: str, allowed: bool):
        """设置权限"""
        if client_id not in self.permissions:
            self.permissions[client_id] = {}
        self.permissions[client_id][tool_name] = allowed
    
    def check_permission(self, client_id: str, tool_name: str) -> bool:
        """检查权限"""
        if client_id not in self.permissions:
            return False
        return self.permissions[client_id].get(tool_name, False)
    
    def call_tool(self, request: dict, client_id: str) -> dict:
        """带权限检查的工具调用"""
        tool_name = request.get("params", {}).get("name")
        
        # 检查权限
        if not self.check_permission(client_id, tool_name):
            return {"error": "Permission denied"}
        
        # 调用工具
        return super().call_tool(request)

2. 速率限制

from collections import defaultdict
import time

class RateLimitedMCPServer(MCPServer):
    def __init__(self, name: str, rate_limit: int = 100):
        super().__init__(name)
        self.rate_limit = rate_limit  # 每分钟请求数
        self.request_counts = defaultdict(list)
    
    def check_rate_limit(self, client_id: str) -> bool:
        """检查速率限制"""
        now = time.time()
        minute_ago = now - 60
        
        # 清理旧记录
        self.request_counts[client_id] = [
            t for t in self.request_counts[client_id]
            if t > minute_ago
        ]
        
        # 检查限制
        if len(self.request_counts[client_id]) >= self.rate_limit:
            return False
        
        # 记录新请求
        self.request_counts[client_id].append(now)
        return True
    
    def handle_request(self, request: dict, client_id: str) -> dict:
        """带速率限制的请求处理"""
        if not self.check_rate_limit(client_id):
            return {"error": "Rate limit exceeded"}
        
        return super().handle_request(request)

💡 最佳实践

Do's ✅

  1. 清晰的工具和资源描述
  2. 结构化的输入schema
  3. 错误处理和验证
  4. 权限和安全控制
  5. 日志和监控
  6. 版本管理

Don'ts ❌

  1. 模糊的工具描述
  2. 缺少输入验证
  3. 忽视安全性
  4. 过度复杂的接口
  5. 缺少文档

🔗 相关文章

  • 00-AI应用架构总体概述
  • 03-Agent系统设计与实现
  • 05-Agent间通信A2A

最后更新:2024年12月22日

最近更新: 2025/12/22 14:25
Contributors: wsyx