Azure Kusto Ingest Client

6.0.3 · active · verified Thu Apr 09

The `azure-kusto-ingest` library provides a client for ingesting data into Azure Data Explorer (Kusto) clusters. It supports queued ingestion (batching for high throughput) and streaming ingestion (low latency). The current version is 6.0.3, with frequent bug fix releases and occasional major versions aligning with Python and Azure SDK ecosystem changes. Version 6.0.3 introduced allowing transformation functions for CSV and SCSV formats.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to perform queued ingestion of in-memory CSV data into an Azure Data Explorer table. It uses Azure CLI authentication for simplicity but can be adapted for other authentication methods. The example sets up the Kusto client, defines ingestion properties, and sends a small CSV string as a stream. Remember to set `KUSTO_CLUSTER_URL`, `KUSTO_DATABASE`, and `KUSTO_TABLE` environment variables.

import os
import io
from azure.kusto.data import KustoConnectionStringBuilder, DataFormat
from azure.kusto.ingest import KustoIngestClient, IngestionProperties

# Configuration from environment variables (replace with your actual values)
KUSTO_CLUSTER_URL = os.environ.get('KUSTO_CLUSTER_URL', 'https://yourcluster.region.kusto.windows.net')
KUSTO_DATABASE = os.environ.get('KUSTO_DATABASE', 'yourdatabase')
KUSTO_TABLE = os.environ.get('KUSTO_TABLE', 'yourtable')

# Ensure environment variables are set or replace with actual connection string details
# For AAD app authentication:
# KCSB = KustoConnectionStringBuilder.with_aad_application_key(
#     KUSTO_CLUSTER_URL, os.environ.get('KUSTO_CLIENT_ID', ''), os.environ.get('KUSTO_CLIENT_SECRET', ''), os.environ.get('KUSTO_TENANT_ID', '')
# )
# For Azure CLI authentication:
KCSB = KustoConnectionStringBuilder.with_az_cli_authentication(KUSTO_CLUSTER_URL)

# Create an Ingest Client
ingest_client = KustoIngestClient(KCSB)

# Define Ingestion Properties
ingestion_properties = IngestionProperties(
    database=KUSTO_DATABASE,
    table=KUSTO_TABLE,
    data_format=DataFormat.CSV,
    # For real-time monitoring of ingestion status, set flush_immediately=True
    # but be aware of performance implications for large batches.
    flush_immediately=False
)

# Sample data as a CSV string
data_rows = [
    "id,name,value",
    "1,TestItemA,100",
    "2,TestItemB,200"
]
data_csv = "\n".join(data_rows)

# Ingest data from an in-memory stream
print(f"Attempting to ingest data into {KUSTO_DATABASE}.{KUSTO_TABLE}...")
with io.StringIO(data_csv) as stream:
    result = ingest_client.ingest_from_stream(stream, ingestion_properties)

# For queued ingestion, the result indicates submission, not completion.
# Actual status must be monitored in Azure Data Explorer using .show commands.
print("Ingestion job submitted. Check Azure Data Explorer for status details.")
# print(f"Ingestion result object: {result}") # Uncomment to see the result object

view raw JSON →