本文是一份完整的 LangChain + OceanBase 开发指南,涵盖从基础概念到高级实现的 Agentic RAG 系统构建。适合快速上手 AI Infra 开发,特别是 RAG(检索增强生成)相关项目。

架构总览

🏗️ LangChain + OceanBase RAG 系统架构

系统分层架构

架构图

═══════════════════════════════════════════════════════════════════
                    RAG 系统六层架构
═══════════════════════════════════════════════════════════════════

📱 应用层 (Application Layer)
   ├─ FastAPI REST API
   ├─ WebSocket 实时对话
   └─ CLI 交互界面

───────────────────────────────────────────────────────────────────

🤖 Agent 层 (Agent Layer)
   ├─ LangChain Agent (ReAct)
   ├─ LLM (GPT-4 / GPT-3.5)
   ├─ Tools (知识库搜索 / 网络搜索 / 计算)
   └─ Middleware (前置/后置钩子)

───────────────────────────────────────────────────────────────────

💬 生成层 (Generation Layer)
   ├─ Prompt Engineering
   ├─ LLM Invocation (同步/流式/异步)
   ├─ Structured Output
   └─ Response Enhancement

───────────────────────────────────────────────────────────────────

🔍 检索层 (Retrieval Layer)
   ├─ Query Processing (重写/扩展)
   ├─ Hybrid Search (向量+稀疏+全文)
   ├─ Document Grading (CRAG)
   └─ Reranking (MMR)

───────────────────────────────────────────────────────────────────

�️ 存储层 (Storage Layer)
   ├─ OceanBase Vector Store
   ├─ Vector Index (HNSW/IVF)
   ├─ Full-Text Index
   └─ Metadata Storage (JSON)

───────────────────────────────────────────────────────────────────

📄 数据层 (Data Layer)
   ├─ Document Loader (PDF/MD/TXT/Web)
   ├─ Text Splitter (智能分块)
   ├─ Embedding (OpenAI/HuggingFace/OceanBase)
   └─ Data Ingestion Pipeline

═══════════════════════════════════════════════════════════════════

🔄 核心工作流程

1️⃣ 数据导入流程 (Data Ingestion)

� 文档源 
   ↓
🔧 Document Loader (加载解析)
   ↓
✂️ Text Splitter (智能分块: 1000 chars, 200 overlap)
   ↓
🔢 Embedding (向量化: 1536维)
   ↓
💾 OceanBase 存储 (向量索引 + 元数据)

2️⃣ 查询处理流程 (Query Processing)

❓ 用户查询
   ↓
✍️ Query Rewriting (查询优化)
   ↓
🔍 Hybrid Retrieval (向量 60% + 关键词 40%)
   ↓
✅ Document Grading (CRAG 相关性评分)
   ↓
📚 Top-K Context (精选文档)

3️⃣ Agent 决策流程 (Agent Reasoning - ReAct)

💭 Query Input
   ↓
🧠 Agent (LLM) 推理
   ↓
   ├─ 需要检索? ──Yes──> 🔍 search_knowledge_base
   ├─ 简单问题? ──Yes──> 💬 直接回答
   └─ 信息不足? ──Yes──> 🌐 web_search (fallback)
   ↓
🔄 迭代优化 (最多 5 轮)
   ↓
✨ 最终结果

4️⃣ 生成响应流程 (Response Generation)

📚 Context + ❓ Query
   ↓
📝 Prompt Engineering (System + Context + Few-Shot)
   ↓
🧠 LLM Generation (GPT-4)
   ↓
🔗 Post-Processing (添加引用 + 来源标注)
   ↓
✅ Final Response

📊 三种 RAG 模式对比

维度 Two-Step RAG Agentic RAG CRAG
流程 Query→Retrieve→Generate Query→Agent决策→Tool(0-N)→Generate Query→Retrieve→Grade→Fallback/Generate
灵活性 ⭐⭐ 固定 ⭐⭐⭐⭐⭐ 极高 ⭐⭐⭐⭐ 自适应
准确性 ⭐⭐⭐ 中等 ⭐⭐⭐⭐ 高 ⭐⭐⭐⭐⭐ 极高
成本 💰 低 💰💰 中 💰💰💰 高
延迟 ⚡ 快 ⚡⚡ 中 ⚡⚡⚡ 慢
适用场景 简单问答 复杂推理 高质量要求
总是检索 ✅ 是 ❌ 否 ✅ 是(验证)

🛠️ 技术栈一览

┌──────────────────────────────────────┐
│  🌐 Frontend / Client                │
│  • REST API / WebSocket              │
│  • CLI Interface                     │
└──────────────────────────────────────┘
              ↓
┌──────────────────────────────────────┐
│  📦 Application Layer                │
│  • FastAPI (API Server)              │
│  • Pydantic (Data Validation)        │
│  • Uvicorn (ASGI Server)             │
└──────────────────────────────────────┘
              ↓
┌──────────────────────────────────────┐
│  🤖 Agent Framework                  │
│  • LangChain (Agent + Tools)         │
│  • OpenAI GPT-4 / GPT-3.5            │
│  • ReAct Pattern                     │
└──────────────────────────────────────┘
              ↓
┌──────────────────────────────────────┐
│  🔍 Retrieval Layer                  │
│  • LangChain Retrievers              │
│  • Hybrid Search (Vector + BM25)     │
│  • CRAG (Document Grading)           │
│  • Tavily (Web Search)               │
└──────────────────────────────────────┘
              ↓
┌──────────────────────────────────────┐
│  �️ Storage Layer                    │
│  • OceanBase (Vector Store)          │
│  • Vector Index (HNSW/IVF)           │
│  • Full-Text Index                   │
│  • JSON Metadata                     │
└──────────────────────────────────────┘
              ↓
┌──────────────────────────────────────┐
│  🔢 Embedding Layer                  │
│  • OpenAI Embeddings (1536维)        │
│  • HuggingFace Models (可选)         │
│  • OceanBase AI Functions (可选)     │
└──────────────────────────────────────┘

🎯 核心特性

✨ 数据处理

  • 多格式支持: PDF, Markdown, TXT, HTML, 网页
  • 智能分块: RecursiveCharacterTextSplitter (1000/200)
  • 向量化: OpenAI text-embedding-3-small (1536维)
  • 存储: OceanBase 向量数据库 + HNSW 索引

🔍 检索增强

  • 混合检索: 向量检索 + BM25 + 全文检索
  • 动态权重: Agent 可调整检索策略
  • CRAG 验证: 文档相关性评分 + 自动回退
  • 重排序: MMR 算法优化多样性

🤖 智能 Agent

  • ReAct 框架: Reason (推理) + Act (行动)
  • 工具调用: 知识库搜索 / 网络搜索 / 计算
  • 中间件: 前置/后置钩子 + 工具包装
  • 记忆管理: 短期/长期记忆 + 上下文注入

� 生成优化

  • 提示工程: 动态/静态提示 + Few-Shot
  • 流式输出: Token 级实时返回
  • 结构化输出: Pydantic Models + JSON Schema
  • 引用标注: 自动添加来源和置信度

⚡ 性能优化

  • 缓存策略: Embedding 缓存 + 查询缓存
  • 批处理: 批量向量化 + 并行检索
  • 异步处理: Async/Await 模式
  • 连接池: 数据库连接复用

前言

最近参与百度的 AI Infra 开源项目,主要聚焦在 RAG(Retrieval-Augmented Generation)方向。本文从 Document AI 开始,系统性地介绍 Agentic RAG 的核心概念和实践方法。

核心概念速览

Document AI 处理流程

  • 非结构化数据 → 结构化数据(JSON/HTML/Markdown)
  • Parsing 和 Extracting 需要注意语义保留
  • OCR:图像清理 + 文本识别(但不理解结构和语义)

Agentic 系统的三要素

  • 👁️ 眼睛(OCR):感知和识别
  • 🧠 大脑(LLM):理解和推理
  • 🤚 手(Tools):执行和操作

ReAct 框架:Reason + Act

  • 观察(Observation)→ 思考(Reasoning)→ 行动(Action)
  • 可调试、可追溯的执行流程

第一部分:LangChain + OceanBase 基础架构

1.1 核心组件

LLMs(大语言模型)

  • 推理引擎:自然语言理解
  • 计划与执行:任务分解和策略制定
  • 决策中枢:协调整个系统的运行

Tools(工具)

  • 行动执行:调用外部能力
  • 观察返回:获取执行结果
  • 能力扩展:连接外部系统和服务

Execution Loops(执行循环)

  • 迭代执行:多轮交互和优化
  • 动态决策:根据结果调整策略
  • 终止条件:明确的完成标准

架构图

1.2 环境搭建

安装依赖

# 安装 LangChain 核心库
pip install langchain langchain-community langchain-core

# 安装 OceanBase 向量存储支持
pip install oceanbase-connector pyobvector

# 安装其他必要组件
pip install openai tiktoken faiss-cpu

OceanBase 配置

from langchain_community.vectorstores import OceanBase
from langchain_openai import OpenAIEmbeddings

# 配置 OceanBase 连接
ob_config = {
    "host": "localhost",
    "port": 2881,
    "user": "root@test",
    "password": "your_password",
    "database": "rag_db"
}

# 初始化 Embedding 模型
embeddings = OpenAIEmbeddings(
    model="text-embedding-3-small",
    openai_api_key="your_api_key"
)

# 创建向量存储
vector_store = OceanBase(
    embedding_function=embeddings,
    connection_args=ob_config,
    table_name="document_vectors"
)

1.3 模型选择策略

静态加载(简单问题)

from langchain_openai import ChatOpenAI

# 直接加载固定模型
llm = ChatOpenAI(
    model="gpt-4",
    temperature=0.7,
    max_tokens=2000
)

动态路由(复杂问题)

from langchain.chains import LLMRouterChain
from langchain.chains.router import MultiPromptChain

# 定义不同场景的提示模板
prompt_infos = [
    {
        "name": "simple_qa",
        "description": "适用于简单的问答任务",
        "prompt_template": "简单回答:{question}"
    },
    {
        "name": "complex_reasoning",
        "description": "适用于需要深度推理的任务",
        "prompt_template": "详细分析:{question}"
    }
]

# 创建路由链
router_chain = MultiPromptChain.from_prompts(
    llm=llm,
    prompt_infos=prompt_infos
)

Middleware 路由

class ModelRouter:
    def __init__(self):
        self.simple_model = ChatOpenAI(model="gpt-3.5-turbo")
        self.complex_model = ChatOpenAI(model="gpt-4")
    
    def route(self, query: str):
        # 根据查询复杂度选择模型
        if len(query.split()) < 10:
            return self.simple_model
        return self.complex_model
    
    def invoke(self, query: str):
        model = self.route(query)
        return model.invoke(query)

