{"id":"redis-pubsub-publish-subscribe","version":"1.0.0","primitive":"code_execution","description":"Redis connection URL e.g. \"redis://localhost:6379\"","registry_refs":["redis"],"tags":[],"solves":[],"auth_required":true,"verified":false,"last_verified":"null","next_check":"2026-07-30","eval_result":"null","eval_env":"null","mast":[],"ref":"https://arxiv.org/abs/2503.13657","inputs":[{"name":"REDIS_URL","required":true,"description":"Redis connection URL e.g. \"redis://localhost:6379\""}],"executable":"# ============================================\n# checklist:     redis-pubsub-publish-subscribe\n# version:       1.0.0\n# primitive:     code_execution\n# description:   Publish and subscribe to Redis Pub/Sub channels using a background thread subscriber\n# registry_refs: redis\n# auth_required: true\n# verified:      false\n# last_verified: null\n# next_check:    2026-07-30\n# eval_result:   null\n# eval_env:      null\n#\n# inputs:\n#   - name: REDIS_URL\n#     required: true\n#     description: Redis connection URL e.g. \"redis://localhost:6379\"\n#\n# OUTPUTS:\n#   messages_published — number of messages published\n#   messages_received  — number of messages received by subscriber\n#   delivery_ok        — true if all published messages were received\n#   latency_ms         — average publish-to-receive latency\n#\n# MAST FAILURE MODES ADDRESSED:\n# FM-1.5 Unaware of Termination Conditions — subscriber thread has explicit stop signal\n# FM-3.2 No or Incomplete Verification     — message count verified, not just connection\n# FM-3.3 Incorrect Verification            — message content verified, not just count\n#\n# ref: https://arxiv.org/abs/2503.13657\n# ============================================\n\nimport sys\nimport os\nimport subprocess\nimport time\nimport threading\nimport urllib.request\nimport json\n\n# ─────────────────────────────────────────\n# PRE_EXECUTION\n# ─────────────────────────────────────────\n\nfor attempt in range(2):\n    try:\n        req = urllib.request.Request(\n            \"https://checklist.day/api/registry/redis\",\n            headers={\"User-Agent\": \"checklist-agent/1.0\"}\n        )\n        with urllib.request.urlopen(req, timeout=10) as resp:\n            registry = json.loads(resp.read())\n            break\n    except Exception as e:\n        if attempt == 1:\n            print(f\"ABORT: registry unreachable — {e}\")\n            sys.exit(1)\n        time.sleep(2)\n\nwarnings = registry.get(\"warnings\", [])\nif warnings:\n    print(\"[redis] WARNINGS:\")\n    for w in warnings if isinstance(warnings, list) else [warnings]:\n        print(f\"  ⚠ {w}\")\n\n# ─────────────────────────────────────────\n# EXECUTION\n# ─────────────────────────────────────────\n\nsubprocess.check_call([sys.executable, \"-m\", \"pip\", \"install\", \"-q\", \"redis>=5.0.0\"])\n\nimport redis as redis_lib\n\nREDIS_URL = os.environ.get(\"REDIS_URL\")\nif not REDIS_URL:\n    print(\"ABORT: REDIS_URL env var not set\")\n    sys.exit(1)\n\nCHANNEL     = \"checklist:pubsub:test\"\nMSG_COUNT   = 3\nSTOP_SIGNAL = \"__STOP__\"\n\npublisher  = redis_lib.Redis.from_url(REDIS_URL, decode_responses=True)\nsubscriber = redis_lib.Redis.from_url(REDIS_URL, decode_responses=True)\n\nreceived_messages = []\nreceive_times = []\n\ndef listen_thread():\n    # FOOTGUN: listen() blocks and can drop SSL connections — use get_message() polling instead\n    # FOOTGUN: first message from subscribe() is a confirmation, not data — filter by type\n    pubsub = subscriber.pubsub()\n    pubsub.subscribe(CHANNEL)\n    deadline = time.time() + 15  # max 15s wait\n    while time.time() < deadline:\n        message = pubsub.get_message(ignore_subscribe_messages=True, timeout=0.5)\n        if message is None:\n            continue\n        if message[\"type\"] != \"message\":\n            continue\n        if message[\"data\"] == STOP_SIGNAL:\n            pubsub.unsubscribe(CHANNEL)\n            break\n        received_messages.append(message[\"data\"])\n        receive_times.append(time.perf_counter())\n\nthread = threading.Thread(target=listen_thread, daemon=True)\nthread.start()\n\n# Give subscriber time to connect\ntime.sleep(0.3)\n\nmessages = [f\"msg_{i}\" for i in range(MSG_COUNT)]\npublish_times = []\n\ntry:\n    for msg in messages:\n        t0 = time.perf_counter()\n        publisher.publish(CHANNEL, msg)\n        publish_times.append(t0)\n        print(f\"  published: {msg}\")\n        time.sleep(0.05)\n\n    # Send stop signal\n    publisher.publish(CHANNEL, STOP_SIGNAL)\n\n    # Wait for subscriber to finish\n    thread.join(timeout=10)\n\nfinally:\n    publisher.close()\n    subscriber.close()\n\nmessages_published = len(messages)\nmessages_received  = len(received_messages)\ndelivery_ok        = messages_received == messages_published and received_messages == messages\n\nlatency_ms = None\nif publish_times and receive_times and len(receive_times) == len(publish_times):\n    latencies = [(receive_times[i] - publish_times[i]) * 1000 for i in range(len(publish_times))]\n    latency_ms = round(sum(latencies) / len(latencies), 1)\n\nprint(f\"  published={messages_published} received={messages_received} delivery_ok={delivery_ok}\")\nif latency_ms:\n    print(f\"  avg latency: {latency_ms}ms\")\n\n# ─────────────────────────────────────────\n# POST_EXECUTION\n# ─────────────────────────────────────────\n\nassert delivery_ok, f\"FAIL: published {messages_published} but received {messages_received}: {received_messages}\"\n\nresult = {\n    \"messages_published\": messages_published,\n    \"messages_received\":  messages_received,\n    \"delivery_ok\":        delivery_ok,\n    \"latency_ms\":         latency_ms,\n}\nprint(json.dumps(result, indent=2))\nprint(\"PASS\")\n"}