XMODEM protocol implementation
The `xmodem` library provides an implementation of the XMODEM, YMODEM, and ZMODEM protocols for Python. It focuses on a minimalistic implementation while adhering to protocol specifications. All modem classes require `getc` and `putc` callback functions to handle character data for communication. The current version is 0.5.0, with releases occurring periodically to address bugs and introduce enhancements.
Warnings
- breaking Version 0.4.0 introduced a critical bug that caused `recv()` to raise an `AssertionError` due to a bogus `assert False` statement. This was fixed in version 0.4.5.
- gotcha The `retry` parameter in `send()` and `recv()` methods was incorrectly implemented in versions prior to 0.4.4. For `send()`, it was treated as total failures instead of failures per block, and `retry=n` would only retry `n-1` times instead of `n` times. This could lead to premature transfer failures, especially for large files or when `retry=1` was used.
- gotcha Prior to version 0.4.7, `recv()` could stall under certain error conditions or when receiving empty files. This could cause programs to hang indefinitely.
- gotcha In versions prior to 0.4.3, the `putc()` callback was invoked multiple times for each part of an XMODEM block's header, data, and checksum. This behavior could cause issues when integrating with microcontrollers or hardware sensitive to timing at stream boundaries.
- gotcha In version 0.5.0, a bug was fixed where `retry_limit` was not correctly triggered during the data transfer phase because errors were not properly accumulated. This means retry mechanisms might not have functioned as expected in versions where this issue was present.
Install
-
pip install xmodem
Imports
- XMODEM
from xmodem import XMODEM
Quickstart
import serial
from xmodem import XMODEM
# Configure your serial port here
# For example, using a dummy serial port for demonstration
# In a real application, replace with an actual serial port like '/dev/ttyUSB0'
class DummySerial:
def __init__(self, timeout=0):
self.buffer = b''
def read(self, size):
if not self.buffer:
return b''
data = self.buffer[:size]
self.buffer = self.buffer[size:]
return data
def write(self, data):
# In a real scenario, this would send data over serial
# For this dummy example, we just 'receive' it instantly
# Or, you could print it to simulate output.
# print(f"DummySerial sent: {data!r}")
self.buffer += data # Simulate loopback or immediate reception
return len(data)
# Replace DummySerial() with serial.Serial('/dev/ttyUSB0', timeout=0) for actual use
ser = DummySerial(timeout=0)
def getc(size, timeout=1):
return ser.read(size) or None
def putc(data, timeout=1):
return ser.write(data)
modem = XMODEM(getc, putc)
# --- Example: Sending a file ---
print("Attempting to send data...")
# Create a dummy stream for demonstration
import io
stream_to_send = io.BytesIO(b"Hello, XMODEM world! This is a test file.\n")
# In a real scenario, this would be `open('/path/to/file', 'rb')`
# status = modem.send(stream_to_send)
# print(f"Send status: {status}")
# Due to the complexity of XMODEM handshakes in a simple script
# without a cooperating receiver, the send/recv calls are commented out.
# A successful transfer requires a matching XMODEM receiver on the other end.
print("To send a file: modem.send(file_stream_object)")
print("To receive a file: modem.recv(file_stream_object)")
print("Note: A full XMODEM transfer requires a corresponding receiver/sender.")
# --- Example: Receiving a file ---
# stream_to_receive = io.BytesIO()
# received_bytes = modem.recv(stream_to_receive)
# if received_bytes is not None:
# print(f"Received {received_bytes} bytes: {stream_to_receive.getvalue()!r}")
# else:
# print("Failed to receive data.")