Prefect Azure Integration
prefect-azure provides a collection of Prefect integrations for orchestrating workflows with Microsoft Azure services. It enables interaction with Azure Blob Storage, Azure Key Vault, Azure Container Instances (ACI), and more. The library is actively maintained, with releases typically aligning with the Prefect core development cycle, and the current version is 0.4.9.
Warnings
- breaking Prefect 2.x (Orion) introduced significant architectural changes, including a new API, flow/task definition patterns using decorators (`@flow`, `@task`), and the 'Block' system for external integrations and credentials. Users migrating from Prefect 1.x will need to refactor flows and task definitions, and adopt the Block pattern for Azure resource configuration.
- gotcha After installing `prefect-azure`, its Block types (e.g., `AzureBlobStorageContainer`, `AzureKeyVaultSecret`) must be registered with the Prefect server for them to be discoverable in the UI or by workers. Failing to do so will prevent their use or display.
- gotcha For secure and robust authentication with Azure services, avoid hardcoding connection strings or secrets. `prefect-azure` components often leverage `azure-identity`'s `DefaultAzureCredential`, which can authenticate via environment variables, managed identities, Azure CLI, etc. Proper Azure RBAC permissions are crucial.
- gotcha When using `AzureBlobStorageContainer` (or similar `prefect-azure` Blocks) as `result_storage` for Prefect flows or tasks, the specific instance of the Block must be saved to the Prefect server (e.g., `block_instance.save("my-block-name")`) before it can be referenced by name in `@flow` or `@task` decorators.
Install
-
pip install prefect-azure -
pip install "prefect-azure[blob_storage]" -
pip install "prefect-azure[cosmos_db]" -
pip install "prefect-azure[ml_datastore]" -
pip install "prefect-azure[all_extras]"
Imports
- AzureBlobStorageContainer
from prefect_azure.blob_storage import AzureBlobStorageContainer
- AzureBlobStorageCredentials
from prefect_azure import AzureBlobStorageCredentials
- blob_storage_download
from prefect_azure.blob_storage import blob_storage_download
- AzureContainerInstanceJob
from prefect_azure.container_instance import AzureContainerInstanceJob
Quickstart
import os
from prefect import flow
from prefect_azure import AzureBlobStorageCredentials
from prefect_azure.blob_storage import blob_storage_download, AzureBlobStorageContainer
# Before running the flow, ensure the block is registered and saved (e.g., via CLI or a separate script):
# prefect block register -m prefect_azure
# From Python:
# blob_creds = AzureBlobStorageCredentials(connection_string=os.environ.get('AZURE_STORAGE_CONNECTION_STRING', ''))
# blob_creds.save(name="my-blob-creds")
# blob_container_block = AzureBlobStorageContainer(
# container_name="my-container",
# credentials=blob_creds
# )
# blob_container_block.save(name="my-blob-container")
@flow
def example_blob_storage_download_flow():
# Load credentials block (assuming it was saved with 'my-blob-creds')
# In a real scenario, connection_string should be fetched securely (e.g., from Azure Key Vault or Prefect Secret Block)
# or derived from environment variables/managed identity via DefaultAzureCredential.
connection_string = os.environ.get('AZURE_STORAGE_CONNECTION_STRING', '')
if not connection_string:
print("Warning: AZURE_STORAGE_CONNECTION_STRING environment variable not set. Using dummy string.")
# Fallback for example, in real world this would likely fail or use DefaultAzureCredential
blob_storage_credentials = AzureBlobStorageCredentials()
else:
blob_storage_credentials = AzureBlobStorageCredentials(connection_string=connection_string)
# If the AzureBlobStorageContainer block was saved via the UI or another script:
# my_blob_container_block = AzureBlobStorageContainer.load("my-blob-container")
# Instead, we create an ad-hoc one for this example's simplicity
my_blob_container_block = AzureBlobStorageContainer(
container_name="prefect", # Replace with your container name
credentials=blob_storage_credentials
)
print(f"Attempting to download 'prefect.txt' from container 'prefect'...")
data = blob_storage_download(
blob="prefect.txt", # Replace with your blob name
container="prefect", # Or use my_blob_container_block directly if it defines the container
blob_storage_credentials=blob_storage_credentials,
)
print(f"Downloaded data (first 100 chars): {data[:100].decode()}")
return data
if __name__ == "__main__":
# Set a dummy connection string for local testing if not already set
# For actual Azure access, replace with a real connection string or use Azure identity management
# os.environ['AZURE_STORAGE_CONNECTION_STRING'] = 'DefaultEndpointsProtocol=https;AccountName=youraccount;AccountKey=yourkey;EndpointSuffix=core.windows.net'
example_blob_storage_download_flow()