1.4 系统提示词设计

静态提示词

from langchain.prompts import ChatPromptTemplate

# 定义静态系统提示词
system_prompt = """你是一个专业的 AI 助手,专注于回答技术问题。
你的回答应该:
1. 准确且基于事实
2. 结构清晰,易于理解
3. 包含代码示例(如果适用)
"""

prompt = ChatPromptTemplate.from_messages([
    ("system", system_prompt),
    ("human", "{question}")
])

动态提示词

from typing import Dict, Any

class DynamicPromptBuilder:
    @staticmethod
    def build_prompt(context: Dict[str, Any]) -> str:
        """根据上下文动态生成提示词"""
        user_level = context.get("user_level", "beginner")
        domain = context.get("domain", "general")
        
        if user_level == "expert":
            tone = "使用专业术语,深入技术细节"
        else:
            tone = "使用通俗语言,提供基础解释"
        
        return f"""你是 {domain} 领域的专家。
回答风格:{tone}
当前用户等级:{user_level}
"""

# 使用动态提示词
@dynamic_prompt
def get_system_message(context):
    return DynamicPromptBuilder.build_prompt(context)

1.5 记忆管理

短期记忆(Chat 内部)

from langchain.memory import ConversationBufferMemory

# 创建对话缓冲记忆
memory = ConversationBufferMemory(
    memory_key="chat_history",
    return_messages=True
)

# 自动维护消息历史
from langchain.chains import ConversationChain

conversation = ConversationChain(
    llm=llm,
    memory=memory,
    verbose=True
)

# 多轮对话
response1 = conversation.predict(input="我叫张三")
response2 = conversation.predict(input="我叫什么名字?")  # 会记住之前的对话

长期记忆(跨对话)

from langchain.memory import ConversationSummaryMemory

# 使用摘要记忆节省 token
summary_memory = ConversationSummaryMemory(
    llm=llm,
    memory_key="history"
)

# 自定义记忆 Schema
class CustomMemory:
    def __init__(self):
        self.user_preferences = {}
        self.conversation_context = []
    
    def add_preference(self, key: str, value: Any):
        """存储用户偏好"""
        self.user_preferences[key] = value
    
    def add_context(self, message: str):
        """添加上下文"""
        self.conversation_context.append({
            "timestamp": datetime.now(),
            "content": message
        })
    
    def get_relevant_context(self, query: str) -> List[str]:
        """检索相关上下文"""
        # 实现相关性检索逻辑
        return self.conversation_context[-5:]  # 返回最近5条

Middleware 实现自定义记忆

class MemoryMiddleware:
    def __init__(self):
        self.session_data = {}
    
    def before_model(self, messages: List[Message]) -> List[Message]:
        """在模型调用前注入记忆"""
        session_id = self.get_session_id()
        if session_id in self.session_data:
            # 注入历史上下文
            context_msg = Message(
                role="system",
                content=f"历史上下文:{self.session_data[session_id]}"
            )
            messages.insert(0, context_msg)
        return messages
    
    def after_model(self, response: Message):
        """在模型响应后更新记忆"""
        session_id = self.get_session_id()
        self.session_data[session_id] = response.content

1.6 工具系统

使用 @Tool 装饰器

from langchain.tools import tool
from typing import Optional

@tool
def search_database(query: str, limit: int = 5) -> str:
    """在数据库中搜索相关文档
    
    Args:
        query: 搜索查询
        limit: 返回结果数量限制
    
    Returns:
        搜索结果的字符串表示
    """
    try:
        # 执行数据库搜索
        results = vector_store.similarity_search(query, k=limit)
        return "\n".join([doc.page_content for doc in results])
    except Exception as e:
        return f"搜索失败:{str(e)}"

@tool
def calculate(expression: str) -> str:
    """计算数学表达式
    
    Args:
        expression: 要计算的数学表达式
    
    Returns:
        计算结果
    """
    try:
        result = eval(expression)
        return f"计算结果:{result}"
    except Exception as e:
        return f"计算错误:{str(e)}"

异常处理和重试

from tenacity import retry, stop_after_attempt, wait_exponential

@tool
@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=2, max=10)
)
def robust_api_call(endpoint: str, params: dict) -> str:
    """带重试机制的 API 调用
    
    Args:
        endpoint: API 端点
        params: 请求参数
    
    Returns:
        API 响应
    """
    import requests
    response = requests.get(endpoint, params=params, timeout=10)
    response.raise_for_status()
    return response.json()

并行工具调用

from langchain.agents import AgentExecutor, create_openai_tools_agent
from langchain.tools import Tool

# 定义多个工具
tools = [
    Tool(
        name="Search",
        func=search_database,
        description="搜索知识库"
    ),
    Tool(
        name="Calculate",
        func=calculate,
        description="执行数学计算"
    ),
    Tool(
        name="WebSearch",
        func=web_search,
        description="搜索互联网"
    )
]

# 创建支持并行调用的 Agent
agent = create_openai_tools_agent(
    llm=llm,
    tools=tools,
    prompt=prompt
)

agent_executor = AgentExecutor(
    agent=agent,
    tools=tools,
    verbose=True,
    max_iterations=5
)

1.7 执行模式

Invoke 基础执行

from langchain.schema import HumanMessage, SystemMessage

# 阻塞式执行
messages = [
    SystemMessage(content="你是一个有帮助的助手"),
    HumanMessage(content="什么是 RAG?")
]

response = llm.invoke(messages)
print(response.content)

# 动态消息追加
conversation_messages = []

def chat(user_input: str):
    conversation_messages.append(HumanMessage(content=user_input))
    response = llm.invoke(conversation_messages)
    conversation_messages.append(response)
    return response.content

Streaming 流式输出

from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler

# 配置流式输出
streaming_llm = ChatOpenAI(
    model="gpt-4",
    streaming=True,
    callbacks=[StreamingStdOutCallbackHandler()]
)

# 实时返回 token
for chunk in streaming_llm.stream("写一首关于 AI 的诗"):
    print(chunk.content, end="", flush=True)

不同流式模式

from typing import AsyncIterator

class StreamingMode:
    """流式输出模式管理"""
    
    @staticmethod
    async def update_mode(agent_executor):
        """状态更新模式:输出整个节点更新的内容"""
        async for event in agent_executor.astream_events(
            {"input": "查询问题"},
            version="v1"
        ):
            if event["event"] == "on_chain_end":
                print(f"节点完成:{event['data']}")
    
    @staticmethod
    async def message_mode(llm):
        """消息模式:一旦有 token 生成就输出"""
        async for chunk in llm.astream("你好"):
            print(chunk.content, end="", flush=True)

# 使用示例
import asyncio

async def main():
    await StreamingMode.message_mode(streaming_llm)

asyncio.run(main())

1.8 中间件:ReAct 的可定义节点

Before Model Hook

from typing import List, Callable
from langchain.schema import BaseMessage

class BeforeModelMiddleware:
    """模型执行前的中间件"""
    
    def __init__(self):
        self.hooks: List[Callable] = []
    
    def register_hook(self, hook: Callable):
        """注册钩子函数"""
        self.hooks.append(hook)
    
    def execute(self, messages: List[BaseMessage]) -> List[BaseMessage]:
        """执行所有钩子"""
        for hook in self.hooks:
            messages = hook(messages)
        return messages

# 示例:添加时间戳
def add_timestamp(messages: List[BaseMessage]) -> List[BaseMessage]:
    from datetime import datetime
    timestamp_msg = SystemMessage(
        content=f"当前时间:{datetime.now().isoformat()}"
    )
    messages.insert(0, timestamp_msg)
    return messages

# 示例:添加用户上下文
def add_user_context(messages: List[BaseMessage]) -> List[BaseMessage]:
    context_msg = SystemMessage(
        content="用户偏好:技术深度讨论,代码示例优先"
    )
    messages.insert(0, context_msg)
    return messages

# 使用中间件
middleware = BeforeModelMiddleware()
middleware.register_hook(add_timestamp)
middleware.register_hook(add_user_context)

# 在调用模型前应用
processed_messages = middleware.execute(messages)
response = llm.invoke(processed_messages)

After Model Hook

class AfterModelMiddleware:
    """模型执行后的中间件"""
    
    def __init__(self):
        self.hooks: List[Callable] = []
    
    def register_hook(self, hook: Callable):
        self.hooks.append(hook)
    
    def execute(self, response: BaseMessage) -> BaseMessage:
        for hook in self.hooks:
            response = hook(response)
        return response

# 示例:日志记录
def log_response(response: BaseMessage) -> BaseMessage:
    import logging
    logging.info(f"模型响应:{response.content[:100]}...")
    return response

# 示例:内容过滤
def filter_sensitive_content(response: BaseMessage) -> BaseMessage:
    sensitive_words = ["密码", "token", "secret"]
    content = response.content
    for word in sensitive_words:
        content = content.replace(word, "***")
    response.content = content
    return response

# 使用
after_middleware = AfterModelMiddleware()
after_middleware.register_hook(log_response)
after_middleware.register_hook(filter_sensitive_content)

Tool Calls 中间件

from langchain.schema import ToolMessage

class ToolCallMiddleware:
    """工具调用中间件"""
    
    def wrap_tool_call(
        self,
        tool_name: str,
        tool_func: Callable,
        *args,
        **kwargs
    ) -> ToolMessage:
        """包装工具调用,添加额外逻辑"""
        
        # 调用前:验证参数
        print(f"调用工具:{tool_name}")
        print(f"参数:{args}, {kwargs}")
        
        try:
            # 执行工具
            result = tool_func(*args, **kwargs)
            
            # 调用后:处理结果
            print(f"工具返回:{result}")
            
            return ToolMessage(
                content=str(result),
                tool_call_id=f"{tool_name}_{id(result)}"
            )
        except Exception as e:
            # 错误处理
            print(f"工具调用失败:{str(e)}")
            return ToolMessage(
                content=f"错误:{str(e)}",
                tool_call_id=f"{tool_name}_error"
            )

1.9 结构化输出

自动化结构化输出

from pydantic import BaseModel, Field
from typing import List

# 定义输出结构
class TechArticle(BaseModel):
    """技术文章结构"""
    title: str = Field(description="文章标题")
    summary: str = Field(description="文章摘要")
    key_points: List[str] = Field(description="关键要点列表")
    difficulty: str = Field(description="难度等级:初级/中级/高级")
    tags: List[str] = Field(description="标签列表")

# 使用结构化输出
from langchain_openai import ChatOpenAI

