Python ZSTD Bindings

1.5.7.3 · active · verified Thu Apr 09

The 'zstd' library provides fast Python bindings to Yann Collet's Zstandard (zstd) lossless compression algorithm. It offers a compelling balance of speed and compression ratio, making it a popular choice for real-time compression and large-scale data processing. The library is actively maintained with frequent releases, currently at version 1.5.7.3, and focuses on performance optimizations and bug fixes.

Warnings

Install

Imports

Quickstart

Demonstrates basic one-shot compression and decompression of bytes data using `zstd.compress()` and `zstd.decompress()`. It also includes an example of streaming compression/decompression for larger datasets, highlighting the use of `ZstdCompressor` and `ZstdDecompressor` and the importance of flushing.

import zstd

original_data = b"This is some data that will be compressed using Zstandard. It\'s a fairly long string to demonstrate compression efficiency." * 100

# Compress data
# level can range from -100 (ultra-fast) to 22 (slowest, best compression)
# threads can be 0 (auto-tune) or a specific number
compressed_data = zstd.compress(original_data, level=3, threads=0)

print(f"Original size: {len(original_data)} bytes")
print(f"Compressed size: {len(compressed_data)} bytes")
print(f"Compression ratio: {len(compressed_data) / len(original_data):.2f}")

# Decompress data
decompressed_data = zstd.decompress(compressed_data)

assert original_data == decompressed_data
print("Decompression successful! Data matches original.")

# Example of streaming compression for large data
cctx = zstd.ZstdCompressor(level=1)
dctx = zstd.ZstdDecompressor()

chunk_size = len(original_data) // 5
compressed_chunks = []

# Stream compress
for i in range(0, len(original_data), chunk_size):
    chunk = original_data[i:i + chunk_size]
    compressed_chunk = cctx.compress(chunk)
    compressed_chunks.append(compressed_chunk)
# Important: flush the compressor to finalize the frame
compressed_chunks.append(cctx.flush())

streaming_compressed_data = b''.join(compressed_chunks)

# Stream decompress
streaming_decompressed_data = dctx.decompress(streaming_compressed_data)

assert original_data == streaming_decompressed_data
print("Streaming decompression successful! Data matches original.")

view raw JSON →