# Agent的上下文和记忆

# 上下文

上下文包括消息列表之外的任何数据,这些数据可以影响代理行为活工具执行。这可以是:

  • 运行时传入的消息,如user_id活API凭证
  • 多步推理过程中更新的内部状态
  • 来自之前交互的持久记忆活事实

LangGraph提供了三种上下文的主要方式

类型 描述 可变 生命周期
Configurable 在运行开始时传入的数据 不可 每次运行
AgentState 执行期间可更改的动态数据 每次运行或对话
长期记忆(存储) 可在对话之间共享的数据 跨对话

# Configurable(智能体在运行时的配置)

这个配置在其他组件都可以获取到,适合传参

configurable 允许在调用 invokestreambatch 时,传入一个 config 字典。这个字典里包含的参数可以被下层的各种组件读取。

在不修改代码逻辑(Chain/Graph 定义)的情况下,通过外部配置来改变组件运行时行为的一种机制。

在用户调用智能体时通过输入传进来的

配置适用于不可变数据,如用户元数据 API密钥,会话id等等。

使用示例:

agent.invoke(
    {"message": [{"role": "user", "content": "hi"}]},
    config={"configurable": {"user_id": "123"}}
)

# 多用户session隔离

在 LangGraph 或 RunnableWithMessageHistory 中使用记忆功能时,系统需要知道“当前是谁在说话”

from langgraph.checkpoint.memory import MemorySaver
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI
from dotenv import load_dotenv
import os

load_dotenv('../.env')
model = ChatOpenAI(
    base_url=os.getenv('OPENAI_BASE_URL'),
    api_key=os.getenv('OPENAI_API_KEY'),
    model=os.getenv('OPENAI_MODEL'),
    temperature=0.5
)
# 定义agent逻辑
memory = MemorySaver()
agent_executor = create_react_agent(model=model, tools=[],checkpointer=memory)
# 通过configurable区分不同用户(线程)
config_user_A = {"configurable": {"thread_id": "user_id"}}
config_user_B = {"configurable": {"thread_id": "user_456"}}

# A 说他叫小明
agent_executor.invoke({"messages": [("user", "我叫小明")]}, config=config_user_A)

# B 说他叫小红
agent_executor.invoke({"messages": [("user", "我叫小红")]}, config=config_user_B)

# 再次访问 A,它依然记得自己是小明,而不会弄混成小红
response = agent_executor.invoke({"messages": [("user", "我叫什么?")]}, config=config_user_A)
print(response["messages"][-1].content)  # 输出 “你叫小明”

# 运行时动态切换组件

configurable 允许在运行时决定使用哪个模型、哪个 Prompt 或哪个参数。这在 A/B 测试或提供“模型切换”功能时非常有用。

# 示例一:prompt版

方案A 在 Prompt 运行前,先执行一个函数从 config 中获取参数,并将其“assign”到数据流中

from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import ConfigurableField, RunnablePassthrough, RunnableConfig
from langchain_openai import ChatOpenAI
from dotenv import load_dotenv
import os

load_dotenv('../.env')
model = ChatOpenAI(
    base_url=os.getenv('OPENAI_BASE_URL'),
    api_key=os.getenv('OPENAI_API_KEY'),
    model=os.getenv('OPENAI_MODEL'),
    temperature=0.5
)
# 1.定义prompt,因为configurable里传递的参数是 lang,所以prompt里有两个参数 lang和input
prompt = ChatPromptTemplate.from_template("你是一个客服机器人,使用{lang}回答用户的问题,用户的问题是{input}")
# 2. 定义提取配置的函数(显式定义,替代 lambda)
def get_lang_from_config(input_dict: dict, config: RunnableConfig) -> str:
    """从运行时配置中获取语言设置,默认为中文"""
    return config.get("configurable", {}).get("lang", "中文")
# 3. 构建 Chain
# 使用 assign 动态地向输入字典中增加 "lang" 字段
chain = (
    RunnablePassthrough.assign(lang=get_lang_from_config) 
    | prompt 
    | model
)
config = {"configurable":{"lang": "zh"}}
response = chain.invoke({"input":"怎么退货"}, config=config)
print(response.content)

方案B 使用 configurable_alternatives 做替代方案,切换整个 Prompt(生产级推荐)

from langchain_core.runnables import ConfigurableField
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from dotenv import load_dotenv
import os

