Redis Provider for Apache Airflow
This is a provider package for Apache Airflow that enables seamless interaction with Redis. It offers hooks, operators, and sensors to integrate various Redis operations, such as setting/getting keys, publishing/subscribing to channels, and monitoring key events, directly into Airflow DAGs. The current version is 4.4.3, and Apache Airflow providers typically maintain an active release cadence, often aligning with Airflow core updates or independent feature/bug fixes.
Warnings
- breaking The `apache-airflow-providers-redis` package has increasing minimum Airflow version requirements. Version `4.4.3` requires `apache-airflow >=2.11.0`. Installing with an older Airflow version (<2.11.0) might trigger an automatic Airflow upgrade and necessitate a manual `airflow upgrade db` command.
- breaking Python version support has changed. Provider version `4.1.1` dropped support for Python 3.9. Current provider version `4.4.3` requires Python `>=3.10`. Ensure your Python environment is compatible.
- gotcha When using `RedisHook.get_conn()` for direct Redis client interactions within Python tasks, the underlying `redis` Python client library must be explicitly installed in your Airflow environment (`pip install redis`). The provider package installs a dependency on `redis`, but for direct programmatic use, it's good practice to ensure it's available.
- gotcha The `RedisHook` and `RedisOperator` rely on Airflow connections (typically configured via the Airflow UI or `airflow.cfg`). The default connection ID is `redis_default`. Misconfigured connection parameters (host, port, password, DB, SSL settings) or a missing `redis_default` connection can lead to connection errors and task failures.
Install
-
pip install apache-airflow-providers-redis
Imports
- RedisHook
from airflow.providers.redis.hooks.redis import RedisHook
- RedisOperator
from airflow.providers.redis.operators.redis import RedisOperator
- RedisKeySensor
from airflow.providers.redis.sensors.redis import RedisKeySensor
- RedisPubSubSensor
from airflow.providers.redis.sensors.redis import RedisPubSubSensor
Quickstart
from __future__ import annotations
import pendulum
from airflow.models.dag import DAG
from airflow.providers.redis.operators.redis import RedisOperator
with DAG(
dag_id="redis_example_dag",
start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
catchup=False,
schedule=None,
tags=["redis", "example"],
) as dag:
# Set a key-value pair in Redis using the default connection (redis_default)
set_value_task = RedisOperator(
task_id="set_my_key",
command="SET my_key 'Hello from Airflow!'",
redis_conn_id="redis_default",
)
# Get the value of a key from Redis
get_value_task = RedisOperator(
task_id="get_my_key",
command="GET my_key",
redis_conn_id="redis_default",
)
# Increment a counter
increment_task = RedisOperator(
task_id="increment_counter",
command="INCR counter",
redis_conn_id="redis_default",
)
set_value_task >> get_value_task >> increment_task