本文是一份完整的 LangChain + OceanBase 开发指南,涵盖从基础概念到高级实现的 Agentic RAG 系统构建。适合快速上手 AI Infra 开发,特别是 RAG(检索增强生成)相关项目。
架构总览
🏗️ LangChain + OceanBase RAG 系统架构
系统分层架构

═══════════════════════════════════════════════════════════════════
RAG 系统六层架构
═══════════════════════════════════════════════════════════════════
📱 应用层 (Application Layer)
├─ FastAPI REST API
├─ WebSocket 实时对话
└─ CLI 交互界面
───────────────────────────────────────────────────────────────────
🤖 Agent 层 (Agent Layer)
├─ LangChain Agent (ReAct)
├─ LLM (GPT-4 / GPT-3.5)
├─ Tools (知识库搜索 / 网络搜索 / 计算)
└─ Middleware (前置/后置钩子)
───────────────────────────────────────────────────────────────────
💬 生成层 (Generation Layer)
├─ Prompt Engineering
├─ LLM Invocation (同步/流式/异步)
├─ Structured Output
└─ Response Enhancement
───────────────────────────────────────────────────────────────────
🔍 检索层 (Retrieval Layer)
├─ Query Processing (重写/扩展)
├─ Hybrid Search (向量+稀疏+全文)
├─ Document Grading (CRAG)
└─ Reranking (MMR)
───────────────────────────────────────────────────────────────────
�️ 存储层 (Storage Layer)
├─ OceanBase Vector Store
├─ Vector Index (HNSW/IVF)
├─ Full-Text Index
└─ Metadata Storage (JSON)
───────────────────────────────────────────────────────────────────
📄 数据层 (Data Layer)
├─ Document Loader (PDF/MD/TXT/Web)
├─ Text Splitter (智能分块)
├─ Embedding (OpenAI/HuggingFace/OceanBase)
└─ Data Ingestion Pipeline
═══════════════════════════════════════════════════════════════════
🔄 核心工作流程
1️⃣ 数据导入流程 (Data Ingestion)
� 文档源
↓
🔧 Document Loader (加载解析)
↓
✂️ Text Splitter (智能分块: 1000 chars, 200 overlap)
↓
🔢 Embedding (向量化: 1536维)
↓
💾 OceanBase 存储 (向量索引 + 元数据)
2️⃣ 查询处理流程 (Query Processing)
❓ 用户查询
↓
✍️ Query Rewriting (查询优化)
↓
🔍 Hybrid Retrieval (向量 60% + 关键词 40%)
↓
✅ Document Grading (CRAG 相关性评分)
↓
📚 Top-K Context (精选文档)
3️⃣ Agent 决策流程 (Agent Reasoning - ReAct)
💭 Query Input
↓
🧠 Agent (LLM) 推理
↓
├─ 需要检索? ──Yes──> 🔍 search_knowledge_base
├─ 简单问题? ──Yes──> 💬 直接回答
└─ 信息不足? ──Yes──> 🌐 web_search (fallback)
↓
🔄 迭代优化 (最多 5 轮)
↓
✨ 最终结果
4️⃣ 生成响应流程 (Response Generation)
📚 Context + ❓ Query
↓
📝 Prompt Engineering (System + Context + Few-Shot)
↓
🧠 LLM Generation (GPT-4)
↓
🔗 Post-Processing (添加引用 + 来源标注)
↓
✅ Final Response
📊 三种 RAG 模式对比
| 维度 | Two-Step RAG | Agentic RAG | CRAG |
|---|---|---|---|
| 流程 | Query→Retrieve→Generate | Query→Agent决策→Tool(0-N)→Generate | Query→Retrieve→Grade→Fallback/Generate |
| 灵活性 | ⭐⭐ 固定 | ⭐⭐⭐⭐⭐ 极高 | ⭐⭐⭐⭐ 自适应 |
| 准确性 | ⭐⭐⭐ 中等 | ⭐⭐⭐⭐ 高 | ⭐⭐⭐⭐⭐ 极高 |
| 成本 | 💰 低 | 💰💰 中 | 💰💰💰 高 |
| 延迟 | ⚡ 快 | ⚡⚡ 中 | ⚡⚡⚡ 慢 |
| 适用场景 | 简单问答 | 复杂推理 | 高质量要求 |
| 总是检索 | ✅ 是 | ❌ 否 | ✅ 是(验证) |
🛠️ 技术栈一览
┌──────────────────────────────────────┐
│ 🌐 Frontend / Client │
│ • REST API / WebSocket │
│ • CLI Interface │
└──────────────────────────────────────┘
↓
┌──────────────────────────────────────┐
│ 📦 Application Layer │
│ • FastAPI (API Server) │
│ • Pydantic (Data Validation) │
│ • Uvicorn (ASGI Server) │
└──────────────────────────────────────┘
↓
┌──────────────────────────────────────┐
│ 🤖 Agent Framework │
│ • LangChain (Agent + Tools) │
│ • OpenAI GPT-4 / GPT-3.5 │
│ • ReAct Pattern │
└──────────────────────────────────────┘
↓
┌──────────────────────────────────────┐
│ 🔍 Retrieval Layer │
│ • LangChain Retrievers │
│ • Hybrid Search (Vector + BM25) │
│ • CRAG (Document Grading) │
│ • Tavily (Web Search) │
└──────────────────────────────────────┘
↓
┌──────────────────────────────────────┐
│ �️ Storage Layer │
│ • OceanBase (Vector Store) │
│ • Vector Index (HNSW/IVF) │
│ • Full-Text Index │
│ • JSON Metadata │
└──────────────────────────────────────┘
↓
┌──────────────────────────────────────┐
│ 🔢 Embedding Layer │
│ • OpenAI Embeddings (1536维) │
│ • HuggingFace Models (可选) │
│ • OceanBase AI Functions (可选) │
└──────────────────────────────────────┘
🎯 核心特性
✨ 数据处理
- 多格式支持: PDF, Markdown, TXT, HTML, 网页
- 智能分块: RecursiveCharacterTextSplitter (1000/200)
- 向量化: OpenAI text-embedding-3-small (1536维)
- 存储: OceanBase 向量数据库 + HNSW 索引
🔍 检索增强
- 混合检索: 向量检索 + BM25 + 全文检索
- 动态权重: Agent 可调整检索策略
- CRAG 验证: 文档相关性评分 + 自动回退
- 重排序: MMR 算法优化多样性
🤖 智能 Agent
- ReAct 框架: Reason (推理) + Act (行动)
- 工具调用: 知识库搜索 / 网络搜索 / 计算
- 中间件: 前置/后置钩子 + 工具包装
- 记忆管理: 短期/长期记忆 + 上下文注入
� 生成优化
- 提示工程: 动态/静态提示 + Few-Shot
- 流式输出: Token 级实时返回
- 结构化输出: Pydantic Models + JSON Schema
- 引用标注: 自动添加来源和置信度
⚡ 性能优化
- 缓存策略: Embedding 缓存 + 查询缓存
- 批处理: 批量向量化 + 并行检索
- 异步处理: Async/Await 模式
- 连接池: 数据库连接复用
前言
最近参与百度的 AI Infra 开源项目,主要聚焦在 RAG(Retrieval-Augmented Generation)方向。本文从 Document AI 开始,系统性地介绍 Agentic RAG 的核心概念和实践方法。
核心概念速览
Document AI 处理流程:
- 非结构化数据 → 结构化数据(JSON/HTML/Markdown)
- Parsing 和 Extracting 需要注意语义保留
- OCR:图像清理 + 文本识别(但不理解结构和语义)
Agentic 系统的三要素:
- 👁️ 眼睛(OCR):感知和识别
- 🧠 大脑(LLM):理解和推理
- 🤚 手(Tools):执行和操作
ReAct 框架:Reason + Act
- 观察(Observation)→ 思考(Reasoning)→ 行动(Action)
- 可调试、可追溯的执行流程
第一部分:LangChain + OceanBase 基础架构
1.1 核心组件
LLMs(大语言模型)
- 推理引擎:自然语言理解
- 计划与执行:任务分解和策略制定
- 决策中枢:协调整个系统的运行
Tools(工具)
- 行动执行:调用外部能力
- 观察返回:获取执行结果
- 能力扩展:连接外部系统和服务
Execution Loops(执行循环)
- 迭代执行:多轮交互和优化
- 动态决策:根据结果调整策略
- 终止条件:明确的完成标准

