LangGraph Redis Checkpoint
langgraph-checkpoint-redis is a Python library providing Redis implementations for LangGraph's checkpoint savers and stores, enabling persistence for AI agent workflows. It supports both full history (`RedisSaver`) and memory-optimized shallow (`ShallowRedisSaver`) saving, as well as optional vector search capabilities via `RedisStore`. The current version is 0.4.0, with frequent patch releases addressing bug fixes and minor features.
Warnings
- breaking Version 0.1.0 introduced breaking changes to the internal storage format. Checkpoints created with pre-0.1.0 versions are not readable by 0.1.0+ without manual migration.
- gotcha Redis requires the 'RedisJSON' and 'RediSearch' modules to be enabled for `langgraph-checkpoint-redis` functionality, especially for index creation and efficient data access. Redis 8.0+ includes these by default.
- gotcha The `.setup()` method must be called on RedisSaver/AsyncRedisSaver instances upon initial setup or application start to create necessary RediSearch indices. Failure to do so will lead to runtime errors.
- gotcha Potential data loss due to Redis persistence settings. Default Redis configurations might not guarantee durability if Redis crashes unexpectedly before RDB snapshot or AOF write operations complete.
- gotcha Incorrect message serialization can lead to `MESSAGE_COERCION_FAILURE` errors when using LangGraph's checkpointers. This often happens when `message.to_dict()` is stored instead of `BaseMessage` objects or simple `{role, content}` dicts.
- gotcha High memory usage in Redis deployments can lead to latency, failed runs, or Out of Memory (OOM) errors, especially in high-load or limited-resource environments.
- gotcha Compatibility issues have been reported with Valkey (a Redis fork), which may cause hanging requests and connection problems.
Install
-
pip install langgraph-checkpoint-redis langgraph redis
Imports
- RedisSaver
from langgraph.checkpoint.redis import RedisSaver
- AsyncRedisSaver
from langgraph.checkpoint.redis.aio import AsyncRedisSaver
- ShallowRedisSaver
from langgraph.checkpoint.redis.shallow import ShallowRedisSaver
- AsyncShallowRedisSaver
from langgraph.checkpoint.redis.ashallow import AsyncShallowRedisSaver
- RedisStore
from langgraph.store.redis import RedisStore
Quickstart
import os
from typing import Annotated
from langgraph.graph import StateGraph, START
from langgraph.checkpoint.redis import RedisSaver
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
# Set up Redis connection string, e.g., 'redis://localhost:6379/0'
REDIS_URL = os.environ.get('REDIS_URL', 'redis://localhost:6379/0')
# Define your graph state
class AgentState:
messages: Annotated[list[BaseMessage], lambda x, y: x + y]
# Define a simple node
def simple_agent_node(state: AgentState) -> dict:
new_message = AIMessage(content=f"Echo: {state.messages[-1].content}")
return {"messages": [new_message]}
# Create the Redis Checkpoint Saver
# Make sure Redis is running and accessible at REDIS_URL
# For production, ensure RedisJSON and RediSearch modules are enabled (Redis 8.0+ includes them)
with RedisSaver.from_conn_string(REDIS_URL) as checkpointer:
# Important: Call setup() to initialize required RediSearch indices
checkpointer.setup()
# Build the graph
workflow = StateGraph(AgentState)
workflow.add_node("agent", simple_agent_node)
workflow.set_entry_point(START)
workflow.set_finish_point("agent")
# Compile the graph with the checkpointer
app = workflow.compile(checkpointer=checkpointer)
# Example usage with a specific thread_id for persistence
thread_id = "test_conversation_123"
config = {"configurable": {"thread_id": thread_id}}
print(f"\n--- Invoking agent for thread: {thread_id} ---")
inputs = {"messages": [HumanMessage(content="Hello LangGraph with Redis!")]}
result = app.invoke(inputs, config)
print("Result 1:", result)
print(f"\n--- Invoking agent again for the same thread: {thread_id} ---")
inputs = {"messages": [HumanMessage(content="How are you doing?")]}
result = app.invoke(inputs, config)
print("Result 2:", result)
# List checkpoints (optional)
print(f"\n--- Checkpoints for thread {thread_id} ---")
for checkpoint in checkpointer.list(config):
print(checkpoint)