Azure Kusto Ingest Client
The `azure-kusto-ingest` library provides a client for ingesting data into Azure Data Explorer (Kusto) clusters. It supports queued ingestion (batching for high throughput) and streaming ingestion (low latency). The current version is 6.0.3, with frequent bug fix releases and occasional major versions aligning with Python and Azure SDK ecosystem changes. Version 6.0.3 introduced allowing transformation functions for CSV and SCSV formats.
Warnings
- breaking Starting with version 6.0.0, the minimum supported Python version for `azure-kusto-ingest` (and other Azure SDKs) is Python 3.9. Prior versions supported older Python versions (e.g., v5.x supported Python 3.8, v4.x supported Python 3.7).
- breaking Version 5.0.0 introduced breaking changes to `KustoConnectionStringBuilder` keywords, aligning them with other Azure SDKs. Keywords like `msi_auth`, `msi_authentication`, `msi_params`, and `msi_type` were removed from direct parsing.
- gotcha Ingesting data from Pandas DataFrames (using `ingest_from_dataframe`) has seen multiple bug fixes across versions related to datetime columns, null values, and specific Pandas versions (e.g., Pandas 3.0). Ensure compatibility.
- gotcha Managed Streaming Ingestion can encounter throttling events. While version 6.0.1 included a fix to handle these events more gracefully, users on older versions might experience issues with managed streaming stability under high load.
Install
-
pip install azure-kusto-ingest -
pip install azure-kusto-ingest[pandas]
Imports
- KustoConnectionStringBuilder
from azure.kusto.data import KustoConnectionStringBuilder
- KustoIngestClient
from azure.kusto.ingest import KustoIngestClient
- KustoStreamingIngestClient
from azure.kusto.ingest import KustoStreamingIngestClient
- IngestionProperties
from azure.kusto.ingest import IngestionProperties
- DataFormat
from azure.kusto.data import DataFormat
- BlobDescriptor
from azure.kusto.ingest import BlobDescriptor
Quickstart
import os
import io
from azure.kusto.data import KustoConnectionStringBuilder, DataFormat
from azure.kusto.ingest import KustoIngestClient, IngestionProperties
# Configuration from environment variables (replace with your actual values)
KUSTO_CLUSTER_URL = os.environ.get('KUSTO_CLUSTER_URL', 'https://yourcluster.region.kusto.windows.net')
KUSTO_DATABASE = os.environ.get('KUSTO_DATABASE', 'yourdatabase')
KUSTO_TABLE = os.environ.get('KUSTO_TABLE', 'yourtable')
# Ensure environment variables are set or replace with actual connection string details
# For AAD app authentication:
# KCSB = KustoConnectionStringBuilder.with_aad_application_key(
# KUSTO_CLUSTER_URL, os.environ.get('KUSTO_CLIENT_ID', ''), os.environ.get('KUSTO_CLIENT_SECRET', ''), os.environ.get('KUSTO_TENANT_ID', '')
# )
# For Azure CLI authentication:
KCSB = KustoConnectionStringBuilder.with_az_cli_authentication(KUSTO_CLUSTER_URL)
# Create an Ingest Client
ingest_client = KustoIngestClient(KCSB)
# Define Ingestion Properties
ingestion_properties = IngestionProperties(
database=KUSTO_DATABASE,
table=KUSTO_TABLE,
data_format=DataFormat.CSV,
# For real-time monitoring of ingestion status, set flush_immediately=True
# but be aware of performance implications for large batches.
flush_immediately=False
)
# Sample data as a CSV string
data_rows = [
"id,name,value",
"1,TestItemA,100",
"2,TestItemB,200"
]
data_csv = "\n".join(data_rows)
# Ingest data from an in-memory stream
print(f"Attempting to ingest data into {KUSTO_DATABASE}.{KUSTO_TABLE}...")
with io.StringIO(data_csv) as stream:
result = ingest_client.ingest_from_stream(stream, ingestion_properties)
# For queued ingestion, the result indicates submission, not completion.
# Actual status must be monitored in Azure Data Explorer using .show commands.
print("Ingestion job submitted. Check Azure Data Explorer for status details.")
# print(f"Ingestion result object: {result}") # Uncomment to see the result object