Apache Airflow MongoDB Provider

5.3.4 · active · verified Sat Apr 11

The `apache-airflow-providers-mongo` package extends Apache Airflow's capabilities by providing operators, hooks, and sensors to interact with MongoDB databases. It enables users to orchestrate various MongoDB operations, such as inserting, updating, deleting, and querying data, directly within their Airflow DAGs. The provider is actively maintained, with frequent releases to support new Airflow versions and add features. The current version is 5.3.4.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to use the `MongoOperator` to insert a single document into a MongoDB collection. It assumes an Airflow connection named `mongodb_default` has been configured in the Airflow UI or via an environment variable. For MongoDB Atlas connections, ensure the 'Extra' field of the Airflow connection is correctly configured for SRV and SSL.

from __future__ import annotations

import os
from datetime import datetime

from airflow.models.dag import DAG
from airflow.providers.mongo.operators.mongo import MongoOperator

# Configure your MongoDB connection in Airflow UI or via environment variable.
# Example for environment variable:
# export AIRFLOW_CONN_MONGODB_DEFAULT='mongo://username:password@host:port/database?authSource=admin'
# For MongoDB Atlas (SRV record), use extra: {'srv': true, 'ssl': true}
# Example: export AIRFLOW_CONN_MONGODB_DEFAULT='mongo://username:password@cluster.mongodb.net/?retryWrites=true&w=majority'
# Then, in Airflow UI, edit the connection and add {"srv": true, "ssl": true} to the 'Extra' field.
# The 'Host' should be your cluster name, e.g., 'cluster0.abcde.mongodb.net'

with DAG(
    dag_id='mongo_insert_example',
    start_date=datetime(2023, 1, 1),
    schedule=None,
    catchup=False,
    tags=['mongodb', 'example'],
    doc_md="""### MongoDB Insert Example DAG
    This DAG demonstrates how to use the MongoOperator to insert a document into a MongoDB collection.
    Ensure you have an Airflow connection named 'mongodb_default' configured for your MongoDB instance.
    """
) as dag:
    insert_document_task = MongoOperator(
        task_id='insert_sample_document',
        mongo_conn_id='mongodb_default', # This refers to the Airflow connection ID
        database='mydatabase',
        collection='mycollection',
        operation='insert_one',
        document={'name': 'Alice', 'age': 30, 'city': 'New York'}
    )

view raw JSON →