Pymodbus
Pymodbus is a fully featured Modbus protocol stack implemented in Python, offering client and server capabilities for TCP, UDP, and serial communication. It primarily leverages `asyncio` for modern asynchronous operations and is actively maintained with frequent minor releases focusing on bug fixes and incremental improvements. The current version is 3.12.1.
Warnings
- breaking Pymodbus v3.0.0 introduced a significant shift from synchronous (blocking) to asynchronous (asyncio-based) operations. Code using `pymodbus.client.sync` or `pymodbus.server.sync` from v2.x will be incompatible with v3.x and require complete refactoring to use the new `asyncio` patterns.
- breaking The traditional datastore classes (e.g., `ModbusSlaveContext`, `ModbusSequentialDataBlock`) are slated for deprecation and eventual removal in Pymodbus v4.0.0. New `SimData` and `SimDevice` classes have been introduced as the preferred modern approach for defining server data storage.
- gotcha Pymodbus version `3.10.0` was officially marked as 'DO NOT USE THIS RELEASE it is broken' by the maintainers. It contained critical bugs that caused unexpected behavior.
- gotcha Incorrect byte or word order is a common issue when communicating with Modbus devices from different manufacturers. Pymodbus defaults to certain orders, but devices may expect others (e.g., Big-Endian vs. Little-Endian, or swapped words).
Install
-
pip install pymodbus
Imports
- ModbusTcpClient
from pymodbus.client import ModbusTcpClient
- StartAsyncTcpServer
from pymodbus.server import StartAsyncTcpServer
- ModbusSlaveContext
from pymodbus.datastore import ModbusSlaveContext, ModbusSequentialDataBlock
Quickstart
import asyncio
import logging
from pymodbus.client import ModbusTcpClient
from pymodbus.server import StartAsyncTcpServer
from pymodbus.datastore import ModbusSlaveContext, ModbusSequentialDataBlock
logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)
async def run_modbus_server():
# Setup a simple Modbus datastore for Slave ID 1
store = ModbusSlaveContext(
di=ModbusSequentialDataBlock(0, [17]*10),
co=ModbusSequentialDataBlock(0, [17]*10),
hr=ModbusSequentialDataBlock(0, [17]*10),
ir=ModbusSequentialDataBlock(0, [17]*10)
)
context = ModbusSlaveContext(slaves={0x01: store}, single=False)
server_task = StartAsyncTcpServer(
context=context,
address=("localhost", 5020),
allow_reuse_address=True
)
log.info("Modbus TCP Server starting on localhost:5020")
await server_task # This will block until cancelled
async def run_modbus_client():
await asyncio.sleep(1) # Give server a moment to start
log.info("Modbus TCP Client connecting to localhost:5020")
client = ModbusTcpClient("localhost", 5020)
if await client.connect():
log.info("Client connected successfully.")
# Read holding registers (address 0, count 5, slave ID 1)
result = await client.read_holding_registers(address=0, count=5, slave=1)
if result.is_success():
log.info(f"Read holding registers: {result.registers}")
else:
log.error(f"Failed to read holding registers: {result}")
# Write to holding registers (address 0, values [99, 98, 97], slave ID 1)
write_result = await client.write_registers(address=0, values=[99, 98, 97], slave=1)
if write_result.is_success():
log.info(f"Wrote to holding registers.")
else:
log.error(f"Failed to write holding registers: {write_result}")
# Read again to verify write
result_after_write = await client.read_holding_registers(address=0, count=5, slave=1)
if result_after_write.is_success():
log.info(f"Read holding registers after write: {result_after_write.registers}")
else:
log.error(f"Failed to read holding registers after write: {result_after_write}")
client.close()
log.info("Client disconnected.")
else:
log.error("Client failed to connect.")
async def main():
server_task = asyncio.create_task(run_modbus_server())
try:
await run_modbus_client()
finally:
server_task.cancel() # Signal server to shut down
try:
await server_task
except asyncio.CancelledError:
log.info("Server task cancelled successfully.")
except Exception as e:
log.error(f"Server task ended with unexpected error: {e}")
if __name__ == "__main__":
asyncio.run(main())