structured_llm = ChatOpenAI(model="gpt-4").with_structured_output(TechArticle)

# 调用并获得结构化结果
result = structured_llm.invoke("总结一篇关于 RAG 的技术文章")
print(f"标题:{result.title}")
print(f"摘要:{result.summary}")
print(f"要点:{result.key_points}")

Tool 工具调用模式

from langchain.output_parsers import PydanticToolsParser

# 定义工具输出结构
class SearchResult(BaseModel):
    """搜索结果结构"""
    query: str = Field(description="搜索查询")
    results: List[str] = Field(description="搜索结果列表")
    relevance_scores: List[float] = Field(description="相关性分数")

# 创建工具
@tool(args_schema=SearchResult)
def structured_search(query: str) -> SearchResult:
    """执行结构化搜索"""
    # 执行搜索逻辑
    results = vector_store.similarity_search_with_score(query, k=5)
    
    return SearchResult(
        query=query,
        results=[doc.page_content for doc, _ in results],
        relevance_scores=[score for _, score in results]
    )

Provider 原生模式

# 使用模型提供商的原生结构化输出
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(
    model="gpt-4",
    model_kwargs={
        "response_format": {"type": "json_object"}
    }
)

# 在提示中指定 JSON schema
prompt = """请以 JSON 格式返回结果,包含以下字段:
{
    "answer": "回答内容",
    "confidence": 0.95,
    "sources": ["来源1", "来源2"]
}

问题:什么是 RAG?
"""

response = llm.invoke(prompt)
import json
structured_response = json.loads(response.content)

第二部分:RAG 系统构建

2.1 RAG 核心概念

RAG(Retrieval-Augmented Generation)本质上是上下文管理,核心是基于 LLM 的生成能力。

RAG 的优势

  • 外部信息检索:突破模型训练数据的限制
  • 可追溯的信息来源:提供引用和证据
  • 动态知识更新:无需重新训练模型
  • 降低幻觉:基于真实文档生成答案

基础架构

RAG 架构

核心流程

System Prompt + Context + Query → LLM → Response

2.2 Two-Step RAG(基础 RAG)

架构特点

Query → Retrieve → Generate
  • 总是会执行检索
  • 流程简单直接
  • 适合确定性场景

实现代码

from langchain.chains import RetrievalQA
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import OceanBase

# 1. 初始化组件
llm = ChatOpenAI(model="gpt-4", temperature=0)
embeddings = OpenAIEmbeddings()

# 2. 创建向量存储
vector_store = OceanBase(
    embedding_function=embeddings,
    connection_args={
        "host": "localhost",
        "port": 2881,
        "user": "root@test",
        "password": "password",
        "database": "rag_db"
    },
    table_name="documents"
)

# 3. 创建检索器
retriever = vector_store.as_retriever(
    search_type="similarity",
    search_kwargs={"k": 5}
)

# 4. 构建 RAG Chain
rag_chain = RetrievalQA.from_chain_type(
    llm=llm,
    chain_type="stuff",  # 将所有文档塞入上下文
    retriever=retriever,
    return_source_documents=True,
    verbose=True
)

# 5. 使用
result = rag_chain.invoke({"query": "什么是向量数据库?"})
print(f"答案:{result['result']}")
print(f"来源:{result['source_documents']}")

自定义提示模板

from langchain.prompts import PromptTemplate

# 定义自定义提示模板
template = """使用以下上下文回答问题。如果不知道答案,就说不知道,不要编造答案。

上下文:
{context}

问题:{question}

详细回答:"""

PROMPT = PromptTemplate(
    template=template,
    input_variables=["context", "question"]
)

# 使用自定义提示
rag_chain = RetrievalQA.from_chain_type(
    llm=llm,
    retriever=retriever,
    chain_type_kwargs={"prompt": PROMPT}
)

2.3 Agentic RAG(智能 RAG)

架构特点

Query → Agent 决策 → Tool Use (0-N 轮召回) → Generate
  • Agent 决定是否需要检索
  • 可以调用多个外部工具
  • 更加灵活和智能

核心:构建 RAG 工具

from langchain.tools import tool
from langchain.agents import AgentExecutor, create_openai_tools_agent
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder

@tool
def search_knowledge_base(query: str) -> str:
    """在知识库中搜索相关信息
    
    Args:
        query: 搜索查询字符串
    
    Returns:
        相关文档内容
    """
    # 执行向量搜索
    docs = vector_store.similarity_search(query, k=3)
    
    # 格式化结果
    results = []
    for i, doc in enumerate(docs, 1):
        results.append(f"文档 {i}:\n{doc.page_content}\n")
    
    return "\n".join(results)

@tool
def get_current_time() -> str:
    """获取当前时间"""
    from datetime import datetime
    return datetime.now().strftime("%Y-%m-%d %H:%M:%S")

# 定义工具列表
tools = [search_knowledge_base, get_current_time]

创建 Agent

# 定义 Agent 提示
prompt = ChatPromptTemplate.from_messages([
    ("system", """你是一个智能助手。你可以使用以下工具:
    - search_knowledge_base: 搜索知识库
    - get_current_time: 获取当前时间
    
    根据用户问题,决定是否需要使用工具。
    如果问题可以直接回答,就不要使用工具。
    """),
    ("human", "{input}"),
    MessagesPlaceholder(variable_name="agent_scratchpad"),
])

# 创建 Agent
agent = create_openai_tools_agent(
    llm=llm,
    tools=tools,
    prompt=prompt
)

# 创建执行器
agent_executor = AgentExecutor(
    agent=agent,
    tools=tools,
    verbose=True,
    max_iterations=5,
    handle_parsing_errors=True
)

# 使用 Agent
response = agent_executor.invoke({
    "input": "搜索关于 OceanBase 的信息"
})
print(response["output"])

Agent 决策示例

# 示例 1:需要检索
query1 = "OceanBase 的向量搜索功能有哪些特点?"
# Agent 会调用 search_knowledge_base 工具

# 示例 2:不需要检索
query2 = "1 + 1 等于几?"
# Agent 直接回答,不调用工具

# 示例 3:多工具调用
query3 = "现在几点了?OceanBase 支持哪些数据类型?"
# Agent 会先调用 get_current_time,再调用 search_knowledge_base

2.4 知识库构建流程

完整的数据处理管道

Document Loader → Text Splitter → Embedding → Vector Store

1. Document Loader(文档加载)

from langchain_community.document_loaders import (
    TextLoader,
    PyPDFLoader,
    UnstructuredMarkdownLoader,
    DirectoryLoader
)

# 加载单个文件
text_loader = TextLoader("document.txt")
pdf_loader = PyPDFLoader("paper.pdf")
md_loader = UnstructuredMarkdownLoader("readme.md")

# 批量加载目录
directory_loader = DirectoryLoader(
    "docs/",
    glob="**/*.md",
    loader_cls=UnstructuredMarkdownLoader
)

documents = directory_loader.load()
print(f"加载了 {len(documents)} 个文档")

2. Text Splitter(文本分片)

from langchain.text_splitter import (
    RecursiveCharacterTextSplitter,
    MarkdownHeaderTextSplitter
)

# 递归字符分割器(推荐)
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,        # 每个块的大小
    chunk_overlap=200,      # 块之间的重叠
    length_function=len,
    separators=["\n\n", "\n", " ", ""]
)

# Markdown 特定分割器
markdown_splitter = MarkdownHeaderTextSplitter(
    headers_to_split_on=[
        ("#", "Header 1"),
        ("##", "Header 2"),
        ("###", "Header 3"),
    ]
)

# 执行分割
chunks = text_splitter.split_documents(documents)
print(f"分割成 {len(chunks)} 个块")

# 查看分割结果
for i, chunk in enumerate(chunks[:3]):
    print(f"\n块 {i+1}:")
    print(chunk.page_content[:200])
    print(f"元数据:{chunk.metadata}")

3. Embedding(向量化)

from langchain_openai import OpenAIEmbeddings
from langchain_community.embeddings import HuggingFaceEmbeddings

# 使用 OpenAI Embeddings
openai_embeddings = OpenAIEmbeddings(
    model="text-embedding-3-small",
    openai_api_key="your_api_key"
)

# 使用开源 Embeddings
hf_embeddings = HuggingFaceEmbeddings(
    model_name="sentence-transformers/all-MiniLM-L6-v2",
    model_kwargs={'device': 'cpu'},
    encode_kwargs={'normalize_embeddings': True}
)

# 测试 Embedding
text = "这是一个测试文本"
vector = openai_embeddings.embed_query(text)
print(f"向量维度:{len(vector)}")
print(f"向量前5个值:{vector[:5]}")

4. Vector Store(向量存储)

from langchain_community.vectorstores import OceanBase

# 创建 OceanBase 向量存储
vector_store = OceanBase.from_documents(
    documents=chunks,
    embedding=openai_embeddings,
    connection_args={
        "host": "localhost",
        "port": 2881,
        "user": "root@test",
        "password": "password",
        "database": "rag_db"
    },
    table_name="document_vectors",
    # 可选:指定向量维度
    vector_dimension=1536
)

print("文档已成功存储到 OceanBase")

完整的知识库构建脚本

def build_knowledge_base(
    source_dir: str,
    db_config: dict,
    table_name: str = "documents"
):
    """构建完整的知识库"""
    
    # 1. 加载文档
    print("正在加载文档...")
    loader = DirectoryLoader(
        source_dir,
        glob="**/*.md",
        loader_cls=UnstructuredMarkdownLoader
    )
    documents = loader.load()
    print(f"✓ 加载了 {len(documents)} 个文档")
    
    # 2. 分割文本
    print("正在分割文本...")
    splitter = RecursiveCharacterTextSplitter(
        chunk_size=1000,
        chunk_overlap=200
    )
    chunks = splitter.split_documents(documents)
    print(f"✓ 分割成 {len(chunks)} 个块")
    
    # 3. 创建 Embeddings
    print("正在初始化 Embedding 模型...")
    embeddings = OpenAIEmbeddings()
    print("✓ Embedding 模型就绪")
    
    # 4. 存储到向量数据库
    print("正在存储到 OceanBase...")
    vector_store = OceanBase.from_documents(
        documents=chunks,
        embedding=embeddings,
        connection_args=db_config,
        table_name=table_name
    )
    print("✓ 知识库构建完成!")
    
    return vector_store

# 使用示例
if __name__ == "__main__":
    db_config = {
        "host": "localhost",
        "port": 2881,
        "user": "root@test",
        "password": "password",
        "database": "rag_db"
    }
    
    vector_store = build_knowledge_base(
        source_dir="./docs",
        db_config=db_config,
        table_name="tech_docs"
    )

