文思AI产品笔记
首页
最新文章
AI编程
AI架构
关于我
  • AI生态圈
  • AI大模型
  • 多维表格
  • AI创业笔记
  • AI落地案例
  • 个人产品推介
  • 问题集
  • 简体中文
  • English
首页
最新文章
AI编程
AI架构
关于我
  • AI生态圈
  • AI大模型
  • 多维表格
  • AI创业笔记
  • AI落地案例
  • 个人产品推介
  • 问题集
  • 简体中文
  • English
  • AI架构

    • AI架构

AI应用架构总体概述

构建可扩展、高性能AI应用系统的完整指南

📋 概述

本文档系列深入讲解AI应用架构的核心概念、设计模式和最佳实践。从基础的Prompt工程到复杂的Agent系统,帮助你构建生产级的AI应用。

🏗️ AI应用架构全景

架构层次

┌─────────────────────────────────────────┐
│        用户交互层(UI/API)              │
├─────────────────────────────────────────┤
│        应用逻辑层(业务流程)            │
├─────────────────────────────────────────┤
│        AI能力层                          │
│  ┌─────────┬─────────┬─────────────┐   │
│  │ Prompt  │  RAG    │   Agent     │   │
│  │ 工程    │  系统   │   系统      │   │
│  └─────────┴─────────┴─────────────┘   │
├─────────────────────────────────────────┤
│        模型接入层(LLM API/本地模型)    │
├─────────────────────────────────────────┤
│        基础设施层                        │
│  ┌─────────┬─────────┬─────────────┐   │
│  │ 向量DB  │ 缓存    │   监控      │   │
│  └─────────┴─────────┴─────────────┘   │
└─────────────────────────────────────────┘

核心组件

1. Prompt工程

  • 提示词设计与优化
  • 上下文管理
  • Few-shot Learning
  • Chain of Thought

2. RAG系统

  • 文档处理与分块
  • 向量化与检索
  • 重排序与过滤
  • 生成与引用

3. Agent系统

  • 规划与推理
  • 工具调用
  • 记忆管理
  • 多Agent协作

4. MCP协议

  • 模型上下文协议
  • 工具与资源标准化
  • 跨平台互操作

🎯 架构设计原则

1. 模块化设计

原则:将复杂系统拆分为独立、可复用的模块

实践:

# 不好的设计:所有功能耦合在一起
def chat_with_docs(user_query, documents):
    # 文档处理
    chunks = split_documents(documents)
    # 向量化
    embeddings = create_embeddings(chunks)
    # 检索
    relevant_chunks = search(embeddings, user_query)
    # 生成
    response = llm.generate(user_query, relevant_chunks)
    return response

# 好的设计:模块化
class DocumentProcessor:
    def process(self, documents):
        return self.split_documents(documents)

class VectorStore:
    def add(self, chunks):
        embeddings = self.embed(chunks)
        self.store(embeddings)
    
    def search(self, query, k=5):
        return self.retrieve(query, k)

class ResponseGenerator:
    def generate(self, query, context):
        prompt = self.build_prompt(query, context)
        return self.llm.generate(prompt)

# 组合使用
class RAGSystem:
    def __init__(self):
        self.processor = DocumentProcessor()
        self.vector_store = VectorStore()
        self.generator = ResponseGenerator()
    
    def query(self, user_query):
        relevant_docs = self.vector_store.search(user_query)
        response = self.generator.generate(user_query, relevant_docs)
        return response

2. 可扩展性

原则:系统能够轻松适应新需求和规模增长

实践:

  • 使用接口/抽象类定义组件
  • 支持多种LLM后端
  • 插件化架构
  • 水平扩展能力
from abc import ABC, abstractmethod

class LLMProvider(ABC):
    @abstractmethod
    def generate(self, prompt: str) -> str:
        pass

class OpenAIProvider(LLMProvider):
    def generate(self, prompt: str) -> str:
        # OpenAI实现
        pass

class ClaudeProvider(LLMProvider):
    def generate(self, prompt: str) -> str:
        # Claude实现
        pass

class LocalLLMProvider(LLMProvider):
    def generate(self, prompt: str) -> str:
        # 本地模型实现
        pass