1.2 环境搭建
安装依赖
# 安装 LangChain 核心库
pip install langchain langchain-community langchain-core
# 安装 OceanBase 向量存储支持
pip install oceanbase-connector pyobvector
# 安装其他必要组件
pip install openai tiktoken faiss-cpu
OceanBase 配置
from langchain_community.vectorstores import OceanBase
from langchain_openai import OpenAIEmbeddings
# 配置 OceanBase 连接
ob_config = {
"host": "localhost",
"port": 2881,
"user": "root@test",
"password": "your_password",
"database": "rag_db"
}
# 初始化 Embedding 模型
embeddings = OpenAIEmbeddings(
model="text-embedding-3-small",
openai_api_key="your_api_key"
)
# 创建向量存储
vector_store = OceanBase(
embedding_function=embeddings,
connection_args=ob_config,
table_name="document_vectors"
)
1.3 模型选择策略
静态加载(简单问题)
from langchain_openai import ChatOpenAI
# 直接加载固定模型
llm = ChatOpenAI(
model="gpt-4",
temperature=0.7,
max_tokens=2000
)
动态路由(复杂问题)
from langchain.chains import LLMRouterChain
from langchain.chains.router import MultiPromptChain
# 定义不同场景的提示模板
prompt_infos = [
{
"name": "simple_qa",
"description": "适用于简单的问答任务",
"prompt_template": "简单回答:{question}"
},
{
"name": "complex_reasoning",
"description": "适用于需要深度推理的任务",
"prompt_template": "详细分析:{question}"
}
]
# 创建路由链
router_chain = MultiPromptChain.from_prompts(
llm=llm,
prompt_infos=prompt_infos
)
Middleware 路由
class ModelRouter:
def __init__(self):
self.simple_model = ChatOpenAI(model="gpt-3.5-turbo")
self.complex_model = ChatOpenAI(model="gpt-4")
def route(self, query: str):
# 根据查询复杂度选择模型
if len(query.split()) < 10:
return self.simple_model
return self.complex_model
def invoke(self, query: str):
model = self.route(query)
return model.invoke(query)
1.4 系统提示词设计
静态提示词
from langchain.prompts import ChatPromptTemplate
# 定义静态系统提示词
system_prompt = """你是一个专业的 AI 助手,专注于回答技术问题。
你的回答应该:
1. 准确且基于事实
2. 结构清晰,易于理解
3. 包含代码示例(如果适用)
"""
prompt = ChatPromptTemplate.from_messages([
("system", system_prompt),
("human", "{question}")
])
动态提示词
from typing import Dict, Any
class DynamicPromptBuilder:
def build_prompt(context: Dict[str, Any]) -> str:
"""根据上下文动态生成提示词"""
user_level = context.get("user_level", "beginner")
domain = context.get("domain", "general")
if user_level == "expert":
tone = "使用专业术语,深入技术细节"
else:
tone = "使用通俗语言,提供基础解释"
return f"""你是 {domain} 领域的专家。
回答风格:{tone}
当前用户等级:{user_level}
"""
# 使用动态提示词
def get_system_message(context):
return DynamicPromptBuilder.build_prompt(context)
1.5 记忆管理
短期记忆(Chat 内部)
from langchain.memory import ConversationBufferMemory
# 创建对话缓冲记忆
memory = ConversationBufferMemory(
memory_key="chat_history",
return_messages=True
)
# 自动维护消息历史
from langchain.chains import ConversationChain
conversation = ConversationChain(
llm=llm,
memory=memory,
verbose=True
)
# 多轮对话
response1 = conversation.predict(input="我叫张三")
response2 = conversation.predict(input="我叫什么名字?") # 会记住之前的对话
长期记忆(跨对话)
from langchain.memory import ConversationSummaryMemory
# 使用摘要记忆节省 token
summary_memory = ConversationSummaryMemory(
llm=llm,
memory_key="history"
)
# 自定义记忆 Schema
class CustomMemory:
def __init__(self):
self.user_preferences = {}
self.conversation_context = []
def add_preference(self, key: str, value: Any):
"""存储用户偏好"""
self.user_preferences[key] = value
def add_context(self, message: str):
"""添加上下文"""
self.conversation_context.append({
"timestamp": datetime.now(),
"content": message
})
def get_relevant_context(self, query: str) -> List[str]:
"""检索相关上下文"""
# 实现相关性检索逻辑
return self.conversation_context[-5:] # 返回最近5条
Middleware 实现自定义记忆
class MemoryMiddleware:
def __init__(self):
self.session_data = {}
def before_model(self, messages: List[Message]) -> List[Message]:
"""在模型调用前注入记忆"""
session_id = self.get_session_id()
if session_id in self.session_data:
# 注入历史上下文
context_msg = Message(
role="system",
content=f"历史上下文:{self.session_data[session_id]}"
)
messages.insert(0, context_msg)
return messages
def after_model(self, response: Message):
"""在模型响应后更新记忆"""
session_id = self.get_session_id()
self.session_data[session_id] = response.content
1.6 工具系统
使用 @Tool 装饰器
from langchain.tools import tool
from typing import Optional
def search_database(query: str, limit: int = 5) -> str:
"""在数据库中搜索相关文档
Args:
query: 搜索查询
limit: 返回结果数量限制
Returns:
搜索结果的字符串表示
"""
try:
# 执行数据库搜索
results = vector_store.similarity_search(query, k=limit)
return "\n".join([doc.page_content for doc in results])
except Exception as e:
return f"搜索失败:{str(e)}"
def calculate(expression: str) -> str:
"""计算数学表达式
Args:
expression: 要计算的数学表达式
Returns:
计算结果
"""
try:
result = eval(expression)
return f"计算结果:{result}"
except Exception as e:
return f"计算错误:{str(e)}"
异常处理和重试
from tenacity import retry, stop_after_attempt, wait_exponential
)
def robust_api_call(endpoint: str, params: dict) -> str:
"""带重试机制的 API 调用
Args:
endpoint: API 端点
params: 请求参数
Returns:
API 响应
"""
import requests
response = requests.get(endpoint, params=params, timeout=10)
response.raise_for_status()
return response.json()
并行工具调用
from langchain.agents import AgentExecutor, create_openai_tools_agent
from langchain.tools import Tool
# 定义多个工具
tools = [
Tool(
name="Search",
func=search_database,
description="搜索知识库"
),
Tool(
name="Calculate",
func=calculate,
description="执行数学计算"
),
Tool(
name="WebSearch",
func=web_search,
description="搜索互联网"
)
]
# 创建支持并行调用的 Agent
agent = create_openai_tools_agent(
llm=llm,
tools=tools,
prompt=prompt
)
agent_executor = AgentExecutor(
agent=agent,
tools=tools,
verbose=True,
max_iterations=5
)
1.7 执行模式
Invoke 基础执行
from langchain.schema import HumanMessage, SystemMessage
# 阻塞式执行
messages = [
SystemMessage(content="你是一个有帮助的助手"),
HumanMessage(content="什么是 RAG?")
]
response = llm.invoke(messages)
print(response.content)
# 动态消息追加
conversation_messages = []
def chat(user_input: str):
conversation_messages.append(HumanMessage(content=user_input))
response = llm.invoke(conversation_messages)
conversation_messages.append(response)
return response.content
Streaming 流式输出
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
# 配置流式输出
streaming_llm = ChatOpenAI(
model="gpt-4",
streaming=True,
callbacks=[StreamingStdOutCallbackHandler()]
)
# 实时返回 token
for chunk in streaming_llm.stream("写一首关于 AI 的诗"):
print(chunk.content, end="", flush=True)
不同流式模式
from typing import AsyncIterator
class StreamingMode:
"""流式输出模式管理"""
async def update_mode(agent_executor):
"""状态更新模式:输出整个节点更新的内容"""
async for event in agent_executor.astream_events(
{"input": "查询问题"},
version="v1"
):
if event["event"] == "on_chain_end":
print(f"节点完成:{event['data']}")
async def message_mode(llm):
"""消息模式:一旦有 token 生成就输出"""
async for chunk in llm.astream("你好"):
print(chunk.content, end="", flush=True)
# 使用示例
import asyncio
async def main():
await StreamingMode.message_mode(streaming_llm)
asyncio.run(main())
1.8 中间件:ReAct 的可定义节点
Before Model Hook
from typing import List, Callable
from langchain.schema import BaseMessage
class BeforeModelMiddleware:
"""模型执行前的中间件"""
def __init__(self):
self.hooks: List[Callable] = []
def register_hook(self, hook: Callable):
"""注册钩子函数"""
self.hooks.append(hook)
def execute(self, messages: List[BaseMessage]) -> List[BaseMessage]:
"""执行所有钩子"""
for hook in self.hooks:
messages = hook(messages)
return messages
# 示例:添加时间戳
def add_timestamp(messages: List[BaseMessage]) -> List[BaseMessage]:
from datetime import datetime
timestamp_msg = SystemMessage(
content=f"当前时间:{datetime.now().isoformat()}"
)
messages.insert(0, timestamp_msg)
return messages
# 示例:添加用户上下文
def add_user_context(messages: List[BaseMessage]) -> List[BaseMessage]:
context_msg = SystemMessage(
content="用户偏好:技术深度讨论,代码示例优先"
)
messages.insert(0, context_msg)
return messages
# 使用中间件
middleware = BeforeModelMiddleware()
middleware.register_hook(add_timestamp)
middleware.register_hook(add_user_context)
# 在调用模型前应用
processed_messages = middleware.execute(messages)
response = llm.invoke(processed_messages)
After Model Hook
class AfterModelMiddleware:
"""模型执行后的中间件"""
def __init__(self):
self.hooks: List[Callable] = []
def register_hook(self, hook: Callable):
self.hooks.append(hook)
def execute(self, response: BaseMessage) -> BaseMessage:
for hook in self.hooks:
response = hook(response)
return response
# 示例:日志记录
def log_response(response: BaseMessage) -> BaseMessage:
import logging
logging.info(f"模型响应:{response.content[:100]}...")
return response
# 示例:内容过滤
def filter_sensitive_content(response: BaseMessage) -> BaseMessage:
sensitive_words = ["密码", "token", "secret"]
content = response.content
for word in sensitive_words:
content = content.replace(word, "***")
response.content = content
return response
# 使用
after_middleware = AfterModelMiddleware()
after_middleware.register_hook(log_response)
after_middleware.register_hook(filter_sensitive_content)
Tool Calls 中间件
from langchain.schema import ToolMessage
class ToolCallMiddleware:
"""工具调用中间件"""
def wrap_tool_call(
self,
tool_name: str,
tool_func: Callable,
*args,
**kwargs
) -> ToolMessage:
"""包装工具调用,添加额外逻辑"""
# 调用前:验证参数
print(f"调用工具:{tool_name}")
print(f"参数:{args}, {kwargs}")
try:
# 执行工具
result = tool_func(*args, **kwargs)
# 调用后:处理结果
print(f"工具返回:{result}")
return ToolMessage(
content=str(result),
tool_call_id=f"{tool_name}_{id(result)}"
)
except Exception as e:
# 错误处理
print(f"工具调用失败:{str(e)}")
return ToolMessage(
content=f"错误:{str(e)}",
tool_call_id=f"{tool_name}_error"
)
1.9 结构化输出
自动化结构化输出
from pydantic import BaseModel, Field
from typing import List
# 定义输出结构
class TechArticle(BaseModel):
"""技术文章结构"""
title: str = Field(description="文章标题")
summary: str = Field(description="文章摘要")
key_points: List[str] = Field(description="关键要点列表")
difficulty: str = Field(description="难度等级:初级/中级/高级")
tags: List[str] = Field(description="标签列表")
# 使用结构化输出
from langchain_openai import ChatOpenAI
structured_llm = ChatOpenAI(model="gpt-4").with_structured_output(TechArticle)
# 调用并获得结构化结果
result = structured_llm.invoke("总结一篇关于 RAG 的技术文章")
print(f"标题:{result.title}")
print(f"摘要:{result.summary}")
print(f"要点:{result.key_points}")
Tool 工具调用模式
from langchain.output_parsers import PydanticToolsParser
# 定义工具输出结构
class SearchResult(BaseModel):
"""搜索结果结构"""
query: str = Field(description="搜索查询")
results: List[str] = Field(description="搜索结果列表")
relevance_scores: List[float] = Field(description="相关性分数")
# 创建工具
def structured_search(query: str) -> SearchResult:
"""执行结构化搜索"""
# 执行搜索逻辑
results = vector_store.similarity_search_with_score(query, k=5)
return SearchResult(
query=query,
results=[doc.page_content for doc, _ in results],
relevance_scores=[score for _, score in results]
)
Provider 原生模式
# 使用模型提供商的原生结构化输出
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(
model="gpt-4",
model_kwargs={
"response_format": {"type": "json_object"}
}
)
# 在提示中指定 JSON schema
prompt = """请以 JSON 格式返回结果,包含以下字段:
{
"answer": "回答内容",
"confidence": 0.95,
"sources": ["来源1", "来源2"]
}
问题:什么是 RAG?
"""
response = llm.invoke(prompt)
import json
structured_response = json.loads(response.content)
第二部分:RAG 系统构建
2.1 RAG 核心概念
RAG(Retrieval-Augmented Generation)本质上是上下文管理,核心是基于 LLM 的生成能力。
RAG 的优势
- ✅ 外部信息检索:突破模型训练数据的限制
- ✅ 可追溯的信息来源:提供引用和证据
- ✅ 动态知识更新:无需重新训练模型
- ✅ 降低幻觉:基于真实文档生成答案
基础架构