load_dotenv('../.env')
model = ChatOpenAI(
    base_url=os.getenv('OPENAI_BASE_URL'),
    api_key=os.getenv('OPENAI_API_KEY'),
    model=os.getenv('OPENAI_MODEL'),
    temperature=0.5
)
# 定义两套prompt
zh_prompt = ChatPromptTemplate.from_template("你是一个客服机器人,使用中文回答:{input}")
en_prompt = ChatPromptTemplate.from_template("You are a customer service bot, answer in English: {input}")
# 2使用 configurable_alternatives 为 Prompt 对象设置“别名”
# 把 zh_prompt 设置为默认
dynamic_prompt = zh_prompt.configurable_alternatives(
    ConfigurableField(id="user_lang"),  # 配置字段的id
    default_key='zh',  # 配置字段默认值
    en=en_prompt  # 添加替代选项 如果配置是en,就切换到en_prompt
    # 更多语言
)

chain = dynamic_prompt | model
config = {"configurable": {"user_lang": "zh"}}
print(chain.invoke({"input": "How to return goods?"}, config=config).content)
# 示例二 configurable_fields将参数变成可配置的
from langchain_openai import ChatOpenAI
from langchain_core.runnables import ConfigurableField

# # 定义一个支持动态配置的模型对象
# model = ChatOpenAI(model="gpt-3.5-turbo", temperature=0).configurable_fields(
#     model_name=ConfigurableField(
#         id="llm_model",  # 配置里使用的key
#         name="LLM Model",
#         description="选择要使用的 LLM 模型",
#     )
# )

# 修改多个参数
model = ChatOpenAI(
    model="gpt-3.5-turbo",
    temperature=0
).configurable_fields(
    model_name=ConfigurableField(id="llm_model"),
    temperature=ConfigurableField(id="llm_temp")
)

# 同时修改多个参数
config = {
    "configurable": {
        "llm_model": "gpt-4o",
        "llm_temp": 0.7
    }
}

# 运行时通过 configurable 动态切换
model.invoke("你好", config=config)

# AgentState(可变上下文)

智能体中叫agentstate 工作流中就叫state

它是所有环节共享的。Agent 做的每一件事(思考、调用工具、观察结果)都是在不断地读取和更新这个“状态”。状态在运行期间充当Agent的记忆,可以短期存储也可以长期存储。它保存可在执行期间演变的动态数据,例如从工具或LLM输出派生的的值。

是存放智能体运行过程中产生个各种message

用户可以自定义要让state记录并存放哪些信息

from langgraph.prebuilt.chat_agent_executor import AgentState

完整示例:LangChain create_agent(底层 LangGraph)+ calculator 工具 + 自定义 AgentState
重点:工具通过返回 Command(update=...) 来同时更新:

  • messages(追加 ToolMessage)
  • calc_count(+1)
  • last_result(覆盖)
  • stage(覆盖)
# state.py
import operator
from typing import Annotated

from langgraph.prebuilt.chat_agent_executor import AgentState


def last_value(left, right):
    """Reducer: 并发更新时取最后一个值"""
    return right[-1] if isinstance(right, list) else right


# 自定义agentstate
class MyState(AgentState):
    # 使用 operator.add 作为 reducer,支持并发更新时累加
    calc_count: Annotated[int, operator.add]
    # 使用 last_value reducer,并发更新时取最后一个
    stage: Annotated[str, last_value]
    last_result: Annotated[str | None, last_value]

# mytools.py
import os
import re

from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langchain_core.tools.base import InjectedToolCallId
from langchain_core.messages import ToolMessage
from langgraph.types import Command
from langgraph.prebuilt import InjectedState
from typing import Annotated

# 一个“安全一点”的计算器:只允许数字、空格、小数点、括号、+ - * / // % **
_ALLOWED = re.compile(r"^[0-9\.\s\(\)\+\-\*\/%]+$")