# 使用时可以轻松切换
class AIApplication:
    def __init__(self, llm_provider: LLMProvider):
        self.llm = llm_provider

3. 性能优化

原则:在满足功能的前提下,优化响应时间和资源使用

关键指标:

  • 首次响应时间(TTFB):< 500ms
  • 完整响应时间:< 3s
  • 并发处理能力:> 100 QPS
  • 成本效率:每1000次请求 < $1

优化策略:

# 1. 缓存策略
from functools import lru_cache
import hashlib

class CachedLLM:
    def __init__(self, llm):
        self.llm = llm
        self.cache = {}
    
    def generate(self, prompt: str) -> str:
        # 计算prompt的哈希值
        prompt_hash = hashlib.md5(prompt.encode()).hexdigest()
        
        # 检查缓存
        if prompt_hash in self.cache:
            return self.cache[prompt_hash]
        
        # 生成并缓存
        response = self.llm.generate(prompt)
        self.cache[prompt_hash] = response
        return response

# 2. 批处理
class BatchProcessor:
    def __init__(self, llm, batch_size=10):
        self.llm = llm
        self.batch_size = batch_size
        self.queue = []
    
    async def process(self, prompts: list) -> list:
        results = []
        for i in range(0, len(prompts), self.batch_size):
            batch = prompts[i:i+self.batch_size]
            batch_results = await self.llm.batch_generate(batch)
            results.extend(batch_results)
        return results

# 3. 流式响应
async def stream_response(prompt: str):
    async for chunk in llm.stream(prompt):
        yield chunk
        # 立即返回给用户,提升体验

4. 可观测性

原则:系统运行状态可见、可追踪、可分析

实践:

import logging
from datetime import datetime
from typing import Dict, Any

class ObservableAISystem:
    def __init__(self):
        self.logger = logging.getLogger(__name__)
        self.metrics = MetricsCollector()
    
    def query(self, user_query: str) -> str:
        start_time = datetime.now()
        
        # 记录请求
        self.logger.info(f"Query received: {user_query[:50]}...")
        
        try:
            # 处理请求
            response = self._process(user_query)
            
            # 记录成功
            duration = (datetime.now() - start_time).total_seconds()
            self.metrics.record('query_success', duration)
            self.logger.info(f"Query completed in {duration:.2f}s")
            
            return response
            
        except Exception as e:
            # 记录失败
            self.logger.error(f"Query failed: {str(e)}")
            self.metrics.record('query_failure', 1)
            raise
    
    def _process(self, query: str) -> str:
        # 细粒度追踪
        with self.metrics.timer('retrieval'):
            docs = self.retrieve(query)
        
        with self.metrics.timer('generation'):
            response = self.generate(query, docs)
        
        return response

# 监控指标
class MetricsCollector:
    def __init__(self):
        self.metrics: Dict[str, list] = {}
    
    def record(self, metric_name: str, value: float):
        if metric_name not in self.metrics:
            self.metrics[metric_name] = []
        self.metrics[metric_name].append({
            'value': value,
            'timestamp': datetime.now()
        })
    
    def get_stats(self, metric_name: str) -> Dict[str, float]:
        values = [m['value'] for m in self.metrics.get(metric_name, [])]
        if not values:
            return {}
        
        return {
            'count': len(values),
            'mean': sum(values) / len(values),
            'max': max(values),
            'min': min(values)
        }

5. 安全性

原则:保护用户数据,防止滥用和攻击

关键措施:

class SecureAISystem:
    def __init__(self):
        self.content_filter = ContentFilter()
        self.rate_limiter = RateLimiter()
        self.auth = Authentication()
    
    def process_query(self, user_id: str, query: str, api_key: str) -> str:
        # 1. 身份验证
        if not self.auth.verify(user_id, api_key):
            raise AuthenticationError("Invalid credentials")
        
        # 2. 速率限制
        if not self.rate_limiter.allow(user_id):
            raise RateLimitError("Too many requests")
        
        # 3. 输入过滤
        if self.content_filter.is_malicious(query):
            self.logger.warning(f"Malicious input detected: {user_id}")
            raise SecurityError("Invalid input")
        
        # 4. 处理请求
        response = self.ai_system.query(query)
        
        # 5. 输出过滤
        response = self.content_filter.sanitize_output(response)
        
        return response

