Dagster Azure
dagster-azure provides a collection of Azure-specific components for the Dagster data orchestration framework, including resources for Blob Storage, Data Lake Gen2, and compute options. The current version is 0.29.0, which aligns with Dagster core 1.13.0. Dagster and its libraries typically follow a monthly release cadence for minor versions.
Warnings
- gotcha The `dagster-azure` library's version (`0.x.x`) is aligned with the core `dagster` version (`1.x.x`). For example, `dagster-azure==0.29.0` is compatible with `dagster==1.13.0`. Always ensure that your `dagster` and `dagster-azure` versions are compatible, typically by installing them together (e.g., `pip install dagster==1.13.0 dagster-azure==0.29.0`).
- gotcha Azure authentication relies on `azure-identity`'s `DefaultAzureCredential`. This credential provider attempts various authentication methods (environment variables, managed identity, Azure CLI, etc.). Incorrectly configured authentication is a common source of errors.
- breaking Major version updates to core `dagster` (e.g., `1.x.x` to `2.x.x` when it eventually happens) or significant changes within `1.x.x` can introduce breaking changes to resource definitions, I/O manager interfaces, or configuration schema that `dagster-azure` components rely on. This might require updates to your resource and I/O manager definitions.
Install
-
pip install dagster-azure
Imports
- azure_blob_storage_resource
from dagster_azure.blob.resource import azure_blob_storage_resource
- blob_storage_io_manager
from dagster_azure.blob.io_manager import blob_storage_io_manager
- adls2_file_cache
from dagster_azure.adls2 import adls2_file_cache
- AzureDataLakeGen2Resource
from dagster_azure.adls2.resources import AzureDataLakeGen2Resource
Quickstart
import os
from dagster import Definitions, asset, JobDefinition
from dagster_azure.blob.io_manager import blob_storage_io_manager
# Set these environment variables or replace with actual values
# For authentication, 'DefaultAzureCredential' (used by dagster-azure) looks for:
# AZURE_TENANT_ID, AZURE_CLIENT_ID, AZURE_CLIENT_SECRET or AZURE_FEDERATED_TOKEN_FILE
# or uses Managed Identity.
AZURE_STORAGE_ACCOUNT_NAME = os.environ.get(
"AZURE_STORAGE_ACCOUNT_NAME", "your_storage_account_name"
)
AZURE_BLOB_CONTAINER_NAME = os.environ.get(
"AZURE_BLOB_CONTAINER_NAME", "your-dagster-container"
)
@asset
def hello_blob_asset():
"""An asset that writes a simple string to Azure Blob Storage."""
return "Hello, Dagster Azure Blob Storage!"
# Create a job that materializes the asset
hello_blob_job = JobDefinition(name="hello_blob_job", assets=[hello_blob_asset])
defs = Definitions(
assets=[hello_blob_asset],
jobs=[hello_blob_job],
resources={
"io_manager": blob_storage_io_manager.configured({
"storage_account_name": AZURE_STORAGE_ACCOUNT_NAME,
"container": AZURE_BLOB_CONTAINER_NAME,
"prefix": "dagster_output/" # Optional: objects will be stored under this prefix
})
}
)