@tool
def calculator(
    expression: str,
    tool_call_id: Annotated[str, InjectedToolCallId],
    state: Annotated[dict, InjectedState]
) -> Command:
    """Evaluate a simple arithmetic expression. Supports + - * / // % and parentheses."""
    expr = expression.strip()

    if not _ALLOWED.match(expr):
        return Command(
            update = {
                "stage": "thinking",
                "calc_count": 1,  # 增量,会被 reducer 累加
                "last_result": "Error unsupported characters",
                "messages":[
                    ToolMessage(
                        content="Error expression contains unsupported characters.",
                        tool_call_id=tool_call_id,
                    )
                ]
            }
        )
    try:
    	# eval 执行字符串表达式 "__builtins__": {} 表示不给他任何内置函数,只能用表达式本身    {}是全局变量作用域,没有任何变量
    	# 整体而言就是一个很干净的,什么都没有的沙箱环境来执行这个字符串
        result = str(eval(expr, {"__builtins__": {}}, {}))
    except Exception as e:
        result = f"Error {e}"
    
    return Command(
        update={
            "stage": "thinking",
            "calc_count": 1,  # 增量,会被 reducer 累加
            "last_result": result,
            "messages": [ToolMessage(content=result, tool_call_id=tool_call_id)]
        }
    )

# agent.py
from langchain_openai import ChatOpenAI
from langchain.agents import create_agent
from langgraph.prebuilt import create_react_agent
from mystate import MyState
import os
from mytools import calculator
from dotenv import load_dotenv
load_dotenv('D:\Desktop\LLM-LE\.env')
model = ChatOpenAI(
    model=os.getenv('OPENAI_MODEL'),
    temperature=0,
    base_url=os.getenv('OPENAI_BASE_URL'),
    api_key=os.getenv('OPENAI_API_KEY')
)
agent = create_react_agent(
        model=model,
        tools=[calculator],
        state_schema=MyState,
        prompt=(
            "You are a helpful assistant. "
            "When math is needed, call the calculator tool with a valid arithmetic expression."
        ),
    )

inputs = {
    "messages": [("user", "依次计算:(12+8)*3,然后 100/4。每一步都用工具。")],
    "calc_count": 0,
    "stage": "thinking",
    "last_result": None,
}

print("\n--- Streaming state snapshots (values) ---\n")
for chunk in agent.stream(inputs, stream_mode="values"):
    stage = chunk.get("stage")
    calc_count = chunk.get("calc_count")
    last_result = chunk.get("last_result")

    last_msg = chunk["messages"][-1]
    msg_type = type(last_msg).__name__
    content = getattr(last_msg, "content", "")

    print(f"[stage={stage!r} calc_count={calc_count} last_result={last_result!r}]")
    print(f"  last_message: {msg_type}: {content[:120]!r}\n")

print("--- Done ---\n")
--- Streaming state snapshots (values) ---

[stage='thinking' calc_count=0 last_result=None]
  last_message: HumanMessage: '依次计算:(12+8)*3,然后 100/4。每一步都用工具。'

[stage='thinking' calc_count=0 last_result=None]
  last_message: AIMessage: ''

[stage='thinking' calc_count=2 last_result='25.0']
  last_message: ToolMessage: '25.0'

[stage='thinking' calc_count=2 last_result='25.0']
  last_message: AIMessage: '第一步计算 (12+8)*3 的结果是 60。\n第二步计算 100/4 的结果是 25.0。'

--- Done ---

# 知识点

# 1. 核心骨架:自定义 State (AgentState)

State 是 Agent 的“大脑内存”,定义了它能记住什么。

  • 定义方式:继承 AgentState
  • 更新策略 (Reducer):这是关键!决定了当新数据进来时,旧数据如何变化。
    • 默认行为(Overwrite):如果不加 Annotated,新值直接覆盖旧值(如 stagelast_result)。
    • 追加行为(Append):用于列表。Annotated[list, add_messages] 会将新消息追加到历史记录中。
    • 累加行为(Accumulate):用于计数器。Annotated[int, operator.add] 会执行 old + new,解决并发计数问题。
# 2. 核心魔法:工具即状态修改器 (Command)

这是 LangGraph 最新的“高光特性”。以前工具只负责“算数”,现在工具可以“改变世界”。

  • 传统模式:工具返回 str -> 图节点接收 -> 图节点决定如何更新 State。
  • 新模式 (Command):工具直接返回 Command(update={...})
    • 内聚性:业务逻辑(如“计算一次就计数+1”、“出错时改变阶段”)全部封装在工具内部,不需要污染外部的图定义。
    • 多字段更新:一个工具调用可以同时:
      1. 回传 ToolMessage(给 LLM 看)。
      2. 更新 calc_count(系统计数)。
      3. 更新 stage(控制流程)。
# 3. 关键参数注入 (Injected)

