LangChain Expression Language LCEL

Every time you used the pipe operator (|) in the previous topics, you were using LangChain Expression Language — LCEL. It is the standard syntax for composing LangChain components. Understanding LCEL deeply lets you build complex pipelines that are clean, readable, and production-ready. This topic explains how LCEL works under the hood, all the ways to compose components, and advanced patterns that make real applications more powerful.

The Plumbing Analogy

Think of LCEL like water pipes in a building. Each pipe section is one component (a pump, a filter, a valve, a tap). You connect them in a sequence — water flows from left to right, each section doing its job before passing the flow forward. LCEL's pipe operator works identically: data flows from left to right through each component, transforming at each step.

Water system:
  Reservoir → [Filter] → [Pump] → [Valve] → [Tap] → Drinking water

LCEL chain:
  Input → [Template] → [Model] → [Parser] → [Transform] → Output

The Runnable Interface

Every component in LCEL implements the Runnable interface. This means every component has the same set of methods — invoke, batch, stream, ainvoke, abatch, astream. Because every component speaks the same language, any component can connect to any other component.

Components that are Runnable:
  ✓ ChatPromptTemplate
  ✓ ChatOpenAI (and all models)
  ✓ StrOutputParser (and all parsers)
  ✓ RunnableLambda (any Python function)
  ✓ RunnablePassthrough
  ✓ RunnableParallel
  ✓ RunnableBranch
  ✓ Retriever
  ✓ Tools (via .as_runnable())

All support:
  .invoke(input)           → single call, returns output
  .batch([input1, input2]) → multiple calls, returns list
  .stream(input)           → returns generator of chunks
  .ainvoke(input)          → async version of invoke
  .abatch([...])           → async version of batch
  .astream(input)          → async version of stream

How the Pipe Operator Works

When you write A | B, Python calls A.__or__(B). LangChain's Runnable class overrides the | operator to create a new RunnableSequence that calls A, takes its output, and passes it as input to B.

chain = prompt | model | parser

# This is equivalent to:
from langchain_core.runnables import RunnableSequence
chain = RunnableSequence(first=prompt, middle=[model], last=parser)

# All three are identical:
chain.invoke(input)
RunnableSequence(prompt, model, parser).invoke(input)
parser.invoke(model.invoke(prompt.invoke(input)))

The pipe syntax is just a cleaner way to write nested function calls. It reads left-to-right, matching how data flows, making the code much easier to understand than deeply nested calls.

Input and Output Types

Each component in a chain has an expected input type and an output type. When you connect components, the output of one must match the input format of the next. Understanding these types prevents the most common LCEL errors.

Component              Input Type              Output Type
──────────────────────────────────────────────────────────────
ChatPromptTemplate     dict (variable values)  list[BaseMessage]
ChatOpenAI             list[BaseMessage]        AIMessage
StrOutputParser        AIMessage                str
JsonOutputParser       AIMessage                dict
RunnableLambda(fn)     anything fn accepts      anything fn returns
RunnablePassthrough    anything                 same as input (unchanged)
Retriever              str (query)              list[Document]

RunnablePassthrough in Depth

RunnablePassthrough passes its input to its output unchanged. Its main use is inside RunnableParallel dictionaries where you want to pass the original input forward alongside processed versions.

from langchain_core.runnables import RunnablePassthrough, RunnableParallel

# Build a chain that keeps the original question AND adds context
setup = RunnableParallel({
    "question": RunnablePassthrough(),    # Pass original question unchanged
    "context": retriever | format_docs    # Retrieve and format context
})

# Then feed both into the prompt
chain = setup | prompt | model | parser

result = chain.invoke("What is the refund policy?")
# setup produces: {"question": "What is the refund policy?",
#                  "context": "Returns accepted within 30 days..."}
# prompt uses both keys to fill in the template

RunnableLambda: Functions as Chain Steps

RunnableLambda wraps any Python function as a chain component. The function receives the previous component's output and returns a value for the next component.

from langchain_core.runnables import RunnableLambda

# Convert list of documents to string
format_docs = RunnableLambda(lambda docs: "\n\n".join(d.page_content for d in docs))

