Apache Airflow MongoDB Provider
The `apache-airflow-providers-mongo` package extends Apache Airflow's capabilities by providing operators, hooks, and sensors to interact with MongoDB databases. It enables users to orchestrate various MongoDB operations, such as inserting, updating, deleting, and querying data, directly within their Airflow DAGs. The provider is actively maintained, with frequent releases to support new Airflow versions and add features. The current version is 5.3.4.
Warnings
- breaking The `conn_id` parameter in `MongoHook` was removed and replaced by `mongo_conn_id`. Direct usage of `conn_id` will result in errors.
- breaking The minimum supported Apache Airflow version for `apache-airflow-providers-mongo` has consistently increased with provider updates. Version 5.3.4 requires Airflow >=2.11.0. Older provider versions require earlier Airflow versions, e.g., 5.1.0 requires 2.10+, 4.2.0 requires 2.8+, and 4.0.0 requires 2.7+.
- gotcha Since provider version 4.0.0, the `allow_insecure` flag in the MongoDB connection's 'Extra' field defaults to `False` when SSL encryption (`ssl=True`) is enabled. This means insecure SSL connections are not permitted by default.
- gotcha There was an issue where setting `ssl=False` in the connection 'Extra' field, especially when combined with `srv=True` for DNS seedlists, did not correctly disable SSL/TLS, potentially leading to connection errors. This was noted as fixed in a patch for 4.x versions (related to #37214).
Install
-
pip install apache-airflow-providers-mongo
Imports
- MongoHook
from airflow.providers.mongo.hooks.mongo import MongoHook
- MongoOperator
from airflow.providers.mongo.operators.mongo import MongoOperator
- MongoSensor
from airflow.providers.mongo.sensors.mongo import MongoSensor
Quickstart
from __future__ import annotations
import os
from datetime import datetime
from airflow.models.dag import DAG
from airflow.providers.mongo.operators.mongo import MongoOperator
# Configure your MongoDB connection in Airflow UI or via environment variable.
# Example for environment variable:
# export AIRFLOW_CONN_MONGODB_DEFAULT='mongo://username:password@host:port/database?authSource=admin'
# For MongoDB Atlas (SRV record), use extra: {'srv': true, 'ssl': true}
# Example: export AIRFLOW_CONN_MONGODB_DEFAULT='mongo://username:password@cluster.mongodb.net/?retryWrites=true&w=majority'
# Then, in Airflow UI, edit the connection and add {"srv": true, "ssl": true} to the 'Extra' field.
# The 'Host' should be your cluster name, e.g., 'cluster0.abcde.mongodb.net'
with DAG(
dag_id='mongo_insert_example',
start_date=datetime(2023, 1, 1),
schedule=None,
catchup=False,
tags=['mongodb', 'example'],
doc_md="""### MongoDB Insert Example DAG
This DAG demonstrates how to use the MongoOperator to insert a document into a MongoDB collection.
Ensure you have an Airflow connection named 'mongodb_default' configured for your MongoDB instance.
"""
) as dag:
insert_document_task = MongoOperator(
task_id='insert_sample_document',
mongo_conn_id='mongodb_default', # This refers to the Airflow connection ID
database='mydatabase',
collection='mycollection',
operation='insert_one',
document={'name': 'Alice', 'age': 30, 'city': 'New York'}
)