GZIP Stream Compression
gzip-stream is a lightweight Python library (version 1.2.0) for compressing and decompressing data on the fly using the GZIP format. It provides both synchronous and asynchronous stream interfaces, allowing efficient handling of large data without loading it all into memory. Releases are infrequent, often adding new features or compatibility updates rather than breaking changes.
Common errors
-
TypeError: a bytes-like object is required, not 'str'
cause Attempting to pass string data directly to `GZIPCompressedStream` or `GZIPDecompressedStream`, which expect `bytes`.fixEncode your string data to bytes before passing it to the stream. Example: `my_string.encode('utf-8')`. -
TypeError: 'AsyncGZIPCompressedStream' object is not async iterable
cause Trying to iterate an asynchronous stream object (`AsyncGZIPCompressedStream` or `AsyncGZIPDecompressedStream`) using a synchronous `for` loop.fixEnsure you are using `async for chunk in stream:` within an `async def` function, and run the asynchronous code using `asyncio.run()`.
Warnings
- gotcha The library exclusively handles byte streams. Providing strings directly to `GZIPCompressedStream` or expecting strings from `GZIPDecompressedStream` without explicit encoding/decoding will lead to `TypeError` or `UnicodeDecodeError`.
- gotcha Synchronous and asynchronous stream classes (e.g., `GZIPCompressedStream` vs `AsyncGZIPCompressedStream`) are distinct and not interchangeable. Attempting to use synchronous iteration (`for`) on an async stream will fail, and vice-versa.
- gotcha When compressing, it's crucial to fully iterate through the `GZIPCompressedStream` (or `AsyncGZIPCompressedStream`) to ensure the GZIP footer, which contains checksums and length information, is properly written. If you stop iterating prematurely, the resulting compressed data may be truncated or corrupted.
Install
-
pip install gzip-stream
Imports
- GZIPCompressedStream
from gzip_stream import GZIPCompressedStream
- GZIPDecompressedStream
from gzip_stream import GZIPDecompressedStream
- AsyncGZIPCompressedStream
from gzip_stream import AsyncGZIPCompressedStream
- AsyncGZIPDecompressedStream
from gzip_stream import AsyncGZIPDecompressedStream
Quickstart
import gzip_stream
import io
# Original data (must be bytes)
original_data = b"This is some data that will be compressed and then decompressed using gzip-stream." * 5 # Make it longer
# --- Compression Example (Synchronous) ---
# Create an iterable of bytes. In real apps, this could be reading from a file or network.
def generate_bytes_for_compression(data_bytes):
chunk_size = 15 # Arbitrary chunk size
for i in range(0, len(data_bytes), chunk_size):
yield data_bytes[i : i + chunk_size]
# Compress the data by passing the iterable to GZIPCompressedStream
compressed_chunks = list(gzip_stream.GZIPCompressedStream(generate_bytes_for_compression(original_data)))
compressed_data = b"".join(compressed_chunks)
print(f"Original size: {len(original_data)} bytes")
print(f"Compressed size: {len(compressed_data)} bytes")
# --- Decompression Example (Synchronous) ---
# Create an iterable of compressed bytes.
def generate_bytes_for_decompression(compressed_data_bytes):
chunk_size = 20 # Arbitrary chunk size for decompression
for i in range(0, len(compressed_data_bytes), chunk_size):
yield compressed_data_bytes[i : i + chunk_size]
# Decompress the data by passing the iterable to GZIPDecompressedStream
decompressed_chunks = list(gzip_stream.GZIPDecompressedStream(generate_bytes_for_decompression(compressed_data)))
decompressed_data = b"".join(decompressed_chunks)
print(f"Decompressed data (first 50 bytes): {decompressed_data[:50].decode()}")
# Verify data integrity
assert original_data == decompressed_data
print("\nSuccess: Original and decompressed data match!")