# Validate input before sending to model
def validate_input(data: dict) -> dict:
    if not data.get("question"):
        raise ValueError("Question cannot be empty")
    data["question"] = data["question"].strip()
    return data

validator = RunnableLambda(validate_input)

# Chain with validation and formatting
chain = validator | setup | prompt | model | parser

itemgetter: Extracting a Value from a Dict

from operator import itemgetter

# itemgetter("key") extracts a single value from a dict
# This is a common LCEL pattern when a chain step only needs one field

chain = (
    {"context": retriever | format_docs, "question": RunnablePassthrough()}
    | prompt
    | model
    | parser
)

Async Chains with ainvoke

Web applications need to handle many requests simultaneously without blocking. Python's async/await syntax lets multiple operations run concurrently. All LCEL components support async natively.

import asyncio
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

model = ChatOpenAI(model="gpt-3.5-turbo")
prompt = ChatPromptTemplate.from_messages([
    ("human", "Write one sentence about {topic}")
])
chain = prompt | model | StrOutputParser()

async def main():
    # Single async call
    result = await chain.ainvoke({"topic": "quantum computing"})
    print(result)

    # Multiple concurrent async calls — much faster than sequential
    topics = ["AI", "blockchain", "robotics", "biotechnology"]
    tasks = [chain.ainvoke({"topic": t}) for t in topics]
    results = await asyncio.gather(*tasks)

    for topic, result in zip(topics, results):
        print(f"{topic}: {result}")

asyncio.run(main())

Running four API calls concurrently with asyncio.gather takes roughly the same time as one sequential call, because all four network requests fly out simultaneously and responses arrive together.

Streaming in Depth

The .stream() method returns a generator that yields chunks of the output as they arrive. For text generation, each chunk is a few tokens (words). For parsers like StrOutputParser, they pass chunks through unchanged.

chain = prompt | model | StrOutputParser()

# Stream and display token by token
print("Response: ", end="")
for chunk in chain.stream({"topic": "the Amazon rainforest"}):
    print(chunk, end="", flush=True)
print()  # Newline at end

Async Streaming

async def stream_response(topic: str):
    print("Response: ", end="")
    async for chunk in chain.astream({"topic": topic}):
        print(chunk, end="", flush=True)
    print()

asyncio.run(stream_response("climate change"))

Configurable Chains: Runtime Parameters

Sometimes you want to change a component's settings at runtime — use a different model for paying users, increase token limits for premium plans, or switch language based on user preference. ConfigurableField makes specific parameters adjustable at invocation time.

from langchain_core.runnables import ConfigurableField

configurable_model = ChatOpenAI(model="gpt-3.5-turbo").configurable_fields(
    model=ConfigurableField(
        id="llm_model",
        name="LLM Model",
        description="The language model to use"
    ),
    temperature=ConfigurableField(
        id="temperature",
        name="Temperature",
        description="Controls creativity (0=focused, 1=creative)"
    )
)

chain = prompt | configurable_model | parser

# Standard user
result = chain.invoke({"topic": "coffee"})

# Premium user — upgrade model at runtime, no code change needed
result = chain.with_config(configurable={"llm_model": "gpt-4o"}).invoke({"topic": "coffee"})

# Creative mode
result = chain.with_config(configurable={"temperature": 0.9}).invoke({"topic": "coffee"})

Binding Runtime Arguments

.bind() lets you attach fixed arguments to a component. Use this to set parameters that do not change per request but you want to configure once.

# Force JSON response format — applies to every call on this chain
json_model = model.bind(response_format={"type": "json_object"})

# Bind stop sequences — model stops generating when it reaches these strings
limited_model = model.bind(stop=["END", "\n\n---"])

chain = prompt | json_model | JsonOutputParser()

Composing Chains: Chains Calling Other Chains

LCEL chains are themselves Runnable objects. You compose them exactly like individual components — pipe them together, use them inside RunnableParallel, or call them inside RunnableLambda functions.

# Chain 1: Generate a title
title_chain = (
    ChatPromptTemplate.from_messages([
        ("human", "Write a catchy blog post title about: {topic}")
    ])
    | model
    | StrOutputParser()
)

