FlowETL

raw JSON →
1.34.0 verified Sat May 09 auth: no python

FlowETL is a collection of Apache Airflow operators and sensors designed for use with FlowKit, a platform for analysing CDR data. The library provides ETL operators specific to FlowDB and FlowMachine. Current version: 1.34.0. Release cadence: irregular, tied to FlowKit releases.

pip install flowetl
error ModuleNotFoundError: No module named 'flowetl'
cause Package not installed or installed in wrong environment.
fix
Run 'pip install flowetl' in the same Python environment as Airflow.
error airflow.exceptions.AirflowException: The conn_id `flowdb_default` isn't defined
cause Required Airflow connection is missing.
fix
Create an Airflow connection with conn_id='flowdb_default' (type Postgres) via UI or CLI: airflow connections add flowdb_default --conn-type postgres --conn-host ...
breaking FlowETL 1.33.0+ requires Airflow 2.10.5. 'airflow.providers.postgres.operators.postgres.PostgresOperator' is deprecated; use 'airflow.providers.postgres.operators.postgres.PostgresOperator' replaced by 'airflow.providers.common.sql.operators.sql.SQLExecuteQueryOperator'.
fix Update Airflow to 2.10.5 and replace deprecated operators with their new equivalents.
gotcha FlowETL operators expect a specific connection id (e.g., 'flowdb_default') configured in Airflow. If not set, tasks fail with 'Connection not found'.
fix Define the Airflow connection (e.g., via UI or environment variable) matching the conn_id used in the operator.
deprecated FlowETL 1.30.0+ requires Python >=3.11. Older versions of Python are no longer supported.
fix Upgrade Python environment to 3.11 or later.

A minimal DAG using FlowETL's CSVToFlowDBOperator to load a CSV into FlowDB, then a FlowETLSensor to wait for the data to be available.

from flowetl.operators import CSVToFlowDBOperator
from flowetl.sensors import FlowETLSensor

# Example DAG (requires Airflow environment)
with DAG('flowetl_example', start_date=datetime(2023,1,1), schedule_interval=None) as dag:
    ingest = CSVToFlowDBOperator(
        task_id='ingest_csv',
        conn_id='flowdb_default',
        csv_path='/data/sample.csv',
        table_name='events.cdr'
    )
    wait = FlowETLSensor(
        task_id='wait_for_data',
        table_name='events.cdr',
        poke_interval=60
    )
    ingest >> wait