Redis Integration for Microsoft Agent Framework

1.0.0b260409 · active · verified Wed Apr 15

agent-framework-redis provides robust Redis integration for the Microsoft Agent Framework, offering persistent storage for conversational history and a flexible context provider for long-term memory. It leverages Redis for efficient, thread-safe chat message storage and advanced context management, including optional vector search capabilities. The library is part of the broader Microsoft Agent Framework ecosystem and is actively developed, with its release cadence tied to the main framework.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to initialize `RedisChatMessageStore`, add messages to it, retrieve messages, and observe message trimming if a `max_messages` limit is set. It also shows how to clear the session history. For local development, ensure a Redis server is running (e.g., via Docker: `docker run -d -p 6379:6379 redis:latest`).

import asyncio
import os
from agent_framework.redis import RedisChatMessageStore
from agent_framework.core.message import Message, Role

async def main():
    # Connect to Redis. Replace with your Redis URL or environment variable.
    # For local Redis, use redis://localhost:6379
    redis_url = os.environ.get('REDIS_URL', 'redis://localhost:6379')
    session_id = 'my-unique-conversation-123'

    # Initialize the Redis chat message store
    # max_messages can be set to limit history, None for unlimited
    store = RedisChatMessageStore(
        redis_url=redis_url,
        thread_id=session_id,
        max_messages=10 # Example: retain last 10 messages
    )

    print(f"Storing messages for session: {session_id}")

    # Add messages to the store
    await store.add_messages([
        Message(role=Role.USER, content='Hello, agent!', author_name='User'),
        Message(role=Role.ASSISTANT, content='Hi there! How can I help you?', author_name='Agent')
    ])

    # Retrieve messages from the store
    retrieved_messages = await store.get_messages()
    print("\nRetrieved messages:")
    for msg in retrieved_messages:
        print(f"[{msg.author_name} ({msg.role.value})]: {msg.content}")

    # Add more messages to test trimming
    for i in range(1, 15):
        await store.add_messages([
            Message(role=Role.USER, content=f'Message {i}', author_name=f'User{i}')
        ])
    
    print(f"\nRetrieved messages after adding more (max_messages={store.max_messages}):")
    trimmed_messages = await store.get_messages()
    for msg in trimmed_messages:
        print(f"[{msg.author_name} ({msg.role.value})]: {msg.content}")

    # Clear the session history
    await store.clear()
    print("\nMessages cleared. Current messages:")
    print(await store.get_messages())

if __name__ == '__main__':
    asyncio.run(main())

view raw JSON →