为了让工具能“感知上下文”并“正确回话”,我们需要在工具参数里注入特殊对象:

  • tool_call_id: Annotated[str, InjectedToolCallId]
    • 作用:必填。用来构建 ToolMessage(tool_call_id=...)
    • 为什么:LLM 可能一次发起了多个工具调用,它需要通过 ID 知道哪个结果对应哪个问题。
  • state: Annotated[dict, InjectedState]
    • 作用:选填。让工具能读取当前的全局状态。
    • 场景:比如工具需要判断 if state['calc_count'] > 10: return "超过限额"

State 定义了 Agent 的记忆结构,Reducer 定义了记忆如何更新,而 Command 让工具拥有了直接重写这份记忆的权力。

# 记忆存储

能够让agent保持上下文信息,并在多轮对话中提供一致性。否则,状态仅限于单此运行。

# 短期存储:线程级存储

短期存储使agent能够跟踪同一个会话里的多轮对话。但只保存当前会话中的信息,通常存储在messages或类似的数据结构中,便于对话上下文及时更新。

要使用必须做两件事:

  1. 在创建代理时提供checkpointer。checkpointer可以实现代理状态的持久性。
  2. 在运行代理时在配置中提供thread_id。thread_id是对话会话的唯一标识。(就是会话id conversation_id)
# 内存:开发环境
checkpointer = InMemorySaver()

# 生产环境中,使用数据库支持的checkpointer
DB_URL = "postgresql://postgres:postgres@localhost:5432/postgres?sslmode=disable"
with PostgresSaver,from_conn_string(DB_URL) as checkpointer:
# 必须安装 pip install -U "psycopg[binary, pool]" langgraph langgraph-checkpoint-postgres

# 生产环境: Redis
DB_URL = "redis://:6379"
with RedisSaver.from_conn_string(DB_URL) as checkpointer:
    
# 必须安装 pip install -U langgraph langgraph-checkpoint-redis

首次使用数据库存储时,需要调用store.setup(),checkpointer.setup()。

# 内存存储案例
import asyncio
import os

from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
from langgraph.checkpoint.memory import InMemorySaver
from mytools import web_search
from dotenv import load_dotenv
load_dotenv('D:\Desktop\LLM-LE\.env')

checkpointer = InMemorySaver()

model = ChatOpenAI(
    model=os.getenv('OPENAI_MODEL'),
    temperature=0,
    base_url=os.getenv('OPENAI_BASE_URL'),
    api_key=os.getenv('OPENAI_API_KEY')
)

agent = create_react_agent(
        model=model,
        tools=[web_search],
        prompt=(
            "You are a helpful assistant. "
        ),
        checkpointer=checkpointer
    )

config = {
    "configurable": {
        "thread_id": "11223"  # 会话id
    }
}

async def main():
    resp1 = await agent.ainvoke({"messages": [{"role": "user", "content": "今天,北京天气怎样"}]}, config)
    print(resp1['messages'][-1].content)

    resp2 = await agent.ainvoke({"messages": [{"role": "user", "content": "长沙呐"}]}, config)
    print(resp2['messages'][-1].content)

asyncio.run(main())
"""
今天北京的天气是小雪,气温大约在-11℃到-3℃之间,湿度约为71%,风速约为14公里每小时(约9英里每小时)。降水概
率为20%。总体来说,天气较冷,有小雪,出门请注意保暖。
今天长沙的天气是晴朗,气温大约在5℃到17℃之间,湿度约为74%,风速约为14公里每小时(约9英里每小时)。总体来说
,天气较为温暖,适合外出。
"""
# postgre短期存储案例
import asyncio
# 这是为了解决 Windows 下某些库(特别是 asyncpg / 网络 IO)在 ProactorEventLoop 上可能出现的兼容问题
from asyncio import WindowsSelectorEventLoopPolicy
asyncio.set_event_loop_policy(WindowsSelectorEventLoopPolicy())

import os
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
from mytools import web_search
from dotenv import load_dotenv

load_dotenv(r'D:\Desktop\LLM-LE\.env')

DB_URL = "postgresql://agent_short_memory:[email protected]:5432/agent_short_memory"

model = ChatOpenAI(
    model=os.getenv('OPENAI_MODEL'),
    temperature=0,
    base_url=os.getenv('OPENAI_BASE_URL'),
    api_key=os.getenv('OPENAI_API_KEY')
)

config = {
    "configurable": {
        "thread_id": "11223"
    }
}

