pySerial-asyncio

0.6 · maintenance · verified Mon Apr 13

pySerial-asyncio is an asynchronous I/O extension for the Python Serial Port package (pySerial). It provides support for working with serial ports through asyncio Transports, Protocols, and Streams. Compatible with Python 3.5 and later, it depends on pySerial. The current version is 0.6. The project appears to be in maintenance mode, with no new releases since 2021, and a fork `pyserial-asyncio-fast` addressing some critical issues.

Warnings

Install

Imports

Quickstart

This example demonstrates how to open an asynchronous serial connection using `open_serial_connection`, send data, and read a response. It uses environment variables for port and baudrate to allow easy testing with virtual or real serial ports. The `writer.drain()` call is essential to ensure buffered data is transmitted.

import asyncio
import os
import serial_asyncio

async def main():
    # For testing, use a virtual serial port like 'loop://'.
    # On Linux, you can create virtual ports using 'socat -d -d pty,raw,echo=0 pty,raw,echo=0'.
    # Replace 'loop://' with your actual port (e.g., '/dev/ttyUSB0' or 'COM1').
    port = os.environ.get('SERIAL_PORT', 'loop://')
    baudrate = int(os.environ.get('SERIAL_BAUDRATE', '115200'))

    print(f"Attempting to open serial port {port} at {baudrate} baud...")
    try:
        # open_serial_connection returns (StreamReader, StreamWriter)
        reader, writer = await serial_asyncio.open_serial_connection(url=port, baudrate=baudrate)
        print("Serial port opened successfully.")
    except Exception as e:
        print(f"Failed to open serial port: {e}")
        return

    try:
        message = b"Hello pySerial-asyncio!\n"
        print(f"Sending: {message.decode().strip()}")
        writer.write(message)
        await writer.drain() # Crucial to ensure data is sent

        print("Waiting for response...")
        response = await reader.readline() # Read until newline
        print(f"Received: {response.decode().strip()}")

    except Exception as e:
        print(f"Error during serial communication: {e}")
    finally:
        print("Closing serial port...")
        writer.close()
        await writer.wait_closed()
        print("Serial port closed.")

if __name__ == '__main__':
    asyncio.run(main())

view raw JSON →