第三部分:CRAG(自纠正 RAG)

3.1 传统 RAG 的问题

检索失效场景

  • 检索文档不匹配:返回的文档与问题无关
  • 过时信息:知识库中的信息已经过期
  • 二义性问题:查询意图不明确导致检索偏差

对用户的影响

  • 😵 幻觉回答:基于错误信息生成答案
  • 😟 缺乏置信度:无法判断答案的可靠性
  • 😞 没有恢复方法:错误后无法自动纠正

3.2 CRAG 解决方案

核心机制

  1. Validation(验证):评估检索文档的相关性
  2. Fallback(回退):提供多种信息来源
  3. Agentic(智能):Agent 决定下一步行动

流程图

CRAG 流程

Query → Retrieve → Grade → [相关] → Generate
                         ↓ [不相关]
                    Fallback (Web Search / 其他 KB)

3.3 Document Grading(文档评分)

实现评分器

from pydantic import BaseModel, Field
from langchain_openai import ChatOpenAI

class GradeDocuments(BaseModel):
    """文档相关性评分结果"""
    binary_score: str = Field(
        description="文档是否相关:'yes' 或 'no'"
    )
    reasoning: str = Field(
        description="评分理由"
    )

# 创建评分 LLM
grader_llm = ChatOpenAI(
    model="gpt-4",
    temperature=0
).with_structured_output(GradeDocuments)

# 评分提示模板
grade_prompt = """你是一个文档相关性评估专家。

用户问题:{question}

检索到的文档:
{document}

请评估该文档是否与用户问题相关。
如果文档包含与问题相关的关键词或语义,返回 'yes'。
如果文档与问题完全无关,返回 'no'。
"""

def grade_document(question: str, document: str) -> GradeDocuments:
    """评估文档相关性"""
    prompt = grade_prompt.format(
        question=question,
        document=document
    )
    return grader_llm.invoke(prompt)

# 使用示例
question = "OceanBase 支持哪些索引类型?"
document = "OceanBase 是一个分布式数据库,支持 B+ 树索引和向量索引..."

grade = grade_document(question, document)
print(f"相关性:{grade.binary_score}")
print(f"理由:{grade.reasoning}")

批量评分

def grade_documents_batch(
    question: str,
    documents: List[str],
    threshold: float = 0.5
) -> List[tuple[str, GradeDocuments]]:
    """批量评估文档相关性"""
    
    results = []
    relevant_count = 0
    
    for doc in documents:
        grade = grade_document(question, doc)
        results.append((doc, grade))
        
        if grade.binary_score.lower() == "yes":
            relevant_count += 1
    
    relevance_ratio = relevant_count / len(documents)
    
    print(f"相关文档比例:{relevance_ratio:.2%}")
    
    return results, relevance_ratio

# 使用
docs = vector_store.similarity_search(question, k=5)
doc_contents = [doc.page_content for doc in docs]
results, ratio = grade_documents_batch(question, doc_contents)

3.4 Fallback 机制

多源信息获取

from langchain_community.tools.tavily_search import TavilySearchResults

class FallbackManager:
    """回退机制管理器"""
    
    def __init__(self):
        # 主知识库
        self.primary_kb = vector_store
        
        # 备用知识库
        self.secondary_kb = None  # 可以是另一个向量库
        
        # 网络搜索
        self.web_search = TavilySearchResults(
            max_results=3,
            api_key="your_tavily_api_key"
        )
    
    def retrieve_with_fallback(
        self,
        query: str,
        min_relevance: float = 0.5
    ) -> tuple[List[str], str]:
        """带回退的检索
        
        Returns:
            (documents, source) - 文档列表和来源标识
        """
        
        # 1. 尝试主知识库
        print("🔍 检索主知识库...")
        docs = self.primary_kb.similarity_search(query, k=5)
        doc_contents = [doc.page_content for doc in docs]
        
        # 2. 评估相关性
        _, relevance_ratio = grade_documents_batch(query, doc_contents)
        
        if relevance_ratio >= min_relevance:
            print("✓ 主知识库检索成功")
            return doc_contents, "primary_kb"
        
        # 3. 回退到备用知识库
        if self.secondary_kb:
            print("⚠️ 主知识库相关性不足,尝试备用知识库...")
            docs = self.secondary_kb.similarity_search(query, k=5)
            doc_contents = [doc.page_content for doc in docs]
            _, relevance_ratio = grade_documents_batch(query, doc_contents)
            
            if relevance_ratio >= min_relevance:
                print("✓ 备用知识库检索成功")
                return doc_contents, "secondary_kb"
        
        # 4. 回退到网络搜索
        print("⚠️ 知识库相关性不足,使用网络搜索...")
        web_results = self.web_search.invoke(query)
        web_contents = [result["content"] for result in web_results]
        print("✓ 网络搜索完成")
        
        return web_contents, "web_search"

# 使用示例
fallback_manager = FallbackManager()

query = "2026年最新的 AI 技术趋势"
documents, source = fallback_manager.retrieve_with_fallback(query)

print(f"\n信息来源:{source}")
print(f"获取到 {len(documents)} 个文档")

查询重写

from langchain.prompts import ChatPromptTemplate

class QueryRewriter:
    """查询重写器"""
    
    def __init__(self, llm):
        self.llm = llm
        self.prompt = ChatPromptTemplate.from_template("""
你是一个查询优化专家。请将用户的查询重写为更适合检索的形式。

原始查询:{query}

重写要求:
1. 提取关键概念
2. 消除歧义
3. 添加相关同义词
4. 保持查询意图

重写后的查询:
""")
    
    def rewrite(self, query: str) -> str:
        """重写查询"""
        response = self.llm.invoke(
            self.prompt.format(query=query)
        )
        return response.content.strip()
    
    def multi_query(self, query: str, n: int = 3) -> List[str]:
        """生成多个查询变体"""
        prompt = f"""
生成 {n} 个不同的查询变体,用于检索与以下问题相关的文档:

原始问题:{query}

请生成 {n} 个不同角度的查询,每行一个:
"""
        response = self.llm.invoke(prompt)
        queries = response.content.strip().split("\n")
        return [q.strip() for q in queries if q.strip()]

# 使用示例
rewriter = QueryRewriter(llm)

original_query = "OB 怎么用"
rewritten_query = rewriter.rewrite(original_query)
print(f"原始查询:{original_query}")
print(f"重写查询:{rewritten_query}")

# 多查询检索
multi_queries = rewriter.multi_query(original_query)
print(f"\n查询变体:")
for i, q in enumerate(multi_queries, 1):
    print(f"{i}. {q}")

3.5 使用 LangChain 构建完整 CRAG

中间件方式实现

from typing import Callable
from langchain.schema import ToolCallRequest, ToolMessage

class CRAGMiddleware:
    """CRAG 中间件实现"""
    
    def __init__(
        self,
        grader_llm,
        rag_tool_name: str = "search_knowledge_base",
        fallback_tool_name: str = "web_search"
    ):
        self.grader_llm = grader_llm
        self.rag_tool_name = rag_tool_name
        self.fallback_tool_name = fallback_tool_name
        self._pending_grading = None
    
    def wrap_tool_call(
        self,
        request: ToolCallRequest,
        handler: Callable[[ToolCallRequest], ToolMessage]
    ) -> ToolMessage:
        """包装工具调用,添加评分逻辑"""
        
        # 执行工具
        result = handler(request)
        
        # 如果是 RAG 工具,进行评分
        if request.tool_call.get("name") == self.rag_tool_name:
            query = request.tool_call.get("args", {}).get("query")
            
            # 评估文档相关性
            grade = self.grader_llm.invoke({
                "question": query,
                "document": result.content[:2000]  # 限制长度
            })
            
            is_relevant = grade.binary_score.lower() == "yes"
            
            # 存储评分结果
            self._pending_grading = {
                "last_rag_query": query,
                "documents_relevant": is_relevant,
                "grade_reasoning": grade.reasoning,
            }
            
            print(f"\n📊 文档评分:{grade.binary_score}")
            print(f"💭 评分理由:{grade.reasoning}")
            
            # 如果不相关,触发回退
            if not is_relevant:
                print("⚠️ 文档相关性不足,触发回退机制...")
                # 这里可以自动调用 fallback 工具
        
        return result
    
    def before_model(self, messages: List[BaseMessage]) -> List[BaseMessage]:
        """在模型调用前注入评分信息"""
        
        if self._pending_grading:
            grading_info = self._pending_grading
            
            if not grading_info["documents_relevant"]:
                # 注入回退提示
                fallback_msg = SystemMessage(content=f"""
注意:上次检索的文档相关性不足。
原因:{grading_info['grade_reasoning']}
建议:考虑使用网络搜索或重新表述查询。
""")
                messages.insert(0, fallback_msg)
            
            # 清除待处理的评分
            self._pending_grading = None
        
        return messages

# 使用 CRAG 中间件
crag_middleware = CRAGMiddleware(
    grader_llm=grader_llm,
    rag_tool_name="search_knowledge_base"
)

完整的 CRAG Agent

from langchain.agents import AgentExecutor, create_openai_tools_agent
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder

# 定义工具
@tool
def search_knowledge_base(query: str) -> str:
    """搜索知识库"""
    docs = vector_store.similarity_search(query, k=3)
    return "\n\n".join([doc.page_content for doc in docs])

@tool
def web_search(query: str) -> str:
    """网络搜索(回退机制)"""
    search_tool = TavilySearchResults(max_results=3)
    results = search_tool.invoke(query)
    return "\n\n".join([r["content"] for r in results])

tools = [search_knowledge_base, web_search]

# 定义 CRAG Agent 提示
crag_prompt = ChatPromptTemplate.from_messages([
    ("system", """你是一个智能 RAG 助手,具有自我纠正能力。

工作流程:
1. 首先尝试使用 search_knowledge_base 搜索知识库
2. 如果知识库信息不足或不相关,使用 web_search 进行网络搜索
3. 基于检索到的信息生成准确的答案
4. 如果无法找到相关信息,诚实地告知用户

注意:
- 优先使用知识库
- 必要时使用网络搜索作为补充
- 始终标注信息来源
"""),
    ("human", "{input}"),
    MessagesPlaceholder(variable_name="agent_scratchpad"),
])

# 创建 CRAG Agent
crag_agent = create_openai_tools_agent(
    llm=llm,
    tools=tools,
    prompt=crag_prompt
)

crag_executor = AgentExecutor(
    agent=crag_agent,
    tools=tools,
    verbose=True,
    max_iterations=5,
    handle_parsing_errors=True
)

