Deployment: From Notebook to Production
This chapter covers going from a notebook to production: LangGraph Cloud, self-hosting, and wiring into a FastAPI service.
What "Deployed" Means for a Graph
A deployed graph is:
- Accessible over the network (HTTP, likely).
- Running with a persistent checkpointer (not MemorySaver).
- Scaled to handle concurrent requests.
- Monitored and logged.
- Authenticated: not everyone can call it.
Three routes to get there.
Option 1: LangGraph Cloud / Platform
Hosted by LangChain. You push your graph repo; they run it.
What you get:
- HTTP API out of the box.
- Persistent state (Postgres-backed).
- Scaling.
- Studio integration.
- Observability via LangSmith.
What it costs: money (paid product past the free tier), some lock-in.
When it fits: you want someone else to run the infrastructure, you're already on LangSmith, you're building fast and want to ship.
Setup (abbreviated):
- Add
langgraph.jsonto your repo. - Run
langgraph uplocally (to test via the CLI). - Deploy via the LangGraph Platform UI, pointing at your repo.
A minimal langgraph.json:
{
"dependencies": ["."],
"graphs": {
"my_agent": "./src/agent.py:graph"
},
"env": ".env",
"python_version": "3.11"
}
The platform exposes POST /threads/{thread_id}/runs/stream and related endpoints. Your client hits those.
Option 2: Self-Host with langgraph-cli
langgraph-cli packages your graph as a container and runs it locally (or wherever you run containers).
pip install langgraph-cli
langgraph up # runs locally with docker-compose
This spins up:
- Your graph as an HTTP server.
- Postgres for state.
- Redis for pub/sub (streaming).
For real production, build the image and deploy it:
langgraph build -t myorg/my-agent:v1
docker push myorg/my-agent:v1
Then run on Kubernetes, ECS, whatever platform you use. Point the container at a production Postgres and Redis.
When it fits: you already run containers, you want full control, you don't want a hosted dependency.
Option 3: Custom (FastAPI)
Write the HTTP layer yourself. Wrap the graph in your own endpoints.
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from langgraph.checkpoint.postgres import PostgresSaver
from langchain_core.messages import HumanMessage
import json
import os
app = FastAPI()
# Build the graph once at startup
DB_URI = os.environ["POSTGRES_URI"]
memory_cm = PostgresSaver.from_conn_string(DB_URI)
memory = memory_cm.__enter__() # persistent across requests
memory.setup()
graph = build_graph().compile(checkpointer=memory)
class ChatRequest(BaseModel):
thread_id: str
message: str
@app.post("/chat")
async def chat(req: ChatRequest):
config = {"configurable": {"thread_id": req.thread_id}}
input = {"messages": [HumanMessage(content=req.message)]}
result = await graph.ainvoke(input, config=config)
return {"reply": result["messages"][-1].content}
@app.post("/chat/stream")
async def chat_stream(req: ChatRequest):
config = {"configurable": {"thread_id": req.thread_id}}
input = {"messages": [HumanMessage(content=req.message)]}
async def generator():
async for chunk in graph.astream(input, config=config, stream_mode="messages"):
msg, _ = chunk
if msg.content:
payload = json.dumps({"type": "token", "content": msg.content})
yield f"data: {payload}\n\n"
yield "data: {\"type\": \"done\"}\n\n"
return StreamingResponse(generator(), media_type="text/event-stream")
@app.get("/history/{thread_id}")
async def history(thread_id: str):
config = {"configurable": {"thread_id": thread_id}}
state = graph.get_state(config)
if not state.values:
raise HTTPException(404, "Thread not found")
return {
"messages": [
{"role": type(m).__name__, "content": getattr(m, "content", "")}
for m in state.values.get("messages", [])
],
}
When it fits: you already run FastAPI (or Flask, or similar), you want to embed the graph in an existing service, you need custom HTTP shapes.
Authentication
Never expose an agent endpoint without auth. Ways to do it:
API Key
Simple. Clients send Authorization: Bearer <key> header. Server validates.
from fastapi import Depends, HTTPException
from fastapi.security import HTTPBearer
security = HTTPBearer()
async def check_key(credentials = Depends(security)):
if credentials.credentials != os.environ["API_KEY"]:
raise HTTPException(401, "Invalid API key")
return credentials
@app.post("/chat", dependencies=[Depends(check_key)])
async def chat(req: ChatRequest):
...
JWT / OAuth
If your app already has user auth, use it. Validate tokens in a dependency; extract user ID for thread_id scoping.
Rate Limiting
Every agent endpoint should rate-limit. One bad client can exhaust your LLM credits.
from slowapi import Limiter
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
@app.post("/chat")
@limiter.limit("10/minute")
async def chat(request, req: ChatRequest):
...
More durable: per-user rate limits in Redis, tied to authenticated identity.
Thread ID Hygiene
Thread IDs are keys to state. Rules:
- Scope per user.
thread_id = f"{user_id}:{session_id}"(or similar). Never let user A read user B's thread. - Validate on access. Before you read
thread_idfrom state, check the authenticated user owns it. - Don't expose raw thread IDs in URLs without auth. Someone could guess or scrape.
Scaling
The graph itself is stateless per worker; state lives in the checkpointer. So:
- Horizontal scaling works. Run many workers behind a load balancer; they all hit the same Postgres.
- LLM is the bottleneck. The graph is I/O-bound waiting for Anthropic. Concurrency (async) matters more than CPU.
- Postgres connections. Every worker opens connections. Use PgBouncer.
- Per-thread contention. Two concurrent requests on the same thread will conflict. Serialize per thread (e.g. a user-level queue).
For very high throughput, put a queue in front:
Client → Frontend API → Queue (SQS, RabbitMQ)
↓
Worker (LangGraph)
↓
Response to client (polling or pub/sub)
The queue decouples request rate from worker capacity, letting you buffer spikes without dropping requests.
Cost Monitoring
LLM calls cost money. Every deployed agent should track:
- Tokens per request (in / out).
- Tokens per thread.
- Monthly spend per project.
Anthropic's dashboard shows account-level usage. LangSmith adds per-run detail. Set billing alerts.
Cost regressions often come from unbounded message history. Trim aggressively in production.
Rollouts
Agents drift. A new prompt may be better on average and catastrophic on edge cases.
Safe rollout practices:
- Version your graph. Tag each deploy with a version.
- Feature flag new prompts. Route 10% of traffic to v2 first.
- Run evals before merge. A regression suite in CI (Chapter 10).
- Keep the old version deployed for rollback. Versioned image; easy switch.
Agent behavior is harder to test than regular code; lean more on gradual rollouts.
Health Checks
HTTP /healthz endpoint that:
- Confirms the graph compiled.
- Confirms the checkpointer is reachable (Postgres ping).
- Confirms the LLM provider is reachable (optional; a failed dependency shouldn't fail the health check unless you decide it should).
@app.get("/healthz")
async def healthz():
assert graph is not None
try:
memory.get({"configurable": {"thread_id": "healthcheck"}})
except Exception as exc:
raise HTTPException(503, f"DB unreachable: {exc}")
return {"status": "ok"}
Common Pitfalls
Using MemorySaver in production. No state survives a restart. Use Postgres.
One Postgres connection per request. Workers melt. Connection pool plus PgBouncer.
No auth on the endpoint. Expect strangers to find it. They will.
No rate limits. One client can drain your LLM budget overnight.
Treating the agent like a stateless API. The graph is stateful per thread. Requests on the same thread must serialize.
Forgetting to set ANTHROPIC_API_KEY in the container. The error message is clear enough, but it's a surprisingly common first-deploy bug.
Next Steps
Continue to 12-best-practices.md for the habits that keep a LangGraph codebase healthy.