Apache Airflow Presto Provider
raw JSON → 5.11.2 verified Thu Apr 16 auth: no python
The `apache-airflow-providers-presto` package provides Apache Airflow hooks and operators to interact with Presto or Trino. It leverages the `trino` Python client for database connectivity, allowing users to execute SQL queries and manage data in these distributed query engines. The current version is 5.11.2, and providers typically follow a regular release cadence, often aligning with Apache Airflow's major releases or independent bug fix and feature updates.
pip install apache-airflow-providers-presto Common errors
error ModuleNotFoundError: No module named 'airflow.providers.presto' ↓
cause The `apache-airflow-providers-presto` package has not been installed in your Airflow environment.
fix
Install the provider package:
pip install apache-airflow-providers-presto error airflow.exceptions.AirflowException: The conn_id `presto_default` isn't defined ↓
cause The Airflow connection with the specified ID (e.g., 'presto_default') has not been configured in your Airflow instance.
fix
Define the connection in the Airflow UI (Admin -> Connections -> Create) or by setting the corresponding environment variable (e.g.,
AIRFLOW_CONN_PRESTO_DEFAULT='presto://user:password@host:port/catalog/schema'). error trino.exceptions.TrinoUserError: Catalog '...' not found ↓
cause The specified catalog or schema in the SQL query or connection parameters does not exist or is inaccessible on the Presto/Trino server.
fix
Verify the 'catalog' and 'schema' parameters in your connection string or
PrestoOperator arguments. Ensure the Presto/Trino server is running and configured correctly with the necessary catalogs. error trino.exceptions.TrinoConnectionError: Connection refused ↓
cause The Airflow worker cannot establish a connection to the specified Presto/Trino server. This could be due to an incorrect host/port, network issues, or the server being down.
fix
Check the
host and port in your Airflow connection configuration. Verify network connectivity between the Airflow worker and the Presto/Trino server, and ensure the Presto/Trino server is running. Warnings
breaking The Presto provider was moved from `airflow.contrib` to its own provider package `apache-airflow-providers-presto` with Airflow 2.0. ↓
fix Update import paths from `airflow.contrib.hooks.presto_hook` or `airflow.contrib.operators.presto_operator` to `airflow.providers.presto.hooks.presto` and `airflow.providers.presto.operators.presto` respectively. Install the `apache-airflow-providers-presto` package.
gotcha The provider package is named 'presto', but it uses the 'trino' Python client library and is fully compatible with Trino (formerly PrestoSQL). While the hook/operator classes retain 'Presto' in their names, connection parameters and client behavior align with Trino. ↓
fix Be aware that connection parameters (e.g., `catalog`, `schema`, authentication methods) should align with Trino client conventions. When troubleshooting, search for 'Trino' client errors or documentation.
gotcha Incorrectly formatted Presto/Trino connection strings are a common source of errors, especially when including catalog, schema, or advanced authentication (e.g., Kerberos, OAuth) parameters. ↓
fix Refer to the `trino` Python client documentation for the exact connection string format, especially for URL parameters and authentication options. Test connections outside Airflow using the `trino` client directly if issues persist to isolate the problem.
Imports
- PrestoHook wrong
from airflow.contrib.hooks.presto_hook import PrestoHookcorrectfrom airflow.providers.presto.hooks.presto import PrestoHook - PrestoOperator wrong
from airflow.contrib.operators.presto_operator import PrestoOperatorcorrectfrom airflow.providers.presto.operators.presto import PrestoOperator
Quickstart
import os
from datetime import datetime
from airflow.models.dag import DAG
from airflow.providers.presto.operators.presto import PrestoOperator
# For local testing, ensure 'presto_default' connection is set up in Airflow UI,
# or define it via environment variables (e.g., in a .env file or shell):
# export AIRFLOW_CONN_PRESTO_DEFAULT='presto://user:password@localhost:8080/hive/default'
# For Trino specific parameters (e.g., auth, TLS):
# export AIRFLOW_CONN_PRESTO_DEFAULT='trino://user@localhost:8080/?catalog=hive&schema=default&auth=NONE'
with DAG(
dag_id="presto_example_dag",
start_date=datetime(2023, 1, 1),
schedule=None,
catchup=False,
tags=["presto", "trino", "example"],
) as dag:
run_simple_query = PrestoOperator(
task_id="execute_select_one",
presto_conn_id="presto_default", # Ensure this connection ID exists in Airflow
sql="SELECT 1",
)
run_templated_query = PrestoOperator(
task_id="execute_templated_query",
presto_conn_id="presto_default",
sql="SELECT '{{ ds }}' as current_date_string, '{{ macros.uuid.uuid4() }}' as random_uuid",
)