Streaming: Watching a Graph Run
This chapter covers the three stream modes and shows you how to expose a graph's progress to a user in real time.
Why Stream
invoke runs to completion and returns final state. For short graphs, fine. For anything the user is watching, bad:
- They stare at a spinner for 30 seconds.
- They don't know if it's working or stuck.
- They can't interrupt.
Streaming emits progress as the graph runs. You forward that to the UI over SSE or websockets. The user sees the graph thinking, word by word or step by step.
stream vs astream
stream is sync, returns an iterator. astream is async, returns an async iterator.
# Sync
for chunk in graph.stream(input, config=config, stream_mode="updates"):
print(chunk)
# Async
async for chunk in graph.astream(input, config=config, stream_mode="updates"):
print(chunk)
Use astream in any real application; web servers and most I/O frameworks want async.
Stream Modes
stream_mode controls what gets emitted. Four values you'll use.
"updates": Just the Diffs
After each node runs, emit the partial update it returned.
for chunk in graph.stream(input, stream_mode="updates"):
print(chunk)
# {'parse': {'parsed': {...}}}
# {'fetch': {'data': [...]}}
# {'summarize': {'summary': '...'}}
Each dict has one key (the node that ran) and its update. Compact, easy to route to the UI as "node X finished".
"values": Full State
After each step, emit the entire current state.
for chunk in graph.stream(input, stream_mode="values"):
print(chunk)
# {'parsed': None, 'data': None, 'summary': None}
# {'parsed': {...}, 'data': None, 'summary': None}
# {'parsed': {...}, 'data': [...], 'summary': None}
# {'parsed': {...}, 'data': [...], 'summary': '...'}
Useful for UIs that re-render full state each tick. Larger payloads.
"messages": LLM Tokens
Stream messages and (when supported) LLM token chunks.
for chunk in graph.stream(input, stream_mode="messages"):
msg, metadata = chunk
if msg.content:
print(msg.content, end="", flush=True)
You get (message, metadata) tuples. For streaming LLM output, messages arrive in chunks as Claude generates tokens. This is how you get the word-by-word UI.
"debug": Everything
Every internal event: task started, task completed, step entered. Noisy but exhaustive.
for chunk in graph.stream(input, stream_mode="debug"):
print(chunk)
Useful for diagnosing weird behavior. Not for end-user UX.
Multiple Modes at Once
Pass a list to receive multiple modes in one stream.
for chunk in graph.stream(input, stream_mode=["updates", "messages"]):
mode, payload = chunk
if mode == "messages":
msg, metadata = payload
print(msg.content, end="", flush=True)
elif mode == "updates":
print(f"\n[step: {list(payload.keys())[0]}]")
Common pattern: stream tokens for the user, and stream updates for internal logging or progress indicators.
Streaming LLM Tokens
Claude's streaming is per-token. To surface tokens to the user:
from langchain_anthropic import ChatAnthropic
llm = ChatAnthropic(model="claude-sonnet-4-5", streaming=True)
for chunk in graph.stream(input, stream_mode="messages"):
msg, metadata = chunk
if metadata.get("langgraph_node") == "agent" and msg.content:
print(msg.content, end="", flush=True)
Tokens arrive over the stream as AIMessageChunk objects. Concatenate their .content to build up the reply. LangChain's add_messages reducer handles chunk-to-message merging automatically.
astream_events: Fine-Grained Events
astream_events yields typed events: on_chat_model_stream, on_tool_start, on_chain_end, and many more. Verbose but powerful.
async for event in graph.astream_events(input, config=config, version="v2"):
kind = event["event"]
if kind == "on_chat_model_stream":
token = event["data"]["chunk"].content
print(token, end="", flush=True)
elif kind == "on_tool_start":
print(f"\n[tool: {event['name']}]")
elif kind == "on_tool_end":
print(f"\n[tool done]")
Use astream_events when you need structured event routing, not just raw state.
Surfacing Progress in a UI
A FastAPI route that streams graph progress over Server-Sent Events:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langchain_core.messages import HumanMessage
import json
app = FastAPI()
@app.post("/chat")
async def chat(payload: dict):
thread_id = payload["thread_id"]
user_message = payload["message"]
config = {"configurable": {"thread_id": thread_id}}
input = {"messages": [HumanMessage(content=user_message)]}
async def stream():
async for chunk in graph.astream(input, config=config, stream_mode="messages"):
msg, metadata = chunk
if msg.content:
data = json.dumps({"type": "token", "content": msg.content})
yield f"data: {data}\n\n"
yield "data: {\"type\": \"done\"}\n\n"
return StreamingResponse(stream(), media_type="text/event-stream")
Client-side JavaScript consumes text/event-stream with EventSource. Each token lands in the UI as it arrives.
Custom Events
Nodes can emit custom events via get_stream_writer.
from langgraph.config import get_stream_writer
def long_task(state: State) -> State:
writer = get_stream_writer()
for i in range(10):
writer({"type": "progress", "step": i, "total": 10})
do_work(i)
return {"done": True}
In the caller:
async for chunk in graph.astream(input, stream_mode="custom"):
print(chunk)
# {'type': 'progress', 'step': 0, 'total': 10}
# {'type': 'progress', 'step': 1, 'total': 10}
# ...
Good for progress bars and custom telemetry.
Backpressure and Cancellation
If the consumer is slow, stream can buffer. For long-running graphs or unreliable clients:
- Set a timeout on the HTTP response.
- Catch
asyncio.CancelledErrorin the server (propagates from client disconnect). - Let the graph continue (checkpointed) so the user can reconnect and pick up where they left off.
Cancellation mid-stream doesn't rewind the graph; it just stops emitting. The graph continues in the background until the next checkpoint.
Common Pitfalls
Choosing the wrong mode. "messages" streams tokens but not node completions. "updates" streams node completions but not tokens. Combine them if you need both.
Blocking the event loop. A sync node (e.g. time.sleep) in an async graph blocks everything. Use async I/O in async graphs.
Forwarding the raw Python objects to JS. The frontend wants JSON. Serialize in the server.
No error handling in streams. If a node raises, the stream ends with an exception. Wrap your generator and translate errors into structured events the client can render.
Rendering every chunk. Frontends re-rendering on every token can be slow. Batch at the UI layer (every 50ms) if you see jank.
Next Steps
Continue to 08-subgraphs.md to compose graphs from smaller graphs.