MySQL Replication Client

1.0.15 · active · verified Thu Apr 16

mysql-replication is a pure Python implementation of the MySQL replication protocol, built on top of PyMYSQL. It enables real-time streaming of binlog events from a MySQL server, facilitating tasks such as change data capture (CDC), data synchronization, and auditing. The current version is 1.0.15, and the project maintains an active release cadence, addressing bugs and adding support for newer Python versions.

Common errors

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to connect to a MySQL server's binlog stream and process various row-based replication events (INSERT, UPDATE, DELETE). Ensure your MySQL server is configured for replication, and the provided user has sufficient privileges (`REPLICATION SLAVE`, `REPLICATION CLIENT`). Remember to set a unique `server_id`.

import os
from mysql.replication import BinLogStreamReader
from mysql.replication.row_event import UpdateRowsEvent, DeleteRowsEvent, WriteRowsEvent

# Configure your MySQL connection details. Use environment variables for production.
MYSQL_HOST = os.environ.get('MYSQL_HOST', 'localhost')
MYSQL_PORT = int(os.environ.get('MYSQL_PORT', 3306))
MYSQL_USER = os.environ.get('MYSQL_USER', 'replication_user')
MYSQL_PASSWORD = os.environ.get('MYSQL_PASSWORD', 'password') # Ensure user has REPLICATION SLAVE/CLIENT

try:
    stream = BinLogStreamReader(
        connection_settings={
            "host": MYSQL_HOST,
            "port": MYSQL_PORT,
            "user": MYSQL_USER,
            "passwd": MYSQL_PASSWORD,
            "charset": "utf8"
        },
        server_id=101, # IMPORTANT: Must be a unique ID > 0 within your replication topology
        blocking=True, # Wait for new events if no more are available
        resume_stream=True, # Resume from last position if possible
        log_file=None, # Start from current position
        log_pos=None   # Start from current position
    )

    print(f"Connected to MySQL binlog stream on {MYSQL_HOST}:{MYSQL_PORT}. Listening for events...")

    for event in stream:
        if isinstance(event, (UpdateRowsEvent, DeleteRowsEvent, WriteRowsEvent)):
            print(f"\n---")
            print(f"[{type(event).__name__}] Database: {event.schema}, Table: {event.table}")
            for row in event.rows:
                if isinstance(event, WriteRowsEvent):
                    print(f"  Inserted: {row['values']}")
                elif isinstance(event, UpdateRowsEvent):
                    print(f"  Before: {row['before_values']}")
                    print(f"  After:  {row['after_values']}")
                elif isinstance(event, DeleteRowsEvent):
                    print(f"  Deleted: {row['values']}")
        # Add more event types (e.g., QueryEvent, RotateEvent, XidEvent) as needed

except Exception as e:
    print(f"An error occurred: {e}")
finally:
    if 'stream' in locals() and stream:
        stream.close()
        print("\nBinLogStreamReader closed.")

view raw JSON →