FlowETL
raw JSON → 1.34.0 verified Sat May 09 auth: no python
FlowETL is a collection of Apache Airflow operators and sensors designed for use with FlowKit, a platform for analysing CDR data. The library provides ETL operators specific to FlowDB and FlowMachine. Current version: 1.34.0. Release cadence: irregular, tied to FlowKit releases.
pip install flowetl Common errors
error ModuleNotFoundError: No module named 'flowetl' ↓
cause Package not installed or installed in wrong environment.
fix
Run 'pip install flowetl' in the same Python environment as Airflow.
error airflow.exceptions.AirflowException: The conn_id `flowdb_default` isn't defined ↓
cause Required Airflow connection is missing.
fix
Create an Airflow connection with conn_id='flowdb_default' (type Postgres) via UI or CLI:
airflow connections add flowdb_default --conn-type postgres --conn-host ... Warnings
breaking FlowETL 1.33.0+ requires Airflow 2.10.5. 'airflow.providers.postgres.operators.postgres.PostgresOperator' is deprecated; use 'airflow.providers.postgres.operators.postgres.PostgresOperator' replaced by 'airflow.providers.common.sql.operators.sql.SQLExecuteQueryOperator'. ↓
fix Update Airflow to 2.10.5 and replace deprecated operators with their new equivalents.
gotcha FlowETL operators expect a specific connection id (e.g., 'flowdb_default') configured in Airflow. If not set, tasks fail with 'Connection not found'. ↓
fix Define the Airflow connection (e.g., via UI or environment variable) matching the conn_id used in the operator.
deprecated FlowETL 1.30.0+ requires Python >=3.11. Older versions of Python are no longer supported. ↓
fix Upgrade Python environment to 3.11 or later.
Imports
- CSVToFlowDBOperator wrong
from airflow.operators import CSVToFlowDBOperatorcorrectfrom flowetl.operators import CSVToFlowDBOperator - FlowETLSensor wrong
from airflow.sensors import FlowETLSensorcorrectfrom flowetl.sensors import FlowETLSensor
Quickstart
from flowetl.operators import CSVToFlowDBOperator
from flowetl.sensors import FlowETLSensor
# Example DAG (requires Airflow environment)
with DAG('flowetl_example', start_date=datetime(2023,1,1), schedule_interval=None) as dag:
ingest = CSVToFlowDBOperator(
task_id='ingest_csv',
conn_id='flowdb_default',
csv_path='/data/sample.csv',
table_name='events.cdr'
)
wait = FlowETLSensor(
task_id='wait_for_data',
table_name='events.cdr',
poke_interval=60
)
ingest >> wait