redis-pubsub-publish-subscribe

code_execution · unverified · null · json · download .py

Redis connection URL e.g. "redis://localhost:6379"

import sys
import os
import subprocess
import time
import threading
import urllib.request
import json

# ─────────────────────────────────────────
# PRE_EXECUTION
# ─────────────────────────────────────────

for attempt in range(2):
    try:
        req = urllib.request.Request(
            "https://checklist.day/api/registry/redis",
            headers={"User-Agent": "checklist-agent/1.0"}
        )
        with urllib.request.urlopen(req, timeout=10) as resp:
            registry = json.loads(resp.read())
            break
    except Exception as e:
        if attempt == 1:
            print(f"ABORT: registry unreachable — {e}")
            sys.exit(1)
        time.sleep(2)

warnings = registry.get("warnings", [])
if warnings:
    print("[redis] WARNINGS:")
    for w in warnings if isinstance(warnings, list) else [warnings]:
        print(f"  ⚠ {w}")

# ─────────────────────────────────────────
# EXECUTION
# ─────────────────────────────────────────

subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "redis>=5.0.0"])

import redis as redis_lib

REDIS_URL = os.environ.get("REDIS_URL")
if not REDIS_URL:
    print("ABORT: REDIS_URL env var not set")
    sys.exit(1)

CHANNEL     = "checklist:pubsub:test"
MSG_COUNT   = 3
STOP_SIGNAL = "__STOP__"

publisher  = redis_lib.Redis.from_url(REDIS_URL, decode_responses=True)
subscriber = redis_lib.Redis.from_url(REDIS_URL, decode_responses=True)

received_messages = []
receive_times = []

def listen_thread():
    # FOOTGUN: listen() blocks and can drop SSL connections — use get_message() polling instead
    # FOOTGUN: first message from subscribe() is a confirmation, not data — filter by type
    pubsub = subscriber.pubsub()
    pubsub.subscribe(CHANNEL)
    deadline = time.time() + 15  # max 15s wait
    while time.time() < deadline:
        message = pubsub.get_message(ignore_subscribe_messages=True, timeout=0.5)
        if message is None:
            continue
        if message["type"] != "message":
            continue
        if message["data"] == STOP_SIGNAL:
            pubsub.unsubscribe(CHANNEL)
            break
        received_messages.append(message["data"])
        receive_times.append(time.perf_counter())

thread = threading.Thread(target=listen_thread, daemon=True)
thread.start()

# Give subscriber time to connect
time.sleep(0.3)

messages = [f"msg_{i}" for i in range(MSG_COUNT)]
publish_times = []

try:
    for msg in messages:
        t0 = time.perf_counter()
        publisher.publish(CHANNEL, msg)
        publish_times.append(t0)
        print(f"  published: {msg}")
        time.sleep(0.05)

    # Send stop signal
    publisher.publish(CHANNEL, STOP_SIGNAL)

    # Wait for subscriber to finish
    thread.join(timeout=10)

finally:
    publisher.close()
    subscriber.close()

messages_published = len(messages)
messages_received  = len(received_messages)
delivery_ok        = messages_received == messages_published and received_messages == messages

latency_ms = None
if publish_times and receive_times and len(receive_times) == len(publish_times):
    latencies = [(receive_times[i] - publish_times[i]) * 1000 for i in range(len(publish_times))]
    latency_ms = round(sum(latencies) / len(latencies), 1)

print(f"  published={messages_published} received={messages_received} delivery_ok={delivery_ok}")
if latency_ms:
    print(f"  avg latency: {latency_ms}ms")

# ─────────────────────────────────────────
# POST_EXECUTION
# ─────────────────────────────────────────

assert delivery_ok, f"FAIL: published {messages_published} but received {messages_received}: {received_messages}"

result = {
    "messages_published": messages_published,
    "messages_received":  messages_received,
    "delivery_ok":        delivery_ok,
    "latency_ms":         latency_ms,
}
print(json.dumps(result, indent=2))
print("PASS")