Apache Airflow Hashicorp Provider

4.5.2 · active · verified Thu Apr 16

The Apache Airflow Hashicorp Provider package integrates Apache Airflow with Hashicorp products like Vault and Consul. It offers hooks, operators, and secrets backends for managing secrets and interacting with Hashicorp services. The current version is 4.5.2 and it follows a regular release cadence, often aligning with Apache Airflow's major and minor releases, as well as independent updates for features and bug fixes.

Common errors

Warnings

Install

Imports

Quickstart

This example DAG demonstrates how to use the `VaultOperator` to read a specific key from a secret stored in Hashicorp Vault. It requires a configured 'vault_default' Airflow connection pointing to your Vault instance with appropriate authentication.

import os
from datetime import datetime

from airflow.models.dag import DAG
from airflow.providers.hashicorp.operators.vault import VaultOperator

# Ensure you have a 'vault_default' connection configured in Airflow with appropriate Vault address and authentication details.
# For local testing, you might need a local Vault instance and a token.
# Example: 'vault_default' connection type: 'Hashicorp Vault', Host: 'http://localhost:8200', Login: 'token', Password: 'your_vault_token'

with DAG(
    dag_id='example_vault_read_secret',
    start_date=datetime(2023, 1, 1),
    schedule=None,
    catchup=False,
    tags=['vault', 'secrets'],
) as dag:
    read_secret = VaultOperator(
        task_id='read_my_secret',
        vault_conn_id='vault_default', # Ensure this connection is configured
        secret_path='secret/data/my-app/db-creds', # Example path, replace with your actual secret path
        key='username', # The specific key within the secret to retrieve
        result_key='retrieved_db_username', # XCom key to store the result
        # Optional: You can specify an output_format, e.g., 'json' or 'plain'
    )

    # The retrieved value will be pushed to XCom under 'retrieved_db_username'
    # You can access it in subsequent tasks like this:
    # from airflow.decorators import task
    # @task
    # def use_secret_value(**kwargs):
    #     secret_value = kwargs['ti'].xcom_pull(task_ids='read_my_secret', key='retrieved_db_username')
    #     print(f"Retrieved DB Username: {secret_value}")
    #
    # use_secret_value()

view raw JSON →