Apache Airflow Neo4j Provider

3.11.5 · active · verified Fri Apr 17

This Apache Airflow provider package integrates Apache Airflow with Neo4j, allowing users to define DAGs that interact with Neo4j graph databases. It provides hooks, operators, and sensors for executing Cypher queries, managing data, and moving data between Neo4j and other systems. It is actively maintained as part of the Apache Airflow ecosystem, with releases typically aligning with Airflow's own release schedule or as needed for bug fixes and new features. Current version: 3.11.5.

Common errors

Warnings

Install

Imports

Quickstart

This example DAG demonstrates how to use the `Neo4jOperator` to execute Cypher queries against a Neo4j database. It creates a node, reads its property, and then cleans it up. Before running, ensure you have configured a 'Neo4j' connection in the Airflow UI with the `conn_id` set to `neo4j_default` (or your chosen ID) and correct credentials and host.

from __future__ import annotations

import pendulum

from airflow.models.dag import DAG
from airflow.providers.neo4j.operators.neo4j import Neo4jOperator

# Ensure you have a Neo4j connection configured in Airflow UI.
# Conn Id: neo4j_default, Conn Type: Neo4j
# Host: bolt://localhost:7687 (or your Neo4j URI)
# Login: neo4j (or your username)
# Password: your_password
# Extra: {'database': 'neo4j'} (if using specific database)

with DAG(
    dag_id="neo4j_simple_example",
    start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
    schedule=None,
    catchup=False,
    tags=["neo4j", "example"],
) as dag:
    create_node = Neo4jOperator(
        task_id="create_test_node",
        neo4j_conn_id="neo4j_default",
        cypher_query="CREATE (n:TestNode {name: 'Airflow Test'}) RETURN n",
    )

    read_node = Neo4jOperator(
        task_id="read_test_node",
        neo4j_conn_id="neo4j_default",
        cypher_query="MATCH (n:TestNode {name: 'Airflow Test'}) RETURN n.name",
    )

    clean_up = Neo4jOperator(
        task_id="clean_up_node",
        neo4j_conn_id="neo4j_default",
        cypher_query="MATCH (n:TestNode {name: 'Airflow Test'}) DELETE n",
    )

    create_node >> read_node >> clean_up

view raw JSON →