RAG系统架构与实践
检索增强生成(Retrieval-Augmented Generation)完整指南
📋 概述
RAG是当前最流行的AI应用架构模式之一,通过结合外部知识库和大语言模型,解决LLM的知识过时、幻觉等问题。本文深入讲解RAG的原理、架构和实现。
🎯 RAG核心概念
什么是RAG?
RAG = 检索(Retrieval) + 增强(Augmented) + 生成(Generation)
核心思想:在生成答案前,先从知识库中检索相关信息,将检索到的信息作为上下文提供给LLM。
RAG vs Fine-tuning
| 维度 | RAG | Fine-tuning |
|---|---|---|
| 知识更新 | 实时更新 | 需要重新训练 |
| 成本 | 低 | 高 |
| 可解释性 | 高(可追溯来源) | 低 |
| 适用场景 | 知识问答、文档检索 | 特定任务、风格迁移 |
| 部署难度 | 中 | 高 |
🏗️ RAG系统架构
基础架构
┌─────────────────────────────────────────┐
│ 1. 文档预处理 │
│ 加载 → 分块 → 向量化 → 存储 │
└─────────────────────────────────────────┘
↓
┌─────────────────────────────────────────┐
│ 2. 查询处理 │
│ 用户问题 → 向量化 → 检索 → 重排序 │
└─────────────────────────────────────────┘
↓
┌─────────────────────────────────────────┐
│ 3. 生成回答 │
│ 构建Prompt → LLM生成 → 后处理 │
└─────────────────────────────────────────┘
完整实现
from typing import List, Dict
from langchain.document_loaders import DirectoryLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import Chroma
from langchain.llms import OpenAI
from langchain.chains import RetrievalQA
class RAGSystem:
def __init__(
self,
documents_path: str,
embedding_model: str = "text-embedding-ada-002",
llm_model: str = "gpt-3.5-turbo",
chunk_size: int = 1000,
chunk_overlap: int = 200
):
self.documents_path = documents_path
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
# 初始化组件
self.embeddings = OpenAIEmbeddings(model=embedding_model)
self.llm = OpenAI(model=llm_model, temperature=0)
self.vector_store = None
# 构建知识库
self.build_knowledge_base()
def build_knowledge_base(self):
"""构建知识库"""
print("开始加载文档...")
# 1. 加载文档
loader = DirectoryLoader(
self.documents_path,
glob="**/*.txt",
show_progress=True
)
documents = loader.load()
print(f"加载了 {len(documents)} 个文档")
# 2. 文档分块
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=self.chunk_size,
chunk_overlap=self.chunk_overlap,
separators=["\n\n", "\n", "。", ".", " ", ""]
)
chunks = text_splitter.split_documents(documents)
print(f"分割为 {len(chunks)} 个块")
# 3. 创建向量存储
self.vector_store = Chroma.from_documents(
documents=chunks,
embedding=self.embeddings,
persist_directory="./chroma_db"
)
print("知识库构建完成")
def query(self, question: str, k: int = 3) -> Dict:
"""查询"""
# 创建检索QA链
qa_chain = RetrievalQA.from_chain_type(
llm=self.llm,
chain_type="stuff",
retriever=self.vector_store.as_retriever(
search_kwargs={"k": k}
),
return_source_documents=True
)
# 执行查询
result = qa_chain({"query": question})
return {
"answer": result["result"],
"sources": [
{
"content": doc.page_content[:200],
"metadata": doc.metadata
}
for doc in result["source_documents"]
]
}
# 使用
rag = RAGSystem(documents_path="./knowledge_base")
result = rag.query("什么是机器学习?")
print(f"答案:{result['answer']}")
print(f"来源:{len(result['sources'])}个文档")
📝 文档处理策略
1. 文档加载
支持的格式:
from langchain.document_loaders import (
TextLoader,
PDFLoader,
UnstructuredMarkdownLoader,
UnstructuredHTMLLoader,
CSVLoader,
JSONLoader
)
# PDF
pdf_loader = PDFLoader("document.pdf")
# Markdown
md_loader = UnstructuredMarkdownLoader("document.md")
# HTML
html_loader = UnstructuredHTMLLoader("page.html")
# 批量加载
from langchain.document_loaders import DirectoryLoader
loader = DirectoryLoader(
"docs/",
glob="**/*.md",
loader_cls=UnstructuredMarkdownLoader
)
2. 文档分块策略
为什么需要分块?
- 向量搜索需要合适的粒度
- 超过模型上下文窗口限制
- 提高检索精度
分块方法对比:
# 方法1:固定长度分块
from langchain.text_splitter import CharacterTextSplitter
splitter = CharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
separator="\n"
)
# 方法2:递归分块(推荐)
from langchain.text_splitter import RecursiveCharacterTextSplitter
splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
separators=["\n\n", "\n", "。", ".", " ", ""]
)
# 方法3:按token分块
from langchain.text_splitter import TokenTextSplitter
splitter = TokenTextSplitter(
chunk_size=500,
chunk_overlap=50
)
# 方法4:语义分块
from langchain.text_splitter import SemanticChunker
splitter = SemanticChunker(
embeddings=embeddings,
breakpoint_threshold_type="percentile"
)
分块参数调优:
class ChunkOptimizer:
def test_chunk_sizes(
self,
document: str,
sizes: List[int] = [500, 1000, 1500, 2000],
test_queries: List[str] = None
):
results = {}
for size in sizes:
splitter = RecursiveCharacterTextSplitter(
chunk_size=size,
chunk_overlap=size // 5
)
chunks = splitter.split_text(document)
results[size] = {
'num_chunks': len(chunks),
'avg_chunk_length': sum(len(c) for c in chunks) / len(chunks),
'retrieval_quality': self.test_retrieval(chunks, test_queries)
}
return results
3. 元数据管理
class DocumentWithMetadata:
def __init__(self, content: str, metadata: Dict):
self.content = content
self.metadata = metadata
@classmethod
def from_file(cls, file_path: str):
# 提取元数据
metadata = {
'source': file_path,
'filename': os.path.basename(file_path),
'created_at': os.path.getctime(file_path),
'modified_at': os.path.getmtime(file_path),
'file_type': os.path.splitext(file_path)[1],
}
# 读取内容
with open(file_path, 'r') as f:
content = f.read()
# 提取标题、作者等(如果有)
metadata.update(cls.extract_metadata(content))
return cls(content, metadata)
@staticmethod
def extract_metadata(content: str) -> Dict:
"""从内容中提取元数据"""
metadata = {}
# Markdown front matter
if content.startswith('---'):
import yaml
end = content.find('---', 3)
if end != -1:
front_matter = content[3:end]
metadata.update(yaml.safe_load(front_matter))
return metadata
🔍 检索优化
1. 向量相似度搜索
# 基础检索
results = vector_store.similarity_search(
query="什么是机器学习?",
k=5
)
# 带分数的检索
results = vector_store.similarity_search_with_score(
query="什么是机器学习?",
k=5
)
for doc, score in results:
print(f"相似度: {score:.4f}")
print(f"内容: {doc.page_content[:100]}...")
# MMR检索(最大边际相关性)
results = vector_store.max_marginal_relevance_search(
query="什么是机器学习?",
k=5,
fetch_k=20,
lambda_mult=0.5 # 多样性vs相关性权衡
)
2. 混合检索
结合关键词和向量:
class HybridRetriever:
def __init__(self, vector_store, bm25_retriever):
self.vector_store = vector_store
self.bm25 = bm25_retriever
def retrieve(self, query: str, k: int = 5, alpha: float = 0.5):
# 向量检索
vector_results = self.vector_store.similarity_search_with_score(
query, k=k*2
)
# BM25检索
bm25_results = self.bm25.get_relevant_documents(query, k=k*2)
# 合并和重排序
combined = self.merge_results(
vector_results,
bm25_results,
alpha
)
return combined[:k]
def merge_results(self, vector_results, bm25_results, alpha):
# 归一化分数并合并
scores = {}
# 向量搜索分数
for doc, score in vector_results:
doc_id = self.get_doc_id(doc)
scores[doc_id] = alpha * (1 - score) # 距离转相似度
# BM25分数
for doc in bm25_results:
doc_id = self.get_doc_id(doc)
bm25_score = doc.metadata.get('score', 0)
if doc_id in scores:
scores[doc_id] += (1 - alpha) * bm25_score
else:
scores[doc_id] = (1 - alpha) * bm25_score
# 排序
sorted_docs = sorted(scores.items(), key=lambda x: x[1], reverse=True)
return [self.get_doc_by_id(doc_id) for doc_id, _ in sorted_docs]
3. 重排序(Reranking)
from sentence_transformers import CrossEncoder
class Reranker:
def __init__(self, model_name="cross-encoder/ms-marco-MiniLM-L-6-v2"):
self.model = CrossEncoder(model_name)
def rerank(
self,
query: str,
documents: List[str],
top_k: int = 3
) -> List[tuple]:
# 计算相关性分数
pairs = [[query, doc] for doc in documents]
scores = self.model.predict(pairs)
# 排序
results = list(zip(documents, scores))
results.sort(key=lambda x: x[1], reverse=True)
return results[:top_k]
# 使用
reranker = Reranker()
# 初步检索
初始结果 = vector_store.similarity_search(query, k=20)
# 重排序
final_results = reranker.rerank(
query=user_query,
documents=[doc.page_content for doc in initial_results],
top_k=5
)
4. 查询优化
查询重写:
class QueryRewriter:
def __init__(self, llm):
self.llm = llm
def rewrite(self, query: str) -> List[str]:
prompt = f"""
原始查询:{query}
请生成3个改写的查询,使其:
1. 更具体
2. 包含相关同义词
3. 从不同角度表达
改写查询(每行一个):
"""
response = self.llm.generate(prompt)
rewrites = response.strip().split('\n')
return [query] + rewrites
# 使用多个查询检索
def multi_query_retrieval(original_query: str, vector_store):
rewriter = QueryRewriter(llm)
queries = rewriter.rewrite(original_query)
all_results = []
for query in queries:
results = vector_store.similarity_search(query, k=5)
all_results.extend(results)
# 去重
unique_results = list(set(all_results))
return unique_results
🎨 高级RAG技术
1. Hypothetical Document Embeddings (HyDE)
原理:先让LLM生成假设的答案,再用答案去检索
class HyDE:
def __init__(self, llm, vector_store):
self.llm = llm
self.vector_store = vector_store
def retrieve(self, query: str, k: int = 5):
# 1. 生成假设答案
hyde_prompt = f"""
问题:{query}
请生成一个详细的假设答案(即使你不确定,也请基于常识生成):
"""
hypothetical_answer = self.llm.generate(hyde_prompt)
# 2. 用假设答案检索
results = self.vector_store.similarity_search(
hypothetical_answer,
k=k
)
return results
2. Self-RAG
原理:让模型自己判断是否需要检索
class SelfRAG:
def __init__(self, llm, vector_store):
self.llm = llm
self.vector_store = vector_store
def answer(self, query: str):
# 1. 判断是否需要检索
need_retrieval = self.check_if_need_retrieval(query)
if not need_retrieval:
# 直接回答
return self.llm.generate(query)
# 2. 检索
docs = self.vector_store.similarity_search(query, k=3)
# 3. 生成答案
answer = self.generate_with_docs(query, docs)
# 4. 验证答案
is_valid = self.validate_answer(query, answer, docs)
if not is_valid:
# 重新生成
answer = self.regenerate(query, docs, answer)
return answer
def check_if_need_retrieval(self, query: str) -> bool:
prompt = f"""
判断以下问题是否需要检索外部知识库:
问题:{query}
判断标准:
- 需要特定事实或数据 → 需要检索
- 通用常识问题 → 不需要检索
- 数学计算 → 不需要检索
- 特定领域专业问题 → 需要检索
只回答"是"或"否":
"""
response = self.llm.generate(prompt).strip().lower()
return "是" in response or "yes" in response
3. RAPTOR(树状RAG)
原理:层次化组织文档,支持不同粒度的检索
class RAPTOR:
def __init__(self, llm, embeddings):
self.llm = llm
self.embeddings = embeddings
self.tree = {} # 多层向量存储
def build_tree(self, documents: List[str], max_depth=3):
# 第0层:原始文档块
self.tree[0] = self.create_vector_store(documents)
# 构建上层
for depth in range(1, max_depth + 1):
# 聚类下一层
clusters = self.cluster_documents(self.tree[depth-1])
# 为每个簇生成摘要
summaries = []
for cluster in clusters:
summary = self.summarize_cluster(cluster)
summaries.append(summary)
# 创建向量存储
self.tree[depth] = self.create_vector_store(summaries)
def query(self, question: str):
# 在每一层检索
all_results = []
for depth in range(len(self.tree)):
results = self.tree[depth].similarity_search(question, k=2)
all_results.extend(results)
# 生成答案
answer = self.generate_answer(question, all_results)
return answer
4. Adaptive RAG
根据问题类型选择策略:
class AdaptiveRAG:
def __init__(self, llm, vector_store):
self.llm = llm
self.vector_store = vector_store
def answer(self, query: str):
# 1. 分类问题类型
query_type = self.classify_query(query)
# 2. 选择策略
if query_type == "factual":
# 事实性问题:精确检索
docs = self.vector_store.similarity_search(query, k=2)
elif query_type == "analytical":
# 分析性问题:广泛检索
docs = self.vector_store.similarity_search(query, k=5)
elif query_type == "creative":
# 创意性问题:少量检索或不检索
docs = self.vector_store.similarity_search(query, k=1)
# 3. 生成答案
return self.generate_with_docs(query, docs, query_type)
def classify_query(self, query: str) -> str:
prompt = f"""
分类以下问题的类型:
问题:{query}
类型选项:
- factual: 询问具体事实或数据
- analytical: 需要分析或推理
- creative: 开放性、创意性问题
只输出类型:
"""
return self.llm.generate(prompt).strip().lower()
📊 RAG性能优化
1. 缓存策略
from functools import lru_cache
import hashlib
class CachedRAG:
def __init__(self, rag_system):
self.rag = rag_system
self.cache = {}
def query(self, question: str):
# 计算查询的哈希
query_hash = hashlib.md5(question.encode()).hexdigest()
# 检查缓存
if query_hash in self.cache:
return self.cache[query_hash]
# 执行查询
result = self.rag.query(question)
# 缓存结果
self.cache[query_hash] = result
return result
2. 批处理
class BatchRAG:
def __init__(self, rag_system, batch_size=10):
self.rag = rag_system
self.batch_size = batch_size
async def batch_query(self, questions: List[str]):
results = []
for i in range(0, len(questions), self.batch_size):
batch = questions[i:i+self.batch_size]
# 并行检索
batch_docs = await asyncio.gather(*[
self.rag.vector_store.asimilarity_search(q, k=3)
for q in batch
])
# 批量生成
prompts = [
self.build_prompt(q, docs)
for q, docs in zip(batch, batch_docs)
]
answers = await self.rag.llm.abatch(prompts)
results.extend(answers)
return results
3. 增量更新
class IncrementalRAG:
def __init__(self, vector_store):
self.vector_store = vector_store
self.indexed_files = set()
def update(self, documents_path: str):
# 扫描新文档
new_files = self.scan_new_files(documents_path)
if not new_files:
print("没有新文档")
return
print(f"发现 {len(new_files)} 个新文档")
# 加载和分块
new_docs = self.load_and_chunk(new_files)
# 添加到向量存储
self.vector_store.add_documents(new_docs)
# 更新索引记录
self.indexed_files.update(new_files)
print("更新完成")
def scan_new_files(self, path: str) -> List[str]:
all_files = set(glob.glob(f"{path}/**/*.txt", recursive=True))
new_files = all_files - self.indexed_files
return list(new_files)
💡 最佳实践
1. 文档准备
- 清理格式,移除噪声
- 保留重要的元数据
- 合适的分块大小(500-1500字符)
- 适当的重叠(10-20%)
2. 检索策略
- 使用混合检索(向量+关键词)
- 重排序提高精度
- 根据场景调整k值
- 考虑使用MMR增加多样性
3. Prompt设计
- 明确指出上下文来源
- 要求引用来源
- 处理未找到信息的情况
- 避免幻觉
4. 评估监控
- 检索准确率
- 答案质量
- 响应时间
- 用户反馈
🔗 相关文章
最后更新:2024年12月22日