# 使用 CRAG
def ask_with_crag(question: str) -> dict:
    """使用 CRAG 回答问题"""
    print(f"\n❓ 问题:{question}\n")
    
    result = crag_executor.invoke({"input": question})
    
    print(f"\n✅ 答案:{result['output']}\n")
    
    return result

# 测试
ask_with_crag("OceanBase 的向量搜索功能有哪些特点?")
ask_with_crag("2026年最新的 AI 发展趋势是什么?")  # 会触发网络搜索

3.6 两种回滚方式对比

Node Style(节点式)

特点

  • 中间件决定
  • before_model 注入
  • 确定性(Deterministic)
  • 适用于严格的机制
class NodeStyleCRAG:
    """节点式 CRAG 实现"""
    
    def __init__(self, grader, retriever, fallback):
        self.grader = grader
        self.retriever = retriever
        self.fallback = fallback
    
    def execute(self, query: str) -> str:
        """执行 CRAG 流程"""
        
        # 1. 检索
        docs = self.retriever.invoke(query)
        
        # 2. 评分
        grade = self.grader.invoke({
            "question": query,
            "document": docs
        })
        
        # 3. 决策节点
        if grade.binary_score.lower() == "yes":
            # 相关:直接使用
            return docs
        else:
            # 不相关:回退
            print("触发回退机制")
            return self.fallback.invoke(query)

# 使用
node_crag = NodeStyleCRAG(
    grader=grader_llm,
    retriever=vector_store.as_retriever(),
    fallback=web_search_tool
)

result = node_crag.execute("查询问题")

Wrap Style(包装式)

特点

  • 模型决定
  • wrap_model_call 过滤
  • 灵活(Flexible)
  • 适用于 Agent 行为级别
class WrapStyleCRAG:
    """包装式 CRAG 实现"""
    
    def __init__(self, llm, grader, tools):
        self.llm = llm
        self.grader = grader
        self.tools = {tool.name: tool for tool in tools}
        self.last_grade = None
    
    def wrap_model_call(
        self,
        messages: List[BaseMessage]
    ) -> BaseMessage:
        """包装模型调用"""
        
        # 调用模型
        response = self.llm.invoke(messages)
        
        # 如果模型决定使用 RAG 工具
        if hasattr(response, 'tool_calls') and response.tool_calls:
            for tool_call in response.tool_calls:
                if tool_call['name'] == 'search_knowledge_base':
                    # 执行工具
                    tool = self.tools['search_knowledge_base']
                    result = tool.invoke(tool_call['args'])
                    
                    # 评分
                    grade = self.grader.invoke({
                        "question": tool_call['args']['query'],
                        "document": result
                    })
                    
                    self.last_grade = grade
                    
                    # 如果不相关,建议使用其他工具
                    if grade.binary_score.lower() == "no":
                        # 修改响应,建议使用 web_search
                        response.content += "\n[系统提示:知识库信息不足,建议使用网络搜索]"
        
        return response

# 使用
wrap_crag = WrapStyleCRAG(
    llm=llm,
    grader=grader_llm,
    tools=tools
)

选择建议

场景 推荐方式 原因
严格的质量控制 Node Style 确定性强,流程可控
灵活的对话系统 Wrap Style Agent 可以自主决策
简单的 RAG 应用 Node Style 实现简单,易于调试
复杂的多工具场景 Wrap Style 更好的工具协调能力

第四部分:高级检索技术

4.1 多模态检索(Hybrid Search)

三种检索模态

1. Vector Search(向量检索)

特点

  • 基于余弦相似度
  • 适合概念性查询
  • 语义理解能力强
# 纯向量检索
docs = vector_store.similarity_search(
    query="分布式数据库的优势",
    k=5
)

2. Sparse Search(稀疏检索)

特点

  • 基于关键词匹配
  • 不需要 Embedding 模型
  • 精确匹配能力强
from langchain.retrievers import BM25Retriever

# BM25 稀疏检索
bm25_retriever = BM25Retriever.from_documents(documents)
docs = bm25_retriever.get_relevant_documents(
    "OceanBase 索引"
)

3. Full-Text Search(全文检索)

特点

  • 精确字符串匹配
  • 支持复杂查询语法
  • 适合精确查找
# OceanBase 全文检索
query = """
SELECT * FROM documents 
WHERE MATCH(content) AGAINST('向量数据库' IN NATURAL LANGUAGE MODE)
"""

Hybrid Search 实现

from langchain.retrievers import EnsembleRetriever

class HybridRetriever:
    """混合检索器"""
    
    def __init__(
        self,
        vector_store,
        documents,
        vector_weight: float = 0.5,
        sparse_weight: float = 0.5
    ):
        # 向量检索器
        self.vector_retriever = vector_store.as_retriever(
            search_kwargs={"k": 5}
        )
        
        # 稀疏检索器(BM25)
        self.sparse_retriever = BM25Retriever.from_documents(
            documents
        )
        self.sparse_retriever.k = 5
        
        # 组合检索器
        self.ensemble_retriever = EnsembleRetriever(
            retrievers=[self.vector_retriever, self.sparse_retriever],
            weights=[vector_weight, sparse_weight]
        )
    
    def retrieve(self, query: str, k: int = 5) -> List[Document]:
        """执行混合检索"""
        return self.ensemble_retriever.get_relevant_documents(query)
    
    def adjust_weights(self, vector_weight: float, sparse_weight: float):
        """动态调整权重"""
        self.ensemble_retriever.weights = [vector_weight, sparse_weight]

# 使用混合检索
hybrid_retriever = HybridRetriever(
    vector_store=vector_store,
    documents=documents,
    vector_weight=0.6,  # 向量检索权重
    sparse_weight=0.4   # 稀疏检索权重
)

results = hybrid_retriever.retrieve("OceanBase 向量索引")

Agentic 权重配置

@tool
def configure_search_weights(
    query_type: str,
    vector_weight: float = 0.5,
    sparse_weight: float = 0.5
) -> str:
    """动态配置检索权重
    
    Args:
        query_type: 查询类型(conceptual/keyword/mixed)
        vector_weight: 向量检索权重
        sparse_weight: 稀疏检索权重
    
    Returns:
        配置结果
    """
    
    # 根据查询类型自动调整权重
    if query_type == "conceptual":
        # 概念性查询:偏向向量检索
        vector_weight = 0.8
        sparse_weight = 0.2
    elif query_type == "keyword":
        # 关键词查询:偏向稀疏检索
        vector_weight = 0.2
        sparse_weight = 0.8
    else:
        # 混合查询:平衡权重
        vector_weight = 0.5
        sparse_weight = 0.5
    
    hybrid_retriever.adjust_weights(vector_weight, sparse_weight)
    
    return f"已配置权重 - 向量:{vector_weight}, 稀疏:{sparse_weight}"

# Agent 可以根据查询自动调整权重
tools.append(configure_search_weights)

4.2 OceanBase 向量搜索优化

创建向量索引

-- 创建向量表
CREATE TABLE document_vectors (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    content TEXT,
    embedding VECTOR(1536),  -- OpenAI embedding 维度
    metadata JSON,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 创建向量索引(提升检索性能)
CREATE VECTOR INDEX idx_embedding ON document_vectors(embedding)
WITH (
    distance_metric = 'cosine',
    index_type = 'hnsw',
    m = 16,
    ef_construction = 200
);

-- 创建全文索引
CREATE FULLTEXT INDEX idx_content ON document_vectors(content);

高级查询

from langchain_community.vectorstores import OceanBase

class OptimizedOceanBaseRetriever:
    """优化的 OceanBase 检索器"""
    
    def __init__(self, vector_store: OceanBase):
        self.vector_store = vector_store
    
    def similarity_search_with_score(
        self,
        query: str,
        k: int = 5,
        score_threshold: float = 0.7
    ) -> List[tuple[Document, float]]:
        """带分数的相似度搜索"""
        
        results = self.vector_store.similarity_search_with_score(
            query=query,
            k=k
        )
        
        # 过滤低分结果
        filtered_results = [
            (doc, score) for doc, score in results
            if score >= score_threshold
        ]
        
        return filtered_results
    
    def mmr_search(
        self,
        query: str,
        k: int = 5,
        fetch_k: int = 20,
        lambda_mult: float = 0.5
    ) -> List[Document]:
        """最大边际相关性搜索(减少冗余)"""
        
        return self.vector_store.max_marginal_relevance_search(
            query=query,
            k=k,
            fetch_k=fetch_k,
            lambda_mult=lambda_mult  # 0=多样性, 1=相关性
        )
    
    def metadata_filter_search(
        self,
        query: str,
        filter_dict: dict,
        k: int = 5
    ) -> List[Document]:
        """基于元数据过滤的搜索"""
        
        return self.vector_store.similarity_search(
            query=query,
            k=k,
            filter=filter_dict
        )

# 使用示例
retriever = OptimizedOceanBaseRetriever(vector_store)

# 1. 带分数阈值的搜索
results = retriever.similarity_search_with_score(
    query="向量数据库",
    k=5,
    score_threshold=0.75
)

for doc, score in results:
    print(f"分数: {score:.3f} - {doc.page_content[:100]}")

# 2. MMR 搜索(减少重复)
diverse_results = retriever.mmr_search(
    query="分布式系统",
    k=5,
    lambda_mult=0.3  # 更注重多样性
)

# 3. 元数据过滤
filtered_results = retriever.metadata_filter_search(
    query="数据库索引",
    filter_dict={"category": "技术文档", "year": 2026},
    k=5
)

性能优化技巧

class PerformanceOptimizer:
    """性能优化器"""
    
    @staticmethod
    def batch_embed(texts: List[str], batch_size: int = 100):
        """批量 Embedding(提升效率)"""
        embeddings = []
        
        for i in range(0, len(texts), batch_size):
            batch = texts[i:i + batch_size]
            batch_embeddings = embedding_model.embed_documents(batch)
            embeddings.extend(batch_embeddings)
        
        return embeddings
    
    @staticmethod
    def cache_embeddings(query: str, cache: dict):
        """缓存 Embedding 结果"""
        if query in cache:
            return cache[query]
        
        embedding = embedding_model.embed_query(query)
        cache[query] = embedding
        return embedding
    
    @staticmethod
    def parallel_search(queries: List[str], retriever):
        """并行检索"""
        from concurrent.futures import ThreadPoolExecutor
        
        with ThreadPoolExecutor(max_workers=5) as executor:
            results = list(executor.map(
                lambda q: retriever.get_relevant_documents(q),
                queries
            ))
        
        return results

# 使用缓存
embedding_cache = {}

def cached_search(query: str):
    embedding = PerformanceOptimizer.cache_embeddings(
        query,
        embedding_cache
    )
    # 使用缓存的 embedding 进行搜索
    return vector_store.similarity_search_by_vector(embedding, k=5)

第五部分:快捷 RAG(Zero-Config RAG)

5.1 OceanBase 内置 RAG 功能

两种构建方式

方式一:内置 Embedding

from langchain_community.vectorstores import OceanBase

# 使用 OceanBase 内置的 Embedding 功能
easy_vector_store = OceanBase.from_texts(
    texts=["文本1", "文本2", "文本3"],
    embedding=None,  # 使用内置 embedding
    connection_args=db_config,
    table_name="easy_rag",
    use_builtin_embedding=True  # 启用内置功能
)

# 直接搜索(无需手动 embedding)
results = easy_vector_store.similarity_search("查询文本")

方式二:AI 函数调用

# 使用 OceanBase 的 AI 函数
class OceanBaseAIFunction:
    """OceanBase AI 函数封装"""
    
    def __init__(self, connection):
        self.conn = connection
    
    def ai_embed(self, text: str) -> List[float]:
        """使用 AI 函数生成 Embedding"""
        query = "SELECT AI_EMBED(%s) as embedding"
        cursor = self.conn.cursor()
        cursor.execute(query, (text,))
        result = cursor.fetchone()
        return result['embedding']
    
    def ai_search(self, query: str, table: str, k: int = 5):
        """使用 AI 函数搜索"""
        sql = f"""
        SELECT content, 
               AI_DISTANCE(embedding, AI_EMBED(%s)) as distance
        FROM {table}
        ORDER BY distance
        LIMIT %s
        """
        cursor = self.conn.cursor()
        cursor.execute(sql, (query, k))
        return cursor.fetchall()
    
    def ai_generate(self, prompt: str, context: str):
        """使用 AI 函数生成回答"""
        sql = "SELECT AI_GENERATE(%s, %s) as response"
        cursor = self.conn.cursor()
        cursor.execute(sql, (prompt, context))
        result = cursor.fetchone()
        return result['response']

# 使用示例
ai_func = OceanBaseAIFunction(connection)

# 1. 生成 Embedding
embedding = ai_func.ai_embed("测试文本")

# 2. 搜索
results = ai_func.ai_search("查询问题", "documents", k=5)

# 3. 生成答案
context = "\n".join([r['content'] for r in results])
answer = ai_func.ai_generate("回答问题", context)

5.2 All-in-One RAG 实现

数据处理两阶段

Data In(数据导入)

Load → Chunk → Embed → Store
class EasyRAGDataIn:
    """简化的数据导入流程"""
    
    def __init__(self, vector_store):
        self.vector_store = vector_store
    
    def ingest_directory(self, directory: str):
        """一键导入目录"""
        from langchain_community.document_loaders import DirectoryLoader
        from langchain.text_splitter import RecursiveCharacterTextSplitter
        
        # Load
        loader = DirectoryLoader(directory, glob="**/*.md")
        documents = loader.load()
        
        # Chunk
        splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000,
            chunk_overlap=200
        )
        chunks = splitter.split_documents(documents)
        
        # Embed & Store(自动完成)
        self.vector_store.add_documents(chunks)
        
        return len(chunks)
    
    def ingest_url(self, url: str):
        """一键导入网页"""
        from langchain_community.document_loaders import WebBaseLoader
        
        loader = WebBaseLoader(url)
        documents = loader.load()
        
        self.vector_store.add_documents(documents)
        
        return len(documents)

