Apache Airflow JDBC Provider
The Apache Airflow JDBC Provider extends Airflow's functionality by enabling interaction with JDBC-compatible databases through specialized hooks and operators. It is part of the larger Apache Airflow ecosystem, with frequent releases that align with new Airflow versions and community contributions. The current version is 5.4.2.
Warnings
- breaking The `JdbcOperator` has been deprecated and subsequently removed from the `apache-airflow-providers-jdbc` package. Users should migrate to `airflow.providers.common.sql.operators.sql.SQLExecuteQueryOperator` for executing SQL commands via JDBC.
- breaking Minimum Apache Airflow version requirements have increased across provider versions. Ensure your Airflow installation meets the minimum version for the installed provider version to avoid issues related to API changes and decorator removals (e.g., `apply_default`).
- gotcha Connecting to JDBC databases requires a properly configured Java environment, including a Java Virtual Machine (JVM), the `JAVA_HOME` environment variable set, and the specific JDBC driver `.jar` file for your database.
- gotcha For security reasons, `allow_driver_class_in_extra` and `allow_driver_path_in_extra` configuration options in `airflow.cfg` (under `[providers.jdbc]`) are `False` by default. If you need to specify `driver_class` or `driver_path` in the Airflow Connection's 'Extra' field, these options must be explicitly set to `True`.
- gotcha The `JdbcOperator` (when it was in use) and `SQLExecuteQueryOperator` primarily execute SQL statements and do not automatically return query results to logs or XComs. To fetch results from a `SELECT` query, you typically need to use the `JdbcHook` directly or provide a `handler` callable to the operator.
Install
-
pip install apache-airflow-providers-jdbc -
pip install apache-airflow[jdbc]
Imports
- JdbcHook
from airflow.providers.jdbc.hooks.jdbc import JdbcHook
- SQLExecuteQueryOperator
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
Quickstart
from __future__ import annotations
import os
from airflow.models.dag import DAG
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.utils.dates import days_ago
with DAG(
dag_id='example_jdbc_sql_query',
start_date=days_ago(1),
schedule_interval=None,
catchup=False,
tags=['jdbc', 'example', 'sql'],
) as dag:
# To run this DAG, ensure you have:
# 1. A JVM installed and JAVA_HOME environment variable set.
# 2. The specific JDBC driver JAR file for your database available (e.g., /path/to/your/driver.jar).
# 3. An Airflow JDBC connection configured with:
# - Conn Id: 'my_jdbc_connection'
# - Conn Type: 'JDBC Connection'
# - Host: 'jdbc:<vendor>://<host>:<port>/<database>' (the full JDBC URL)
# - Login: '<username>'
# - Password: '<password>'
# - Extra: {'driver_path': '/path/to/your/driver.jar', 'driver_class': 'com.vendor.DriverClass'}
# Refer to official documentation for specific database driver_class and driver_path values.
# Example 1: Execute a SELECT query
execute_select_query = SQLExecuteQueryOperator(
task_id='execute_select_query',
conn_id='my_jdbc_connection',
sql="SELECT * FROM example_table WHERE status = 'active';",
handler=lambda cursor: [row for row in cursor], # Example handler to fetch results
)
# Example 2: Execute an INSERT statement
execute_insert_statement = SQLExecuteQueryOperator(
task_id='execute_insert_statement',
conn_id='my_jdbc_connection',
sql="INSERT INTO log_table (event_time, message) VALUES (NOW(), 'Data processed successfully');",
autocommit=True,
)
execute_select_query >> execute_insert_statement