Pymodbus

3.12.1 · active · verified Sat Apr 11

Pymodbus is a fully featured Modbus protocol stack implemented in Python, offering client and server capabilities for TCP, UDP, and serial communication. It primarily leverages `asyncio` for modern asynchronous operations and is actively maintained with frequent minor releases focusing on bug fixes and incremental improvements. The current version is 3.12.1.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates a basic asynchronous Modbus TCP server and client. The server runs on localhost:5020, serving a simple datastore. The client connects, reads initial holding registers, writes new values, and then reads again to verify the write operation, before gracefully shutting down the server.

import asyncio
import logging
from pymodbus.client import ModbusTcpClient
from pymodbus.server import StartAsyncTcpServer
from pymodbus.datastore import ModbusSlaveContext, ModbusSequentialDataBlock

logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)

async def run_modbus_server():
    # Setup a simple Modbus datastore for Slave ID 1
    store = ModbusSlaveContext(
        di=ModbusSequentialDataBlock(0, [17]*10),
        co=ModbusSequentialDataBlock(0, [17]*10),
        hr=ModbusSequentialDataBlock(0, [17]*10),
        ir=ModbusSequentialDataBlock(0, [17]*10)
    )
    context = ModbusSlaveContext(slaves={0x01: store}, single=False)

    server_task = StartAsyncTcpServer(
        context=context,
        address=("localhost", 5020),
        allow_reuse_address=True
    )
    log.info("Modbus TCP Server starting on localhost:5020")
    await server_task # This will block until cancelled

async def run_modbus_client():
    await asyncio.sleep(1) # Give server a moment to start
    log.info("Modbus TCP Client connecting to localhost:5020")
    client = ModbusTcpClient("localhost", 5020)
    if await client.connect():
        log.info("Client connected successfully.")
        
        # Read holding registers (address 0, count 5, slave ID 1)
        result = await client.read_holding_registers(address=0, count=5, slave=1)
        if result.is_success():
            log.info(f"Read holding registers: {result.registers}")
        else:
            log.error(f"Failed to read holding registers: {result}")

        # Write to holding registers (address 0, values [99, 98, 97], slave ID 1)
        write_result = await client.write_registers(address=0, values=[99, 98, 97], slave=1)
        if write_result.is_success():
            log.info(f"Wrote to holding registers.")
        else:
            log.error(f"Failed to write holding registers: {write_result}")

        # Read again to verify write
        result_after_write = await client.read_holding_registers(address=0, count=5, slave=1)
        if result_after_write.is_success():
            log.info(f"Read holding registers after write: {result_after_write.registers}")
        else:
            log.error(f"Failed to read holding registers after write: {result_after_write}")

        client.close()
        log.info("Client disconnected.")
    else:
        log.error("Client failed to connect.")

async def main():
    server_task = asyncio.create_task(run_modbus_server())
    try:
        await run_modbus_client()
    finally:
        server_task.cancel() # Signal server to shut down
        try:
            await server_task
        except asyncio.CancelledError:
            log.info("Server task cancelled successfully.")
        except Exception as e:
            log.error(f"Server task ended with unexpected error: {e}")

if __name__ == "__main__":
    asyncio.run(main())

view raw JSON →