Skip to content

LangChain 教程 08|短期记忆:会话上下文管理

📖 本篇导读:这是 LangChain 系列教程的第 8 篇。本篇将深入讲解短期记忆的实现原理、添加方法、状态扩展,以及处理长对话的常见策略。读完预计需要 8 分钟。

简单来说

短期记忆是 LangChain 中让 AI 代理能够记住之前交互内容的系统,就像人类对话中的短期记忆一样,它让代理能够在同一会话中保持上下文连贯性。短期记忆主要通过消息历史来实现,但由于模型上下文窗口的限制,我们需要使用各种策略来管理记忆,如修剪、删除或总结消息。

本节目标

  • 理解短期记忆在 LangChain 中的作用和重要性
  • 掌握如何为代理添加短期记忆
  • 学会自定义代理状态和扩展记忆功能
  • 掌握处理长对话的常见策略
  • 了解如何在工具和中间件中访问和修改短期记忆

什么是短期记忆?

问题驱动

在构建对话式 AI 应用时,我们经常遇到以下挑战:

  • 模型无法记住之前的对话内容,导致上下文丢失
  • 长对话可能超出模型的上下文窗口限制
  • 即使在上下文窗口内,长对话也会导致模型性能下降
  • 长对话会增加响应时间和成本

短期记忆正是为了解决这些问题而设计的,它让代理能够在同一会话中保持上下文连贯性,同时通过各种策略管理记忆大小。

AI 对话能力对比:无记忆 vs 有短期记忆

核心概念

短期记忆(也称为线程级持久性)让应用能够在单个线程或对话中记住之前的交互。对话历史是最常见的短期记忆形式。

LangChain 的代理将短期记忆作为代理状态的一部分进行管理,通过将这些状态存储在图的状态中,代理可以访问给定对话的完整上下文,同时保持不同线程之间的分离。

⚠️ 这里的「线程」和操作系统的线程不是一回事!

