Apache Airflow Neo4j Provider
This Apache Airflow provider package integrates Apache Airflow with Neo4j, allowing users to define DAGs that interact with Neo4j graph databases. It provides hooks, operators, and sensors for executing Cypher queries, managing data, and moving data between Neo4j and other systems. It is actively maintained as part of the Apache Airflow ecosystem, with releases typically aligning with Airflow's own release schedule or as needed for bug fixes and new features. Current version: 3.11.5.
Common errors
-
ModuleNotFoundError: No module named 'airflow.providers.neo4j.hooks.neo4j'
cause The `apache-airflow-providers-neo4j` package is not installed in your Airflow environment, or the import path is incorrect.fixRun `pip install apache-airflow-providers-neo4j` in the environment where your Airflow worker/scheduler/webserver runs. Verify your import statement matches `from airflow.providers.neo4j.hooks.neo4j import Neo4jHook`. -
neo4j.exceptions.AuthError: The client is unauthorized to perform this request.
cause Incorrect username or password configured in the Airflow Neo4j connection details.fixNavigate to `Admin -> Connections` in the Airflow UI. Edit your Neo4j connection (e.g., `neo4j_default`) and carefully verify the 'Login' and 'Password' fields against your Neo4j database credentials. -
AttributeError: module 'airflow.contrib.operators.neo4j_operator' has no attribute 'Neo4jOperator'
cause You are attempting to use a deprecated and removed import path from `airflow.contrib` which was used in Airflow 1.x.fixUpdate your import statement to `from airflow.providers.neo4j.operators.neo4j import Neo4jOperator` and ensure the `apache-airflow-providers-neo4j` package is installed and Airflow is version 2.0 or newer.
Warnings
- breaking Airflow 2.0+ introduced a new provider package structure. All imports from `airflow.contrib` are deprecated and removed in Airflow 2.0 and later.
- gotcha Neo4j connection configuration in the Airflow UI can be tricky, especially for Neo4j AuraDB or specific configurations.
- gotcha The underlying `neo4j` Python driver (installed as a dependency of this provider) must be compatible with your Neo4j database server version.
Install
-
pip install apache-airflow-providers-neo4j
Imports
- Neo4jHook
from airflow.contrib.hooks.neo4j_hook import Neo4jHook
from airflow.providers.neo4j.hooks.neo4j import Neo4jHook
- Neo4jOperator
from airflow.contrib.operators.neo4j_operator import Neo4jOperator
from airflow.providers.neo4j.operators.neo4j import Neo4jOperator
- Neo4jToNeo4jOperator
from airflow.providers.neo4j.operators.neo4j import Neo4jToNeo4jOperator
Quickstart
from __future__ import annotations
import pendulum
from airflow.models.dag import DAG
from airflow.providers.neo4j.operators.neo4j import Neo4jOperator
# Ensure you have a Neo4j connection configured in Airflow UI.
# Conn Id: neo4j_default, Conn Type: Neo4j
# Host: bolt://localhost:7687 (or your Neo4j URI)
# Login: neo4j (or your username)
# Password: your_password
# Extra: {'database': 'neo4j'} (if using specific database)
with DAG(
dag_id="neo4j_simple_example",
start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
schedule=None,
catchup=False,
tags=["neo4j", "example"],
) as dag:
create_node = Neo4jOperator(
task_id="create_test_node",
neo4j_conn_id="neo4j_default",
cypher_query="CREATE (n:TestNode {name: 'Airflow Test'}) RETURN n",
)
read_node = Neo4jOperator(
task_id="read_test_node",
neo4j_conn_id="neo4j_default",
cypher_query="MATCH (n:TestNode {name: 'Airflow Test'}) RETURN n.name",
)
clean_up = Neo4jOperator(
task_id="clean_up_node",
neo4j_conn_id="neo4j_default",
cypher_query="MATCH (n:TestNode {name: 'Airflow Test'}) DELETE n",
)
create_node >> read_node >> clean_up