文思AI产品笔记
首页
最新文章
AI编程
AI架构
关于我
  • AI生态圈
  • AI大模型
  • 多维表格
  • AI创业笔记
  • AI落地案例
  • 个人产品推介
  • 问题集
  • 简体中文
  • English
首页
最新文章
AI编程
AI架构
关于我
  • AI生态圈
  • AI大模型
  • 多维表格
  • AI创业笔记
  • AI落地案例
  • 个人产品推介
  • 问题集
  • 简体中文
  • English
  • AI架构

    • AI架构

RAG系统架构与实践

检索增强生成(Retrieval-Augmented Generation)完整指南

📋 概述

RAG是当前最流行的AI应用架构模式之一,通过结合外部知识库和大语言模型,解决LLM的知识过时、幻觉等问题。本文深入讲解RAG的原理、架构和实现。

🎯 RAG核心概念

什么是RAG?

RAG = 检索(Retrieval) + 增强(Augmented) + 生成(Generation)

核心思想:在生成答案前,先从知识库中检索相关信息,将检索到的信息作为上下文提供给LLM。

RAG vs Fine-tuning

维度RAGFine-tuning
知识更新实时更新需要重新训练
成本低高
可解释性高(可追溯来源)低
适用场景知识问答、文档检索特定任务、风格迁移
部署难度中高

🏗️ RAG系统架构

基础架构

┌─────────────────────────────────────────┐
│          1. 文档预处理                   │
│  加载 → 分块 → 向量化 → 存储             │
└─────────────────────────────────────────┘
                    ↓
┌─────────────────────────────────────────┐
│          2. 查询处理                     │
│  用户问题 → 向量化 → 检索 → 重排序        │
└─────────────────────────────────────────┘
                    ↓
┌─────────────────────────────────────────┐
│          3. 生成回答                     │
│  构建Prompt → LLM生成 → 后处理           │
└─────────────────────────────────────────┘

完整实现

from typing import List, Dict
from langchain.document_loaders import DirectoryLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import Chroma
from langchain.llms import OpenAI
from langchain.chains import RetrievalQA

class RAGSystem:
    def __init__(
        self,
        documents_path: str,
        embedding_model: str = "text-embedding-ada-002",
        llm_model: str = "gpt-3.5-turbo",
        chunk_size: int = 1000,
        chunk_overlap: int = 200
    ):
        self.documents_path = documents_path
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        
        # 初始化组件
        self.embeddings = OpenAIEmbeddings(model=embedding_model)
        self.llm = OpenAI(model=llm_model, temperature=0)
        self.vector_store = None
        
        # 构建知识库
        self.build_knowledge_base()
    
    def build_knowledge_base(self):
        """构建知识库"""
        print("开始加载文档...")
        
        # 1. 加载文档
        loader = DirectoryLoader(
            self.documents_path,
            glob="**/*.txt",
            show_progress=True
        )
        documents = loader.load()
        print(f"加载了 {len(documents)} 个文档")
        
        # 2. 文档分块
        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=self.chunk_size,
            chunk_overlap=self.chunk_overlap,
            separators=["\n\n", "\n", "。", ".", " ", ""]
        )
        chunks = text_splitter.split_documents(documents)
        print(f"分割为 {len(chunks)} 个块")
        
        # 3. 创建向量存储
        self.vector_store = Chroma.from_documents(
            documents=chunks,
            embedding=self.embeddings,
            persist_directory="./chroma_db"
        )
        print("知识库构建完成")
    
    def query(self, question: str, k: int = 3) -> Dict:
        """查询"""
        # 创建检索QA链
        qa_chain = RetrievalQA.from_chain_type(
            llm=self.llm,
            chain_type="stuff",
            retriever=self.vector_store.as_retriever(
                search_kwargs={"k": k}
            ),
            return_source_documents=True
        )
        
        # 执行查询
        result = qa_chain({"query": question})
        
        return {
            "answer": result["result"],
            "sources": [
                {
                    "content": doc.page_content[:200],
                    "metadata": doc.metadata
                }
                for doc in result["source_documents"]
            ]
        }

# 使用
rag = RAGSystem(documents_path="./knowledge_base")
result = rag.query("什么是机器学习?")
print(f"答案:{result['answer']}")
print(f"来源:{len(result['sources'])}个文档")

📝 文档处理策略

1. 文档加载

支持的格式:

from langchain.document_loaders import (
    TextLoader,
    PDFLoader,
    UnstructuredMarkdownLoader,
    UnstructuredHTMLLoader,
    CSVLoader,
    JSONLoader
)

# PDF
pdf_loader = PDFLoader("document.pdf")

# Markdown
md_loader = UnstructuredMarkdownLoader("document.md")