核心流程:
System Prompt + Context + Query → LLM → Response
2.2 Two-Step RAG(基础 RAG)
架构特点
Query → Retrieve → Generate
- 总是会执行检索
- 流程简单直接
- 适合确定性场景
实现代码
from langchain.chains import RetrievalQA
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import OceanBase
# 1. 初始化组件
llm = ChatOpenAI(model="gpt-4", temperature=0)
embeddings = OpenAIEmbeddings()
# 2. 创建向量存储
vector_store = OceanBase(
embedding_function=embeddings,
connection_args={
"host": "localhost",
"port": 2881,
"user": "root@test",
"password": "password",
"database": "rag_db"
},
table_name="documents"
)
# 3. 创建检索器
retriever = vector_store.as_retriever(
search_type="similarity",
search_kwargs={"k": 5}
)
# 4. 构建 RAG Chain
rag_chain = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff", # 将所有文档塞入上下文
retriever=retriever,
return_source_documents=True,
verbose=True
)
# 5. 使用
result = rag_chain.invoke({"query": "什么是向量数据库?"})
print(f"答案:{result['result']}")
print(f"来源:{result['source_documents']}")
自定义提示模板
from langchain.prompts import PromptTemplate
# 定义自定义提示模板
template = """使用以下上下文回答问题。如果不知道答案,就说不知道,不要编造答案。
上下文:
{context}
问题:{question}
详细回答:"""
PROMPT = PromptTemplate(
template=template,
input_variables=["context", "question"]
)
# 使用自定义提示
rag_chain = RetrievalQA.from_chain_type(
llm=llm,
retriever=retriever,
chain_type_kwargs={"prompt": PROMPT}
)
2.3 Agentic RAG(智能 RAG)
架构特点
Query → Agent 决策 → Tool Use (0-N 轮召回) → Generate
- Agent 决定是否需要检索
- 可以调用多个外部工具
- 更加灵活和智能
核心:构建 RAG 工具
from langchain.tools import tool
from langchain.agents import AgentExecutor, create_openai_tools_agent
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
def search_knowledge_base(query: str) -> str:
"""在知识库中搜索相关信息
Args:
query: 搜索查询字符串
Returns:
相关文档内容
"""
# 执行向量搜索
docs = vector_store.similarity_search(query, k=3)
# 格式化结果
results = []
for i, doc in enumerate(docs, 1):
results.append(f"文档 {i}:\n{doc.page_content}\n")
return "\n".join(results)
def get_current_time() -> str:
"""获取当前时间"""
from datetime import datetime
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 定义工具列表
tools = [search_knowledge_base, get_current_time]
创建 Agent
# 定义 Agent 提示
prompt = ChatPromptTemplate.from_messages([
("system", """你是一个智能助手。你可以使用以下工具:
- search_knowledge_base: 搜索知识库
- get_current_time: 获取当前时间
根据用户问题,决定是否需要使用工具。
如果问题可以直接回答,就不要使用工具。
"""),
("human", "{input}"),
MessagesPlaceholder(variable_name="agent_scratchpad"),
])
# 创建 Agent
agent = create_openai_tools_agent(
llm=llm,
tools=tools,
prompt=prompt
)
# 创建执行器
agent_executor = AgentExecutor(
agent=agent,
tools=tools,
verbose=True,
max_iterations=5,
handle_parsing_errors=True
)
# 使用 Agent
response = agent_executor.invoke({
"input": "搜索关于 OceanBase 的信息"
})
print(response["output"])
Agent 决策示例
# 示例 1:需要检索
query1 = "OceanBase 的向量搜索功能有哪些特点?"
# Agent 会调用 search_knowledge_base 工具
# 示例 2:不需要检索
query2 = "1 + 1 等于几?"
# Agent 直接回答,不调用工具
# 示例 3:多工具调用
query3 = "现在几点了?OceanBase 支持哪些数据类型?"
# Agent 会先调用 get_current_time,再调用 search_knowledge_base
2.4 知识库构建流程
完整的数据处理管道
Document Loader → Text Splitter → Embedding → Vector Store
1. Document Loader(文档加载)
from langchain_community.document_loaders import (
TextLoader,
PyPDFLoader,
UnstructuredMarkdownLoader,
DirectoryLoader
)
# 加载单个文件
text_loader = TextLoader("document.txt")
pdf_loader = PyPDFLoader("paper.pdf")
md_loader = UnstructuredMarkdownLoader("readme.md")
# 批量加载目录
directory_loader = DirectoryLoader(
"docs/",
glob="**/*.md",
loader_cls=UnstructuredMarkdownLoader
)
documents = directory_loader.load()
print(f"加载了 {len(documents)} 个文档")
2. Text Splitter(文本分片)
from langchain.text_splitter import (
RecursiveCharacterTextSplitter,
MarkdownHeaderTextSplitter
)
# 递归字符分割器(推荐)
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000, # 每个块的大小
chunk_overlap=200, # 块之间的重叠
length_function=len,
separators=["\n\n", "\n", " ", ""]
)
# Markdown 特定分割器
markdown_splitter = MarkdownHeaderTextSplitter(
headers_to_split_on=[
("#", "Header 1"),
("##", "Header 2"),
("###", "Header 3"),
]
)
# 执行分割
chunks = text_splitter.split_documents(documents)
print(f"分割成 {len(chunks)} 个块")
# 查看分割结果
for i, chunk in enumerate(chunks[:3]):
print(f"\n块 {i+1}:")
print(chunk.page_content[:200])
print(f"元数据:{chunk.metadata}")
3. Embedding(向量化)
from langchain_openai import OpenAIEmbeddings
from langchain_community.embeddings import HuggingFaceEmbeddings
# 使用 OpenAI Embeddings
openai_embeddings = OpenAIEmbeddings(
model="text-embedding-3-small",
openai_api_key="your_api_key"
)
# 使用开源 Embeddings
hf_embeddings = HuggingFaceEmbeddings(
model_name="sentence-transformers/all-MiniLM-L6-v2",
model_kwargs={'device': 'cpu'},
encode_kwargs={'normalize_embeddings': True}
)
# 测试 Embedding
text = "这是一个测试文本"
vector = openai_embeddings.embed_query(text)
print(f"向量维度:{len(vector)}")
print(f"向量前5个值:{vector[:5]}")
4. Vector Store(向量存储)
from langchain_community.vectorstores import OceanBase
# 创建 OceanBase 向量存储
vector_store = OceanBase.from_documents(
documents=chunks,
embedding=openai_embeddings,
connection_args={
"host": "localhost",
"port": 2881,
"user": "root@test",
"password": "password",
"database": "rag_db"
},
table_name="document_vectors",
# 可选:指定向量维度
vector_dimension=1536
)
print("文档已成功存储到 OceanBase")
完整的知识库构建脚本
def build_knowledge_base(
source_dir: str,
db_config: dict,
table_name: str = "documents"
):
"""构建完整的知识库"""
# 1. 加载文档
print("正在加载文档...")
loader = DirectoryLoader(
source_dir,
glob="**/*.md",
loader_cls=UnstructuredMarkdownLoader
)
documents = loader.load()
print(f"✓ 加载了 {len(documents)} 个文档")
# 2. 分割文本
print("正在分割文本...")
splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200
)
chunks = splitter.split_documents(documents)
print(f"✓ 分割成 {len(chunks)} 个块")
# 3. 创建 Embeddings
print("正在初始化 Embedding 模型...")
embeddings = OpenAIEmbeddings()
print("✓ Embedding 模型就绪")
# 4. 存储到向量数据库
print("正在存储到 OceanBase...")
vector_store = OceanBase.from_documents(
documents=chunks,
embedding=embeddings,
connection_args=db_config,
table_name=table_name
)
print("✓ 知识库构建完成!")
return vector_store
# 使用示例
if __name__ == "__main__":
db_config = {
"host": "localhost",
"port": 2881,
"user": "root@test",
"password": "password",
"database": "rag_db"
}
vector_store = build_knowledge_base(
source_dir="./docs",
db_config=db_config,
table_name="tech_docs"
)
第三部分:CRAG(自纠正 RAG)
3.1 传统 RAG 的问题
检索失效场景
- ❌ 检索文档不匹配:返回的文档与问题无关
- ❌ 过时信息:知识库中的信息已经过期
- ❌ 二义性问题:查询意图不明确导致检索偏差
对用户的影响
- 😵 幻觉回答:基于错误信息生成答案
- 😟 缺乏置信度:无法判断答案的可靠性
- 😞 没有恢复方法:错误后无法自动纠正
3.2 CRAG 解决方案
核心机制
- Validation(验证):评估检索文档的相关性
- Fallback(回退):提供多种信息来源
- Agentic(智能):Agent 决定下一步行动
流程图

