Apache Airflow MySQL Provider
The Apache Airflow MySQL Provider enables seamless interaction with MySQL databases within Airflow DAGs. It offers operators and hooks to execute SQL queries, transfer data, and manage connections. As part of the Airflow ecosystem, it receives regular updates, with the current version being 6.5.1, requiring Python >= 3.10.
Warnings
- breaking The `MySqlOperator` was removed from the provider package. Users should migrate to `SQLExecuteQueryOperator` from `airflow.providers.common.sql.operators.sql` for executing SQL queries. This change was introduced in provider versions 6.3.2 and later.
- breaking Older versions of the MySQL provider (e.g., 2.x and above) require Apache Airflow version 2.1.0+ due to the removal of the `apply_default` decorator. Provider version 3.0.0+ specifically requires Airflow 2.2.0+. Installing an incompatible provider version may lead to automatic Airflow upgrades and require manual database migration.
- gotcha Attempting to import MySQL-related components like `MySqlOperator` or `MySqlHook` without explicitly installing `apache-airflow-providers-mysql` will result in a `ModuleNotFoundError`.
- gotcha Installation of the `mysqlclient` dependency can fail if required system-level development packages (e.g., `libmysqlclient-dev` on Debian/Ubuntu) are not present, leading to build errors.
- gotcha Users have reported `AttributeError: property 'connection' of 'MySqlHook' object has no setter` when `apache-airflow-providers-mysql` has no upper version constraint on `apache-airflow-providers-common-sql`, leading to incompatible versions being installed.
- gotcha Long-running MySQL connections via Airflow can experience 'Server has gone away' errors, typically due to the MySQL server restarting or connection timeouts.
Install
-
pip install apache-airflow-providers-mysql -
sudo apt-get install default-libmysqlclient-dev # For Debian/Ubuntu # Or for older Debian: sudo apt-get install libmysqlclient-dev
Imports
- MySqlHook
from airflow.providers.mysql.hooks.mysql import MySqlHook
- SQLExecuteQueryOperator
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
- MySqlToS3Operator
from airflow.providers.mysql.operators.mysql import MySqlToS3Operator
Quickstart
from __future__ import annotations
import pendulum
from airflow.models.dag import DAG
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
# Ensure you have a MySQL connection named 'mysql_default' configured in Airflow UI
# Host: localhost, Schema: airflow_db, User: airflow, Pass: airflow
with DAG(
dag_id="mysql_quickstart_dag",
start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
catchup=False,
schedule=None,
tags=["mysql", "example"],
) as dag:
create_table = SQLExecuteQueryOperator(
task_id="create_test_table",
conn_id="mysql_default",
sql="""
CREATE TABLE IF NOT EXISTS test_table (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255)
);
""",
)
insert_data = SQLExecuteQueryOperator(
task_id="insert_test_data",
conn_id="mysql_default",
sql="INSERT INTO test_table (name) VALUES ('Airflow User 1'), ('Airflow User 2');",
)
select_data = SQLExecuteQueryOperator(
task_id="select_test_data",
conn_id="mysql_default",
sql="SELECT * FROM test_table;",
)
create_table >> insert_data >> select_data