# 使用
data_in = EasyRAGDataIn(vector_store)

# 导入本地文档
count = data_in.ingest_directory("./docs")
print(f"导入了 {count} 个文档块")

# 导入网页
data_in.ingest_url("https://example.com/article")

Data Out(数据检索)

Query → Search → Retrieve → Generate
class EasyRAGDataOut:
    """简化的数据检索流程"""
    
    def __init__(self, vector_store, llm):
        self.vector_store = vector_store
        self.llm = llm
    
    def ask(self, question: str, k: int = 3) -> dict:
        """一键问答"""
        
        # Search & Retrieve
        docs = self.vector_store.similarity_search(question, k=k)
        context = "\n\n".join([doc.page_content for doc in docs])
        
        # Generate
        prompt = f"""基于以下上下文回答问题:

上下文:
{context}

问题:{question}

回答:"""
        
        answer = self.llm.invoke(prompt).content
        
        return {
            "question": question,
            "answer": answer,
            "sources": docs
        }

# 使用
data_out = EasyRAGDataOut(vector_store, llm)

result = data_out.ask("OceanBase 有哪些特性?")
print(result["answer"])

完整的 Zero-Config RAG

class ZeroConfigRAG:
    """零配置 RAG 系统"""
    
    def __init__(self, db_config: dict):
        """只需要数据库配置即可启动"""
        
        # 自动初始化所有组件
        self.embeddings = OpenAIEmbeddings()
        self.llm = ChatOpenAI(model="gpt-4")
        
        self.vector_store = OceanBase(
            embedding_function=self.embeddings,
            connection_args=db_config,
            table_name="zero_config_rag"
        )
        
        self.data_in = EasyRAGDataIn(self.vector_store)
        self.data_out = EasyRAGDataOut(self.vector_store, self.llm)
    
    def add_documents(self, source: str):
        """添加文档(自动识别类型)"""
        import os
        
        if os.path.isdir(source):
            return self.data_in.ingest_directory(source)
        elif source.startswith("http"):
            return self.data_in.ingest_url(source)
        else:
            # 单个文件
            from langchain_community.document_loaders import TextLoader
            loader = TextLoader(source)
            docs = loader.load()
            self.vector_store.add_documents(docs)
            return len(docs)
    
    def ask(self, question: str) -> str:
        """提问"""
        result = self.data_out.ask(question)
        return result["answer"]
    
    def chat(self):
        """交互式对话"""
        print("Zero-Config RAG 已启动!输入 'quit' 退出。\n")
        
        while True:
            question = input("你: ")
            if question.lower() == 'quit':
                break
            
            answer = self.ask(question)
            print(f"AI: {answer}\n")

# 使用示例
if __name__ == "__main__":
    # 只需要数据库配置
    db_config = {
        "host": "localhost",
        "port": 2881,
        "user": "root@test",
        "password": "password",
        "database": "rag_db"
    }
    
    # 创建 RAG 系统
    rag = ZeroConfigRAG(db_config)
    
    # 添加文档
    rag.add_documents("./docs")
    rag.add_documents("https://example.com/article")
    
    # 开始对话
    rag.chat()

第六部分:实战项目

6.1 完整的生产级 RAG 系统

项目结构

rag_system/
├── config/
│   ├── __init__.py
│   ├── settings.py          # 配置管理
│   └── prompts.py           # 提示模板
├── core/
│   ├── __init__.py
│   ├── embeddings.py        # Embedding 管理
│   ├── vector_store.py      # 向量存储
│   └── llm.py               # LLM 管理
├── retrieval/
│   ├── __init__.py
│   ├── hybrid_retriever.py  # 混合检索
│   ├── grader.py            # 文档评分
│   └── reranker.py          # 重排序
├── agents/
│   ├── __init__.py
│   ├── rag_agent.py         # RAG Agent
│   └── tools.py             # 工具定义
├── middleware/
│   ├── __init__.py
│   ├── crag.py              # CRAG 中间件
│   └── logging.py           # 日志中间件
├── api/
│   ├── __init__.py
│   ├── routes.py            # API 路由
│   └── models.py            # 数据模型
├── utils/
│   ├── __init__.py
│   ├── document_loader.py   # 文档加载
│   └── text_splitter.py     # 文本分割
├── tests/
│   └── test_rag.py
├── main.py                  # 主程序
└── requirements.txt

配置管理(config/settings.py)

from pydantic_settings import BaseSettings
from typing import Optional

class Settings(BaseSettings):
    """系统配置"""
    
    # OpenAI 配置
    openai_api_key: str
    openai_model: str = "gpt-4"
    embedding_model: str = "text-embedding-3-small"
    
    # OceanBase 配置
    ob_host: str = "localhost"
    ob_port: int = 2881
    ob_user: str = "root@test"
    ob_password: str
    ob_database: str = "rag_db"
    ob_table: str = "documents"
    
    # RAG 配置
    chunk_size: int = 1000
    chunk_overlap: int = 200
    retrieval_k: int = 5
    score_threshold: float = 0.7
    
    # CRAG 配置
    enable_crag: bool = True
    min_relevance: float = 0.5
    
    # API 配置
    api_host: str = "0.0.0.0"
    api_port: int = 8000
    
    class Config:
        env_file = ".env"

settings = Settings()

核心组件(core/vector_store.py)

from langchain_community.vectorstores import OceanBase
from langchain_openai import OpenAIEmbeddings
from config.settings import settings

class VectorStoreManager:
    """向量存储管理器"""
    
    _instance = None
    
    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
            cls._instance._initialized = False
        return cls._instance
    
    def __init__(self):
        if self._initialized:
            return
        
        self.embeddings = OpenAIEmbeddings(
            model=settings.embedding_model,
            openai_api_key=settings.openai_api_key
        )
        
        self.vector_store = OceanBase(
            embedding_function=self.embeddings,
            connection_args={
                "host": settings.ob_host,
                "port": settings.ob_port,
                "user": settings.ob_user,
                "password": settings.ob_password,
                "database": settings.ob_database
            },
            table_name=settings.ob_table
        )
        
        self._initialized = True
    
    def get_vector_store(self):
        return self.vector_store
    
    def get_retriever(self, **kwargs):
        return self.vector_store.as_retriever(**kwargs)

# 单例实例
vector_store_manager = VectorStoreManager()

RAG Agent(agents/rag_agent.py)

from langchain.agents import AgentExecutor, create_openai_tools_agent
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from agents.tools import get_rag_tools
from config.settings import settings

class RAGAgent:
    """RAG Agent 封装"""
    
    def __init__(self):
        self.llm = ChatOpenAI(
            model=settings.openai_model,
            temperature=0,
            openai_api_key=settings.openai_api_key
        )
        
        self.tools = get_rag_tools()
        
        self.prompt = ChatPromptTemplate.from_messages([
            ("system", self._get_system_prompt()),
            ("human", "{input}"),
            MessagesPlaceholder(variable_name="agent_scratchpad"),
        ])
        
        self.agent = create_openai_tools_agent(
            llm=self.llm,
            tools=self.tools,
            prompt=self.prompt
        )
        
        self.executor = AgentExecutor(
            agent=self.agent,
            tools=self.tools,
            verbose=True,
            max_iterations=5,
            handle_parsing_errors=True
        )
    
    def _get_system_prompt(self) -> str:
        return """你是一个专业的 AI 助手,具有以下能力:
        
1. 知识库搜索:使用 search_knowledge_base 工具
2. 网络搜索:使用 web_search 工具(当知识库信息不足时)
3. 文档评分:自动评估检索结果的相关性

工作原则:
- 优先使用知识库
- 必要时使用网络搜索补充
- 始终提供信息来源
- 如果无法找到答案,诚实告知

请根据用户问题,选择合适的工具并生成准确的答案。
"""
    
    def invoke(self, question: str) -> dict:
        """执行查询"""
        return self.executor.invoke({"input": question})
    
    async def ainvoke(self, question: str) -> dict:
        """异步执行查询"""
        return await self.executor.ainvoke({"input": question})