# HTML
html_loader = UnstructuredHTMLLoader("page.html")

# 批量加载
from langchain.document_loaders import DirectoryLoader

loader = DirectoryLoader(
    "docs/",
    glob="**/*.md",
    loader_cls=UnstructuredMarkdownLoader
)

2. 文档分块策略

为什么需要分块?

  • 向量搜索需要合适的粒度
  • 超过模型上下文窗口限制
  • 提高检索精度

分块方法对比:

# 方法1:固定长度分块
from langchain.text_splitter import CharacterTextSplitter

splitter = CharacterTextSplitter(
    chunk_size=1000,
    chunk_overlap=200,
    separator="\n"
)

# 方法2:递归分块(推荐)
from langchain.text_splitter import RecursiveCharacterTextSplitter

splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,
    chunk_overlap=200,
    separators=["\n\n", "\n", "。", ".", " ", ""]
)

# 方法3:按token分块
from langchain.text_splitter import TokenTextSplitter

splitter = TokenTextSplitter(
    chunk_size=500,
    chunk_overlap=50
)

# 方法4:语义分块
from langchain.text_splitter import SemanticChunker

splitter = SemanticChunker(
    embeddings=embeddings,
    breakpoint_threshold_type="percentile"
)

分块参数调优:

class ChunkOptimizer:
    def test_chunk_sizes(
        self,
        document: str,
        sizes: List[int] = [500, 1000, 1500, 2000],
        test_queries: List[str] = None
    ):
        results = {}
        
        for size in sizes:
            splitter = RecursiveCharacterTextSplitter(
                chunk_size=size,
                chunk_overlap=size // 5
            )
            
            chunks = splitter.split_text(document)
            
            results[size] = {
                'num_chunks': len(chunks),
                'avg_chunk_length': sum(len(c) for c in chunks) / len(chunks),
                'retrieval_quality': self.test_retrieval(chunks, test_queries)
            }
        
        return results

3. 元数据管理

class DocumentWithMetadata:
    def __init__(self, content: str, metadata: Dict):
        self.content = content
        self.metadata = metadata
    
    @classmethod
    def from_file(cls, file_path: str):
        # 提取元数据
        metadata = {
            'source': file_path,
            'filename': os.path.basename(file_path),
            'created_at': os.path.getctime(file_path),
            'modified_at': os.path.getmtime(file_path),
            'file_type': os.path.splitext(file_path)[1],
        }
        
        # 读取内容
        with open(file_path, 'r') as f:
            content = f.read()
        
        # 提取标题、作者等(如果有)
        metadata.update(cls.extract_metadata(content))
        
        return cls(content, metadata)
    
    @staticmethod
    def extract_metadata(content: str) -> Dict:
        """从内容中提取元数据"""
        metadata = {}
        
        # Markdown front matter
        if content.startswith('---'):
            import yaml
            end = content.find('---', 3)
            if end != -1:
                front_matter = content[3:end]
                metadata.update(yaml.safe_load(front_matter))
        
        return metadata

🔍 检索优化

1. 向量相似度搜索

# 基础检索
results = vector_store.similarity_search(
    query="什么是机器学习?",
    k=5
)

# 带分数的检索
results = vector_store.similarity_search_with_score(
    query="什么是机器学习?",
    k=5
)

for doc, score in results:
    print(f"相似度: {score:.4f}")
    print(f"内容: {doc.page_content[:100]}...")

# MMR检索(最大边际相关性)
results = vector_store.max_marginal_relevance_search(
    query="什么是机器学习?",
    k=5,
    fetch_k=20,
    lambda_mult=0.5  # 多样性vs相关性权衡
)

2. 混合检索

结合关键词和向量:

class HybridRetriever:
    def __init__(self, vector_store, bm25_retriever):
        self.vector_store = vector_store
        self.bm25 = bm25_retriever
    
    def retrieve(self, query: str, k: int = 5, alpha: float = 0.5):
        # 向量检索
        vector_results = self.vector_store.similarity_search_with_score(
            query, k=k*2
        )
        
        # BM25检索
        bm25_results = self.bm25.get_relevant_documents(query, k=k*2)
        
        # 合并和重排序
        combined = self.merge_results(
            vector_results,
            bm25_results,
            alpha
        )
        
        return combined[:k]
    
    def merge_results(self, vector_results, bm25_results, alpha):
        # 归一化分数并合并
        scores = {}
        
        # 向量搜索分数
        for doc, score in vector_results:
            doc_id = self.get_doc_id(doc)
            scores[doc_id] = alpha * (1 - score)  # 距离转相似度
        
        # BM25分数
        for doc in bm25_results:
            doc_id = self.get_doc_id(doc)
            bm25_score = doc.metadata.get('score', 0)
            if doc_id in scores:
                scores[doc_id] += (1 - alpha) * bm25_score
            else:
                scores[doc_id] = (1 - alpha) * bm25_score
        
        # 排序
        sorted_docs = sorted(scores.items(), key=lambda x: x[1], reverse=True)
        return [self.get_doc_by_id(doc_id) for doc_id, _ in sorted_docs]