class ContentFilter:
    def __init__(self):
        self.banned_patterns = [
            r'ignore previous instructions',
            r'prompt injection',
            # ... 更多模式
        ]
    
    def is_malicious(self, text: str) -> bool:
        import re
        for pattern in self.banned_patterns:
            if re.search(pattern, text, re.IGNORECASE):
                return True
        return False
    
    def sanitize_output(self, text: str) -> str:
        # 移除敏感信息
        # 过滤有害内容
        return text

class RateLimiter:
    def __init__(self, max_requests=100, window=3600):
        self.max_requests = max_requests
        self.window = window  # 秒
        self.requests = {}  # user_id -> [timestamps]
    
    def allow(self, user_id: str) -> bool:
        now = datetime.now().timestamp()
        
        # 清理过期记录
        if user_id in self.requests:
            self.requests[user_id] = [
                ts for ts in self.requests[user_id]
                if now - ts < self.window
            ]
        else:
            self.requests[user_id] = []
        
        # 检查限制
        if len(self.requests[user_id]) >= self.max_requests:
            return False
        
        # 记录请求
        self.requests[user_id].append(now)
        return True

🔧 技术选型

LLM选择

场景推荐模型原因
通用对话GPT-4、Claude 3能力强,效果好
代码生成GPT-4、Claude 3、Codex代码理解能力强
文档问答GPT-3.5-Turbo性价比高
本地部署LLaMA 2、Qwen开源,可控
多语言GPT-4、Claude 3多语言支持好
长文本Claude 3100K上下文

向量数据库选择

数据库优势适用场景
Pinecone托管服务,易用快速原型,中小规模
Milvus开源,性能好大规模,自部署
QdrantRust编写,高性能性能要求高
Chroma轻量,易集成开发测试
Weaviate功能丰富复杂查询需求

框架选择

框架特点推荐场景
LangChain生态丰富,功能全复杂应用
LlamaIndex专注RAG文档问答
Semantic Kernel微软出品.NET生态
Haystack专注NLP搜索场景
自研灵活可控特殊需求

📊 典型架构模式

1. 简单问答系统

用户输入 → Prompt构建 → LLM → 输出

适用场景:

  • 简单对话
  • 通用助手
  • 内容生成

代码示例:

class SimpleQA:
    def __init__(self, llm):
        self.llm = llm
    
    def answer(self, question: str) -> str:
        prompt = f"请回答以下问题:\n{question}"
        response = self.llm.generate(prompt)
        return response

2. RAG增强问答

用户输入 → 检索相关文档 → 构建Prompt → LLM → 输出

适用场景:

  • 知识库问答
  • 文档查询
  • 企业内部助手

代码示例:

class RAGSystem:
    def __init__(self, llm, vector_store):
        self.llm = llm
        self.vector_store = vector_store
    
    def answer(self, question: str) -> str:
        # 检索相关文档
        relevant_docs = self.vector_store.search(question, k=3)
        
        # 构建prompt
        context = "\n".join([doc.content for doc in relevant_docs])
        prompt = f"""基于以下信息回答问题:

信息:
{context}

问题:{question}

请根据上述信息回答问题。如果信息中没有答案,请说"我不知道"。
"""
        
        response = self.llm.generate(prompt)
        return response

3. Agent系统

用户输入 → Agent规划 → 工具调用 → 结果整合 → 输出

适用场景:

  • 需要多步推理
  • 需要外部工具
  • 复杂任务分解

代码示例:

class AgentSystem:
    def __init__(self, llm, tools):
        self.llm = llm
        self.tools = {tool.name: tool for tool in tools}
    
    def execute(self, task: str) -> str:
        # 规划
        plan = self.plan(task)
        
        # 执行
        results = []
        for step in plan:
            if step.requires_tool:
                tool = self.tools[step.tool_name]
                result = tool.execute(step.parameters)
                results.append(result)
            else:
                result = self.llm.generate(step.prompt)
                results.append(result)
        
        # 整合
        final_result = self.synthesize(results)
        return final_result

4. 多Agent协作