FastAPI 接口(api/routes.py)

from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import List, Optional
from agents.rag_agent import RAGAgent
from utils.document_loader import DocumentLoader
from config.settings import settings

app = FastAPI(title="RAG System API", version="1.0.0")

# CORS 配置
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 初始化 Agent
rag_agent = RAGAgent()
doc_loader = DocumentLoader()

# 请求模型
class QueryRequest(BaseModel):
    question: str
    k: Optional[int] = 5

class DocumentRequest(BaseModel):
    source: str  # 文件路径或 URL
    source_type: str  # "file", "directory", "url"

class QueryResponse(BaseModel):
    question: str
    answer: str
    sources: List[dict]

# API 路由
@app.get("/")
async def root():
    return {"message": "RAG System API", "version": "1.0.0"}

@app.post("/query", response_model=QueryResponse)
async def query(request: QueryRequest):
    """查询接口"""
    try:
        result = rag_agent.invoke(request.question)
        
        return QueryResponse(
            question=request.question,
            answer=result["output"],
            sources=[]  # 可以从 result 中提取来源
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/documents/add")
async def add_documents(request: DocumentRequest):
    """添加文档接口"""
    try:
        count = doc_loader.load_and_store(
            source=request.source,
            source_type=request.source_type
        )
        
        return {
            "message": "Documents added successfully",
            "count": count
        }
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/health")
async def health_check():
    """健康检查"""
    return {"status": "healthy"}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(
        app,
        host=settings.api_host,
        port=settings.api_port
    )

文档加载工具(utils/document_loader.py)

from langchain_community.document_loaders import (
    DirectoryLoader,
    TextLoader,
    PyPDFLoader,
    WebBaseLoader
)
from langchain.text_splitter import RecursiveCharacterTextSplitter
from core.vector_store import vector_store_manager
from config.settings import settings
import os

class DocumentLoader:
    """文档加载和处理"""
    
    def __init__(self):
        self.vector_store = vector_store_manager.get_vector_store()
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=settings.chunk_size,
            chunk_overlap=settings.chunk_overlap
        )
    
    def load_and_store(self, source: str, source_type: str) -> int:
        """加载并存储文档"""
        
        # 加载文档
        if source_type == "directory":
            documents = self._load_directory(source)
        elif source_type == "file":
            documents = self._load_file(source)
        elif source_type == "url":
            documents = self._load_url(source)
        else:
            raise ValueError(f"Unsupported source type: {source_type}")
        
        # 分割文档
        chunks = self.text_splitter.split_documents(documents)
        
        # 存储到向量数据库
        self.vector_store.add_documents(chunks)
        
        return len(chunks)
    
    def _load_directory(self, directory: str):
        """加载目录"""
        loader = DirectoryLoader(
            directory,
            glob="**/*.{md,txt,pdf}",
            show_progress=True
        )
        return loader.load()
    
    def _load_file(self, file_path: str):
        """加载单个文件"""
        ext = os.path.splitext(file_path)[1].lower()
        
        if ext == ".pdf":
            loader = PyPDFLoader(file_path)
        else:
            loader = TextLoader(file_path)
        
        return loader.load()
    
    def _load_url(self, url: str):
        """加载网页"""
        loader = WebBaseLoader(url)
        return loader.load()

主程序(main.py

import argparse
from api.routes import app
from utils.document_loader import DocumentLoader
from agents.rag_agent import RAGAgent
from config.settings import settings
import uvicorn

def start_api():
    """启动 API 服务"""
    print(f"Starting RAG API on {settings.api_host}:{settings.api_port}")
    uvicorn.run(
        app,
        host=settings.api_host,
        port=settings.api_port
    )

def interactive_mode():
    """交互式模式"""
    print("RAG System - Interactive Mode")
    print("输入 'quit' 退出\n")
    
    agent = RAGAgent()
    
    while True:
        question = input("你: ")
        if question.lower() == 'quit':
            break
        
        try:
            result = agent.invoke(question)
            print(f"AI: {result['output']}\n")
        except Exception as e:
            print(f"错误: {str(e)}\n")

def load_documents(source: str, source_type: str):
    """加载文档"""
    loader = DocumentLoader()
    count = loader.load_and_store(source, source_type)
    print(f"成功加载 {count} 个文档块")

def main():
    parser = argparse.ArgumentParser(description="RAG System")
    parser.add_argument(
        "mode",
        choices=["api", "chat", "load"],
        help="运行模式"
    )
    parser.add_argument(
        "--source",
        help="文档源(用于 load 模式)"
    )
    parser.add_argument(
        "--type",
        choices=["file", "directory", "url"],
        help="源类型(用于 load 模式)"
    )
    
    args = parser.parse_args()
    
    if args.mode == "api":
        start_api()
    elif args.mode == "chat":
        interactive_mode()
    elif args.mode == "load":
        if not args.source or not args.type:
            print("错误: load 模式需要 --source 和 --type 参数")
            return
        load_documents(args.source, args.type)

if __name__ == "__main__":
    main()

使用示例

# 1. 安装依赖
pip install -r requirements.txt

# 2. 配置环境变量(.env 文件)
OPENAI_API_KEY=your_api_key
OB_PASSWORD=your_password

# 3. 加载文档
python main.py load --source ./docs --type directory

# 4. 启动 API 服务
python main.py api

# 5. 或者使用交互式模式
python main.py chat

API 调用示例

import requests

# 查询
response = requests.post(
    "http://localhost:8000/query",
    json={"question": "OceanBase 有哪些特性?"}
)
print(response.json())

# 添加文档
response = requests.post(
    "http://localhost:8000/documents/add",
    json={
        "source": "./new_docs",
        "source_type": "directory"
    }
)
print(response.json())

6.2 性能监控和优化

日志中间件(middleware/logging.py)

import time
import logging
from typing import List
from langchain.schema import BaseMessage

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class LoggingMiddleware:
    """日志和性能监控中间件"""
    
    def __init__(self):
        self.metrics = {
            "total_queries": 0,
            "total_time": 0,
            "avg_time": 0
        }
    
    def before_model(self, messages: List[BaseMessage]) -> List[BaseMessage]:
        """记录请求开始"""
        self.start_time = time.time()
        logger.info(f"开始处理查询,消息数: {len(messages)}")
        return messages
    
    def after_model(self, response: BaseMessage) -> BaseMessage:
        """记录请求结束"""
        elapsed = time.time() - self.start_time
        
        self.metrics["total_queries"] += 1
        self.metrics["total_time"] += elapsed
        self.metrics["avg_time"] = (
            self.metrics["total_time"] / self.metrics["total_queries"]
        )
        
        logger.info(f"查询完成,耗时: {elapsed:.2f}s")
        logger.info(f"平均响应时间: {self.metrics['avg_time']:.2f}s")
        
        return response
    
    def get_metrics(self) -> dict:
        """获取性能指标"""
        return self.metrics

# 使用
logging_middleware = LoggingMiddleware()

缓存优化

from functools import lru_cache
import hashlib
import json

class CacheManager:
    """缓存管理器"""
    
    def __init__(self, max_size: int = 1000):
        self.cache = {}
        self.max_size = max_size
    
    def _get_cache_key(self, query: str, k: int) -> str:
        """生成缓存键"""
        content = f"{query}_{k}"
        return hashlib.md5(content.encode()).hexdigest()
    
    def get(self, query: str, k: int):
        """获取缓存"""
        key = self._get_cache_key(query, k)
        return self.cache.get(key)
    
    def set(self, query: str, k: int, result):
        """设置缓存"""
        if len(self.cache) >= self.max_size:
            # 简单的 FIFO 策略
            self.cache.pop(next(iter(self.cache)))
        
        key = self._get_cache_key(query, k)
        self.cache[key] = result
    
    def clear(self):
        """清空缓存"""
        self.cache.clear()

# 使用缓存的检索器
class CachedRetriever:
    def __init__(self, retriever, cache_manager: CacheManager):
        self.retriever = retriever
        self.cache = cache_manager
    
    def get_relevant_documents(self, query: str, k: int = 5):
        """带缓存的检索"""
        # 尝试从缓存获取
        cached_result = self.cache.get(query, k)
        if cached_result:
            logger.info("缓存命中")
            return cached_result
        
        # 执行检索
        logger.info("缓存未命中,执行检索")
        result = self.retriever.get_relevant_documents(query)
        
        # 存入缓存
        self.cache.set(query, k, result)
        
        return result

# 使用
cache_manager = CacheManager(max_size=500)
cached_retriever = CachedRetriever(retriever, cache_manager)

批处理优化

from typing import List
import asyncio

class BatchProcessor:
    """批处理器"""
    
    def __init__(self, batch_size: int = 10):
        self.batch_size = batch_size
    
    async def process_batch(
        self,
        queries: List[str],
        processor_func
    ) -> List:
        """批量处理查询"""
        results = []
        
        for i in range(0, len(queries), self.batch_size):
            batch = queries[i:i + self.batch_size]
            
            # 并行处理批次
            batch_results = await asyncio.gather(*[
                processor_func(query) for query in batch
            ])
            
            results.extend(batch_results)
        
        return results

# 使用示例
async def process_query(query: str):
    """异步处理单个查询"""
    result = await rag_agent.ainvoke(query)
    return result

async def main():
    queries = [
        "问题1",
        "问题2",
        "问题3",
        # ... 更多查询
    ]
    
    batch_processor = BatchProcessor(batch_size=5)
    results = await batch_processor.process_batch(
        queries,
        process_query
    )
    
    return results

# 运行
# asyncio.run(main())

第七部分:最佳实践和技巧

7.1 提示词工程

系统提示词模板

SYSTEM_PROMPTS = {
    "technical": """你是一个技术专家,专注于提供准确的技术信息。
    
特点:
- 使用专业术语
- 提供代码示例
- 引用官方文档
- 解释技术原理
""",
    
    "beginner_friendly": """你是一个友好的导师,擅长用简单的语言解释复杂概念。
    
特点:
- 避免过多术语
- 使用类比和比喻
- 循序渐进讲解
- 鼓励提问
""",
    
    "concise": """你是一个简洁的助手,提供精炼的答案。
    
特点:
- 直接回答要点
- 避免冗余信息
- 使用列表和要点
- 控制回答长度
"""
}

def get_prompt_for_user(user_level: str, domain: str) -> str:
    """根据用户特征选择提示词"""
    if user_level == "expert":
        return SYSTEM_PROMPTS["technical"]
    elif user_level == "beginner":
        return SYSTEM_PROMPTS["beginner_friendly"]
    else:
        return SYSTEM_PROMPTS["concise"]

Few-Shot 示例

FEW_SHOT_EXAMPLES = """
示例 1:
问题:什么是向量数据库?
回答:向量数据库是专门用于存储和检索向量数据的数据库系统。它使用向量相似度算法(如余弦相似度)来快速找到相似的数据点,常用于 AI 应用中的语义搜索。

示例 2:
问题:如何优化 RAG 系统的检索性能?
回答:优化 RAG 检索性能的方法包括:
1. 使用混合检索(向量+关键词)
2. 实现查询缓存
3. 优化文档分块策略
4. 使用重排序模型
5. 批量处理查询

现在请回答用户的问题:
"""

# 在提示中使用
prompt_with_examples = f"{FEW_SHOT_EXAMPLES}\n问题:{user_question}"

7.2 错误处理和重试

健壮的错误处理

from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential,
    retry_if_exception_type
)
import logging

