Apache Airflow JDBC Provider

5.4.2 · active · verified Sat Apr 11

The Apache Airflow JDBC Provider extends Airflow's functionality by enabling interaction with JDBC-compatible databases through specialized hooks and operators. It is part of the larger Apache Airflow ecosystem, with frequent releases that align with new Airflow versions and community contributions. The current version is 5.4.2.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to use the `SQLExecuteQueryOperator` to interact with a JDBC-compatible database. It requires a pre-configured Airflow connection, a JVM, the `JAVA_HOME` environment variable, and the specific JDBC driver JAR file for your database. The `SQLExecuteQueryOperator` can execute single or multiple SQL queries. For fetching results, a `handler` callable can be provided. Make sure to replace placeholder values for the JDBC connection details and SQL queries with your actual data.

from __future__ import annotations

import os

from airflow.models.dag import DAG
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.utils.dates import days_ago

with DAG(
    dag_id='example_jdbc_sql_query',
    start_date=days_ago(1),
    schedule_interval=None,
    catchup=False,
    tags=['jdbc', 'example', 'sql'],
) as dag:
    # To run this DAG, ensure you have:
    # 1. A JVM installed and JAVA_HOME environment variable set.
    # 2. The specific JDBC driver JAR file for your database available (e.g., /path/to/your/driver.jar).
    # 3. An Airflow JDBC connection configured with:
    #    - Conn Id: 'my_jdbc_connection'
    #    - Conn Type: 'JDBC Connection'
    #    - Host: 'jdbc:<vendor>://<host>:<port>/<database>' (the full JDBC URL)
    #    - Login: '<username>'
    #    - Password: '<password>'
    #    - Extra: {'driver_path': '/path/to/your/driver.jar', 'driver_class': 'com.vendor.DriverClass'}
    #    Refer to official documentation for specific database driver_class and driver_path values.

    # Example 1: Execute a SELECT query
    execute_select_query = SQLExecuteQueryOperator(
        task_id='execute_select_query',
        conn_id='my_jdbc_connection',
        sql="SELECT * FROM example_table WHERE status = 'active';",
        handler=lambda cursor: [row for row in cursor], # Example handler to fetch results
    )

    # Example 2: Execute an INSERT statement
    execute_insert_statement = SQLExecuteQueryOperator(
        task_id='execute_insert_statement',
        conn_id='my_jdbc_connection',
        sql="INSERT INTO log_table (event_time, message) VALUES (NOW(), 'Data processed successfully');",
        autocommit=True,
    )

    execute_select_query >> execute_insert_statement

view raw JSON →