async def main():
    async with AsyncPostgresSaver.from_conn_string(DB_URL) as checkpointer:
        await checkpointer.setup()
        
        agent = create_react_agent(
            model=model,
            tools=[web_search],
            prompt="You are a helpful assistant.",
            checkpointer=checkpointer
        )
        print(await agent.aget_state(config))  # 获取短期存储的上下文记忆
        # resp1 = await agent.ainvoke({"messages": [{"role": "user", "content": "今天,北京天气怎样"}]}, config)
        # print(resp1['messages'][-1].content)

        # resp2 = await agent.ainvoke({"messages": [{"role": "user", "content": "长沙呐"}]}, config)
        # print(resp2['messages'][-1].content)

if __name__ == "__main__":
    asyncio.run(main())

# 长期存储

保存跨多个会话的信息,通常存储在数据库或文件系统中,允许agent在长期交互中记住用户的历史信息和行为。

要使用必须做三件事:

  1. 配置一个存储器
  2. 短期存储的checkpointer一定要有
  3. 使用get_store函数从工具或提示中访问存储

短期存储使用saver 长期存储使用store

(ReAct agent + PostgresSaver 短期 + PostgresStore 长期 + 每 N 次用户发言触发一次抽取 + 抽取输入用最近 K 条 messages)。

它做了这几件事:

  • checkpointer:用 thread_id 保存对话 state(短期)
  • store:用 user_id 保存长期记忆(跨 thread)
  • state_modifier 注入长期记忆(每次 LLM 思考前)
  • 每 N 次用户发言 才触发一次 extractor(省钱)
  • ✅ extractor 输入用 最近 K 条 messages
import os
import json
import time
import uuid
from typing import Any, Dict, List

from dotenv import load_dotenv
from psycopg_pool import ConnectionPool

from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langchain_core.messages import SystemMessage

from langchain.agents import create_agent  # ✅ 按你的版本提示迁移到这里

from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.store.postgres import PostgresStore


# =========================
# 0) 环境变量
# =========================
load_dotenv(r"D:\Desktop\LLM-LE\.env")

MODEL_NAME = os.getenv("OPENAI_MODEL", "gpt-4o-mini")
BASE_URL = os.getenv("OPENAI_BASE_URL")
API_KEY = os.getenv("OPENAI_API_KEY")

if not API_KEY:
    raise RuntimeError("请在 .env 设置 OPENAI_API_KEY(以及可选 OPENAI_BASE_URL / OPENAI_MODEL)")


# =========================
# 1) 数据库连接
# =========================
DB_URI = "postgresql://agent_long_memory:[email protected]:5432/agent_long_memory"
pool = ConnectionPool(conninfo=DB_URI, max_size=10, kwargs={"autocommit": True})
# 短期(checkpoint)
checkpointer = PostgresSaver(pool)
checkpointer.setup()

# 长期(store)
store = PostgresStore(pool)
store.setup()


# =========================
# 2) LLM:agent + extractor
# =========================
llm = ChatOpenAI(model=MODEL_NAME, temperature=0, base_url=BASE_URL, api_key=API_KEY)
extractor_llm = ChatOpenAI(model=MODEL_NAME, temperature=0, base_url=BASE_URL, api_key=API_KEY)


# =========================
# 3) Store 兼容读取(不同版本可能是 dict 或对象)
# =========================
def as_item_dict(item: Any) -> Dict[str, Any]:
    if item is None:
        return {}
    if isinstance(item, dict):
        return item
    d = {}
    for k in ["key", "value", "namespace", "created_at", "updated_at"]:
        if hasattr(item, k):
            d[k] = getattr(item, k)
    return d


# =========================
# 4) 长期记忆:计数器(跨 thread,按 user_id)
# =========================
def get_user_turn_count(user_id: str) -> int:
    item = store.get(("counters", user_id), "user_turn_count")
    item = as_item_dict(item)
    if not item:
        return 0
    val = item.get("value") or {}
    return int(val.get("n", 0))

def set_user_turn_count(user_id: str, n: int) -> None:
    store.put(("counters", user_id), "user_turn_count", {"n": int(n)})


