# LCEL简介

全称LangChain Expression Language。用处是使用“|”运算符链接langchain的各个组件

是一种声明式的方法来链接langchian组件。LCEL从第一天起就被设计为支持将原型投入生产,无需代码更改。

# LCEL语法

  • Runnable组件
  • 组件调用、批量、流式运行
  • 组合成chain
  • 并行调用运行
  • 合并输入和输出字典
  • 后备选项
  • 重复多次执行Runnable组件
  • 条件构建chain
  • map高阶处理
  • 打印chain图形
  • 生命周期管理

# 组件

标准:。可以是函数、工具。。。但必须是Runnable接口。

chain本身就是组件 可以把一个小 Chain 当作一个积木,去搭建一个更大的 Chain。

# 一切皆 Runnable

在 LCEL 中,不要把对象看作“类”,要把它们看作“可以被调用的函数”。

# 1. Runnable 协议(The Protocol)RunnableLambda

LangChain 中的核心组件(Model, Prompt, Retriever, Tool)都实现了 Runnable 接口。这意味着它们都可以直接被“执行”。

  • 标准接口
    • invoke(input): 单次调用。
    • stream(input): 流式返回(返回迭代器)。
    • batch([inputs]): 批量并行处理。
    • ainvoke, astream, abatch: 对应的异步方法

使用 RunnableLambda 封装成runnable接口

导入方式:

from langchain_core.runnables import RunnableLambda

定义一个函数作为组件-三种调用方式

# 定义一个函数做组件
from langchain_core.runnables import RunnableLambda, RunnableParallel

# def test1(x:int):
#     return x + 1

# # 封装成runnable组件
# r1 = RunnableLambda(test1)

# # 1、调用组件 看函数情况传参
# res = r1.invoke(3)
# print(res)  # 4

# # 2、批量调用 调用多次
# res2 = r1.batch([4, 5])
# print(res2)  # [5, 6]

# # 3、流式调用
# def test2(prompt: str):
#     for item in prompt.split(' '):
#         yield item
# r2 = RunnableLambda(test2)
# # res3 = r2.stream("tihe is a cat")
# for chunk in res3:
#     print(chunk)

# 2. 管道符 | 的本质

a | b 的意思是:a 的输出,作为 b 的输入

  • 这就要求:a 的输出数据结构,必须完全符合 b 的输入数据结构要求(通常是 strdict)。
from langchain_core.runnables import RunnableLambda
# 4、组件组合成链 两个以上的组件
def test3(x: int):
    return x + 2
def test4(x:int):
    return x**2
r3 = RunnableLambda(test3)
r4 = RunnableLambda(test4)
chain = r3|r4  # r3运行的结果作为参数传到r4
print(chain.invoke(3))  # 25

# 3.链的并行运算 RunnableParallel

调用时可以加一些配置

max_concurrency:最大并发数

chain.invoke(args, **kwargs, config={'max_concurrency':2})  # 两个并发
from langchain_core.runnables import RunnableLambda, RunnableParallel
# 4、组件组合成链 两个以上的组件
def test3(x: int):
    return x + 2
def test4(x:int):
    return x**2
r3 = RunnableLambda(test3)
r4 = RunnableLambda(test4)
# 5、链并行运行  输出是一个字典 r3=r3, r4=r4里的第一个r3\r4是输出的字典里结果的key. 两个组件同时运行
chain2 = RunnableParallel(r3=r3, r4=r4)
print(chain2.invoke(3, config={'max_concurrency':2}))  # {'r3': 5, 'r4': 9}

# 4.查看链的图像描述

pip install grandalf
print(chain.get_graph().print_ascii())

# 5.透传 - 合并输入并处理中间数据 RunnablePassthrough

RunnablePassthrough: 允许传递输入数据,可以保持不变或添加额外的键。还可以过滤数据。

使用 RunnablePassthrough.assign 可以保留原始输入并添加新数据

使用 RunnablePassthrough().pick 可以过滤数据,支持传入字符串或字符串列表

RunnablePassthrough是恒等函数f(x) = xRunnablePassthrough.assign是字典的update方法,所以x必须是dict

导入方式

from langchain_core.runnables import RunnablePassthrough

保持不变

from langchain_core.runnables import RunnablePassthrough, RunnableLambda
r1 = RunnableLambda(lambda x:x)
r2 = RunnableLambda(lambda x:x + 10)
chain = r1 | RunnablePassthrough() | r2
print(chain.invoke(2))  # 12

添加数据 RunnablePassthrough.assign必须接收字典

from langchain_core.runnables import RunnablePassthrough, RunnableLambda
r3 = RunnableLambda(lambda x:{'key1': x})
r4 = RunnableLambda(lambda x:x['key1'] + 10)
chain = r3 | RunnablePassthrough.assign(new_key=r4)  # new_key是输出字典的key
print(chain.invoke(2))
'''
{'key1': 2, 'new_key': 12}
'''

扩展:并行+透传

from langchain_core.runnables import RunnablePassthrough, RunnableLambda, RunnableParallel
r1 = RunnableLambda(lambda x:{'key1': x})
r2 = RunnableLambda(lambda x:x['key1'] + 10)
chain = r1 | RunnableParallel(b1=RunnablePassthrough(),b2=RunnablePassthrough.assign(new_key= r2))
print(chain.invoke(2))

第一步:r1 (数据初始化)

  • 输入: 2 (整数)
  • 逻辑: lambda x: {'key1': x}
  • 输出: {'key1': 2} (字典)
    • 注意:这一步至关重要,因为后续的 .assign() 要求输入必须是字典。

第二步:RunnableParallel (并行分流)

上一步的输出 {'key1': 2} 同时进入 b1b2 两个分支。

  • 分支 b1: RunnablePassthrough()
    • 含义: “什么都不做,给我什么,我就吐出什么”。
    • 输入: {'key1': 2}
    • 输出: {'key1': 2} (原样保留)
  • 分支 b2: RunnablePassthrough.assign(new_key=r2)
    • 含义: “拿着原始数据,计算出 new_key,然后把它到原始数据上”。
    • 内部计算过程:
      1. 系统把输入 {'key1': 2} 传给 r2
      2. r2 执行逻辑 x['key1'] + 10 -> 2 + 10 = 12
      3. assignnew_key=12 合并回原始字典。
    • 输出: {'key1': 2, 'new_key': 12}

第三步:最终合并

RunnableParallel 将两个分支的结果合并到一个大字典中,Key 就是定义的 b1b2

过滤

r1 = RunnableLambda(lambda x:{'key1': x})
r2 = RunnableLambda(lambda x:x['key1'] + 10)
chain = r1 | RunnableParallel(b1=RunnablePassthrough(),b2=RunnablePassthrough.assign(new_key=r2))
print(chain.invoke(2))  # {'b1': {'key1': 2}, 'b2': {'key1': 2, 'new_key': 12}}
chain2 = chain.pick('b2').pick('new_key')
print(chain2.invoke(2))  # 12

场景

from langchain_core.runnables import RunnablePassthrough

# 场景:Prompt 需要 "question",但我们想直接传字符串给 chain.invoke("什么是LCEL")
# 使用 RunnablePassthrough.assign 可以保留原始输入并添加新数据
chain = (
    {"question": RunnablePassthrough()} 
    | prompt 
    | model
)
chain.invoke('什么是LCEL')