# Chain 2: Write an intro based on the title
intro_chain = (
    ChatPromptTemplate.from_messages([
        ("human", "Write a 3-sentence introduction for a blog post titled: {title}")
    ])
    | model
    | StrOutputParser()
)

# Compose: pipe title output into intro input
def title_to_intro_input(title: str) -> dict:
    return {"title": title}

full_chain = (
    title_chain
    | RunnableLambda(title_to_intro_input)
    | intro_chain
)

result = full_chain.invoke({"topic": "the future of remote work"})
print(result)  # Full introduction paragraph

Error Handling: fallbacks and retries

from langchain_openai import ChatOpenAI

# Primary: expensive, capable model
primary_model = ChatOpenAI(model="gpt-4o")

# Fallback: cheaper, faster model
fallback_model = ChatOpenAI(model="gpt-3.5-turbo")

# Try primary, fall back to secondary on any failure
resilient_model = primary_model.with_fallbacks(
    [fallback_model],
    exceptions_to_handle=(Exception,)
)

# Retry up to 3 times before failing
retrying_model = primary_model.with_retry(
    stop_after_attempt=3,
    wait_exponential_jitter=True  # Add jitter to avoid thundering herd
)

chain = prompt | resilient_model | parser

Inspecting a Chain

LCEL chains expose their structure so you can inspect, debug, and document them.

chain = prompt | model | parser

# See what input the chain expects
print(chain.input_schema.schema())
# Shows: {"properties": {"topic": {"type": "string"}}, "required": ["topic"]}

# See what output the chain produces
print(chain.output_schema.schema())
# Shows: {"type": "string"}

# Print the full chain structure
chain.get_graph().print_ascii()
# Visual representation of all components and connections

Complete Advanced Chain Example

from dotenv import load_dotenv
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough, RunnableParallel, RunnableLambda

load_dotenv()

# Components
model = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)
parser = StrOutputParser()
embeddings = OpenAIEmbeddings()
vector_store = FAISS.load_local("knowledge_base", embeddings,
                                allow_dangerous_deserialization=True)
retriever = vector_store.as_retriever(search_kwargs={"k": 4})

def format_docs(docs):
    return "\n\n---\n\n".join(
        f"[Source: {d.metadata.get('source', 'unknown')}, "
        f"Page: {d.metadata.get('page', 'N/A')}]\n{d.page_content}"
        for d in docs
    )

prompt = ChatPromptTemplate.from_messages([
    ("system",
     "You are a knowledgeable assistant. Answer using only the context provided. "
     "Cite the source when possible.\n\nContext:\n{context}"),
    ("human", "{question}")
])

# Full RAG chain with citations
rag_chain = (
    RunnableParallel({
        "context": retriever | RunnableLambda(format_docs),
        "question": RunnablePassthrough()
    })
    | prompt
    | model
    | parser
)

# Run it
answer = rag_chain.invoke("What is the company's expense reimbursement process?")
print(answer)

LCEL Summary Reference

Pattern                         Code
──────────────────────────────────────────────────────────────
Sequential pipe                 A | B | C
Pass input unchanged            RunnablePassthrough()
Custom Python logic             RunnableLambda(fn)
Run multiple chains in parallel RunnableParallel({...})
Conditional routing             RunnableBranch(...)
Retry on failure                component.with_retry(...)
Fallback on failure             component.with_fallbacks([...])
Runtime configuration           component.with_config(...)
Async single call               await chain.ainvoke(input)
Async batch                     await chain.abatch([...])
Async stream                    async for chunk in chain.astream(input)
Bind fixed parameters           component.bind(param=value)

Summary

LCEL is the modern, composable backbone of every LangChain application. Every component implements the Runnable interface, giving them identical invoke, batch, and stream methods. The pipe operator creates RunnableSequences where data flows left to right. RunnableParallel runs multiple components on the same input simultaneously. RunnableLambda wraps any Python function as a chain step. Async methods (ainvoke, astream) enable high-throughput web applications. Configurable fields and binding make chains flexible without code changes.

Leave a Comment