LangChain Expression Language LCEL
Every time you used the pipe operator (|) in the previous topics, you were using LangChain Expression Language — LCEL. It is the standard syntax for composing LangChain components. Understanding LCEL deeply lets you build complex pipelines that are clean, readable, and production-ready. This topic explains how LCEL works under the hood, all the ways to compose components, and advanced patterns that make real applications more powerful.
The Plumbing Analogy
Think of LCEL like water pipes in a building. Each pipe section is one component (a pump, a filter, a valve, a tap). You connect them in a sequence — water flows from left to right, each section doing its job before passing the flow forward. LCEL's pipe operator works identically: data flows from left to right through each component, transforming at each step.
Water system: Reservoir → [Filter] → [Pump] → [Valve] → [Tap] → Drinking water LCEL chain: Input → [Template] → [Model] → [Parser] → [Transform] → Output
The Runnable Interface
Every component in LCEL implements the Runnable interface. This means every component has the same set of methods — invoke, batch, stream, ainvoke, abatch, astream. Because every component speaks the same language, any component can connect to any other component.
Components that are Runnable: ✓ ChatPromptTemplate ✓ ChatOpenAI (and all models) ✓ StrOutputParser (and all parsers) ✓ RunnableLambda (any Python function) ✓ RunnablePassthrough ✓ RunnableParallel ✓ RunnableBranch ✓ Retriever ✓ Tools (via .as_runnable()) All support: .invoke(input) → single call, returns output .batch([input1, input2]) → multiple calls, returns list .stream(input) → returns generator of chunks .ainvoke(input) → async version of invoke .abatch([...]) → async version of batch .astream(input) → async version of stream
How the Pipe Operator Works
When you write A | B, Python calls A.__or__(B). LangChain's Runnable class overrides the | operator to create a new RunnableSequence that calls A, takes its output, and passes it as input to B.
chain = prompt | model | parser # This is equivalent to: from langchain_core.runnables import RunnableSequence chain = RunnableSequence(first=prompt, middle=[model], last=parser) # All three are identical: chain.invoke(input) RunnableSequence(prompt, model, parser).invoke(input) parser.invoke(model.invoke(prompt.invoke(input)))
The pipe syntax is just a cleaner way to write nested function calls. It reads left-to-right, matching how data flows, making the code much easier to understand than deeply nested calls.
Input and Output Types
Each component in a chain has an expected input type and an output type. When you connect components, the output of one must match the input format of the next. Understanding these types prevents the most common LCEL errors.
Component Input Type Output Type ────────────────────────────────────────────────────────────── ChatPromptTemplate dict (variable values) list[BaseMessage] ChatOpenAI list[BaseMessage] AIMessage StrOutputParser AIMessage str JsonOutputParser AIMessage dict RunnableLambda(fn) anything fn accepts anything fn returns RunnablePassthrough anything same as input (unchanged) Retriever str (query) list[Document]
RunnablePassthrough in Depth
RunnablePassthrough passes its input to its output unchanged. Its main use is inside RunnableParallel dictionaries where you want to pass the original input forward alongside processed versions.
from langchain_core.runnables import RunnablePassthrough, RunnableParallel
# Build a chain that keeps the original question AND adds context
setup = RunnableParallel({
"question": RunnablePassthrough(), # Pass original question unchanged
"context": retriever | format_docs # Retrieve and format context
})
# Then feed both into the prompt
chain = setup | prompt | model | parser
result = chain.invoke("What is the refund policy?")
# setup produces: {"question": "What is the refund policy?",
# "context": "Returns accepted within 30 days..."}
# prompt uses both keys to fill in the template
RunnableLambda: Functions as Chain Steps
RunnableLambda wraps any Python function as a chain component. The function receives the previous component's output and returns a value for the next component.
from langchain_core.runnables import RunnableLambda
# Convert list of documents to string
format_docs = RunnableLambda(lambda docs: "\n\n".join(d.page_content for d in docs))
# Validate input before sending to model
def validate_input(data: dict) -> dict:
if not data.get("question"):
raise ValueError("Question cannot be empty")
data["question"] = data["question"].strip()
return data
validator = RunnableLambda(validate_input)
# Chain with validation and formatting
chain = validator | setup | prompt | model | parser
itemgetter: Extracting a Value from a Dict
from operator import itemgetter
# itemgetter("key") extracts a single value from a dict
# This is a common LCEL pattern when a chain step only needs one field
chain = (
{"context": retriever | format_docs, "question": RunnablePassthrough()}
| prompt
| model
| parser
)
Async Chains with ainvoke
Web applications need to handle many requests simultaneously without blocking. Python's async/await syntax lets multiple operations run concurrently. All LCEL components support async natively.
import asyncio
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
model = ChatOpenAI(model="gpt-3.5-turbo")
prompt = ChatPromptTemplate.from_messages([
("human", "Write one sentence about {topic}")
])
chain = prompt | model | StrOutputParser()
async def main():
# Single async call
result = await chain.ainvoke({"topic": "quantum computing"})
print(result)
# Multiple concurrent async calls — much faster than sequential
topics = ["AI", "blockchain", "robotics", "biotechnology"]
tasks = [chain.ainvoke({"topic": t}) for t in topics]
results = await asyncio.gather(*tasks)
for topic, result in zip(topics, results):
print(f"{topic}: {result}")
asyncio.run(main())
Running four API calls concurrently with asyncio.gather takes roughly the same time as one sequential call, because all four network requests fly out simultaneously and responses arrive together.
Streaming in Depth
The .stream() method returns a generator that yields chunks of the output as they arrive. For text generation, each chunk is a few tokens (words). For parsers like StrOutputParser, they pass chunks through unchanged.
chain = prompt | model | StrOutputParser()
# Stream and display token by token
print("Response: ", end="")
for chunk in chain.stream({"topic": "the Amazon rainforest"}):
print(chunk, end="", flush=True)
print() # Newline at end
Async Streaming
async def stream_response(topic: str):
print("Response: ", end="")
async for chunk in chain.astream({"topic": topic}):
print(chunk, end="", flush=True)
print()
asyncio.run(stream_response("climate change"))
Configurable Chains: Runtime Parameters
Sometimes you want to change a component's settings at runtime — use a different model for paying users, increase token limits for premium plans, or switch language based on user preference. ConfigurableField makes specific parameters adjustable at invocation time.
from langchain_core.runnables import ConfigurableField
configurable_model = ChatOpenAI(model="gpt-3.5-turbo").configurable_fields(
model=ConfigurableField(
id="llm_model",
name="LLM Model",
description="The language model to use"
),
temperature=ConfigurableField(
id="temperature",
name="Temperature",
description="Controls creativity (0=focused, 1=creative)"
)
)
chain = prompt | configurable_model | parser
# Standard user
result = chain.invoke({"topic": "coffee"})
# Premium user — upgrade model at runtime, no code change needed
result = chain.with_config(configurable={"llm_model": "gpt-4o"}).invoke({"topic": "coffee"})
# Creative mode
result = chain.with_config(configurable={"temperature": 0.9}).invoke({"topic": "coffee"})
Binding Runtime Arguments
.bind() lets you attach fixed arguments to a component. Use this to set parameters that do not change per request but you want to configure once.
# Force JSON response format — applies to every call on this chain
json_model = model.bind(response_format={"type": "json_object"})
# Bind stop sequences — model stops generating when it reaches these strings
limited_model = model.bind(stop=["END", "\n\n---"])
chain = prompt | json_model | JsonOutputParser()
Composing Chains: Chains Calling Other Chains
LCEL chains are themselves Runnable objects. You compose them exactly like individual components — pipe them together, use them inside RunnableParallel, or call them inside RunnableLambda functions.
# Chain 1: Generate a title
title_chain = (
ChatPromptTemplate.from_messages([
("human", "Write a catchy blog post title about: {topic}")
])
| model
| StrOutputParser()
)
# Chain 2: Write an intro based on the title
intro_chain = (
ChatPromptTemplate.from_messages([
("human", "Write a 3-sentence introduction for a blog post titled: {title}")
])
| model
| StrOutputParser()
)
# Compose: pipe title output into intro input
def title_to_intro_input(title: str) -> dict:
return {"title": title}
full_chain = (
title_chain
| RunnableLambda(title_to_intro_input)
| intro_chain
)
result = full_chain.invoke({"topic": "the future of remote work"})
print(result) # Full introduction paragraph
Error Handling: fallbacks and retries
from langchain_openai import ChatOpenAI
# Primary: expensive, capable model
primary_model = ChatOpenAI(model="gpt-4o")
# Fallback: cheaper, faster model
fallback_model = ChatOpenAI(model="gpt-3.5-turbo")
# Try primary, fall back to secondary on any failure
resilient_model = primary_model.with_fallbacks(
[fallback_model],
exceptions_to_handle=(Exception,)
)
# Retry up to 3 times before failing
retrying_model = primary_model.with_retry(
stop_after_attempt=3,
wait_exponential_jitter=True # Add jitter to avoid thundering herd
)
chain = prompt | resilient_model | parser
Inspecting a Chain
LCEL chains expose their structure so you can inspect, debug, and document them.
chain = prompt | model | parser
# See what input the chain expects
print(chain.input_schema.schema())
# Shows: {"properties": {"topic": {"type": "string"}}, "required": ["topic"]}
# See what output the chain produces
print(chain.output_schema.schema())
# Shows: {"type": "string"}
# Print the full chain structure
chain.get_graph().print_ascii()
# Visual representation of all components and connections
Complete Advanced Chain Example
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough, RunnableParallel, RunnableLambda
load_dotenv()
# Components
model = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)
parser = StrOutputParser()
embeddings = OpenAIEmbeddings()
vector_store = FAISS.load_local("knowledge_base", embeddings,
allow_dangerous_deserialization=True)
retriever = vector_store.as_retriever(search_kwargs={"k": 4})
def format_docs(docs):
return "\n\n---\n\n".join(
f"[Source: {d.metadata.get('source', 'unknown')}, "
f"Page: {d.metadata.get('page', 'N/A')}]\n{d.page_content}"
for d in docs
)
prompt = ChatPromptTemplate.from_messages([
("system",
"You are a knowledgeable assistant. Answer using only the context provided. "
"Cite the source when possible.\n\nContext:\n{context}"),
("human", "{question}")
])
# Full RAG chain with citations
rag_chain = (
RunnableParallel({
"context": retriever | RunnableLambda(format_docs),
"question": RunnablePassthrough()
})
| prompt
| model
| parser
)
# Run it
answer = rag_chain.invoke("What is the company's expense reimbursement process?")
print(answer)
LCEL Summary Reference
Pattern Code
──────────────────────────────────────────────────────────────
Sequential pipe A | B | C
Pass input unchanged RunnablePassthrough()
Custom Python logic RunnableLambda(fn)
Run multiple chains in parallel RunnableParallel({...})
Conditional routing RunnableBranch(...)
Retry on failure component.with_retry(...)
Fallback on failure component.with_fallbacks([...])
Runtime configuration component.with_config(...)
Async single call await chain.ainvoke(input)
Async batch await chain.abatch([...])
Async stream async for chunk in chain.astream(input)
Bind fixed parameters component.bind(param=value)
Summary
LCEL is the modern, composable backbone of every LangChain application. Every component implements the Runnable interface, giving them identical invoke, batch, and stream methods. The pipe operator creates RunnableSequences where data flows left to right. RunnableParallel runs multiple components on the same input simultaneously. RunnableLambda wraps any Python function as a chain step. Async methods (ainvoke, astream) enable high-throughput web applications. Configurable fields and binding make chains flexible without code changes.
