pySerial-asyncio
pySerial-asyncio is an asynchronous I/O extension for the Python Serial Port package (pySerial). It provides support for working with serial ports through asyncio Transports, Protocols, and Streams. Compatible with Python 3.5 and later, it depends on pySerial. The current version is 0.6. The project appears to be in maintenance mode, with no new releases since 2021, and a fork `pyserial-asyncio-fast` addressing some critical issues.
Warnings
- breaking The `pyserial-asyncio` library is known to block the asyncio event loop due to blocking sleep calls, especially on Windows, which can lead to performance issues and unresponsiveness in asynchronous applications. This library is also not actively maintained, with the last release being in 2021.
- gotcha When using `StreamWriter` objects obtained from `open_serial_connection`, it is critical to call `await writer.drain()` after `writer.write()` to ensure that all buffered data is flushed and sent over the serial port. Forgetting to call `drain()` can result in data not being transmitted, especially in tight loops or when the buffer fills up.
- gotcha On Windows, `pyserial-asyncio` uses a polling-based implementation for serial I/O, which may be less performant and slower compared to the native event-driven implementations on POSIX systems (Linux, macOS, BSD).
- gotcha When working with `asyncio`, creating `asyncio.Lock` objects (or similar synchronization primitives) outside of an actively running event loop can lead to subtle bugs and exceptions when the lock is later acquired or released in a different event loop (e.g., one started by `asyncio.run()`).
Install
-
pip install pyserial-asyncio
Imports
- serial_asyncio
import serial_asyncio
- open_serial_connection
from serial_asyncio import open_serial_connection
- create_serial_connection
from serial_asyncio import create_serial_connection
Quickstart
import asyncio
import os
import serial_asyncio
async def main():
# For testing, use a virtual serial port like 'loop://'.
# On Linux, you can create virtual ports using 'socat -d -d pty,raw,echo=0 pty,raw,echo=0'.
# Replace 'loop://' with your actual port (e.g., '/dev/ttyUSB0' or 'COM1').
port = os.environ.get('SERIAL_PORT', 'loop://')
baudrate = int(os.environ.get('SERIAL_BAUDRATE', '115200'))
print(f"Attempting to open serial port {port} at {baudrate} baud...")
try:
# open_serial_connection returns (StreamReader, StreamWriter)
reader, writer = await serial_asyncio.open_serial_connection(url=port, baudrate=baudrate)
print("Serial port opened successfully.")
except Exception as e:
print(f"Failed to open serial port: {e}")
return
try:
message = b"Hello pySerial-asyncio!\n"
print(f"Sending: {message.decode().strip()}")
writer.write(message)
await writer.drain() # Crucial to ensure data is sent
print("Waiting for response...")
response = await reader.readline() # Read until newline
print(f"Received: {response.decode().strip()}")
except Exception as e:
print(f"Error during serial communication: {e}")
finally:
print("Closing serial port...")
writer.close()
await writer.wait_closed()
print("Serial port closed.")
if __name__ == '__main__':
asyncio.run(main())