Apache Airflow Presto Provider
The `apache-airflow-providers-presto` package provides Apache Airflow hooks and operators to interact with Presto or Trino. It leverages the `trino` Python client for database connectivity, allowing users to execute SQL queries and manage data in these distributed query engines. The current version is 5.11.2, and providers typically follow a regular release cadence, often aligning with Apache Airflow's major releases or independent bug fix and feature updates.
Common errors
-
ModuleNotFoundError: No module named 'airflow.providers.presto'
cause The `apache-airflow-providers-presto` package has not been installed in your Airflow environment.fixInstall the provider package: `pip install apache-airflow-providers-presto` -
airflow.exceptions.AirflowException: The conn_id `presto_default` isn't defined
cause The Airflow connection with the specified ID (e.g., 'presto_default') has not been configured in your Airflow instance.fixDefine the connection in the Airflow UI (Admin -> Connections -> Create) or by setting the corresponding environment variable (e.g., `AIRFLOW_CONN_PRESTO_DEFAULT='presto://user:password@host:port/catalog/schema'`). -
trino.exceptions.TrinoUserError: Catalog '...' not found
cause The specified catalog or schema in the SQL query or connection parameters does not exist or is inaccessible on the Presto/Trino server.fixVerify the 'catalog' and 'schema' parameters in your connection string or `PrestoOperator` arguments. Ensure the Presto/Trino server is running and configured correctly with the necessary catalogs. -
trino.exceptions.TrinoConnectionError: Connection refused
cause The Airflow worker cannot establish a connection to the specified Presto/Trino server. This could be due to an incorrect host/port, network issues, or the server being down.fixCheck the `host` and `port` in your Airflow connection configuration. Verify network connectivity between the Airflow worker and the Presto/Trino server, and ensure the Presto/Trino server is running.
Warnings
- breaking The Presto provider was moved from `airflow.contrib` to its own provider package `apache-airflow-providers-presto` with Airflow 2.0.
- gotcha The provider package is named 'presto', but it uses the 'trino' Python client library and is fully compatible with Trino (formerly PrestoSQL). While the hook/operator classes retain 'Presto' in their names, connection parameters and client behavior align with Trino.
- gotcha Incorrectly formatted Presto/Trino connection strings are a common source of errors, especially when including catalog, schema, or advanced authentication (e.g., Kerberos, OAuth) parameters.
Install
-
pip install apache-airflow-providers-presto
Imports
- PrestoHook
from airflow.contrib.hooks.presto_hook import PrestoHook
from airflow.providers.presto.hooks.presto import PrestoHook
- PrestoOperator
from airflow.contrib.operators.presto_operator import PrestoOperator
from airflow.providers.presto.operators.presto import PrestoOperator
Quickstart
import os
from datetime import datetime
from airflow.models.dag import DAG
from airflow.providers.presto.operators.presto import PrestoOperator
# For local testing, ensure 'presto_default' connection is set up in Airflow UI,
# or define it via environment variables (e.g., in a .env file or shell):
# export AIRFLOW_CONN_PRESTO_DEFAULT='presto://user:password@localhost:8080/hive/default'
# For Trino specific parameters (e.g., auth, TLS):
# export AIRFLOW_CONN_PRESTO_DEFAULT='trino://user@localhost:8080/?catalog=hive&schema=default&auth=NONE'
with DAG(
dag_id="presto_example_dag",
start_date=datetime(2023, 1, 1),
schedule=None,
catchup=False,
tags=["presto", "trino", "example"],
) as dag:
run_simple_query = PrestoOperator(
task_id="execute_select_one",
presto_conn_id="presto_default", # Ensure this connection ID exists in Airflow
sql="SELECT 1",
)
run_templated_query = PrestoOperator(
task_id="execute_templated_query",
presto_conn_id="presto_default",
sql="SELECT '{{ ds }}' as current_date_string, '{{ macros.uuid.uuid4() }}' as random_uuid",
)