# =========================
# 5) 长期记忆:构造系统提示(每次调用前手动注入)
# =========================
def build_system_prompt(user_id: str, limit: int = 50) -> str:
    memories = store.search(("memories", user_id), limit=limit)

    lines = []
    for m in memories:
        m = as_item_dict(m)
        key = m.get("key")
        val = m.get("value") or {}
        typ = val.get("type", "fact")
        data = val.get("data", "")
        if data:
            lines.append(f"- [{typ}] {data} (key={key})")

    memory_str = "\n".join(lines) if lines else "暂无数据"

    return f"""你是一个智能助手。

【长期记忆 (Long-term Memory)】
关于该用户,你目前记得以下信息:
{memory_str}

回答用户时请参考这些长期记忆。
如果发现用户给出新的稳定信息(姓名、城市、长期偏好、硬性约束、过敏/禁忌等),请调用 remember_fact 保存。
"""


# =========================
# 6) 工具:写入长期记忆
# =========================
@tool
def remember_fact(user_id: str, fact_type: str, value: str) -> str:
    """
    保存长期信息到 PostgresStore。
    user_id: 用户ID(用于区分不同用户)
    fact_type: 类型 (name/city/preference/constraint/allergy/goal...)
    value: 内容(简短、稳定)
    """
    key = f"{int(time.time())}_{uuid.uuid4().hex[:8]}"
    store.put(("memories", user_id), key, {"type": fact_type, "data": value, "ts": int(time.time())})
    return f"已保存长期记忆: [{fact_type}] {value}"


# =========================
# 7) 自动抽取:最近 K 条 messages(你选 C)
# =========================
MEMORY_EXTRACT_PROMPT = """你是记忆抽取器。给定最近对话消息(可能包含系统/用户/助手/工具消息),提取适合长期保存的用户信息。

规则:
- 只保存稳定且对未来有用的信息(姓名、城市、长期偏好、硬性约束、重要目标、过敏/禁忌等)。
- 不保存一次性临时信息(例如“我今天很累”、“我现在在地铁上”)。
- 信息不确定就不要保存。
- 输出必须是 JSON 数组;没有可保存信息就输出 []。
- 每个元素必须包含:
  - type: 记忆类型 (name, city, preference, constraint, goal, allergy, etc.)
  - data: 内容(简短)
  - confidence: 0 到 1

只输出 JSON,不要输出任何额外文字。
"""

def format_last_k_messages(msgs: List[Any], k: int) -> str:
    tail = msgs[-k:] if k > 0 else msgs
    out = []
    for m in tail:
        # m 可能是 BaseMessage 或 tuple
        role = getattr(m, "type", None) or m.__class__.__name__
        content = getattr(m, "content", None)
        if content is None and isinstance(m, (tuple, list)) and len(m) == 2:
            role, content = m
        out.append(f"{role}: {content}")
    return "\n".join(out)

def extract_memories_from_messages(msgs: List[Any], k: int) -> List[Dict[str, Any]]:
    convo = format_last_k_messages(msgs, k)
    prompt = MEMORY_EXTRACT_PROMPT + "\n\n最近对话:\n" + convo
    resp = extractor_llm.invoke(prompt)
    try:
        data = json.loads(resp.content)
        return data if isinstance(data, list) else []
    except Exception:
        return []

def save_extracted_memories(user_id: str, mems: List[Dict[str, Any]]) -> int:
    saved = 0
    for m in mems:
        if not isinstance(m, dict):
            continue
        if float(m.get("confidence", 0)) < 0.6:
            continue
        typ = str(m.get("type", "fact"))[:50]
        dat = str(m.get("data", "")).strip()
        if not dat:
            continue
        key = f"{int(time.time())}_{uuid.uuid4().hex[:8]}"
        store.put(("memories", user_id), key, {
            "type": typ, "data": dat, "confidence": float(m.get("confidence", 0)), "ts": int(time.time())
        })
        saved += 1
    return saved


# =========================
# 8) 创建 agent(底层仍是 LangGraph)
# =========================
agent = create_agent(
    model=llm,
    tools=[remember_fact],
    # 关键:把 checkpointer 传进去,让 thread_id 生效(短期)
    checkpointer=checkpointer,
    # 注:不使用 state_modifier,改为“我们自己在 messages 前塞 SystemMessage”
)


# =========================
# 9) 运行:每 N 次用户发言抽取一次(C:最近K条messages)
# =========================
N = 3     # 每3次用户发言抽取一次
K = 12    # 抽取输入最近12条 messages

# 我们自己维护一个 recent_messages(跨 thread 也可以按需维护)
recent_messages: Dict[str, List[Any]] = {}  # key=user_id