logger = logging.getLogger(__name__)

class RobustRAGAgent:
    """带错误处理的 RAG Agent"""
    
    def __init__(self, agent):
        self.agent = agent
    
    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=2, max=10),
        retry=retry_if_exception_type(Exception)
    )
    def invoke_with_retry(self, question: str) -> dict:
        """带重试的调用"""
        try:
            return self.agent.invoke(question)
        except Exception as e:
            logger.error(f"查询失败: {str(e)}")
            raise
    
    def invoke_safe(self, question: str) -> dict:
        """安全调用(不抛出异常)"""
        try:
            return self.invoke_with_retry(question)
        except Exception as e:
            logger.error(f"查询最终失败: {str(e)}")
            return {
                "output": "抱歉,系统遇到了问题,请稍后重试。",
                "error": str(e)
            }

# 使用
robust_agent = RobustRAGAgent(rag_agent)
result = robust_agent.invoke_safe("查询问题")

降级策略

class FallbackStrategy:
    """降级策略"""
    
    def __init__(self, primary_agent, fallback_agent):
        self.primary = primary_agent
        self.fallback = fallback_agent
    
    def invoke(self, question: str) -> dict:
        """尝试主 Agent,失败则降级"""
        try:
            # 尝试主 Agent
            return self.primary.invoke(question)
        except Exception as e:
            logger.warning(f"主 Agent 失败,使用降级策略: {str(e)}")
            
            try:
                # 使用降级 Agent
                return self.fallback.invoke(question)
            except Exception as e2:
                logger.error(f"降级 Agent 也失败: {str(e2)}")
                return {
                    "output": "系统暂时不可用,请稍后重试。",
                    "error": str(e2)
                }

# 使用
fallback_strategy = FallbackStrategy(
    primary_agent=advanced_rag_agent,
    fallback_agent=simple_rag_agent
)

7.3 评估和测试

RAG 系统评估

from typing import List, Dict
import numpy as np

class RAGEvaluator:
    """RAG 系统评估器"""
    
    def __init__(self, rag_system):
        self.rag_system = rag_system
    
    def evaluate_retrieval(
        self,
        test_cases: List[Dict[str, any]]
    ) -> Dict[str, float]:
        """评估检索质量"""
        
        precision_scores = []
        recall_scores = []
        
        for case in test_cases:
            query = case["query"]
            expected_docs = set(case["expected_doc_ids"])
            
            # 执行检索
            retrieved_docs = self.rag_system.retrieve(query, k=5)
            retrieved_ids = set([doc.metadata["id"] for doc in retrieved_docs])
            
            # 计算精确率和召回率
            true_positives = len(expected_docs & retrieved_ids)
            precision = true_positives / len(retrieved_ids) if retrieved_ids else 0
            recall = true_positives / len(expected_docs) if expected_docs else 0
            
            precision_scores.append(precision)
            recall_scores.append(recall)
        
        return {
            "avg_precision": np.mean(precision_scores),
            "avg_recall": np.mean(recall_scores),
            "f1_score": 2 * np.mean(precision_scores) * np.mean(recall_scores) / 
                       (np.mean(precision_scores) + np.mean(recall_scores))
        }
    
    def evaluate_generation(
        self,
        test_cases: List[Dict[str, str]]
    ) -> Dict[str, float]:
        """评估生成质量"""
        
        from rouge import Rouge
        rouge = Rouge()
        
        rouge_scores = []
        
        for case in test_cases:
            query = case["query"]
            reference = case["reference_answer"]
            
            # 生成答案
            result = self.rag_system.ask(query)
            generated = result["answer"]
            
            # 计算 ROUGE 分数
            scores = rouge.get_scores(generated, reference)[0]
            rouge_scores.append(scores["rouge-l"]["f"])
        
        return {
            "avg_rouge_l": np.mean(rouge_scores)
        }

# 使用示例
test_cases = [
    {
        "query": "什么是向量数据库?",
        "expected_doc_ids": ["doc1", "doc3", "doc5"],
        "reference_answer": "向量数据库是..."
    },
    # 更多测试用例
]

evaluator = RAGEvaluator(rag_system)
retrieval_metrics = evaluator.evaluate_retrieval(test_cases)
generation_metrics = evaluator.evaluate_generation(test_cases)

print(f"检索精确率: {retrieval_metrics['avg_precision']:.2f}")
print(f"检索召回率: {retrieval_metrics['avg_recall']:.2f}")
print(f"生成质量 (ROUGE-L): {generation_metrics['avg_rouge_l']:.2f}")

7.4 常见问题和解决方案

问题 1:检索结果不相关

原因

  • Embedding 模型不适合
  • 文档分块策略不当
  • 查询表述不清晰

解决方案

# 1. 使用更好的 Embedding 模型
from langchain_openai import OpenAIEmbeddings

embeddings = OpenAIEmbeddings(
    model="text-embedding-3-large"  # 更大的模型
)

# 2. 优化分块策略
from langchain.text_splitter import RecursiveCharacterTextSplitter

splitter = RecursiveCharacterTextSplitter(
    chunk_size=500,      # 减小块大小
    chunk_overlap=100,   # 增加重叠
    separators=["\n\n", "\n", "。", "!", "?", " ", ""]  # 中文分隔符
)

# 3. 查询重写
def rewrite_query(query: str) -> str:
    """重写查询以提高检索效果"""
    prompt = f"将以下查询重写为更适合检索的形式:{query}"
    return llm.invoke(prompt).content

问题 2:生成答案有幻觉

原因

  • 检索到的文档不相关
  • 模型过度推理
  • 缺乏约束

解决方案

# 1. 使用 CRAG 验证检索结果
# 2. 强化提示词约束

STRICT_PROMPT = """基于以下上下文回答问题。

重要规则:
1. 只使用上下文中的信息
2. 如果上下文中没有答案,明确说"我不知道"
3. 不要推测或编造信息
4. 引用具体的上下文片段

上下文:
{context}

问题:{question}

回答:"""

# 3. 添加置信度评估
class ConfidenceEvaluator:
    def evaluate(self, answer: str, context: str) -> float:
        """评估答案的置信度"""
        prompt = f"""
评估以下答案是否完全基于上下文:

上下文:{context}

答案:{answer}

返回 0-1 之间的置信度分数。
"""
        response = llm.invoke(prompt)
        return float(response.content)

问题 3:响应速度慢

原因

  • Embedding 计算慢
  • 向量检索慢
  • LLM 调用慢

解决方案

# 1. 使用缓存
cache_manager = CacheManager(max_size=1000)

# 2. 批量处理
batch_processor = BatchProcessor(batch_size=10)

# 3. 异步处理
async def async_query(question: str):
    result = await rag_agent.ainvoke(question)
    return result

# 4. 使用更快的模型
fast_llm = ChatOpenAI(
    model="gpt-3.5-turbo",  # 更快的模型
    temperature=0
)

# 5. 优化向量索引
# 在 OceanBase 中创建 HNSW 索引

问题 4:成本过高

原因

  • 频繁调用 API
  • 使用昂贵的模型
  • 没有缓存

解决方案

# 1. 使用本地 Embedding 模型
from langchain_community.embeddings import HuggingFaceEmbeddings

local_embeddings = HuggingFaceEmbeddings(
    model_name="BAAI/bge-large-zh-v1.5"  # 中文模型
)

# 2. 混合使用模型
class CostOptimizedAgent:
    def __init__(self):
        self.cheap_model = ChatOpenAI(model="gpt-3.5-turbo")
        self.expensive_model = ChatOpenAI(model="gpt-4")
    
    def invoke(self, question: str, complexity: str = "simple"):
        if complexity == "simple":
            return self.cheap_model.invoke(question)
        else:
            return self.expensive_model.invoke(question)

# 3. 实现智能缓存
# 4. 批量处理减少调用次数

7.5 部署建议

Docker 部署

# Dockerfile
FROM python:3.11-slim

WORKDIR /app

# 安装依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 复制代码
COPY . .

# 暴露端口
EXPOSE 8000

# 启动命令
CMD ["python", "main.py", "api"]
# docker-compose.yml
version: '3.8'

services:
  rag-api:
    build: .
    ports:
      - "8000:8000"
    environment:
      - OPENAI_API_KEY=${OPENAI_API_KEY}
      - OB_HOST=oceanbase
      - OB_PASSWORD=${OB_PASSWORD}
    depends_on:
      - oceanbase
  
  oceanbase:
    image: oceanbase/oceanbase-ce:latest
    ports:
      - "2881:2881"
    environment:
      - MODE=slim
    volumes:
      - ob-data:/root/ob
  
volumes:
  ob-data:

生产环境配置

# config/production.py

PRODUCTION_CONFIG = {
    # 性能优化
    "enable_cache": True,
    "cache_size": 5000,
    "batch_size": 20,
    
    # 可靠性
    "max_retries": 3,
    "timeout": 30,
    "enable_fallback": True,
    
    # 监控
    "enable_logging": True,
    "log_level": "INFO",
    "enable_metrics": True,
    
    # 安全
    "enable_rate_limit": True,
    "rate_limit": "100/minute",
    "enable_auth": True,
}