Query → Retrieve → Grade → [相关] → Generate
↓ [不相关]
Fallback (Web Search / 其他 KB)
3.3 Document Grading(文档评分)
实现评分器
from pydantic import BaseModel, Field
from langchain_openai import ChatOpenAI
class GradeDocuments(BaseModel):
"""文档相关性评分结果"""
binary_score: str = Field(
description="文档是否相关:'yes' 或 'no'"
)
reasoning: str = Field(
description="评分理由"
)
# 创建评分 LLM
grader_llm = ChatOpenAI(
model="gpt-4",
temperature=0
).with_structured_output(GradeDocuments)
# 评分提示模板
grade_prompt = """你是一个文档相关性评估专家。
用户问题:{question}
检索到的文档:
{document}
请评估该文档是否与用户问题相关。
如果文档包含与问题相关的关键词或语义,返回 'yes'。
如果文档与问题完全无关,返回 'no'。
"""
def grade_document(question: str, document: str) -> GradeDocuments:
"""评估文档相关性"""
prompt = grade_prompt.format(
question=question,
document=document
)
return grader_llm.invoke(prompt)
# 使用示例
question = "OceanBase 支持哪些索引类型?"
document = "OceanBase 是一个分布式数据库,支持 B+ 树索引和向量索引..."
grade = grade_document(question, document)
print(f"相关性:{grade.binary_score}")
print(f"理由:{grade.reasoning}")
批量评分
def grade_documents_batch(
question: str,
documents: List[str],
threshold: float = 0.5
) -> List[tuple[str, GradeDocuments]]:
"""批量评估文档相关性"""
results = []
relevant_count = 0
for doc in documents:
grade = grade_document(question, doc)
results.append((doc, grade))
if grade.binary_score.lower() == "yes":
relevant_count += 1
relevance_ratio = relevant_count / len(documents)
print(f"相关文档比例:{relevance_ratio:.2%}")
return results, relevance_ratio
# 使用
docs = vector_store.similarity_search(question, k=5)
doc_contents = [doc.page_content for doc in docs]
results, ratio = grade_documents_batch(question, doc_contents)
3.4 Fallback 机制
多源信息获取
from langchain_community.tools.tavily_search import TavilySearchResults
class FallbackManager:
"""回退机制管理器"""
def __init__(self):
# 主知识库
self.primary_kb = vector_store
# 备用知识库
self.secondary_kb = None # 可以是另一个向量库
# 网络搜索
self.web_search = TavilySearchResults(
max_results=3,
api_key="your_tavily_api_key"
)
def retrieve_with_fallback(
self,
query: str,
min_relevance: float = 0.5
) -> tuple[List[str], str]:
"""带回退的检索
Returns:
(documents, source) - 文档列表和来源标识
"""
# 1. 尝试主知识库
print("🔍 检索主知识库...")
docs = self.primary_kb.similarity_search(query, k=5)
doc_contents = [doc.page_content for doc in docs]
# 2. 评估相关性
_, relevance_ratio = grade_documents_batch(query, doc_contents)
if relevance_ratio >= min_relevance:
print("✓ 主知识库检索成功")
return doc_contents, "primary_kb"
# 3. 回退到备用知识库
if self.secondary_kb:
print("⚠️ 主知识库相关性不足,尝试备用知识库...")
docs = self.secondary_kb.similarity_search(query, k=5)
doc_contents = [doc.page_content for doc in docs]
_, relevance_ratio = grade_documents_batch(query, doc_contents)
if relevance_ratio >= min_relevance:
print("✓ 备用知识库检索成功")
return doc_contents, "secondary_kb"
# 4. 回退到网络搜索
print("⚠️ 知识库相关性不足,使用网络搜索...")
web_results = self.web_search.invoke(query)
web_contents = [result["content"] for result in web_results]
print("✓ 网络搜索完成")
return web_contents, "web_search"
# 使用示例
fallback_manager = FallbackManager()
query = "2026年最新的 AI 技术趋势"
documents, source = fallback_manager.retrieve_with_fallback(query)
print(f"\n信息来源:{source}")
print(f"获取到 {len(documents)} 个文档")
查询重写
from langchain.prompts import ChatPromptTemplate
class QueryRewriter:
"""查询重写器"""
def __init__(self, llm):
self.llm = llm
self.prompt = ChatPromptTemplate.from_template("""
你是一个查询优化专家。请将用户的查询重写为更适合检索的形式。
原始查询:{query}
重写要求:
1. 提取关键概念
2. 消除歧义
3. 添加相关同义词
4. 保持查询意图
重写后的查询:
""")
def rewrite(self, query: str) -> str:
"""重写查询"""
response = self.llm.invoke(
self.prompt.format(query=query)
)
return response.content.strip()
def multi_query(self, query: str, n: int = 3) -> List[str]:
"""生成多个查询变体"""
prompt = f"""
生成 {n} 个不同的查询变体,用于检索与以下问题相关的文档:
原始问题:{query}
请生成 {n} 个不同角度的查询,每行一个:
"""
response = self.llm.invoke(prompt)
queries = response.content.strip().split("\n")
return [q.strip() for q in queries if q.strip()]
# 使用示例
rewriter = QueryRewriter(llm)
original_query = "OB 怎么用"
rewritten_query = rewriter.rewrite(original_query)
print(f"原始查询:{original_query}")
print(f"重写查询:{rewritten_query}")
# 多查询检索
multi_queries = rewriter.multi_query(original_query)
print(f"\n查询变体:")
for i, q in enumerate(multi_queries, 1):
print(f"{i}. {q}")
3.5 使用 LangChain 构建完整 CRAG
中间件方式实现
from typing import Callable
from langchain.schema import ToolCallRequest, ToolMessage
class CRAGMiddleware:
"""CRAG 中间件实现"""
def __init__(
self,
grader_llm,
rag_tool_name: str = "search_knowledge_base",
fallback_tool_name: str = "web_search"
):
self.grader_llm = grader_llm
self.rag_tool_name = rag_tool_name
self.fallback_tool_name = fallback_tool_name
self._pending_grading = None
def wrap_tool_call(
self,
request: ToolCallRequest,
handler: Callable[[ToolCallRequest], ToolMessage]
) -> ToolMessage:
"""包装工具调用,添加评分逻辑"""
# 执行工具
result = handler(request)
# 如果是 RAG 工具,进行评分
if request.tool_call.get("name") == self.rag_tool_name:
query = request.tool_call.get("args", {}).get("query")
# 评估文档相关性
grade = self.grader_llm.invoke({
"question": query,
"document": result.content[:2000] # 限制长度
})
is_relevant = grade.binary_score.lower() == "yes"
# 存储评分结果
self._pending_grading = {
"last_rag_query": query,
"documents_relevant": is_relevant,
"grade_reasoning": grade.reasoning,
}
print(f"\n📊 文档评分:{grade.binary_score}")
print(f"💭 评分理由:{grade.reasoning}")
# 如果不相关,触发回退
if not is_relevant:
print("⚠️ 文档相关性不足,触发回退机制...")
# 这里可以自动调用 fallback 工具
return result
def before_model(self, messages: List[BaseMessage]) -> List[BaseMessage]:
"""在模型调用前注入评分信息"""
if self._pending_grading:
grading_info = self._pending_grading
if not grading_info["documents_relevant"]:
# 注入回退提示
fallback_msg = SystemMessage(content=f"""
注意:上次检索的文档相关性不足。
原因:{grading_info['grade_reasoning']}
建议:考虑使用网络搜索或重新表述查询。
""")
messages.insert(0, fallback_msg)
# 清除待处理的评分
self._pending_grading = None
return messages
# 使用 CRAG 中间件
crag_middleware = CRAGMiddleware(
grader_llm=grader_llm,
rag_tool_name="search_knowledge_base"
)
完整的 CRAG Agent
from langchain.agents import AgentExecutor, create_openai_tools_agent
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
# 定义工具
def search_knowledge_base(query: str) -> str:
"""搜索知识库"""
docs = vector_store.similarity_search(query, k=3)
return "\n\n".join([doc.page_content for doc in docs])
def web_search(query: str) -> str:
"""网络搜索(回退机制)"""
search_tool = TavilySearchResults(max_results=3)
results = search_tool.invoke(query)
return "\n\n".join([r["content"] for r in results])
tools = [search_knowledge_base, web_search]
# 定义 CRAG Agent 提示
crag_prompt = ChatPromptTemplate.from_messages([
("system", """你是一个智能 RAG 助手,具有自我纠正能力。
工作流程:
1. 首先尝试使用 search_knowledge_base 搜索知识库
2. 如果知识库信息不足或不相关,使用 web_search 进行网络搜索
3. 基于检索到的信息生成准确的答案
4. 如果无法找到相关信息,诚实地告知用户
注意:
- 优先使用知识库
- 必要时使用网络搜索作为补充
- 始终标注信息来源
"""),
("human", "{input}"),
MessagesPlaceholder(variable_name="agent_scratchpad"),
])
# 创建 CRAG Agent
crag_agent = create_openai_tools_agent(
llm=llm,
tools=tools,
prompt=crag_prompt
)
crag_executor = AgentExecutor(
agent=crag_agent,
tools=tools,
verbose=True,
max_iterations=5,
handle_parsing_errors=True
)
# 使用 CRAG
def ask_with_crag(question: str) -> dict:
"""使用 CRAG 回答问题"""
print(f"\n❓ 问题:{question}\n")
result = crag_executor.invoke({"input": question})
print(f"\n✅ 答案:{result['output']}\n")
return result
# 测试
ask_with_crag("OceanBase 的向量搜索功能有哪些特点?")
ask_with_crag("2026年最新的 AI 发展趋势是什么?") # 会触发网络搜索
3.6 两种回滚方式对比
Node Style(节点式)
特点:
- 中间件决定
- before_model 注入
- 确定性(Deterministic)
- 适用于严格的机制
class NodeStyleCRAG:
"""节点式 CRAG 实现"""
def __init__(self, grader, retriever, fallback):
self.grader = grader
self.retriever = retriever
self.fallback = fallback
def execute(self, query: str) -> str:
"""执行 CRAG 流程"""
# 1. 检索
docs = self.retriever.invoke(query)
# 2. 评分
grade = self.grader.invoke({
"question": query,
"document": docs
})
# 3. 决策节点
if grade.binary_score.lower() == "yes":
# 相关:直接使用
return docs
else:
# 不相关:回退
print("触发回退机制")
return self.fallback.invoke(query)
# 使用
node_crag = NodeStyleCRAG(
grader=grader_llm,
retriever=vector_store.as_retriever(),
fallback=web_search_tool
)
result = node_crag.execute("查询问题")
Wrap Style(包装式)
特点:
- 模型决定
- wrap_model_call 过滤
- 灵活(Flexible)
- 适用于 Agent 行为级别
class WrapStyleCRAG:
"""包装式 CRAG 实现"""
def __init__(self, llm, grader, tools):
self.llm = llm
self.grader = grader
self.tools = {tool.name: tool for tool in tools}
self.last_grade = None
def wrap_model_call(
self,
messages: List[BaseMessage]
) -> BaseMessage:
"""包装模型调用"""
# 调用模型
response = self.llm.invoke(messages)
# 如果模型决定使用 RAG 工具
if hasattr(response, 'tool_calls') and response.tool_calls:
for tool_call in response.tool_calls:
if tool_call['name'] == 'search_knowledge_base':
# 执行工具
tool = self.tools['search_knowledge_base']
result = tool.invoke(tool_call['args'])
# 评分
grade = self.grader.invoke({
"question": tool_call['args']['query'],
"document": result
})
self.last_grade = grade
# 如果不相关,建议使用其他工具
if grade.binary_score.lower() == "no":
# 修改响应,建议使用 web_search
response.content += "\n[系统提示:知识库信息不足,建议使用网络搜索]"
return response
# 使用
wrap_crag = WrapStyleCRAG(
llm=llm,
grader=grader_llm,
tools=tools
)
选择建议
| 场景 | 推荐方式 | 原因 |
|---|---|---|
| 严格的质量控制 | Node Style | 确定性强,流程可控 |
| 灵活的对话系统 | Wrap Style | Agent 可以自主决策 |
| 简单的 RAG 应用 | Node Style | 实现简单,易于调试 |
| 复杂的多工具场景 | Wrap Style | 更好的工具协调能力 |
第四部分:高级检索技术
4.1 多模态检索(Hybrid Search)
三种检索模态
1. Vector Search(向量检索)
特点:
- 基于余弦相似度
- 适合概念性查询
- 语义理解能力强
# 纯向量检索
docs = vector_store.similarity_search(
query="分布式数据库的优势",
k=5
)
2. Sparse Search(稀疏检索)
特点:
- 基于关键词匹配
- 不需要 Embedding 模型
- 精确匹配能力强
from langchain.retrievers import BM25Retriever
# BM25 稀疏检索
bm25_retriever = BM25Retriever.from_documents(documents)
docs = bm25_retriever.get_relevant_documents(
"OceanBase 索引"
)
3. Full-Text Search(全文检索)
特点:
- 精确字符串匹配
- 支持复杂查询语法
- 适合精确查找
# OceanBase 全文检索
query = """
SELECT * FROM documents
WHERE MATCH(content) AGAINST('向量数据库' IN NATURAL LANGUAGE MODE)
"""
Hybrid Search 实现
from langchain.retrievers import EnsembleRetriever
class HybridRetriever:
"""混合检索器"""
def __init__(
self,
vector_store,
documents,
vector_weight: float = 0.5,
sparse_weight: float = 0.5
):
# 向量检索器
self.vector_retriever = vector_store.as_retriever(
search_kwargs={"k": 5}
)
# 稀疏检索器(BM25)
self.sparse_retriever = BM25Retriever.from_documents(
documents
)
self.sparse_retriever.k = 5
# 组合检索器
self.ensemble_retriever = EnsembleRetriever(
retrievers=[self.vector_retriever, self.sparse_retriever],
weights=[vector_weight, sparse_weight]
)
def retrieve(self, query: str, k: int = 5) -> List[Document]:
"""执行混合检索"""
return self.ensemble_retriever.get_relevant_documents(query)
def adjust_weights(self, vector_weight: float, sparse_weight: float):
"""动态调整权重"""
self.ensemble_retriever.weights = [vector_weight, sparse_weight]
# 使用混合检索
hybrid_retriever = HybridRetriever(
vector_store=vector_store,
documents=documents,
vector_weight=0.6, # 向量检索权重
sparse_weight=0.4 # 稀疏检索权重
)
results = hybrid_retriever.retrieve("OceanBase 向量索引")
Agentic 权重配置
def configure_search_weights(
query_type: str,
vector_weight: float = 0.5,
sparse_weight: float = 0.5
) -> str:
"""动态配置检索权重
Args:
query_type: 查询类型(conceptual/keyword/mixed)
vector_weight: 向量检索权重
sparse_weight: 稀疏检索权重
Returns:
配置结果
"""
# 根据查询类型自动调整权重
if query_type == "conceptual":
# 概念性查询:偏向向量检索
vector_weight = 0.8
sparse_weight = 0.2
elif query_type == "keyword":
# 关键词查询:偏向稀疏检索
vector_weight = 0.2
sparse_weight = 0.8
else:
# 混合查询:平衡权重
vector_weight = 0.5
sparse_weight = 0.5
hybrid_retriever.adjust_weights(vector_weight, sparse_weight)
return f"已配置权重 - 向量:{vector_weight}, 稀疏:{sparse_weight}"
# Agent 可以根据查询自动调整权重
tools.append(configure_search_weights)
4.2 OceanBase 向量搜索优化
创建向量索引
-- 创建向量表
CREATE TABLE document_vectors (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
content TEXT,
embedding VECTOR(1536), -- OpenAI embedding 维度
metadata JSON,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 创建向量索引(提升检索性能)
CREATE VECTOR INDEX idx_embedding ON document_vectors(embedding)
WITH (
distance_metric = 'cosine',
index_type = 'hnsw',
m = 16,
ef_construction = 200
);
-- 创建全文索引
CREATE FULLTEXT INDEX idx_content ON document_vectors(content);
高级查询
from langchain_community.vectorstores import OceanBase
class OptimizedOceanBaseRetriever:
"""优化的 OceanBase 检索器"""
def __init__(self, vector_store: OceanBase):
self.vector_store = vector_store
def similarity_search_with_score(
self,
query: str,
k: int = 5,
score_threshold: float = 0.7
) -> List[tuple[Document, float]]:
"""带分数的相似度搜索"""
results = self.vector_store.similarity_search_with_score(
query=query,
k=k
)
# 过滤低分结果
filtered_results = [
(doc, score) for doc, score in results
if score >= score_threshold
]
return filtered_results
def mmr_search(
self,
query: str,
k: int = 5,
fetch_k: int = 20,
lambda_mult: float = 0.5
) -> List[Document]:
"""最大边际相关性搜索(减少冗余)"""
return self.vector_store.max_marginal_relevance_search(
query=query,
k=k,
fetch_k=fetch_k,
lambda_mult=lambda_mult # 0=多样性, 1=相关性
)
def metadata_filter_search(
self,
query: str,
filter_dict: dict,
k: int = 5
) -> List[Document]:
"""基于元数据过滤的搜索"""
return self.vector_store.similarity_search(
query=query,
k=k,
filter=filter_dict
)
# 使用示例
retriever = OptimizedOceanBaseRetriever(vector_store)
# 1. 带分数阈值的搜索
results = retriever.similarity_search_with_score(
query="向量数据库",
k=5,
score_threshold=0.75
)
for doc, score in results:
print(f"分数: {score:.3f} - {doc.page_content[:100]}")
# 2. MMR 搜索(减少重复)
diverse_results = retriever.mmr_search(
query="分布式系统",
k=5,
lambda_mult=0.3 # 更注重多样性
)
# 3. 元数据过滤
filtered_results = retriever.metadata_filter_search(
query="数据库索引",
filter_dict={"category": "技术文档", "year": 2026},
k=5
)
性能优化技巧
class PerformanceOptimizer:
"""性能优化器"""
def batch_embed(texts: List[str], batch_size: int = 100):
"""批量 Embedding(提升效率)"""
embeddings = []
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
batch_embeddings = embedding_model.embed_documents(batch)
embeddings.extend(batch_embeddings)
return embeddings
def cache_embeddings(query: str, cache: dict):
"""缓存 Embedding 结果"""
if query in cache:
return cache[query]
embedding = embedding_model.embed_query(query)
cache[query] = embedding
return embedding
def parallel_search(queries: List[str], retriever):
"""并行检索"""
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=5) as executor:
results = list(executor.map(
lambda q: retriever.get_relevant_documents(q),
queries
))
return results
# 使用缓存
embedding_cache = {}
def cached_search(query: str):
embedding = PerformanceOptimizer.cache_embeddings(
query,
embedding_cache
)
# 使用缓存的 embedding 进行搜索
return vector_store.similarity_search_by_vector(embedding, k=5)
第五部分:快捷 RAG(Zero-Config RAG)
5.1 OceanBase 内置 RAG 功能
两种构建方式
方式一:内置 Embedding
from langchain_community.vectorstores import OceanBase
# 使用 OceanBase 内置的 Embedding 功能
easy_vector_store = OceanBase.from_texts(
texts=["文本1", "文本2", "文本3"],
embedding=None, # 使用内置 embedding
connection_args=db_config,
table_name="easy_rag",
use_builtin_embedding=True # 启用内置功能
)
# 直接搜索(无需手动 embedding)
results = easy_vector_store.similarity_search("查询文本")
方式二:AI 函数调用
# 使用 OceanBase 的 AI 函数
class OceanBaseAIFunction:
"""OceanBase AI 函数封装"""
def __init__(self, connection):
self.conn = connection
def ai_embed(self, text: str) -> List[float]:
"""使用 AI 函数生成 Embedding"""
query = "SELECT AI_EMBED(%s) as embedding"
cursor = self.conn.cursor()
cursor.execute(query, (text,))
result = cursor.fetchone()
return result['embedding']
def ai_search(self, query: str, table: str, k: int = 5):
"""使用 AI 函数搜索"""
sql = f"""
SELECT content,
AI_DISTANCE(embedding, AI_EMBED(%s)) as distance
FROM {table}
ORDER BY distance
LIMIT %s
"""
cursor = self.conn.cursor()
cursor.execute(sql, (query, k))
return cursor.fetchall()
def ai_generate(self, prompt: str, context: str):
"""使用 AI 函数生成回答"""
sql = "SELECT AI_GENERATE(%s, %s) as response"
cursor = self.conn.cursor()
cursor.execute(sql, (prompt, context))
result = cursor.fetchone()
return result['response']
# 使用示例
ai_func = OceanBaseAIFunction(connection)
# 1. 生成 Embedding
embedding = ai_func.ai_embed("测试文本")
# 2. 搜索
results = ai_func.ai_search("查询问题", "documents", k=5)
# 3. 生成答案
context = "\n".join([r['content'] for r in results])
answer = ai_func.ai_generate("回答问题", context)
5.2 All-in-One RAG 实现
数据处理两阶段
Data In(数据导入)
Load → Chunk → Embed → Store
class EasyRAGDataIn:
"""简化的数据导入流程"""
def __init__(self, vector_store):
self.vector_store = vector_store
def ingest_directory(self, directory: str):
"""一键导入目录"""
from langchain_community.document_loaders import DirectoryLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
# Load
loader = DirectoryLoader(directory, glob="**/*.md")
documents = loader.load()
# Chunk
splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200
)
chunks = splitter.split_documents(documents)
# Embed & Store(自动完成)
self.vector_store.add_documents(chunks)
return len(chunks)
def ingest_url(self, url: str):
"""一键导入网页"""
from langchain_community.document_loaders import WebBaseLoader
loader = WebBaseLoader(url)
documents = loader.load()
self.vector_store.add_documents(documents)
return len(documents)
# 使用
data_in = EasyRAGDataIn(vector_store)
# 导入本地文档
count = data_in.ingest_directory("./docs")
print(f"导入了 {count} 个文档块")
# 导入网页
data_in.ingest_url("https://example.com/article")
Data Out(数据检索)
Query → Search → Retrieve → Generate
class EasyRAGDataOut:
"""简化的数据检索流程"""
def __init__(self, vector_store, llm):
self.vector_store = vector_store
self.llm = llm
def ask(self, question: str, k: int = 3) -> dict:
"""一键问答"""
# Search & Retrieve
docs = self.vector_store.similarity_search(question, k=k)
context = "\n\n".join([doc.page_content for doc in docs])
# Generate
prompt = f"""基于以下上下文回答问题:
上下文:
{context}
问题:{question}
回答:"""
answer = self.llm.invoke(prompt).content
return {
"question": question,
"answer": answer,
"sources": docs
}
# 使用
data_out = EasyRAGDataOut(vector_store, llm)
result = data_out.ask("OceanBase 有哪些特性?")
print(result["answer"])
完整的 Zero-Config RAG
class ZeroConfigRAG:
"""零配置 RAG 系统"""
def __init__(self, db_config: dict):
"""只需要数据库配置即可启动"""
# 自动初始化所有组件
self.embeddings = OpenAIEmbeddings()
self.llm = ChatOpenAI(model="gpt-4")
self.vector_store = OceanBase(
embedding_function=self.embeddings,
connection_args=db_config,
table_name="zero_config_rag"
)
self.data_in = EasyRAGDataIn(self.vector_store)
self.data_out = EasyRAGDataOut(self.vector_store, self.llm)
def add_documents(self, source: str):
"""添加文档(自动识别类型)"""
import os
if os.path.isdir(source):
return self.data_in.ingest_directory(source)
elif source.startswith("http"):
return self.data_in.ingest_url(source)
else:
# 单个文件
from langchain_community.document_loaders import TextLoader
loader = TextLoader(source)
docs = loader.load()
self.vector_store.add_documents(docs)
return len(docs)
def ask(self, question: str) -> str:
"""提问"""
result = self.data_out.ask(question)
return result["answer"]
def chat(self):
"""交互式对话"""
print("Zero-Config RAG 已启动!输入 'quit' 退出。\n")
while True:
question = input("你: ")
if question.lower() == 'quit':
break
answer = self.ask(question)
print(f"AI: {answer}\n")
# 使用示例
if __name__ == "__main__":
# 只需要数据库配置
db_config = {
"host": "localhost",
"port": 2881,
"user": "root@test",
"password": "password",
"database": "rag_db"
}
# 创建 RAG 系统
rag = ZeroConfigRAG(db_config)
# 添加文档
rag.add_documents("./docs")
rag.add_documents("https://example.com/article")
# 开始对话
rag.chat()
第六部分:实战项目
6.1 完整的生产级 RAG 系统
项目结构
rag_system/
├── config/
│ ├── __init__.py
│ ├── settings.py # 配置管理
│ └── prompts.py # 提示模板
├── core/
│ ├── __init__.py
│ ├── embeddings.py # Embedding 管理
│ ├── vector_store.py # 向量存储
│ └── llm.py # LLM 管理
├── retrieval/
│ ├── __init__.py
│ ├── hybrid_retriever.py # 混合检索
│ ├── grader.py # 文档评分
│ └── reranker.py # 重排序
├── agents/
│ ├── __init__.py
│ ├── rag_agent.py # RAG Agent
│ └── tools.py # 工具定义
├── middleware/
│ ├── __init__.py
│ ├── crag.py # CRAG 中间件
│ └── logging.py # 日志中间件
├── api/
│ ├── __init__.py
│ ├── routes.py # API 路由
│ └── models.py # 数据模型
├── utils/
│ ├── __init__.py
│ ├── document_loader.py # 文档加载
│ └── text_splitter.py # 文本分割
├── tests/
│ └── test_rag.py
├── main.py # 主程序
└── requirements.txt
配置管理(config/settings.py)
from pydantic_settings import BaseSettings
from typing import Optional
class Settings(BaseSettings):
"""系统配置"""
# OpenAI 配置
openai_api_key: str
openai_model: str = "gpt-4"
embedding_model: str = "text-embedding-3-small"
# OceanBase 配置
ob_host: str = "localhost"
ob_port: int = 2881
ob_user: str = "root@test"
ob_password: str
ob_database: str = "rag_db"
ob_table: str = "documents"
# RAG 配置
chunk_size: int = 1000
chunk_overlap: int = 200
retrieval_k: int = 5
score_threshold: float = 0.7
# CRAG 配置
enable_crag: bool = True
min_relevance: float = 0.5
# API 配置
api_host: str = "0.0.0.0"
api_port: int = 8000
class Config:
env_file = ".env"
settings = Settings()
核心组件(core/vector_store.py)
from langchain_community.vectorstores import OceanBase
from langchain_openai import OpenAIEmbeddings
from config.settings import settings
class VectorStoreManager:
"""向量存储管理器"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
self.embeddings = OpenAIEmbeddings(
model=settings.embedding_model,
openai_api_key=settings.openai_api_key
)
self.vector_store = OceanBase(
embedding_function=self.embeddings,
connection_args={
"host": settings.ob_host,
"port": settings.ob_port,
"user": settings.ob_user,
"password": settings.ob_password,
"database": settings.ob_database
},
table_name=settings.ob_table
)
self._initialized = True
def get_vector_store(self):
return self.vector_store
def get_retriever(self, **kwargs):
return self.vector_store.as_retriever(**kwargs)
# 单例实例
vector_store_manager = VectorStoreManager()
RAG Agent(agents/rag_agent.py)
from langchain.agents import AgentExecutor, create_openai_tools_agent
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from agents.tools import get_rag_tools
from config.settings import settings
class RAGAgent:
"""RAG Agent 封装"""
def __init__(self):
self.llm = ChatOpenAI(
model=settings.openai_model,
temperature=0,
openai_api_key=settings.openai_api_key
)
self.tools = get_rag_tools()
self.prompt = ChatPromptTemplate.from_messages([
("system", self._get_system_prompt()),
("human", "{input}"),
MessagesPlaceholder(variable_name="agent_scratchpad"),
])
self.agent = create_openai_tools_agent(
llm=self.llm,
tools=self.tools,
prompt=self.prompt
)
self.executor = AgentExecutor(
agent=self.agent,
tools=self.tools,
verbose=True,
max_iterations=5,
handle_parsing_errors=True
)
def _get_system_prompt(self) -> str:
return """你是一个专业的 AI 助手,具有以下能力:
1. 知识库搜索:使用 search_knowledge_base 工具
2. 网络搜索:使用 web_search 工具(当知识库信息不足时)
3. 文档评分:自动评估检索结果的相关性
工作原则:
- 优先使用知识库
- 必要时使用网络搜索补充
- 始终提供信息来源
- 如果无法找到答案,诚实告知
请根据用户问题,选择合适的工具并生成准确的答案。
"""
def invoke(self, question: str) -> dict:
"""执行查询"""
return self.executor.invoke({"input": question})
async def ainvoke(self, question: str) -> dict:
"""异步执行查询"""
return await self.executor.ainvoke({"input": question})
FastAPI 接口(api/routes.py)
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import List, Optional
from agents.rag_agent import RAGAgent
from utils.document_loader import DocumentLoader
from config.settings import settings
app = FastAPI(title="RAG System API", version="1.0.0")
# CORS 配置
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 初始化 Agent
rag_agent = RAGAgent()
doc_loader = DocumentLoader()
# 请求模型
class QueryRequest(BaseModel):
question: str
k: Optional[int] = 5
class DocumentRequest(BaseModel):
source: str # 文件路径或 URL
source_type: str # "file", "directory", "url"
class QueryResponse(BaseModel):
question: str
answer: str
sources: List[dict]
# API 路由
async def root():
return {"message": "RAG System API", "version": "1.0.0"}
async def query(request: QueryRequest):
"""查询接口"""
try:
result = rag_agent.invoke(request.question)
return QueryResponse(
question=request.question,
answer=result["output"],
sources=[] # 可以从 result 中提取来源
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
async def add_documents(request: DocumentRequest):
"""添加文档接口"""
try:
count = doc_loader.load_and_store(
source=request.source,
source_type=request.source_type
)
return {
"message": "Documents added successfully",
"count": count
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
async def health_check():
"""健康检查"""
return {"status": "healthy"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(
app,
host=settings.api_host,
port=settings.api_port
)
文档加载工具(utils/document_loader.py)
from langchain_community.document_loaders import (
DirectoryLoader,
TextLoader,
PyPDFLoader,
WebBaseLoader
)
from langchain.text_splitter import RecursiveCharacterTextSplitter
from core.vector_store import vector_store_manager
from config.settings import settings
import os
class DocumentLoader:
"""文档加载和处理"""
def __init__(self):
self.vector_store = vector_store_manager.get_vector_store()
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=settings.chunk_size,
chunk_overlap=settings.chunk_overlap
)
def load_and_store(self, source: str, source_type: str) -> int:
"""加载并存储文档"""
# 加载文档
if source_type == "directory":
documents = self._load_directory(source)
elif source_type == "file":
documents = self._load_file(source)
elif source_type == "url":
documents = self._load_url(source)
else:
raise ValueError(f"Unsupported source type: {source_type}")
# 分割文档
chunks = self.text_splitter.split_documents(documents)
# 存储到向量数据库
self.vector_store.add_documents(chunks)
return len(chunks)
def _load_directory(self, directory: str):
"""加载目录"""
loader = DirectoryLoader(
directory,
glob="**/*.{md,txt,pdf}",
show_progress=True
)
return loader.load()
def _load_file(self, file_path: str):
"""加载单个文件"""
ext = os.path.splitext(file_path)[1].lower()
if ext == ".pdf":
loader = PyPDFLoader(file_path)
else:
loader = TextLoader(file_path)
return loader.load()
def _load_url(self, url: str):
"""加载网页"""
loader = WebBaseLoader(url)
return loader.load()
主程序(main.py)
import argparse
from api.routes import app
from utils.document_loader import DocumentLoader
from agents.rag_agent import RAGAgent
from config.settings import settings
import uvicorn
def start_api():
"""启动 API 服务"""
print(f"Starting RAG API on {settings.api_host}:{settings.api_port}")
uvicorn.run(
app,
host=settings.api_host,
port=settings.api_port
)
def interactive_mode():
"""交互式模式"""
print("RAG System - Interactive Mode")
print("输入 'quit' 退出\n")
agent = RAGAgent()
while True:
question = input("你: ")
if question.lower() == 'quit':
break
try:
result = agent.invoke(question)
print(f"AI: {result['output']}\n")
except Exception as e:
print(f"错误: {str(e)}\n")
def load_documents(source: str, source_type: str):
"""加载文档"""
loader = DocumentLoader()
count = loader.load_and_store(source, source_type)
print(f"成功加载 {count} 个文档块")
def main():
parser = argparse.ArgumentParser(description="RAG System")
parser.add_argument(
"mode",
choices=["api", "chat", "load"],
help="运行模式"
)
parser.add_argument(
"--source",
help="文档源(用于 load 模式)"
)
parser.add_argument(
"--type",
choices=["file", "directory", "url"],
help="源类型(用于 load 模式)"
)
args = parser.parse_args()
if args.mode == "api":
start_api()
elif args.mode == "chat":
interactive_mode()
elif args.mode == "load":
if not args.source or not args.type:
print("错误: load 模式需要 --source 和 --type 参数")
return
load_documents(args.source, args.type)
if __name__ == "__main__":
main()
使用示例
# 1. 安装依赖
pip install -r requirements.txt
# 2. 配置环境变量(.env 文件)
OPENAI_API_KEY=your_api_key
OB_PASSWORD=your_password
# 3. 加载文档
python main.py load --source ./docs --type directory
# 4. 启动 API 服务
python main.py api
# 5. 或者使用交互式模式
python main.py chat
API 调用示例
import requests
# 查询
response = requests.post(
"http://localhost:8000/query",
json={"question": "OceanBase 有哪些特性?"}
)
print(response.json())
# 添加文档
response = requests.post(
"http://localhost:8000/documents/add",
json={
"source": "./new_docs",
"source_type": "directory"
}
)
print(response.json())
6.2 性能监控和优化
日志中间件(middleware/logging.py)
import time
import logging
from typing import List
from langchain.schema import BaseMessage
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class LoggingMiddleware:
"""日志和性能监控中间件"""
def __init__(self):
self.metrics = {
"total_queries": 0,
"total_time": 0,
"avg_time": 0
}
def before_model(self, messages: List[BaseMessage]) -> List[BaseMessage]:
"""记录请求开始"""
self.start_time = time.time()
logger.info(f"开始处理查询,消息数: {len(messages)}")
return messages
def after_model(self, response: BaseMessage) -> BaseMessage:
"""记录请求结束"""
elapsed = time.time() - self.start_time
self.metrics["total_queries"] += 1
self.metrics["total_time"] += elapsed
self.metrics["avg_time"] = (
self.metrics["total_time"] / self.metrics["total_queries"]
)
logger.info(f"查询完成,耗时: {elapsed:.2f}s")
logger.info(f"平均响应时间: {self.metrics['avg_time']:.2f}s")
return response
def get_metrics(self) -> dict:
"""获取性能指标"""
return self.metrics
# 使用
logging_middleware = LoggingMiddleware()
缓存优化
from functools import lru_cache
import hashlib
import json
class CacheManager:
"""缓存管理器"""
def __init__(self, max_size: int = 1000):
self.cache = {}
self.max_size = max_size
def _get_cache_key(self, query: str, k: int) -> str:
"""生成缓存键"""
content = f"{query}_{k}"
return hashlib.md5(content.encode()).hexdigest()
def get(self, query: str, k: int):
"""获取缓存"""
key = self._get_cache_key(query, k)
return self.cache.get(key)
def set(self, query: str, k: int, result):
"""设置缓存"""
if len(self.cache) >= self.max_size:
# 简单的 FIFO 策略
self.cache.pop(next(iter(self.cache)))
key = self._get_cache_key(query, k)
self.cache[key] = result
def clear(self):
"""清空缓存"""
self.cache.clear()
# 使用缓存的检索器
class CachedRetriever:
def __init__(self, retriever, cache_manager: CacheManager):
self.retriever = retriever
self.cache = cache_manager
def get_relevant_documents(self, query: str, k: int = 5):
"""带缓存的检索"""
# 尝试从缓存获取
cached_result = self.cache.get(query, k)
if cached_result:
logger.info("缓存命中")
return cached_result
# 执行检索
logger.info("缓存未命中,执行检索")
result = self.retriever.get_relevant_documents(query)
# 存入缓存
self.cache.set(query, k, result)
return result
# 使用
cache_manager = CacheManager(max_size=500)
cached_retriever = CachedRetriever(retriever, cache_manager)
批处理优化
from typing import List
import asyncio
class BatchProcessor:
"""批处理器"""
def __init__(self, batch_size: int = 10):
self.batch_size = batch_size
async def process_batch(
self,
queries: List[str],
processor_func
) -> List:
"""批量处理查询"""
results = []
for i in range(0, len(queries), self.batch_size):
batch = queries[i:i + self.batch_size]
# 并行处理批次
batch_results = await asyncio.gather(*[
processor_func(query) for query in batch
])
results.extend(batch_results)
return results
# 使用示例
async def process_query(query: str):
"""异步处理单个查询"""
result = await rag_agent.ainvoke(query)
return result
async def main():
queries = [
"问题1",
"问题2",
"问题3",
# ... 更多查询
]
batch_processor = BatchProcessor(batch_size=5)
results = await batch_processor.process_batch(
queries,
process_query
)
return results
# 运行
# asyncio.run(main())
第七部分:最佳实践和技巧
7.1 提示词工程
系统提示词模板
SYSTEM_PROMPTS = {
"technical": """你是一个技术专家,专注于提供准确的技术信息。
特点:
- 使用专业术语
- 提供代码示例
- 引用官方文档
- 解释技术原理
""",
"beginner_friendly": """你是一个友好的导师,擅长用简单的语言解释复杂概念。
特点:
- 避免过多术语
- 使用类比和比喻
- 循序渐进讲解
- 鼓励提问
""",
"concise": """你是一个简洁的助手,提供精炼的答案。
特点:
- 直接回答要点
- 避免冗余信息
- 使用列表和要点
- 控制回答长度
"""
}
def get_prompt_for_user(user_level: str, domain: str) -> str:
"""根据用户特征选择提示词"""
if user_level == "expert":
return SYSTEM_PROMPTS["technical"]
elif user_level == "beginner":
return SYSTEM_PROMPTS["beginner_friendly"]
else:
return SYSTEM_PROMPTS["concise"]
Few-Shot 示例
FEW_SHOT_EXAMPLES = """
示例 1:
问题:什么是向量数据库?
回答:向量数据库是专门用于存储和检索向量数据的数据库系统。它使用向量相似度算法(如余弦相似度)来快速找到相似的数据点,常用于 AI 应用中的语义搜索。
示例 2:
问题:如何优化 RAG 系统的检索性能?
回答:优化 RAG 检索性能的方法包括:
1. 使用混合检索(向量+关键词)
2. 实现查询缓存
3. 优化文档分块策略
4. 使用重排序模型
5. 批量处理查询
现在请回答用户的问题:
"""
# 在提示中使用
prompt_with_examples = f"{FEW_SHOT_EXAMPLES}\n问题:{user_question}"
7.2 错误处理和重试
健壮的错误处理
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type
)
import logging
logger = logging.getLogger(__name__)
class RobustRAGAgent:
"""带错误处理的 RAG Agent"""
def __init__(self, agent):
self.agent = agent
)
def invoke_with_retry(self, question: str) -> dict:
"""带重试的调用"""
try:
return self.agent.invoke(question)
except Exception as e:
logger.error(f"查询失败: {str(e)}")
raise
def invoke_safe(self, question: str) -> dict:
"""安全调用(不抛出异常)"""
try:
return self.invoke_with_retry(question)
except Exception as e:
logger.error(f"查询最终失败: {str(e)}")
return {
"output": "抱歉,系统遇到了问题,请稍后重试。",
"error": str(e)
}
# 使用
robust_agent = RobustRAGAgent(rag_agent)
result = robust_agent.invoke_safe("查询问题")
降级策略
class FallbackStrategy:
"""降级策略"""
def __init__(self, primary_agent, fallback_agent):
self.primary = primary_agent
self.fallback = fallback_agent
def invoke(self, question: str) -> dict:
"""尝试主 Agent,失败则降级"""
try:
# 尝试主 Agent
return self.primary.invoke(question)
except Exception as e:
logger.warning(f"主 Agent 失败,使用降级策略: {str(e)}")
try:
# 使用降级 Agent
return self.fallback.invoke(question)
except Exception as e2:
logger.error(f"降级 Agent 也失败: {str(e2)}")
return {
"output": "系统暂时不可用,请稍后重试。",
"error": str(e2)
}
# 使用
fallback_strategy = FallbackStrategy(
primary_agent=advanced_rag_agent,
fallback_agent=simple_rag_agent
)
7.3 评估和测试
RAG 系统评估
from typing import List, Dict
import numpy as np
class RAGEvaluator:
"""RAG 系统评估器"""
def __init__(self, rag_system):
self.rag_system = rag_system
def evaluate_retrieval(
self,
test_cases: List[Dict[str, any]]
) -> Dict[str, float]:
"""评估检索质量"""
precision_scores = []
recall_scores = []
for case in test_cases:
query = case["query"]
expected_docs = set(case["expected_doc_ids"])
# 执行检索
retrieved_docs = self.rag_system.retrieve(query, k=5)
retrieved_ids = set([doc.metadata["id"] for doc in retrieved_docs])
# 计算精确率和召回率
true_positives = len(expected_docs & retrieved_ids)
precision = true_positives / len(retrieved_ids) if retrieved_ids else 0
recall = true_positives / len(expected_docs) if expected_docs else 0
precision_scores.append(precision)
recall_scores.append(recall)
return {
"avg_precision": np.mean(precision_scores),
"avg_recall": np.mean(recall_scores),
"f1_score": 2 * np.mean(precision_scores) * np.mean(recall_scores) /
(np.mean(precision_scores) + np.mean(recall_scores))
}
def evaluate_generation(
self,
test_cases: List[Dict[str, str]]
) -> Dict[str, float]:
"""评估生成质量"""
from rouge import Rouge
rouge = Rouge()
rouge_scores = []
for case in test_cases:
query = case["query"]
reference = case["reference_answer"]
# 生成答案
result = self.rag_system.ask(query)
generated = result["answer"]
# 计算 ROUGE 分数
scores = rouge.get_scores(generated, reference)[0]
rouge_scores.append(scores["rouge-l"]["f"])
return {
"avg_rouge_l": np.mean(rouge_scores)
}
# 使用示例
test_cases = [
{
"query": "什么是向量数据库?",
"expected_doc_ids": ["doc1", "doc3", "doc5"],
"reference_answer": "向量数据库是..."
},
# 更多测试用例
]
evaluator = RAGEvaluator(rag_system)
retrieval_metrics = evaluator.evaluate_retrieval(test_cases)
generation_metrics = evaluator.evaluate_generation(test_cases)
print(f"检索精确率: {retrieval_metrics['avg_precision']:.2f}")
print(f"检索召回率: {retrieval_metrics['avg_recall']:.2f}")
print(f"生成质量 (ROUGE-L): {generation_metrics['avg_rouge_l']:.2f}")
7.4 常见问题和解决方案
问题 1:检索结果不相关
原因:
- Embedding 模型不适合
- 文档分块策略不当
- 查询表述不清晰
解决方案:
# 1. 使用更好的 Embedding 模型
from langchain_openai import OpenAIEmbeddings
embeddings = OpenAIEmbeddings(
model="text-embedding-3-large" # 更大的模型
)
# 2. 优化分块策略
from langchain.text_splitter import RecursiveCharacterTextSplitter
splitter = RecursiveCharacterTextSplitter(
chunk_size=500, # 减小块大小
chunk_overlap=100, # 增加重叠
separators=["\n\n", "\n", "。", "!", "?", " ", ""] # 中文分隔符
)
# 3. 查询重写
def rewrite_query(query: str) -> str:
"""重写查询以提高检索效果"""
prompt = f"将以下查询重写为更适合检索的形式:{query}"
return llm.invoke(prompt).content
问题 2:生成答案有幻觉
原因:
- 检索到的文档不相关
- 模型过度推理
- 缺乏约束
解决方案:
# 1. 使用 CRAG 验证检索结果
# 2. 强化提示词约束
STRICT_PROMPT = """基于以下上下文回答问题。
重要规则:
1. 只使用上下文中的信息
2. 如果上下文中没有答案,明确说"我不知道"
3. 不要推测或编造信息
4. 引用具体的上下文片段
上下文:
{context}
问题:{question}
回答:"""
# 3. 添加置信度评估
class ConfidenceEvaluator:
def evaluate(self, answer: str, context: str) -> float:
"""评估答案的置信度"""
prompt = f"""
评估以下答案是否完全基于上下文:
上下文:{context}
答案:{answer}
返回 0-1 之间的置信度分数。
"""
response = llm.invoke(prompt)
return float(response.content)
问题 3:响应速度慢
原因:
- Embedding 计算慢
- 向量检索慢
- LLM 调用慢
解决方案:
# 1. 使用缓存
cache_manager = CacheManager(max_size=1000)
# 2. 批量处理
batch_processor = BatchProcessor(batch_size=10)
# 3. 异步处理
async def async_query(question: str):
result = await rag_agent.ainvoke(question)
return result
# 4. 使用更快的模型
fast_llm = ChatOpenAI(
model="gpt-3.5-turbo", # 更快的模型
temperature=0
)
# 5. 优化向量索引
# 在 OceanBase 中创建 HNSW 索引
问题 4:成本过高
原因:
- 频繁调用 API
- 使用昂贵的模型
- 没有缓存
解决方案:
# 1. 使用本地 Embedding 模型
from langchain_community.embeddings import HuggingFaceEmbeddings
local_embeddings = HuggingFaceEmbeddings(
model_name="BAAI/bge-large-zh-v1.5" # 中文模型
)
# 2. 混合使用模型
class CostOptimizedAgent:
def __init__(self):
self.cheap_model = ChatOpenAI(model="gpt-3.5-turbo")
self.expensive_model = ChatOpenAI(model="gpt-4")
def invoke(self, question: str, complexity: str = "simple"):
if complexity == "simple":
return self.cheap_model.invoke(question)
else:
return self.expensive_model.invoke(question)
# 3. 实现智能缓存
# 4. 批量处理减少调用次数
7.5 部署建议
Docker 部署
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
# 安装依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 复制代码
COPY . .
# 暴露端口
EXPOSE 8000
# 启动命令
CMD ["python", "main.py", "api"]
# docker-compose.yml
version: '3.8'
services:
rag-api:
build: .
ports:
- "8000:8000"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- OB_HOST=oceanbase
- OB_PASSWORD=${OB_PASSWORD}
depends_on:
- oceanbase
oceanbase:
image: oceanbase/oceanbase-ce:latest
ports:
- "2881:2881"
environment:
- MODE=slim
volumes:
- ob-data:/root/ob
volumes:
ob-data:
生产环境配置
# config/production.py
PRODUCTION_CONFIG = {
# 性能优化
"enable_cache": True,
"cache_size": 5000,
"batch_size": 20,
# 可靠性
"max_retries": 3,
"timeout": 30,
"enable_fallback": True,
# 监控
"enable_logging": True,
"log_level": "INFO",
"enable_metrics": True,
# 安全
"enable_rate_limit": True,
"rate_limit": "100/minute",
"enable_auth": True,
}