LangGraph Postgres Checkpoint
langgraph-checkpoint-postgres provides a robust PostgreSQL implementation of LangGraph's checkpoint saver interface. This allows LangGraph agents to persist their conversational state and execution history in a Postgres database, enabling fault tolerance and the ability to resume graphs from previous states. Currently at version 3.0.5, it follows the rapid release cycle of the broader LangChain/LangGraph ecosystem, with frequent updates tied to `langgraph`'s release schedule.
Warnings
- breaking This library (`langgraph-checkpoint-postgres` v3.x) is designed exclusively for `langgraph` v1.0.0 and newer. It relies on the significantly revised `CheckpointSaver` interface introduced in `langgraph` 1.x. Using this checkpoint saver with older versions of `langgraph` (pre-1.0.0) will lead to API mismatches and runtime errors.
- gotcha The `PostgresSaver` does not automatically create the necessary database tables. You must manually initialize the PostgreSQL schema that `langgraph` expects for checkpointing. Failure to do so will result in database errors when the saver attempts to write or read state.
- gotcha For convenience, `langgraph-checkpoint-postgres` depends on `psycopg2-binary`. While suitable for development, `psycopg2-binary` is not always recommended for production due to its pre-compiled nature which can sometimes lead to platform-specific issues or larger container images. For robust production deployments, consider alternatives.
- gotcha Directly hardcoding database credentials or connection strings in your application code is a significant security risk. Exposure of credentials can compromise your entire database.
Install
-
pip install langgraph-checkpoint-postgres
Imports
- PostgresSaver
from langgraph_checkpoint_postgres import PostgresSaver
Quickstart
import os
from langgraph.graph import StateGraph, START
from langgraph.checkpoint.base import Checkpoint
from langgraph_checkpoint_postgres import PostgresSaver
# Define a simple graph state (must be mutable for in-place updates)
class AgentState:
def __init__(self, value: int = 0):
self.value = value
def __repr__(self):
return f"AgentState(value={self.value})"
# Define simple node functions that modify the state
def increment_node(state: AgentState):
state.value += 1
print(f"Node 'increment': value = {state.value}")
return state
def decrement_node(state: AgentState):
state.value -= 1
print(f"Node 'decrement': value = {state.value}")
return state
# Setup PostgresSaver
# Ensure a PostgreSQL database is running and accessible with the connection string.
# The necessary schema for langgraph checkpoints must be initialized in your DB beforehand.
# Example connection string: "postgresql://user:password@host:port/database_name"
pg_connection_string = os.environ.get(
"POSTGRES_CONNECTION_STRING",
"postgresql://user:password@localhost:5432/langgraph_db"
) # Replace with your actual connection string or set ENV var
saver = PostgresSaver(conn_string=pg_connection_string)
# Build the graph
builder = StateGraph(AgentState)
builder.add_node("increment", increment_node)
builder.add_node("decrement", decrement_node)
builder.add_edge(START, "increment")
builder.add_edge("increment", "decrement")
# Create a loop for multiple steps, with a conditional exit
builder.add_conditional_edges(
"decrement",
# If value is less than 3, go back to 'increment', otherwise end
lambda state: "increment" if state.value < 3 else "__end__",
{"increment": "increment", "__end__": "__end__"}
)
# Compile the graph with the checkpointer
# interrupt_after allows viewing state at specific nodes during stream
app = builder.compile(checkpointer=saver, interrupt_after=["increment", "decrement"])
# Example usage:
# A unique thread_id is crucial for checkpointing different conversations
thread_id = "my_unique_conversation_123"
config = {"configurable": {"thread_id": thread_id, "thread_ts": ""}}
print(f"\n--- Starting run for thread: {thread_id} ---")
# Start the graph from an initial state
for step in app.stream(AgentState(value=0), config, stream_mode="updates"):
print(f"Stream update: {step}")
# Simulate a pause or application restart
print(f"\n--- Resuming run for thread: {thread_id} ---")
# Pass None as input to resume from the last saved state for the given thread_id
for step in app.stream(None, config, stream_mode="updates"):
print(f"Stream update: {step}")
# Retrieve the final state directly from the saver
last_checkpoint: Checkpoint = saver.get(config)
if last_checkpoint and '__root__' in last_checkpoint.channel_values:
final_state_value = last_checkpoint.channel_values['__root__'].value
print(f"\nFinal state value retrieved from checkpoint: {final_state_value}")
else:
print("\nNo final state found or __root__ channel missing.")