PyIceberg
PyIceberg is the official Python client for Apache Iceberg, an open table format designed for huge analytic datasets. It provides a pure Pythonic experience, enabling DML operations and queries on Iceberg tables without a JVM, and integrates seamlessly with popular Python data tools like Polars, Pandas, and DuckDB. Currently at version 0.11.1, the library maintains a regular release cadence with minor feature releases and necessary patch updates.
Warnings
- breaking The behavior of `Table.name` changed in version 0.8.1 to return the table name *without* the catalog name. Previously, it might have included the catalog name, which was inconsistent with a broader effort to decouple catalog references.
- breaking Several AWS-related catalog properties (`profile_name`, `region_name`, `aws_access_key_id`, `aws_secret_access_key`, `aws_session_token`) were deprecated and subsequently removed in version 0.8.0. Unified AWS Credentials should be used instead.
- breaking Version 0.11.0 included the removal of several previously deprecated features and APIs. While specific removals were not detailed, users upgrading from older minor versions should review release notes for a comprehensive list of removed APIs.
- gotcha PyIceberg is designed for programmatic interaction with Iceberg table metadata and data, but it is not intended for heavy compute or large-scale ETL operations. For such tasks, it's recommended to integrate with dedicated query engines like Spark or Flink.
- gotcha While PyIceberg can read data in various formats (Parquet, ORC, Avro), it generally relies on existing Iceberg writers (often from other engines) to create new data files. Direct writing of new files through PyIceberg is not its primary focus.
- gotcha Deletion operations in PyIceberg primarily use a Copy-on-Write (CoW) strategy by default, rewriting data files. Merge-on-Read (MoR) deletions are more nuanced and work is ongoing to enhance their efficiency, especially for frequent, small updates.
Install
-
pip install pyiceberg -
pip install "pyiceberg[s3fs,pyarrow]"
Imports
- load_catalog
from pyiceberg.catalog import load_catalog
- Schema
from pyiceberg.schema import Schema
- Table
from pyiceberg.table import Table
Quickstart
import os
from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema, NestedField, PrimitiveType
import pyarrow as pa
import datetime
# Configure a local SQL catalog using environment variables or direct properties
# For simplicity, we'll use a temporary local directory.
warehouse_path = '/tmp/pyiceberg_warehouse'
os.makedirs(warehouse_path, exist_ok=True)
catalog = load_catalog(
"default",
type="sql",
uri=f"sqlite:///{warehouse_path}/pyiceberg_catalog.db",
warehouse=f"file://{warehouse_path}"
)
print(f"✓ Successfully loaded catalog: {catalog.name}")
# Define a simple schema
schema = Schema(
NestedField(1, "id", PrimitiveType.long(), required=True),
NestedField(2, "name", PrimitiveType.string(), required=False),
NestedField(3, "event_time", PrimitiveType.timestamp_ntz(), required=False)
)
namespace = "default"
table_name = "my_sample_table"
# Create a namespace if it doesn't exist
catalog.create_namespace_if_not_exists(namespace)
print(f"✓ Ensured namespace '{namespace}' exists.")
# Create a table
try:
table = catalog.create_table(f"{namespace}.{table_name}", schema)
print(f"✓ Successfully created table: {table.identifier}")
except Exception as e:
print(f"Table {namespace}.{table_name} might already exist. Loading it instead. Error: {e}")
table = catalog.load_table(f"{namespace}.{table_name}")
# Prepare some data using PyArrow
data = pa.table({
"id": [1, 2, 3],
"name": ["Alice", "Bob", "Charlie"],
"event_time": [datetime.datetime.now() - datetime.timedelta(days=i) for i in range(3)]
})
# Append data to the table
table.append(data)
print(f"✓ Appended {len(data)} rows to the table.")
# Read data from the table
scan_result = table.scan().to_arrow()
print(f"\nTotal rows read: {len(scan_result)}")
print("Sample data:")
print(scan_result.to_pandas())
# Clean up (optional: uncomment to drop the table and namespace)
# catalog.drop_table(f"{namespace}.{table_name}")
# catalog.drop_namespace(namespace)
# print(f"Cleaned up table {table_name} and namespace {namespace}.")