Trino Provider for Apache Airflow

raw JSON →
6.5.1 verified Fri May 15 auth: no python

The `apache-airflow-providers-trino` package provides Apache Airflow users with hooks, operators, and transfers to interact with Trino (formerly PrestoSQL). It enables programmatic orchestration of Trino SQL queries within Airflow DAGs, facilitating ETL and data pipeline workflows. The current version is 6.5.1, and the provider follows a strict Semantic Versioning policy with frequent releases independent of Airflow core.

pip install apache-airflow-providers-trino
cli airflow
error ImportError: cannot import name 'TrinoHook'
cause This error occurs when attempting to import 'TrinoHook' from an incorrect module path.
fix
Ensure you are importing 'TrinoHook' from the correct module: 'from airflow.providers.trino.hooks.trino import TrinoHook'.
error ModuleNotFoundError: No module named 'airflow.providers.trino'
cause This error occurs when the 'apache-airflow-providers-trino' package is not installed.
fix
Install the package using pip: 'pip install apache-airflow-providers-trino'.
error AttributeError: module 'airflow.providers.trino.hooks.trino' has no attribute 'TrinoHook'
cause This error occurs when the 'TrinoHook' class is not found in the specified module, possibly due to version incompatibility.
fix
Verify that you are using a compatible version of 'apache-airflow-providers-trino' and 'apache-airflow'.
breaking The minimum required Apache Airflow version for `apache-airflow-providers-trino` frequently increases with new provider releases. For version 6.5.1, Airflow 2.11.0 or newer is required. Installing this provider on an older Airflow version (e.g., <2.11.0) may automatically upgrade your Airflow core, potentially requiring `airflow upgrade db` manually.
fix Always check the provider's documentation or PyPI page for the exact `apache-airflow` version requirement before upgrading or installing. Upgrade Airflow core if necessary and run `airflow upgrade db`.
gotcha When configuring a Trino connection in Airflow, ensure that only one authentication method (e.g., password, JWT, Kerberos) is set in the connection details. Attempting to use multiple authentication methods simultaneously can lead to task failures.
fix Review your Airflow Trino connection configuration and specify only one authentication method in the 'Extra' field or dedicated fields.
breaking In provider version 5.0.0, the deprecated `delegate_to` parameter was removed from `GCSToTrinoOperator` (and related Google Cloud operators/hooks). Impersonation should now be achieved using the `impersonation_chain` parameter.
fix Migrate any usage of `delegate_to` in `GCSToTrinoOperator` to `impersonation_chain`. For older versions, consider `impersonation_chain` as best practice.
deprecated The `apply_default` decorator was removed, which is why older versions of this provider required Airflow 2.1.0+. While fixed, it highlights the need to stay updated with both provider and Airflow core versions.
fix Ensure your Airflow core version is at least 2.1.0 or higher, as required by the specific provider version you are using.
python os / libc status wheel install import disk mem side effects
3.10 alpine (musl) build_error - - - - - -
3.10 slim (glibc) wheel 31.2s 3.54s 407M 70.5M clean
3.11 alpine (musl) build_error - - - - - -
3.11 slim (glibc) wheel 30.1s 5.71s 443M 76.6M clean
3.12 alpine (musl) build_error - - - - - -
3.12 slim (glibc) wheel 25.3s 5.65s 427M 75.2M clean
3.13 alpine (musl) build_error - - - - - -
3.13 slim (glibc) wheel 24.6s 5.27s 429M 75.9M clean
3.9 alpine (musl) build_error - - - - - -
3.9 slim (glibc) wheel 30.7s 4.96s 239M 64.5M noisy

This quickstart demonstrates a basic Airflow DAG that uses the `TrinoOperator` to interact with a Trino cluster. It includes tasks for creating a table, inserting data, and selecting data. Ensure you configure a 'Trino' connection in the Airflow UI with the ID `trino_default` before running this DAG.

import os
import pendulum

from airflow.models.dag import DAG
from airflow.providers.trino.operators.trino import TrinoOperator

# Configure Trino Connection in Airflow UI:
# Conn Id: 'trino_default'
# Conn Type: Trino
# Host: <your_trino_host>
# Port: <your_trino_port>
# Schema: <your_trino_schema>
# User: <your_trino_user>
# Password: <your_trino_password> (optional)
# Extra: {'protocol': 'https', 'verify': 'false'} (example for HTTPS/self-signed)

with DAG(
    dag_id='trino_example_dag',
    start_date=pendulum.datetime(2023, 1, 1, tz='UTC'),
    catchup=False,
    schedule=None,
    tags=['trino', 'example'],
) as dag:
    create_test_table = TrinoOperator(
        task_id='create_test_table',
        trino_conn_id='trino_default',
        sql="""
            CREATE TABLE IF NOT EXISTS memory.default.airflow_test (
                id INT,
                name VARCHAR
            )
        """,
        handler=lambda _: None, # For DDL, handler is often not needed or can be a no-op
    )

    insert_data = TrinoOperator(
        task_id='insert_data',
        trino_conn_id='trino_default',
        sql="""
            INSERT INTO memory.default.airflow_test (id, name)
            VALUES (1, 'Airflow'), (2, 'Trino')
        """,
        handler=lambda _: None,
    )

    select_data = TrinoOperator(
        task_id='select_data',
        trino_conn_id='trino_default',
        sql="SELECT * FROM memory.default.airflow_test",
    )

    create_test_table >> insert_data >> select_data