3. 重排序(Reranking)

from sentence_transformers import CrossEncoder

class Reranker:
    def __init__(self, model_name="cross-encoder/ms-marco-MiniLM-L-6-v2"):
        self.model = CrossEncoder(model_name)
    
    def rerank(
        self,
        query: str,
        documents: List[str],
        top_k: int = 3
    ) -> List[tuple]:
        # 计算相关性分数
        pairs = [[query, doc] for doc in documents]
        scores = self.model.predict(pairs)
        
        # 排序
        results = list(zip(documents, scores))
        results.sort(key=lambda x: x[1], reverse=True)
        
        return results[:top_k]

# 使用
reranker = Reranker()

# 初步检索
初始结果 = vector_store.similarity_search(query, k=20)

# 重排序
final_results = reranker.rerank(
    query=user_query,
    documents=[doc.page_content for doc in initial_results],
    top_k=5
)

4. 查询优化

查询重写:

class QueryRewriter:
    def __init__(self, llm):
        self.llm = llm
    
    def rewrite(self, query: str) -> List[str]:
        prompt = f"""
原始查询:{query}

请生成3个改写的查询,使其:
1. 更具体
2. 包含相关同义词
3. 从不同角度表达

改写查询(每行一个):
"""
        
        response = self.llm.generate(prompt)
        rewrites = response.strip().split('\n')
        return [query] + rewrites

# 使用多个查询检索
def multi_query_retrieval(original_query: str, vector_store):
    rewriter = QueryRewriter(llm)
    queries = rewriter.rewrite(original_query)
    
    all_results = []
    for query in queries:
        results = vector_store.similarity_search(query, k=5)
        all_results.extend(results)
    
    # 去重
    unique_results = list(set(all_results))
    return unique_results

🎨 高级RAG技术

1. Hypothetical Document Embeddings (HyDE)

原理:先让LLM生成假设的答案,再用答案去检索

class HyDE:
    def __init__(self, llm, vector_store):
        self.llm = llm
        self.vector_store = vector_store
    
    def retrieve(self, query: str, k: int = 5):
        # 1. 生成假设答案
        hyde_prompt = f"""
问题:{query}

请生成一个详细的假设答案(即使你不确定,也请基于常识生成):
"""
        
        hypothetical_answer = self.llm.generate(hyde_prompt)
        
        # 2. 用假设答案检索
        results = self.vector_store.similarity_search(
            hypothetical_answer,
            k=k
        )
        
        return results

2. Self-RAG

原理:让模型自己判断是否需要检索

class SelfRAG:
    def __init__(self, llm, vector_store):
        self.llm = llm
        self.vector_store = vector_store
    
    def answer(self, query: str):
        # 1. 判断是否需要检索
        need_retrieval = self.check_if_need_retrieval(query)
        
        if not need_retrieval:
            # 直接回答
            return self.llm.generate(query)
        
        # 2. 检索
        docs = self.vector_store.similarity_search(query, k=3)
        
        # 3. 生成答案
        answer = self.generate_with_docs(query, docs)
        
        # 4. 验证答案
        is_valid = self.validate_answer(query, answer, docs)
        
        if not is_valid:
            # 重新生成
            answer = self.regenerate(query, docs, answer)
        
        return answer
    
    def check_if_need_retrieval(self, query: str) -> bool:
        prompt = f"""
判断以下问题是否需要检索外部知识库:

问题:{query}

判断标准:
- 需要特定事实或数据 → 需要检索
- 通用常识问题 → 不需要检索
- 数学计算 → 不需要检索
- 特定领域专业问题 → 需要检索

只回答"是"或"否":
"""
        
        response = self.llm.generate(prompt).strip().lower()
        return "是" in response or "yes" in response

3. RAPTOR(树状RAG)

原理:层次化组织文档,支持不同粒度的检索

