Trino Provider for Apache Airflow
The `apache-airflow-providers-trino` package provides Apache Airflow users with hooks, operators, and transfers to interact with Trino (formerly PrestoSQL). It enables programmatic orchestration of Trino SQL queries within Airflow DAGs, facilitating ETL and data pipeline workflows. The current version is 6.5.1, and the provider follows a strict Semantic Versioning policy with frequent releases independent of Airflow core.
Warnings
- breaking The minimum required Apache Airflow version for `apache-airflow-providers-trino` frequently increases with new provider releases. For version 6.5.1, Airflow 2.11.0 or newer is required. Installing this provider on an older Airflow version (e.g., <2.11.0) may automatically upgrade your Airflow core, potentially requiring `airflow upgrade db` manually.
- gotcha When configuring a Trino connection in Airflow, ensure that only one authentication method (e.g., password, JWT, Kerberos) is set in the connection details. Attempting to use multiple authentication methods simultaneously can lead to task failures.
- breaking In provider version 5.0.0, the deprecated `delegate_to` parameter was removed from `GCSToTrinoOperator` (and related Google Cloud operators/hooks). Impersonation should now be achieved using the `impersonation_chain` parameter.
- deprecated The `apply_default` decorator was removed, which is why older versions of this provider required Airflow 2.1.0+. While fixed, it highlights the need to stay updated with both provider and Airflow core versions.
Install
-
pip install apache-airflow-providers-trino
Imports
- TrinoHook
from airflow.providers.trino.hooks.trino import TrinoHook
- TrinoOperator
from airflow.providers.trino.operators.trino import TrinoOperator
- TrinoToS3Operator
from airflow.providers.trino.transfers.trino_to_s3 import TrinoToS3Operator
Quickstart
import os
import pendulum
from airflow.models.dag import DAG
from airflow.providers.trino.operators.trino import TrinoOperator
# Configure Trino Connection in Airflow UI:
# Conn Id: 'trino_default'
# Conn Type: Trino
# Host: <your_trino_host>
# Port: <your_trino_port>
# Schema: <your_trino_schema>
# User: <your_trino_user>
# Password: <your_trino_password> (optional)
# Extra: {'protocol': 'https', 'verify': 'false'} (example for HTTPS/self-signed)
with DAG(
dag_id='trino_example_dag',
start_date=pendulum.datetime(2023, 1, 1, tz='UTC'),
catchup=False,
schedule=None,
tags=['trino', 'example'],
) as dag:
create_test_table = TrinoOperator(
task_id='create_test_table',
trino_conn_id='trino_default',
sql="""
CREATE TABLE IF NOT EXISTS memory.default.airflow_test (
id INT,
name VARCHAR
)
""",
handler=lambda _: None, # For DDL, handler is often not needed or can be a no-op
)
insert_data = TrinoOperator(
task_id='insert_data',
trino_conn_id='trino_default',
sql="""
INSERT INTO memory.default.airflow_test (id, name)
VALUES (1, 'Airflow'), (2, 'Trino')
""",
handler=lambda _: None,
)
select_data = TrinoOperator(
task_id='select_data',
trino_conn_id='trino_default',
sql="SELECT * FROM memory.default.airflow_test",
)
create_test_table >> insert_data >> select_data