Apache Airflow Vertica Provider

4.3.2 · active · verified Thu Apr 16

The `apache-airflow-providers-vertica` package is an Apache Airflow provider that enables seamless integration with Vertica databases. It offers hooks and operators to connect to Vertica and execute SQL queries within Airflow DAGs. The current version is 4.3.2 and it follows an independent semantic versioning scheme, with frequent updates that may introduce new features, bug fixes, or compatibility changes with Airflow core.

Common errors

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to define a simple Airflow DAG to interact with Vertica. It creates a table, inserts data, and then selects it, printing the results to the task logs. An Airflow connection with `Conn Id` set to `vertica_default` must be configured in the Airflow UI, providing the necessary host, port, schema, login, and password for your Vertica instance.

import pendulum

from airflow.models.dag import DAG
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator

# Ensure a Vertica connection named 'vertica_default' is configured in Airflow UI
# with host, port, schema, login, and password.

with DAG(
    dag_id="vertica_quickstart_dag",
    start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
    catchup=False,
    schedule=None,
    tags=["vertica", "example"],
) as dag:
    create_table_task = SQLExecuteQueryOperator(
        task_id="create_vertica_table",
        conn_id="vertica_default",
        sql="""CREATE TABLE IF NOT EXISTS public.test_airflow (id INT, name VARCHAR(50));""",
    )

    insert_data_task = SQLExecuteQueryOperator(
        task_id="insert_vertica_data",
        conn_id="vertica_default",
        sql="""INSERT INTO public.test_airflow (id, name) VALUES (1, 'Airflow Test');""",
    )

    select_data_task = SQLExecuteQueryOperator(
        task_id="select_vertica_data",
        conn_id="vertica_default",
        sql="""SELECT * FROM public.test_airflow;""",
        handler=lambda cursor: print(cursor.fetchall()), # Print results to task logs
    )

    create_table_task >> insert_data_task >> select_data_task

view raw JSON →