Trino Provider for Apache Airflow

6.5.1 · active · verified Sun Apr 12

The `apache-airflow-providers-trino` package provides Apache Airflow users with hooks, operators, and transfers to interact with Trino (formerly PrestoSQL). It enables programmatic orchestration of Trino SQL queries within Airflow DAGs, facilitating ETL and data pipeline workflows. The current version is 6.5.1, and the provider follows a strict Semantic Versioning policy with frequent releases independent of Airflow core.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates a basic Airflow DAG that uses the `TrinoOperator` to interact with a Trino cluster. It includes tasks for creating a table, inserting data, and selecting data. Ensure you configure a 'Trino' connection in the Airflow UI with the ID `trino_default` before running this DAG.

import os
import pendulum

from airflow.models.dag import DAG
from airflow.providers.trino.operators.trino import TrinoOperator

# Configure Trino Connection in Airflow UI:
# Conn Id: 'trino_default'
# Conn Type: Trino
# Host: <your_trino_host>
# Port: <your_trino_port>
# Schema: <your_trino_schema>
# User: <your_trino_user>
# Password: <your_trino_password> (optional)
# Extra: {'protocol': 'https', 'verify': 'false'} (example for HTTPS/self-signed)

with DAG(
    dag_id='trino_example_dag',
    start_date=pendulum.datetime(2023, 1, 1, tz='UTC'),
    catchup=False,
    schedule=None,
    tags=['trino', 'example'],
) as dag:
    create_test_table = TrinoOperator(
        task_id='create_test_table',
        trino_conn_id='trino_default',
        sql="""
            CREATE TABLE IF NOT EXISTS memory.default.airflow_test (
                id INT,
                name VARCHAR
            )
        """,
        handler=lambda _: None, # For DDL, handler is often not needed or can be a no-op
    )

    insert_data = TrinoOperator(
        task_id='insert_data',
        trino_conn_id='trino_default',
        sql="""
            INSERT INTO memory.default.airflow_test (id, name)
            VALUES (1, 'Airflow'), (2, 'Trino')
        """,
        handler=lambda _: None,
    )

    select_data = TrinoOperator(
        task_id='select_data',
        trino_conn_id='trino_default',
        sql="SELECT * FROM memory.default.airflow_test",
    )

    create_test_table >> insert_data >> select_data

view raw JSON →