MySQL Replication Client
mysql-replication is a pure Python implementation of the MySQL replication protocol, built on top of PyMYSQL. It enables real-time streaming of binlog events from a MySQL server, facilitating tasks such as change data capture (CDC), data synchronization, and auditing. The current version is 1.0.15, and the project maintains an active release cadence, addressing bugs and adding support for newer Python versions.
Common errors
-
pymysql.err.OperationalError: (2003, "Can't connect to MySQL server on 'hostname' (111 Connection refused)")
cause The Python client could not establish a connection to the MySQL server. This typically indicates an incorrect host/port, firewall blocking, or the MySQL server not running.fixVerify `MYSQL_HOST` and `MYSQL_PORT` in your connection settings. Check that the MySQL server is running and accessible from the client machine. Temporarily disable firewalls or ensure port 3306 (or your custom port) is open. -
pymysql.err.OperationalError: (1045, "Access denied for user 'replication_user'@'%' (using password: YES)")
cause The provided username or password for the MySQL connection is incorrect, or the user lacks the necessary `REPLICATION SLAVE` and `REPLICATION CLIENT` privileges.fixDouble-check `MYSQL_USER` and `MYSQL_PASSWORD`. Ensure the user has the required `REPLICATION SLAVE` and `REPLICATION CLIENT` grants for binlog access: `GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'replication_user'@'%' IDENTIFIED BY 'password'; FLUSH PRIVILEGES;` -
pymysql.err.OperationalError: (1290, "The MySQL server is running with the --log-bin option, but is not configured for replication.")
cause The MySQL server's `my.cnf` configuration is missing critical parameters for replication, specifically `server-id`.fixEdit your `my.cnf` (or equivalent MySQL configuration file) and ensure `server-id = <unique_id>` is present under the `[mysqld]` section. Restart the MySQL server after making changes. -
mysql.replication.exceptions.BinLogStreamReaderException: Unknown event type: X (or similar low-level protocol error)
cause The library encountered an unsupported or malformed binlog event. This can sometimes happen with very new MySQL versions, custom event types, or corruption.fixEnsure you are using the latest version of `mysql-replication`. Check if your MySQL server version is explicitly supported. If it's a known event type that's not handled, you might need to extend the library or contribute a fix. For unknown low-level errors, check MySQL server logs for issues.
Warnings
- gotcha Older versions (<=1.0.14) of `mysql-replication` could potentially log sensitive connection information (e.g., passwords) if logging was not carefully configured, especially in verbose modes.
- gotcha The `server_id` parameter in `BinLogStreamReader` must be a unique integer greater than 0 within your entire MySQL replication topology. Using a duplicate `server_id` can lead to unexpected behavior or replication conflicts.
- gotcha Your MySQL server must be explicitly configured for binlog replication. This includes enabling `log_bin`, setting a unique `server-id` in `my.cnf`, and typically `binlog_format = ROW` to get detailed row changes. The replication user also needs `REPLICATION SLAVE` and `REPLICATION CLIENT` privileges.
Install
-
pip install mysql-replication
Imports
- BinLogStreamReader
from mysql.replication import BinLogStreamReader
- UpdateRowsEvent
from mysql.replication.row_event import UpdateRowsEvent
- DeleteRowsEvent
from mysql.replication.row_event import DeleteRowsEvent
- WriteRowsEvent
from mysql.replication.row_event import WriteRowsEvent
- QueryEvent
from mysql.replication.event import QueryEvent
Quickstart
import os
from mysql.replication import BinLogStreamReader
from mysql.replication.row_event import UpdateRowsEvent, DeleteRowsEvent, WriteRowsEvent
# Configure your MySQL connection details. Use environment variables for production.
MYSQL_HOST = os.environ.get('MYSQL_HOST', 'localhost')
MYSQL_PORT = int(os.environ.get('MYSQL_PORT', 3306))
MYSQL_USER = os.environ.get('MYSQL_USER', 'replication_user')
MYSQL_PASSWORD = os.environ.get('MYSQL_PASSWORD', 'password') # Ensure user has REPLICATION SLAVE/CLIENT
try:
stream = BinLogStreamReader(
connection_settings={
"host": MYSQL_HOST,
"port": MYSQL_PORT,
"user": MYSQL_USER,
"passwd": MYSQL_PASSWORD,
"charset": "utf8"
},
server_id=101, # IMPORTANT: Must be a unique ID > 0 within your replication topology
blocking=True, # Wait for new events if no more are available
resume_stream=True, # Resume from last position if possible
log_file=None, # Start from current position
log_pos=None # Start from current position
)
print(f"Connected to MySQL binlog stream on {MYSQL_HOST}:{MYSQL_PORT}. Listening for events...")
for event in stream:
if isinstance(event, (UpdateRowsEvent, DeleteRowsEvent, WriteRowsEvent)):
print(f"\n---")
print(f"[{type(event).__name__}] Database: {event.schema}, Table: {event.table}")
for row in event.rows:
if isinstance(event, WriteRowsEvent):
print(f" Inserted: {row['values']}")
elif isinstance(event, UpdateRowsEvent):
print(f" Before: {row['before_values']}")
print(f" After: {row['after_values']}")
elif isinstance(event, DeleteRowsEvent):
print(f" Deleted: {row['values']}")
# Add more event types (e.g., QueryEvent, RotateEvent, XidEvent) as needed
except Exception as e:
print(f"An error occurred: {e}")
finally:
if 'stream' in locals() and stream:
stream.close()
print("\nBinLogStreamReader closed.")