def run_turn(user_id: str, thread_id: str, user_text: str):
    # 1) 组装本轮 messages:System(长期记忆) + 近期对话 + 本轮 user
    sys_prompt = build_system_prompt(user_id)
    sys_msg = SystemMessage(content=sys_prompt)

    buf = recent_messages.get(user_id, [])
    buf = (buf + [("user", user_text)])  # 先把用户话加进缓冲
    buf = buf[-K:]  # 控制缓存大小
    recent_messages[user_id] = buf

    config = {"configurable": {"thread_id": thread_id}}

    # 2) 调用 agent:注意 remember_fact 需要 user_id,所以我们把 user_id 放在用户内容里也行
    # 更稳:让模型在调用工具时显式传 user_id(工具签名要求它传)
    result = agent.invoke(
        {"messages": [sys_msg, ("user", f"[user_id={user_id}]\n{user_text}")]} ,
        config=config
    )

    # 3) 把 assistant 回复也放进 recent buffer(用于 C)
    assistant_text = result["messages"][-1].content
    buf = recent_messages.get(user_id, [])
    buf = (buf + [("assistant", assistant_text)])[-K:]
    recent_messages[user_id] = buf

    # 4) 计数 + 到点抽取(用 recent_messages[user_id])
    turn = get_user_turn_count(user_id) + 1
    set_user_turn_count(user_id, turn)

    extract_info = {"turn": turn, "extracted": False, "saved": 0}
    if turn % N == 0:
        mems = extract_memories_from_messages(recent_messages[user_id], K)
        saved = save_extracted_memories(user_id, mems)
        extract_info = {"turn": turn, "extracted": True, "candidates": len(mems), "saved": saved}

    return assistant_text, extract_info


def demo():
    user_id = "user_999"

    print("=" * 60)
    print("Round 1 (thread_A)")
    a1, e1 = run_turn(user_id, "thread_A", "你好,我叫迈克,我特别喜欢吃川菜。")
    print("Agent:", a1)
    print("Extractor:", e1)

    print("-" * 60)
    print("Round 2 (thread_A)")
    a2, e2 = run_turn(user_id, "thread_A", "我对花生过敏。")
    print("Agent:", a2)
    print("Extractor:", e2)

    print("-" * 60)
    print("Round 3 (thread_A) 触发抽取")
    a3, e3 = run_turn(user_id, "thread_A", "我住在长沙。")
    print("Agent:", a3)
    print("Extractor:", e3)

    print("=" * 60)
    print("Round 4 (thread_B) 换线程测试长期记忆")
    a4, e4 = run_turn(user_id, "thread_B", "我饿了,推荐今晚吃什么?")
    print("Agent:", a4)
    print("Extractor:", e4)
    print("=" * 60)


if __name__ == "__main__":
    demo()
"""
============================================================
Round 1 (thread_A)
Agent: 你好,迈克!很高兴认识你。你喜欢吃川菜,有没有特别喜欢的川菜菜品呢?我可以帮你推荐一些好吃的川菜或
者川菜馆。
Extractor: {'turn': 1, 'extracted': False, 'saved': 0}
------------------------------------------------------------
Round 2 (thread_A)
饮食偏好或禁忌吗?
Extractor: {'turn': 2, 'extracted': False, 'saved': 0}
------------------------------------------------------------
Round 3 (thread_A) 触发抽取
Agent: 好的,迈克,我知道你住在长沙了。如果你需要,我可以帮你推荐长沙当地的川菜馆,或者帮你找一些适合你口味且不含花生的川菜菜谱。你有什么具体需求吗?
Extractor: {'turn': 3, 'extracted': True, 'candidates': 4, 'saved': 4}
============================================================
Round 4 (thread_B) 换线程测试长期记忆
Agent: 迈克,你喜欢吃川菜,而且你对花生过敏。考虑到这些,我推荐你今晚可以尝试一些经典的川菜,比如宫保鸡丁(去掉花生)、水煮鱼或者麻婆豆腐。这些菜既符合你的口味 ,又避免了花生过敏的风险。你觉得怎么样?需要我帮你找附近的川菜馆吗?
Extractor: {'turn': 4, 'extracted': False, 'saved': 0}
============================================================
"""

# LangChain中的记忆

  • ConversationBufferMemory:在会话期间动态保存对话。

  • ConversationSummaryMemory:自动总结会话内容以减少存储大小。

  • SQLDatabaseMemory:将记忆存储在数据库中。

  • CustomMemory:允许定义自己的记忆模型,存储任何自定义数据。