Streaming: Watching a Graph Run

This chapter covers the three stream modes and shows you how to expose a graph's progress to a user in real time.

Why Stream

invoke runs to completion and returns final state. For short graphs, fine. For anything the user is watching, bad:

  • They stare at a spinner for 30 seconds.
  • They don't know if it's working or stuck.
  • They can't interrupt.

Streaming emits progress as the graph runs. You forward that to the UI over SSE or websockets. The user sees the graph thinking, word by word or step by step.

stream vs astream

stream is sync, returns an iterator. astream is async, returns an async iterator.

# Sync
for chunk in graph.stream(input, config=config, stream_mode="updates"):
    print(chunk)

# Async
async for chunk in graph.astream(input, config=config, stream_mode="updates"):
    print(chunk)

Use astream in any real application; web servers and most I/O frameworks want async.

Stream Modes

stream_mode controls what gets emitted. Four values you'll use.

"updates": Just the Diffs

After each node runs, emit the partial update it returned.

for chunk in graph.stream(input, stream_mode="updates"):
    print(chunk)
# {'parse': {'parsed': {...}}}
# {'fetch': {'data': [...]}}
# {'summarize': {'summary': '...'}}

Each dict has one key (the node that ran) and its update. Compact, easy to route to the UI as "node X finished".

"values": Full State

After each step, emit the entire current state.

for chunk in graph.stream(input, stream_mode="values"):
    print(chunk)
# {'parsed': None, 'data': None, 'summary': None}
# {'parsed': {...}, 'data': None, 'summary': None}
# {'parsed': {...}, 'data': [...], 'summary': None}
# {'parsed': {...}, 'data': [...], 'summary': '...'}

Useful for UIs that re-render full state each tick. Larger payloads.

"messages": LLM Tokens

Stream messages and (when supported) LLM token chunks.

for chunk in graph.stream(input, stream_mode="messages"):
    msg, metadata = chunk
    if msg.content:
        print(msg.content, end="", flush=True)

You get (message, metadata) tuples. For streaming LLM output, messages arrive in chunks as Claude generates tokens. This is how you get the word-by-word UI.

"debug": Everything

Every internal event: task started, task completed, step entered. Noisy but exhaustive.

for chunk in graph.stream(input, stream_mode="debug"):
    print(chunk)

Useful for diagnosing weird behavior. Not for end-user UX.

Multiple Modes at Once

Pass a list to receive multiple modes in one stream.

for chunk in graph.stream(input, stream_mode=["updates", "messages"]):
    mode, payload = chunk
    if mode == "messages":
        msg, metadata = payload
        print(msg.content, end="", flush=True)
    elif mode == "updates":
        print(f"\n[step: {list(payload.keys())[0]}]")

Common pattern: stream tokens for the user, and stream updates for internal logging or progress indicators.

Streaming LLM Tokens

Claude's streaming is per-token. To surface tokens to the user:

from langchain_anthropic import ChatAnthropic

llm = ChatAnthropic(model="claude-sonnet-4-5", streaming=True)

for chunk in graph.stream(input, stream_mode="messages"):
    msg, metadata = chunk
    if metadata.get("langgraph_node") == "agent" and msg.content:
        print(msg.content, end="", flush=True)

Tokens arrive over the stream as AIMessageChunk objects. Concatenate their .content to build up the reply. LangChain's add_messages reducer handles chunk-to-message merging automatically.

astream_events: Fine-Grained Events

astream_events yields typed events: on_chat_model_stream, on_tool_start, on_chain_end, and many more. Verbose but powerful.

async for event in graph.astream_events(input, config=config, version="v2"):
    kind = event["event"]
    if kind == "on_chat_model_stream":
        token = event["data"]["chunk"].content
        print(token, end="", flush=True)
    elif kind == "on_tool_start":
        print(f"\n[tool: {event['name']}]")
    elif kind == "on_tool_end":
        print(f"\n[tool done]")

Use astream_events when you need structured event routing, not just raw state.

Surfacing Progress in a UI

A FastAPI route that streams graph progress over Server-Sent Events:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langchain_core.messages import HumanMessage
import json

app = FastAPI()

@app.post("/chat")
async def chat(payload: dict):
    thread_id = payload["thread_id"]
    user_message = payload["message"]

    config = {"configurable": {"thread_id": thread_id}}
    input = {"messages": [HumanMessage(content=user_message)]}

    async def stream():
        async for chunk in graph.astream(input, config=config, stream_mode="messages"):
            msg, metadata = chunk
            if msg.content:
                data = json.dumps({"type": "token", "content": msg.content})
                yield f"data: {data}\n\n"
        yield "data: {\"type\": \"done\"}\n\n"

    return StreamingResponse(stream(), media_type="text/event-stream")

Client-side JavaScript consumes text/event-stream with EventSource. Each token lands in the UI as it arrives.

Custom Events

Nodes can emit custom events via get_stream_writer.

from langgraph.config import get_stream_writer

def long_task(state: State) -> State:
    writer = get_stream_writer()

    for i in range(10):
        writer({"type": "progress", "step": i, "total": 10})
        do_work(i)

    return {"done": True}

In the caller:

async for chunk in graph.astream(input, stream_mode="custom"):
    print(chunk)
# {'type': 'progress', 'step': 0, 'total': 10}
# {'type': 'progress', 'step': 1, 'total': 10}
# ...

Good for progress bars and custom telemetry.

Backpressure and Cancellation

If the consumer is slow, stream can buffer. For long-running graphs or unreliable clients:

  • Set a timeout on the HTTP response.
  • Catch asyncio.CancelledError in the server (propagates from client disconnect).
  • Let the graph continue (checkpointed) so the user can reconnect and pick up where they left off.

Cancellation mid-stream doesn't rewind the graph; it just stops emitting. The graph continues in the background until the next checkpoint.

Common Pitfalls

Choosing the wrong mode. "messages" streams tokens but not node completions. "updates" streams node completions but not tokens. Combine them if you need both.

Blocking the event loop. A sync node (e.g. time.sleep) in an async graph blocks everything. Use async I/O in async graphs.

Forwarding the raw Python objects to JS. The frontend wants JSON. Serialize in the server.

No error handling in streams. If a node raises, the stream ends with an exception. Wrap your generator and translate errors into structured events the client can render.

Rendering every chunk. Frontends re-rendering on every token can be slow. Batch at the UI layer (every 50ms) if you see jank.

Next Steps

Continue to 08-subgraphs.md to compose graphs from smaller graphs.