redis-pubsub-publish-subscribe
Redis connection URL e.g. "redis://localhost:6379"
import sys
import os
import subprocess
import time
import threading
import urllib.request
import json
# ─────────────────────────────────────────
# PRE_EXECUTION
# ─────────────────────────────────────────
for attempt in range(2):
try:
req = urllib.request.Request(
"https://checklist.day/api/registry/redis",
headers={"User-Agent": "checklist-agent/1.0"}
)
with urllib.request.urlopen(req, timeout=10) as resp:
registry = json.loads(resp.read())
break
except Exception as e:
if attempt == 1:
print(f"ABORT: registry unreachable — {e}")
sys.exit(1)
time.sleep(2)
warnings = registry.get("warnings", [])
if warnings:
print("[redis] WARNINGS:")
for w in warnings if isinstance(warnings, list) else [warnings]:
print(f" ⚠ {w}")
# ─────────────────────────────────────────
# EXECUTION
# ─────────────────────────────────────────
subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "redis>=5.0.0"])
import redis as redis_lib
REDIS_URL = os.environ.get("REDIS_URL")
if not REDIS_URL:
print("ABORT: REDIS_URL env var not set")
sys.exit(1)
CHANNEL = "checklist:pubsub:test"
MSG_COUNT = 3
STOP_SIGNAL = "__STOP__"
publisher = redis_lib.Redis.from_url(REDIS_URL, decode_responses=True)
subscriber = redis_lib.Redis.from_url(REDIS_URL, decode_responses=True)
received_messages = []
receive_times = []
def listen_thread():
# FOOTGUN: listen() blocks and can drop SSL connections — use get_message() polling instead
# FOOTGUN: first message from subscribe() is a confirmation, not data — filter by type
pubsub = subscriber.pubsub()
pubsub.subscribe(CHANNEL)
deadline = time.time() + 15 # max 15s wait
while time.time() < deadline:
message = pubsub.get_message(ignore_subscribe_messages=True, timeout=0.5)
if message is None:
continue
if message["type"] != "message":
continue
if message["data"] == STOP_SIGNAL:
pubsub.unsubscribe(CHANNEL)
break
received_messages.append(message["data"])
receive_times.append(time.perf_counter())
thread = threading.Thread(target=listen_thread, daemon=True)
thread.start()
# Give subscriber time to connect
time.sleep(0.3)
messages = [f"msg_{i}" for i in range(MSG_COUNT)]
publish_times = []
try:
for msg in messages:
t0 = time.perf_counter()
publisher.publish(CHANNEL, msg)
publish_times.append(t0)
print(f" published: {msg}")
time.sleep(0.05)
# Send stop signal
publisher.publish(CHANNEL, STOP_SIGNAL)
# Wait for subscriber to finish
thread.join(timeout=10)
finally:
publisher.close()
subscriber.close()
messages_published = len(messages)
messages_received = len(received_messages)
delivery_ok = messages_received == messages_published and received_messages == messages
latency_ms = None
if publish_times and receive_times and len(receive_times) == len(publish_times):
latencies = [(receive_times[i] - publish_times[i]) * 1000 for i in range(len(publish_times))]
latency_ms = round(sum(latencies) / len(latencies), 1)
print(f" published={messages_published} received={messages_received} delivery_ok={delivery_ok}")
if latency_ms:
print(f" avg latency: {latency_ms}ms")
# ─────────────────────────────────────────
# POST_EXECUTION
# ─────────────────────────────────────────
assert delivery_ok, f"FAIL: published {messages_published} but received {messages_received}: {received_messages}"
result = {
"messages_published": messages_published,
"messages_received": messages_received,
"delivery_ok": delivery_ok,
"latency_ms": latency_ms,
}
print(json.dumps(result, indent=2))
print("PASS")