Prefect Azure Integration

0.4.9 · active · verified Wed Apr 15

prefect-azure provides a collection of Prefect integrations for orchestrating workflows with Microsoft Azure services. It enables interaction with Azure Blob Storage, Azure Key Vault, Azure Container Instances (ACI), and more. The library is actively maintained, with releases typically aligning with the Prefect core development cycle, and the current version is 0.4.9.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to download a file from Azure Blob Storage using the `prefect-azure` integration. It highlights the use of `AzureBlobStorageCredentials` and the `blob_storage_download` task. For actual usage, `AZURE_STORAGE_CONNECTION_STRING` should be set as an environment variable or credentials should be managed via Prefect Blocks and Azure's identity features. It also emphasizes the importance of registering Prefect Blocks for discovery.

import os
from prefect import flow
from prefect_azure import AzureBlobStorageCredentials
from prefect_azure.blob_storage import blob_storage_download, AzureBlobStorageContainer

# Before running the flow, ensure the block is registered and saved (e.g., via CLI or a separate script):
# prefect block register -m prefect_azure
# From Python:
# blob_creds = AzureBlobStorageCredentials(connection_string=os.environ.get('AZURE_STORAGE_CONNECTION_STRING', ''))
# blob_creds.save(name="my-blob-creds")
# blob_container_block = AzureBlobStorageContainer(
#    container_name="my-container", 
#    credentials=blob_creds
# )
# blob_container_block.save(name="my-blob-container")

@flow
def example_blob_storage_download_flow():
    # Load credentials block (assuming it was saved with 'my-blob-creds')
    # In a real scenario, connection_string should be fetched securely (e.g., from Azure Key Vault or Prefect Secret Block)
    # or derived from environment variables/managed identity via DefaultAzureCredential.
    connection_string = os.environ.get('AZURE_STORAGE_CONNECTION_STRING', '')
    
    if not connection_string:
        print("Warning: AZURE_STORAGE_CONNECTION_STRING environment variable not set. Using dummy string.")
        # Fallback for example, in real world this would likely fail or use DefaultAzureCredential
        blob_storage_credentials = AzureBlobStorageCredentials()
    else:
        blob_storage_credentials = AzureBlobStorageCredentials(connection_string=connection_string)
    
    # If the AzureBlobStorageContainer block was saved via the UI or another script:
    # my_blob_container_block = AzureBlobStorageContainer.load("my-blob-container")
    # Instead, we create an ad-hoc one for this example's simplicity
    my_blob_container_block = AzureBlobStorageContainer(
        container_name="prefect", # Replace with your container name
        credentials=blob_storage_credentials
    )

    print(f"Attempting to download 'prefect.txt' from container 'prefect'...")
    data = blob_storage_download(
        blob="prefect.txt", # Replace with your blob name
        container="prefect", # Or use my_blob_container_block directly if it defines the container
        blob_storage_credentials=blob_storage_credentials,
    )
    print(f"Downloaded data (first 100 chars): {data[:100].decode()}")
    return data

if __name__ == "__main__":
    # Set a dummy connection string for local testing if not already set
    # For actual Azure access, replace with a real connection string or use Azure identity management
    # os.environ['AZURE_STORAGE_CONNECTION_STRING'] = 'DefaultEndpointsProtocol=https;AccountName=youraccount;AccountKey=yourkey;EndpointSuffix=core.windows.net'
    example_blob_storage_download_flow()

view raw JSON →