任务 → 任务分解 → Agent1 ↘
                           → 协调 → 结果整合
              → Agent2 ↗

适用场景:

  • 复杂项目
  • 多角色协作
  • 并行处理

🚀 部署架构

开发环境

本地开发 → Docker容器 → 本地测试

配置示例:

# docker-compose.yml
version: '3.8'
services:
  app:
    build: .
    ports:
      - "8000:8000"
    environment:
      - OPENAI_API_KEY=${OPENAI_API_KEY}
    volumes:
      - ./app:/app
  
  vector_db:
    image: qdrant/qdrant
    ports:
      - "6333:6333"
  
  redis:
    image: redis:alpine
    ports:
      - "6379:6379"

生产环境

负载均衡 → API服务集群 → LLM服务
                        → 向量数据库
                        → 缓存服务
                        → 监控系统

架构图:

               ┌─────────────┐
               │ Load Balancer│
               └──────┬───────┘
                      │
        ┌─────────────┼─────────────┐
        │             │             │
   ┌────▼────┐  ┌────▼────┐  ┌────▼────┐
   │ API-1   │  │ API-2   │  │ API-3   │
   └────┬────┘  └────┬────┘  └────┬────┘
        │             │             │
        └─────────────┼─────────────┘
                      │
         ┌────────────┼────────────┐
         │            │            │
    ┌────▼────┐  ┌───▼────┐  ┌───▼────┐
    │ Vector  │  │ Redis  │  │  LLM   │
    │   DB    │  │ Cache  │  │ Service│
    └─────────┘  └────────┘  └────────┘

📖 系列文章导航

本系列包含以下详细文章:

  1. 00-AI应用架构总体概述(本文)

    • 架构全景
    • 设计原则
    • 技术选型
  2. 01-Prompt工程与上下文管理

    • Prompt设计模式
    • 上下文优化
    • Few-shot学习
    • Chain of Thought
  3. 02-RAG系统架构与实践

    • RAG核心流程
    • 文档处理策略
    • 检索优化
    • 高级RAG技术
  4. 03-Agent系统设计与实现

    • Agent架构
    • 工具集成
    • 推理与规划
    • 多Agent协作
  5. 04-MCP协议详解

    • MCP协议规范
    • 工具标准化
    • 跨平台集成
    • 实战案例
  6. 05-Agent间通信A2A

    • A2A通信协议
    • 消息格式
    • 协作模式
    • 实现示例

💡 学习路径建议

初学者(0-3个月)

  1. 阅读总体概述(本文)
  2. 学习Prompt工程
  3. 实现简单的RAG系统
  4. 理解基本架构概念

进阶者(3-6个月)

  1. 深入学习RAG优化技术
  2. 掌握Agent系统设计
  3. 实践MCP协议集成
  4. 构建完整应用

高级者(6个月+)

  1. 多Agent系统设计
  2. 生产级部署优化
  3. 性能调优与监控
  4. 架构创新与研究

🎯 实战建议

1. 从简单开始

  • 先实现基础功能
  • 逐步增加复杂度
  • 迭代优化

2. 重视测试

  • 单元测试
  • 集成测试
  • 性能测试
  • 用户测试

3. 持续优化

  • 监控指标
  • 收集反馈
  • 迭代改进

4. 关注成本

  • Token使用优化
  • 缓存策略
  • 批处理优化
  • 模型选择

📚 参考资源

官方文档

  • OpenAI:https://platform.openai.com/docs
  • Anthropic Claude:https://docs.anthropic.com
  • LangChain:https://python.langchain.com
  • LlamaIndex:https://docs.llamaindex.ai

开源项目

  • LangChain:https://github.com/langchain-ai/langchain
  • AutoGPT:https://github.com/Significant-Gravitas/AutoGPT
  • BabyAGI:https://github.com/yoheinakajima/babyagi

社区资源

  • Reddit r/LangChain
  • Discord: LangChain社区
  • GitHub Discussions

🔄 持续更新

本系列文档会持续更新,包括:

  • 新技术和最佳实践
  • 实战案例
  • 性能优化技巧
  • 常见问题解答

欢迎反馈和建议!


最后更新:2024年12月22日

最近更新: 2025/12/22 14:25
Contributors: wsyx