在计算机科学中,我们熟悉的是操作系统层面的概念:

  • 进程(Process):独立运行的程序实例,拥有自己的内存空间
  • 线程(Thread):进程内的执行单元,共享进程的内存,由 CPU 调度
  • 协程(Coroutine):用户态的轻量级线程,由程序自身调度(如 JS 的 async/await

而 LangChain/LangGraph 中的 Thread(线程) 是一个业务概念,含义完全不同:

  • 它指的是一轮独立的对话会话,类似聊天软件中的一个"对话窗口"
  • 每个 Thread 用一个唯一的 thread_id 来标识(如 { thread_id: "1" }
  • 同一个 thread_id 下的所有消息共享同一份上下文记忆
  • 不同 thread_id 之间的记忆是完全隔离

简单来说,LangChain 的 Thread ≈ 一个聊天会话,操作系统的 Thread ≈ 一个 CPU 执行流。两者只是恰好都叫"线程",概念上没有任何关系。

操作系统的线程                    LangChain 的线程
┌──────────────┐                ┌──────────────────┐
│   进程       │                │   Agent          │
│  ┌────────┐  │                │  ┌────────────┐  │
│  │ 线程 1 │  │                │  │ thread_id=1│  │
│  │ 线程 2 │  │   完全不同!    │  │ (用户A对话) │  │
│  │ 线程 3 │  │  ←————————→    │  ├────────────┤  │
│  └────────┘  │                │  │ thread_id=2│  │
│  共享内存    │                │  │ (用户B对话) │  │
│  CPU调度     │                │  └────────────┘  │
└──────────────┘                │  各自独立记忆    │
                                └──────────────────┘

类比教学

想象一下短期记忆就像会议中的记录本:

  • 每个对话线程就像一个独立的会议
  • 会议记录本(短期记忆)记录了会议中的所有讨论内容
  • 当会议时间过长时,记录本可能会写满(超出上下文窗口)
  • 我们需要决定哪些内容重要(保留),哪些内容可以忽略(删除)
  • 对于特别长的会议,我们可能需要做会议摘要(总结消息)

短期记忆类比:会议记录本

使用短期记忆

添加短期记忆

要为代理添加短期记忆(线程级持久性),您需要在创建代理时指定一个 checkpointer

typescript
import { createAgent } from "langchain";
import { MemorySaver } from "@langchain/langgraph";

const checkpointer = new MemorySaver();

const agent = createAgent({
    model: "claude-sonnet-4-5-20250929",
    tools: [],
    checkpointer,
});

await agent.invoke(
    { messages: [{ role: "user", content: "hi! i am Bob" }] },
    { configurable: { thread_id: "1" } }
);

💡 为什么叫 Checkpointer?

Checkpoint(检查点) 这个概念来源于计算机科学中的经典术语:

  • 游戏中:检查点就是"存档点"——角色死了可以从存档点重新开始,不用从头来
  • 数据库中:检查点是将内存中的数据刷写到磁盘的时刻,用于崩溃恢复
  • 深度学习训练中:检查点是保存模型权重的快照,训练中断后可以从上次的检查点恢复

LangGraph 借用了同样的思想——在图执行的每一步(super-step)自动保存一份状态快照,这份快照就叫 Checkpoint。而负责执行"保存/读取快照"这个动作的组件,就叫 Checkpointer(检查点管理器)。

具体来说,Checkpointer 做了这些事情:

  1. 自动存档:Agent 每走一步(调用模型、执行工具),Checkpointer 就保存一份当前状态的快照
  2. 恢复记忆:下次用同一个 thread_id 调用时,Checkpointer 自动加载上次的状态,Agent 就"记住"了之前的对话
  3. 时间旅行:你可以回到任意一个历史检查点,查看或回滚状态(类似 Git 的 commit 历史)
  4. 容错恢复:如果某个节点执行失败,可以从最后一个成功的检查点重新开始
对话过程中 Checkpointer 的工作方式:

用户消息1 ──→ [Agent处理] ──→ 💾 Checkpoint 1(保存状态)

用户消息2 ──→ [加载 CP1] ──→ [Agent处理] ──→ 💾 Checkpoint 2

用户消息3 ──→ [加载 CP2] ──→ [Agent处理] ──→ 💾 Checkpoint 3

所以 MemorySaver 是一个将检查点保存在内存中的 Checkpointer 实现,适合开发调试。生产环境需要换成持久化存储(如 PostgreSQL),这样即使服务重启,对话记忆也不会丢失。

⚠️ 每一步都存快照,会不会很浪费存储?

这是一个很自然的担忧。答案是:默认设计确实会产生存储开销,但 LangGraph 提供了多种机制来控制它

首先,单个 Checkpoint 并没有你想象的那么大:

  • 对于一个典型的聊天 Agent,每个 Checkpoint 主要存储的是消息列表(messages)和少量状态字段
  • 一轮普通对话产生的 Checkpoint 大小通常在几 KB 到几十 KB 之间
  • 并不是把整个程序内存都存下来,只存图的 state 部分

其次,存储开销和你获得的能力是对等的:

先理解一个关键点:每个 Checkpoint 都是当前状态的完整快照。也就是说,最新的那一个 Checkpoint 里已经包含了完整的对话历史(messages 数组是累积的),并不是每个 Checkpoint 只存"这一步新增的内容"。

Checkpoint 1 的 messages: [用户消息1, AI回复1]
Checkpoint 2 的 messages: [用户消息1, AI回复1, 用户消息2, AI回复2]
Checkpoint 3 的 messages: [用户消息1, AI回复1, 用户消息2, AI回复2, 用户消息3, AI回复3]
                           ↑ 每个 Checkpoint 都包含完整的对话历史!

所以,多个 Checkpoint 之间存在大量重复数据——这就是存储冗余的来源。但正因为如此,我们才可以安全地只保留最新的一个 Checkpoint,对话记忆一条都不会丢。

能力需要保留多少 Checkpoint说明
对话记忆只需最新的 1 个最新 Checkpoint 里已包含全部对话历史,不会丢失任何记忆
时间旅行 / 调试需要完整历史要回到"第 3 步时的状态",就必须保留第 3 步的 Checkpoint
人工审核(Human-in-the-loop)需要中断点的快照暂停后需要从中断处恢复
故障恢复至少最近一个成功的失败后需要从上一个成功状态重跑

最后,LangGraph 提供了多种策略来控制存储量:

  1. TTL(自动过期):为 Checkpoint 设置生存时间,过期自动删除

    typescript
    // Redis 示例:Checkpoint 60 分钟后自动过期
    const checkpointer = await RedisSaver.fromUrl("redis://localhost:6379", {
      defaultTTL: 60,       // 60 分钟后过期
      refreshOnRead: true   // 读取时重置 TTL
    });
  2. Shallow Saver(只保留最新快照):如果你不需要时间旅行,可以用浅层存储,每个线程只保留最新一个 Checkpoint,旧的自动清理

    typescript
    // 只保留最新的 Checkpoint,自动清理历史
    import { ShallowRedisSaver } from "@langchain/langgraph-checkpoint-redis/shallow";
    const checkpointer = await ShallowRedisSaver.fromUrl("redis://localhost:6379");
  3. 压缩存储:部分 Checkpointer 实现支持对状态数据进行压缩后再存储,降低存储体积

    压缩的工作原理:Checkpoint 在存储前要经过序列化(Serialization)——把内存中的 JavaScript 对象转换成可以存到数据库的字节流。LangGraph 默认使用 JsonPlusSerializer(基于 msgpack + JSON),它能处理 LangChain 原生对象、日期、枚举等复杂类型。压缩就是在序列化之后、写入存储之前,对这段字节流再做一次类似 gzip 的处理:

    内存中的 State 对象
    
    
    ① 序列化(JsonPlusSerializer)
         │  把对象变成字节流(如 JSON / msgpack 格式)
    
    ② 压缩(gzip / snappy 等)
         │  把字节流压缩,体积通常缩小 50%-80%
    
    ③ 写入存储(DynamoDB / S3 / Redis ...)
    
    读取时反向操作:存储 → 解压 → 反序列化 → State 对象

    以 AWS DynamoDB Checkpointer 为例,只需一个配置项即可开启:

    python
    # Python 示例(DynamoDB Checkpointer)
    checkpointer = DynamoDBSaver(
        table_name="my_checkpoints_table",
        region_name="us-east-1",
        ttl_seconds=86400 * 30,              # 30 天后过期
        enable_checkpoint_compression=True,   # 开启压缩
        s3_offload_config={                   # 大于 350KB 的自动转存 S3
            "bucket_name": "my-checkpoint-bucket",
        }
    )

    DynamoDB Checkpointer 还有一个智能设计——大小分流

    • 小于 350 KB 的 Checkpoint → 直接存 DynamoDB(快速读写)
    • 大于 350 KB 的 Checkpoint → 自动转存到 S3(突破 DynamoDB 单条记录大小限制),DynamoDB 里只存一个指针

    压缩 + 分流 + TTL 三管齐下,生产环境的存储成本就可以控制在合理范围内。

总结:默认行为是"宁可多存、确保安全",但在生产环境中你可以根据实际需要选择合适的策略。如果只需要对话记忆,用 Shallow Saver 就够了;如果需要完整历史,加上 TTL 自动清理过期数据即可。

生产环境使用

在生产环境中,您应该使用由数据库支持的检查点:

typescript
import { PostgresSaver } from "@langchain/langgraph-checkpoint-postgres";

const DB_URI = "postgresql://postgres:postgres@localhost:5442/postgres?sslmode=disable";
const checkpointer = PostgresSaver.fromConnString(DB_URI);

LangChain 支持多种检查点库,包括 SQLite、Postgres 和 Azure Cosmos DB 等。

自定义代理记忆

您可以通过创建带有状态模式的自定义中间件来扩展代理状态:

typescript
import { createAgent, createMiddleware } from "langchain";
import { StateSchema, MemorySaver } from "@langchain/langgraph";
import * as z from "zod";

// 定义自定义状态
const CustomState = new StateSchema({
    userId: z.string(),
    preferences: z.record(z.string(), z.any()),
});

// 创建状态扩展中间件
const stateExtensionMiddleware = createMiddleware({
    name: "StateExtension",
    stateSchema: CustomState,
});

// 创建检查点
const checkpointer = new MemorySaver();

// 创建带有自定义状态的代理
const agent = createAgent({
    model: "gpt-5",
    tools: [],
    middleware: [stateExtensionMiddleware],
    checkpointer,
});

// 在调用时传递自定义状态
const result = await agent.invoke({
    messages: [{ role: "user", content: "Hello" }],
    userId: "user_123",
    preferences: { theme: "dark" },
});

常见模式

启用短期记忆后,长对话可能会超出 LLM 的上下文窗口。以下是处理长对话的常见策略:

长对话管理:三种记忆策略

1. 修剪消息

删除最早或最近的 N 条消息(在调用 LLM 之前):

typescript
import { RemoveMessage } from "@langchain/core/messages";
import { createAgent, createMiddleware } from "langchain";
import { MemorySaver, REMOVE_ALL_MESSAGES } from "@langchain/langgraph";

// 创建消息修剪中间件
const trimMessages = createMiddleware({
  name: "TrimMessages",
  beforeModel: (state) => {
    const messages = state.messages;

    if (messages.length <= 3) {
      return; // 不需要更改
    }

    // 保留第一条消息和最近的几条消息
    const firstMsg = messages[0];
    const recentMessages = 
      messages.length % 2 === 0 ? messages.slice(-3) : messages.slice(-4);
    const newMessages = [firstMsg, ...recentMessages];

    return {
      messages: [
        new RemoveMessage({ id: REMOVE_ALL_MESSAGES }),
        ...newMessages,
      ],
    };
  },
});

// 创建代理
const checkpointer = new MemorySaver();
const agent = createAgent({
  model: "gpt-4.1",
  tools: [],
  middleware: [trimMessages],
  checkpointer,
});

2. 删除消息

从 LangGraph 状态中永久删除消息:

typescript
import { RemoveMessage } from "@langchain/core/messages";
import { createAgent, createMiddleware } from "langchain";
import { MemorySaver } from "@langchain/langgraph";

// 创建消息删除中间件
const deleteOldMessages = createMiddleware({
  name: "DeleteOldMessages",
  afterModel: (state) => {
    const messages = state.messages;
    if (messages.length > 2) {
      // 删除最早的两条消息
      return {
        messages: messages
          .slice(0, 2)
          .map((m) => new RemoveMessage({ id: m.id! })),
      };
    }
    return;
  },
});

// 创建代理
const agent = createAgent({
  model: "gpt-4.1",
  tools: [],
  systemPrompt: "Please be concise and to the point.",
  middleware: [deleteOldMessages],
  checkpointer: new MemorySaver(),
});

3. 总结消息

总结早期消息并将它们替换为摘要:

typescript
import { createAgent, summarizationMiddleware } from "langchain";
import { MemorySaver } from "@langchain/langgraph";

// 创建检查点
const checkpointer = new MemorySaver();

// 创建带有摘要中间件的代理
const agent = createAgent({
  model: "gpt-4.1",
  tools: [],
  middleware: [
    summarizationMiddleware({
      model: "gpt-4.1-mini",
      trigger: { tokens: 4000 }, // 当消息达到 4000 tokens 时触发摘要
      keep: { messages: 20 }, // 保留最近的 20 条消息
    }),
  ],
  checkpointer,
});

// 测试长对话
const config = { configurable: { thread_id: "1" } };
await agent.invoke({ messages: "hi, my name is bob" }, config);
await agent.invoke({ messages: "write a short poem about cats" }, config);
await agent.invoke({ messages: "now do the same but for dogs" }, config);
const finalResponse = await agent.invoke({ messages: "what's my name?" }, config);

console.log(finalResponse.messages.at(-1)?.content);
// Your name is Bob!

4. 自定义策略

根据业务需求创建自定义策略,例如消息过滤、重要性排序等。

访问记忆

您可以通过多种方式访问和修改代理的短期记忆(状态):

短期记忆的访问方式

在工具中访问短期记忆

使用 runtime 参数(类型为 ToolRuntime)在工具中访问短期记忆(状态):

typescript
import { createAgent, tool, type ToolRuntime } from "langchain";
import { StateSchema } from "@langchain/langgraph";
import * as z from "zod";

// 定义自定义状态
const CustomState = new StateSchema({
  userId: z.string(),
});

// 创建访问用户信息的工具
const getUserInfo = tool(
  async (_, config: ToolRuntime<typeof CustomState.State>) => {
    const userId = config.state.userId;
    return userId === "user_123" ? "John Doe" : "Unknown User";
  },
  {
    name: "get_user_info",
    description: "Get user info",
    schema: z.object({}),
  }
);

// 创建代理
const agent = createAgent({
  model: "gpt-5-nano",
  tools: [getUserInfo],
  stateSchema: CustomState,
});

// 测试工具
const result = await agent.invoke(
  {
    messages: [{ role: "user", content: "what's my name?" }],
    userId: "user_123",
  },
  {
    context: {},
  }
);

console.log(result.messages.at(-1)?.content);
// Outputs: "Your name is John Doe."

在工具中修改短期记忆

要在执行过程中修改代理的短期记忆(状态),您可以直接从工具返回状态更新:

typescript
import { tool, createAgent, ToolMessage, type ToolRuntime } from "langchain";
import { Command, StateSchema } from "@langchain/langgraph";
import * as z from "zod";

// 定义自定义状态
const CustomState = new StateSchema({
  userId: z.string().optional(),
});

// 创建更新用户信息的工具
const updateUserInfo = tool(
  async (_, config: ToolRuntime<typeof CustomState.State>) => {
    const userId = config.state.userId;
    const name = userId === "user_123" ? "John Smith" : "Unknown user";
    return new Command({
      update: {
        userName: name,
        // 更新消息历史
        messages: [
          new ToolMessage({
            content: "Successfully looked up user information",
            tool_call_id: config.toolCall?.id ?? "",
          }),
        ],
      },
    });
  },
  {
    name: "update_user_info",
    description: "Look up and update user info.",
    schema: z.object({}),
  }
);

// 创建问候工具
const greet = tool(
  async (_, config) => {
    const userName = config.context?.userName;
    return `Hello ${userName}!`;
  },
  {
    name: "greet",
    description: "Use this to greet the user once you found their info.",
    schema: z.object({}),
  }
);

// 创建代理
const agent = createAgent({
  model: "openai:gpt-5-mini",
  tools: [updateUserInfo, greet],
  stateSchema: CustomState,
});

// 测试代理
const result = await agent.invoke({
  messages: [{ role: "user", content: "greet the user" }],
  userId: "user_123",
});

console.log(result.messages.at(-1)?.content);
// Output: "Hello! I'm here to help — what would you like to do today?"

在提示中访问短期记忆

在中间件中访问短期记忆(状态),以基于对话历史或自定义状态字段创建动态提示:

typescript
import * as z from "zod";
import { createAgent, tool, dynamicSystemPromptMiddleware } from "langchain";

// 定义上下文模式
const contextSchema = z.object({
  userName: z.string(),
});
type ContextSchema = z.infer<typeof contextSchema>;

// 创建天气工具
const getWeather = tool(
  async ({ city }) => {
    return `The weather in ${city} is always sunny!`;
  },
  {
    name: "get_weather",
    description: "Get user info",
    schema: z.object({
      city: z.string(),
    }),
  }
);

// 创建代理
const agent = createAgent({
  model: "gpt-5-nano",
  tools: [getWeather],
  contextSchema,
  middleware: [
    dynamicSystemPromptMiddleware<ContextSchema>((_, config) => {
      return `You are a helpful assistant. Address the user as ${config.context?.userName}.`;
    }),
  ],
});

// 测试代理
const result = await agent.invoke(
  {
    messages: [{ role: "user", content: "What is the weather in SF?" }],
  },
  {
    context: {
      userName: "John Smith",
    },
  }
);

// 输出结果
for (const message of result.messages) {
  console.log(message);
}

在模型调用前访问记忆

使用 beforeModel 钩子在模型调用前访问和修改记忆:

typescript
import { RemoveMessage } from "@langchain/core/messages";
import { createAgent, createMiddleware, trimMessages } from "langchain";
import { MemorySaver } from "@langchain/langgraph";
import { REMOVE_ALL_MESSAGES } from "@langchain/langgraph";

// 创建消息修剪中间件
const trimMessageHistory = createMiddleware({
  name: "TrimMessages",
  beforeModel: async (state) => {
    // 修剪消息
    const trimmed = await trimMessages(state.messages, {
      maxTokens: 384,
      strategy: "last",
      startOn: "human",
      endOn: ["human", "tool"],
      tokenCounter: (msgs) => msgs.length,
    });
    
    // 返回修剪后的消息
    return {
      messages: [new RemoveMessage({ id: REMOVE_ALL_MESSAGES }), ...trimmed],
    };
  },
});

// 创建代理
const checkpointer = new MemorySaver();
const agent = createAgent({
  model: "gpt-5-nano",
  tools: [],
  middleware: [trimMessageHistory],
  checkpointer,
});

在模型调用后访问记忆

使用 afterModel 钩子在模型调用后访问和修改记忆:

typescript
import { RemoveMessage } from "@langchain/core/messages";
import { createAgent, createMiddleware } from "langchain";
import { REMOVE_ALL_MESSAGES } from "@langchain/langgraph";

// 创建响应验证中间件
const validateResponse = createMiddleware({
  name: "ValidateResponse",
  afterModel: (state) => {
    const lastMessage = state.messages.at(-1)?.content;
    if (
      typeof lastMessage === "string" &&
      lastMessage.toLowerCase().includes("confidential")
    ) {
      // 如果响应包含机密信息,删除所有消息
      return {
        messages: [
          new RemoveMessage({ id: REMOVE_ALL_MESSAGES }),
        ],
      };
    }
    return;
  },
});

// 创建代理
const agent = createAgent({
  model: "gpt-5-nano",
  tools: [],
  middleware: [validateResponse],
});

业务场景

短期记忆的典型应用场景

场景一:客户支持助手

问题:如何构建一个能够记住客户之前问题和解决方案的客户支持助手?

解决方案:使用短期记忆存储客户对话历史,并在长对话时使用摘要策略

typescript
import { createAgent, summarizationMiddleware } from "langchain";
import { MemorySaver } from "@langchain/langgraph";
import { ChatOpenAI } from "@langchain/openai";

// 创建检查点
const checkpointer = new MemorySaver();

// 创建客户支持助手
const supportAgent = createAgent({
  model: new ChatOpenAI({ model: "gpt-4.1" }),
  tools: [], // 可以添加知识库查询等工具
  systemPrompt: `你是一位专业的客户支持助手,能够帮助用户解决各种问题。
请保持友好、耐心的态度,提供详细、准确的解答。
记住用户之前提到的问题和解决方案,避免重复提问。`,
  middleware: [
    summarizationMiddleware({
      model: new ChatOpenAI({ model: "gpt-4.1-mini" }),
      trigger: { tokens: 3000 },
      keep: { messages: 15 },
    }),
  ],
  checkpointer,
});

// 测试客户支持对话
const config = { configurable: { thread_id: "support_1" } };

// 第一次交互
await supportAgent.invoke(
  { messages: [{ role: "user", content: "你好,我的账户无法登录,提示密码错误,但我确定密码是正确的。" }] },
  config
);

// 第二次交互
await supportAgent.invoke(
  { messages: [{ role: "user", content: "我已经尝试了多次,还是无法登录。我上次登录是在昨天。" }] },
  config
);

// 第三次交互
await supportAgent.invoke(
  { messages: [{ role: "user", content: "我记得我上周修改过密码,但现在忘记了新密码。" }] },
  config
);

// 第四次交互(测试记忆)
const finalResponse = await supportAgent.invoke(
  { messages: [{ role: "user", content: "你能帮我总结一下我遇到的问题以及解决方案吗?" }] },
  config
);

console.log(finalResponse.messages.at(-1)?.content);

场景二:个性化购物助手

问题:如何构建一个能够记住用户偏好和购买历史的购物助手?

解决方案:使用自定义状态存储用户偏好,并在对话中使用这些信息

typescript
import * as z from "zod";
import { createAgent, createMiddleware, tool } from "langchain";
import { StateSchema, MemorySaver } from "@langchain/langgraph";
import { ChatOpenAI } from "@langchain/openai";

// 定义用户偏好状态
const UserPreferenceState = new StateSchema({
  userId: z.string(),
  preferences: z.object({
    favoriteCategories: z.array(z.string()),
    budget: z.number().optional(),
    size: z.string().optional(),
    color: z.string().optional(),
  }),
  purchaseHistory: z.array(z.object({
    productId: z.string(),
    productName: z.string(),
    date: z.string(),
  })),
});

// 创建状态扩展中间件
const userPreferenceMiddleware = createMiddleware({
  name: "UserPreference",
  stateSchema: UserPreferenceState,
});

// 创建产品推荐工具
const recommendProducts = tool(
  ({ category }, config) => {
    const preferences = config.state.preferences;
    const history = config.state.purchaseHistory;
    
    // 基于用户偏好和购买历史推荐产品
    const recommendations = [
      `根据您对 ${category} 的兴趣,推荐以下产品:`,
      `- 产品 A: 符合您喜欢的 ${preferences.color || "颜色"}`,
      `- 产品 B: 在您的预算范围内`,
      `- 产品 C: 与您之前购买的 ${history[0]?.productName || "产品"} 类似`,
    ];
    
    return recommendations.join("\n");
  },
  {
    name: "recommend_products",
    description: "根据用户偏好推荐产品",
    schema: z.object({
      category: z.string().describe("产品类别"),
    }),
  }
);

// 创建检查点
const checkpointer = new MemorySaver();

// 创建购物助手
const shoppingAgent = createAgent({
  model: new ChatOpenAI({ model: "gpt-4.1" }),
  tools: [recommendProducts],
  systemPrompt: `你是一位个性化购物助手,能够根据用户的偏好和购买历史推荐产品。
请使用友好、专业的语气,提供符合用户需求的建议。
记住用户之前提到的偏好和购买记录。`,
  middleware: [userPreferenceMiddleware],
  checkpointer,
});

// 测试购物助手
const config = { configurable: { thread_id: "shopping_1" } };

// 初始化用户信息
const initialState = {
  messages: [{ role: "user", content: "你好,我想购买一些新衣服。" }],
  userId: "user_456",
  preferences: {
    favoriteCategories: ["T恤", "牛仔裤", "运动鞋"],
    budget: 500,
    size: "M",
    color: "蓝色",
  },
  purchaseHistory: [
    { productId: "p1", productName: "蓝色牛仔裤", date: "2024-01-15" },
    { productId: "p2", productName: "白色T恤", date: "2024-02-10" },
  ],
};

// 第一次交互
await shoppingAgent.invoke(initialState, config);

// 第二次交互(测试个性化推荐)
const response = await shoppingAgent.invoke(
  { messages: [{ role: "user", content: "你能推荐一些T恤吗?" }] },
  config
);

console.log(response.messages.at(-1)?.content);

场景三:项目管理助手

问题:如何构建一个能够记住项目进展和任务状态的项目管理助手?

解决方案:使用短期记忆跟踪项目进展,并在对话中更新任务状态

typescript
import * as z from "zod";
import { createAgent, createMiddleware, tool, Command } from "langchain";
import { StateSchema, MemorySaver } from "@langchain/langgraph";
import { ChatOpenAI } from "@langchain/openai";

// 定义任务状态
const TaskState = z.object({
  id: z.string(),
  title: z.string(),
  status: z.enum(["todo", "in_progress", "done"]),
  assignee: z.string().optional(),
  deadline: z.string().optional(),
});

// 定义项目状态
const ProjectState = new StateSchema({
  projectId: z.string(),
  projectName: z.string(),
  tasks: z.array(TaskState),
  milestones: z.array(z.object({
    id: z.string(),
    title: z.string(),
    date: z.string(),
  })),
});

// 创建状态扩展中间件
const projectMiddleware = createMiddleware({
  name: "ProjectManagement",
  stateSchema: ProjectState,
});

// 创建添加任务工具
const addTask = tool(
  ({ title, assignee, deadline }, config) => {
    const newTask = {
      id: `task_${Date.now()}`,
      title,
      status: "todo" as const,
      assignee,
      deadline,
    };
    
    // 更新任务列表
    const updatedTasks = [...config.state.tasks, newTask];
    
    return new Command({
      update: {
        tasks: updatedTasks,
      },
      content: `任务已添加:${title}`,
    });
  },
  {
    name: "add_task",
    description: "添加新任务",
    schema: z.object({
      title: z.string().describe("任务标题"),
      assignee: z.string().optional().describe("负责人"),
      deadline: z.string().optional().describe("截止日期"),
    }),
  }
);

// 创建更新任务状态工具
const updateTaskStatus = tool(
  ({ taskId, status }, config) => {
    const updatedTasks = config.state.tasks.map(task => 
      task.id === taskId ? { ...task, status } : task
    );
    
    return new Command({
      update: {
        tasks: updatedTasks,
      },
      content: `任务状态已更新:${status}`,
    });
  },
  {
    name: "update_task_status",
    description: "更新任务状态",
    schema: z.object({
      taskId: z.string().describe("任务ID"),
      status: z.enum(["todo", "in_progress", "done"]).describe("新状态"),
    }),
  }
);

// 创建获取项目概览工具
const getProjectOverview = tool(
  (_, config) => {
    const project = config.state;
    const todoTasks = project.tasks.filter(t => t.status === "todo").length;
    const inProgressTasks = project.tasks.filter(t => t.status === "in_progress").length;
    const doneTasks = project.tasks.filter(t => t.status === "done").length;
    
    return `项目概览:${project.projectName}\n\n` +
           `任务状态:\n` +
           `- 待办:${todoTasks}\n` +
           `- 进行中:${inProgressTasks}\n` +
           `- 已完成:${doneTasks}\n\n` +
           `最近里程碑:\n` +
           `${project.milestones.slice(0, 3).map(m => `- ${m.title} (${m.date})`).join("\n")}`;
  },
  {
    name: "get_project_overview",
    description: "获取项目概览",
    schema: z.object({}),
  }
);

// 创建检查点
const checkpointer = new MemorySaver();

// 创建项目管理助手
const projectAgent = createAgent({
  model: new ChatOpenAI({ model: "gpt-4.1" }),
  tools: [addTask, updateTaskStatus, getProjectOverview],
  systemPrompt: `你是一位专业的项目管理助手,能够帮助用户跟踪项目进展和任务状态。\n请保持清晰、简洁的沟通风格,提供准确的项目信息。\n记住项目的任务状态和里程碑,帮助用户有效地管理项目。`,
  middleware: [projectMiddleware],
  checkpointer,
});

// 初始化项目
const initialState = {
  messages: [{ role: "user", content: "你好,我需要帮助管理一个新的项目。" }],
  projectId: "project_1",
  projectName: "网站重构项目",
  tasks: [
    { id: "task_1", title: "需求分析", status: "done", assignee: "张三", deadline: "2024-03-01" },
    { id: "task_2", title: "设计原型", status: "done", assignee: "李四", deadline: "2024-03-15" },
    { id: "task_3", title: "前端开发", status: "in_progress", assignee: "王五", deadline: "2024-04-15" },
  ],
  milestones: [
    { id: "milestone_1", title: "需求确认", date: "2024-03-01" },
    { id: "milestone_2", title: "设计完成", date: "2024-03-15" },
    { id: "milestone_3", title: "开发完成", date: "2024-04-30" },
    { id: "milestone_4", title: "项目上线", date: "2024-05-15" },
  ],
};

// 测试项目管理助手
const config = { configurable: { thread_id: "project_1" } };

// 初始化对话
await projectAgent.invoke(initialState, config);

// 添加新任务
await projectAgent.invoke(
  { messages: [{ role: "user", content: "请添加一个新任务:后端API开发,负责人是赵六,截止日期是2024-04-20。" }] },
  config
);

// 更新任务状态
await projectAgent.invoke(
  { messages: [{ role: "user", content: "请将前端开发任务标记为已完成。" }] },
  config
);

// 获取项目概览
const overviewResponse = await projectAgent.invoke(
  { messages: [{ role: "user", content: "请给我项目的当前概览。" }] },
  config
);

console.log(overviewResponse.messages.at(-1)?.content);

技术要点

1. 记忆管理策略

  • 上下文窗口限制:了解并尊重模型的上下文窗口限制
  • 记忆修剪:使用适当的策略修剪或删除旧消息
  • 记忆总结:对长对话进行总结,保留关键信息
  • 记忆优先级:根据信息重要性调整记忆策略

2. 状态管理

  • 自定义状态:根据业务需求定义自定义状态结构
  • 状态扩展:使用中间件扩展代理状态
  • 状态持久化:在生产环境中使用数据库存储状态
  • 状态一致性:确保状态更新的一致性和可靠性

3. 性能优化

  • 记忆大小:控制记忆大小,避免超出上下文窗口
  • 总结时机:选择合适的时机触发记忆总结
  • 中间件顺序:合理安排中间件顺序,优化处理流程
  • 并行处理:在适当情况下使用并行处理提高性能

4. 安全性

  • 敏感信息:避免在记忆中存储敏感信息
  • 状态验证:验证状态更新的合法性
  • 访问控制:控制对状态的访问权限
  • 数据加密:在存储敏感状态时使用加密

5. 最佳实践

明确的系统提示:指导代理如何使用记忆

通过系统提示告诉模型它拥有对话记忆、应该如何利用上下文,避免模型"忘记"自己已经知道的信息:

typescript
const agent = createAgent({
  model: "gpt-4.1",
  tools: [],
  prompt: `你是一个客户支持助手。

记忆使用规则:
1. 用户首次提到姓名时,在后续对话中主动使用该姓名称呼用户
2. 用户提到过的偏好(如语言、格式要求)应在整个对话中保持
3. 如果用户之前已经描述过问题,不要重复询问,直接引用之前的描述
4. 对话中出现的关键信息(订单号、产品名称等)应在需要时主动引用`,
  checkpointer,
});

一致的状态结构:保持可预测性

定义 state schema 时使用明确的类型和默认值,避免出现 undefined 或结构不一致的情况:

typescript
import { z } from "zod";

const stateSchema = z.object({
  messages: z.array(z.any()).default([]),
  userName: z.string().default(""),
  language: z.enum(["zh", "en", "ja"]).default("zh"),
  issueResolved: z.boolean().default(false),
  mentionedOrderIds: z.array(z.string()).default([]),
});

const middleware = createMiddleware({
  name: "TrackUserInfo",
  stateSchema,
  beforeModel: (state) => {
    return state;
  },
});

💡 为什么要这样做? 如果状态字段有时候存在有时候不存在(比如有的 Checkpoint 有 userName,有的没有),代码里就需要到处写 if (state.userName) 这样的防御性判断,容易出 bug。给每个字段设置 .default() 能确保状态结构始终一致。

合理的触发条件:为记忆管理策略设置阈值

不要等到上下文爆炸了才处理,提前设置合理的触发阈值:

typescript
import { createMiddleware } from "langchain";
import { trimMessages } from "@langchain/core/messages";
import { summarizeMessages } from "./utils";

const memoryManagement = createMiddleware({
  name: "MemoryManagement",
  beforeModel: async (state) => {
    const messageCount = state.messages.length;

    if (messageCount > 50) {
      const summary = await summarizeMessages(state.messages.slice(0, -10));
      return {
        ...state,
        messages: [
          { role: "system", content: `之前的对话摘要:${summary}` },
          ...state.messages.slice(-10),
        ],
      };
    }

    if (messageCount > 20) {
      return {
        ...state,
        messages: trimMessages(state.messages, {
          maxTokens: 4000,
          strategy: "last",
        }),
      };
    }

    return state;
  },
});

💡 分级策略:消息少(< 20 条)→ 全部保留;消息中等(20-50 条)→ 修剪保留最近的;消息很多(> 50 条)→ 摘要 + 最近 10 条。这样既节省 token 又不会丢失重要上下文。

监控和调试:观测记忆管理的实际表现

利用 getStategetStateHistory 检查记忆状态,在开发阶段提前发现问题:

typescript
const config = { configurable: { thread_id: "debug-session-1" } };

const snapshot = await agent.getState(config);
console.log("当前消息数量:", snapshot.values.messages.length);
console.log("下一步节点:", snapshot.next);
console.log("最新消息:", snapshot.values.messages.at(-1)?.content);

for await (const state of agent.getStateHistory(config)) {
  console.log(
    `Step ${state.metadata.step}: ` +
    `${state.values.messages.length} messages, ` +
    `next: [${state.next.join(", ")}]`
  );
}

在生产环境中,可以添加指标监控来追踪记忆的健康状况:

typescript
const memoryMonitor = createMiddleware({
  name: "MemoryMonitor",
  beforeModel: (state) => {
    const msgCount = state.messages.length;
    const estimatedTokens = JSON.stringify(state.messages).length / 4;

    if (estimatedTokens > 100_000) {
      console.warn(
        `[MemoryMonitor] thread token estimate: ${estimatedTokens}, ` +
        `consider triggering summarization`
      );
    }

    console.log(
      `[MemoryMonitor] messages: ${msgCount}, ~tokens: ${estimatedTokens}`
    );
    return state;
  },
});

💡 关注什么指标?

  • 消息数量:是否接近触发修剪/摘要的阈值
  • 估算 token 数:是否接近模型上下文窗口上限
  • Checkpoint 大小:存储是否增长过快
  • 响应延迟:上下文过长时模型响应会明显变慢,这也是需要介入记忆管理的信号

总结

短期记忆是 LangChain 中构建有效对话式 AI 应用的关键组件,它让代理能够在同一会话中保持上下文连贯性,提供更加个性化、连贯的用户体验。通过合理的记忆管理策略,我们可以在模型上下文窗口限制和对话连贯性之间取得平衡。

核心优势

  • 上下文连贯性:保持对话的上下文连贯性,避免上下文丢失
  • 个性化体验:根据用户历史交互提供个性化服务
  • 效率提升:减少重复提问,提高对话效率
  • 智能决策:基于历史信息做出更智能的决策

应用前景

短期记忆的应用前景非常广阔,从客户支持、购物助手到项目管理、教育辅导,几乎所有对话式 AI 应用都可以受益于有效的短期记忆管理。随着模型能力的不断提升和上下文窗口的不断扩大,短期记忆管理策略也将不断演进,为用户提供更加自然、智能的对话体验。

通过掌握短期记忆的使用和管理技巧,开发者可以构建更加智能、高效、个性化的 AI 应用,为用户创造更大的价值。

读文档、看源码、写代码,理解 AI Agent 本质 🤖