class RAPTOR:
    def __init__(self, llm, embeddings):
        self.llm = llm
        self.embeddings = embeddings
        self.tree = {}  # 多层向量存储
    
    def build_tree(self, documents: List[str], max_depth=3):
        # 第0层:原始文档块
        self.tree[0] = self.create_vector_store(documents)
        
        # 构建上层
        for depth in range(1, max_depth + 1):
            # 聚类下一层
            clusters = self.cluster_documents(self.tree[depth-1])
            
            # 为每个簇生成摘要
            summaries = []
            for cluster in clusters:
                summary = self.summarize_cluster(cluster)
                summaries.append(summary)
            
            # 创建向量存储
            self.tree[depth] = self.create_vector_store(summaries)
    
    def query(self, question: str):
        # 在每一层检索
        all_results = []
        
        for depth in range(len(self.tree)):
            results = self.tree[depth].similarity_search(question, k=2)
            all_results.extend(results)
        
        # 生成答案
        answer = self.generate_answer(question, all_results)
        return answer

4. Adaptive RAG

根据问题类型选择策略:

class AdaptiveRAG:
    def __init__(self, llm, vector_store):
        self.llm = llm
        self.vector_store = vector_store
    
    def answer(self, query: str):
        # 1. 分类问题类型
        query_type = self.classify_query(query)
        
        # 2. 选择策略
        if query_type == "factual":
            # 事实性问题:精确检索
            docs = self.vector_store.similarity_search(query, k=2)
        elif query_type == "analytical":
            # 分析性问题:广泛检索
            docs = self.vector_store.similarity_search(query, k=5)
        elif query_type == "creative":
            # 创意性问题:少量检索或不检索
            docs = self.vector_store.similarity_search(query, k=1)
        
        # 3. 生成答案
        return self.generate_with_docs(query, docs, query_type)
    
    def classify_query(self, query: str) -> str:
        prompt = f"""
分类以下问题的类型:

问题:{query}

类型选项:
- factual: 询问具体事实或数据
- analytical: 需要分析或推理
- creative: 开放性、创意性问题

只输出类型:
"""
        
        return self.llm.generate(prompt).strip().lower()

📊 RAG性能优化

1. 缓存策略

from functools import lru_cache
import hashlib

class CachedRAG:
    def __init__(self, rag_system):
        self.rag = rag_system
        self.cache = {}
    
    def query(self, question: str):
        # 计算查询的哈希
        query_hash = hashlib.md5(question.encode()).hexdigest()
        
        # 检查缓存
        if query_hash in self.cache:
            return self.cache[query_hash]
        
        # 执行查询
        result = self.rag.query(question)
        
        # 缓存结果
        self.cache[query_hash] = result
        
        return result

2. 批处理

class BatchRAG:
    def __init__(self, rag_system, batch_size=10):
        self.rag = rag_system
        self.batch_size = batch_size
    
    async def batch_query(self, questions: List[str]):
        results = []
        
        for i in range(0, len(questions), self.batch_size):
            batch = questions[i:i+self.batch_size]
            
            # 并行检索
            batch_docs = await asyncio.gather(*[
                self.rag.vector_store.asimilarity_search(q, k=3)
                for q in batch
            ])
            
            # 批量生成
            prompts = [
                self.build_prompt(q, docs)
                for q, docs in zip(batch, batch_docs)
            ]
            
            answers = await self.rag.llm.abatch(prompts)
            
            results.extend(answers)
        
        return results

3. 增量更新

class IncrementalRAG:
    def __init__(self, vector_store):
        self.vector_store = vector_store
        self.indexed_files = set()
    
    def update(self, documents_path: str):
        # 扫描新文档
        new_files = self.scan_new_files(documents_path)
        
        if not new_files:
            print("没有新文档")
            return
        
        print(f"发现 {len(new_files)} 个新文档")
        
        # 加载和分块
        new_docs = self.load_and_chunk(new_files)
        
        # 添加到向量存储
        self.vector_store.add_documents(new_docs)
        
        # 更新索引记录
        self.indexed_files.update(new_files)
        
        print("更新完成")
    
    def scan_new_files(self, path: str) -> List[str]:
        all_files = set(glob.glob(f"{path}/**/*.txt", recursive=True))
        new_files = all_files - self.indexed_files
        return list(new_files)

💡 最佳实践

1. 文档准备

  • 清理格式,移除噪声
  • 保留重要的元数据
  • 合适的分块大小(500-1500字符)
  • 适当的重叠(10-20%)

2. 检索策略

  • 使用混合检索(向量+关键词)
  • 重排序提高精度
  • 根据场景调整k值
  • 考虑使用MMR增加多样性

3. Prompt设计

  • 明确指出上下文来源
  • 要求引用来源
  • 处理未找到信息的情况
  • 避免幻觉

4. 评估监控

  • 检索准确率
  • 答案质量
  • 响应时间
  • 用户反馈

🔗 相关文章

  • 00-AI应用架构总体概述
  • 01-Prompt工程与上下文管理
  • 03-Agent系统设计与实现

最后更新:2024年12月22日

最近更新: 2025/12/22 14:25
Contributors: wsyx