s3-download-file

code_execution · unverified · null · json · download .py

AWS region

import sys
import os
import subprocess
import time
import tempfile
import urllib.request
import json

# ─────────────────────────────────────────
# PRE_EXECUTION
# ─────────────────────────────────────────

for attempt in range(2):
    try:
        req = urllib.request.Request(
            "https://checklist.day/api/registry/boto3",
            headers={"User-Agent": "checklist-agent/1.0"}
        )
        with urllib.request.urlopen(req, timeout=10) as resp:
            registry = json.loads(resp.read())
            break
    except Exception as e:
        if attempt == 1:
            print(f"ABORT: registry unreachable — {e}")
            sys.exit(1)
        time.sleep(2)

warnings = registry.get("warnings", [])
if warnings:
    print("[boto3] WARNINGS:")
    for w in warnings if isinstance(warnings, list) else [warnings]:
        print(f"  ⚠ {w}")

# ─────────────────────────────────────────
# EXECUTION
# ─────────────────────────────────────────

subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "boto3>=1.26.0"])

import boto3

AWS_ACCESS_KEY_ID     = os.environ.get("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = os.environ.get("AWS_SECRET_ACCESS_KEY")
AWS_REGION            = os.environ.get("AWS_REGION", "us-east-1")
S3_BUCKET             = os.environ.get("S3_BUCKET")

if not AWS_ACCESS_KEY_ID:
    print("ABORT: AWS_ACCESS_KEY_ID not set"); sys.exit(1)
if not AWS_SECRET_ACCESS_KEY:
    print("ABORT: AWS_SECRET_ACCESS_KEY not set"); sys.exit(1)
if not S3_BUCKET:
    print("ABORT: S3_BUCKET not set"); sys.exit(1)

KEY           = "checklist-test/download-test.txt"
EXPECTED      = b"checklist.day s3-download-file test\n"

client = boto3.client(
    "s3",
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
    region_name=AWS_REGION,
)

# Setup: upload test object
client.put_object(Bucket=S3_BUCKET, Key=KEY, Body=EXPECTED)
print(f"  uploaded test object: {KEY}")

try:
    # Method 1: download_file — writes directly to disk, handles multipart
    with tempfile.NamedTemporaryFile(delete=False, suffix=".txt") as f:
        tmp_path = f.name

    client.download_file(S3_BUCKET, KEY, tmp_path)
    with open(tmp_path, "rb") as f:
        downloaded_content = f.read()
    os.unlink(tmp_path)
    download_file_ok = downloaded_content == EXPECTED
    print(f"  download_file: {len(downloaded_content)} bytes (match={download_file_ok})")

    # Method 2: get_object — returns StreamingBody
    # FOOTGUN: response["Body"] is a StreamingBody, NOT bytes — must call .read()
    # FOOTGUN: StreamingBody can only be read once — store result immediately
    response   = client.get_object(Bucket=S3_BUCKET, Key=KEY)
    body_bytes = response["Body"].read()  # must call .read() explicitly
    get_object_ok  = body_bytes == EXPECTED
    content_match  = downloaded_content == body_bytes
    print(f"  get_object:    {len(body_bytes)} bytes (match={get_object_ok})")

finally:
    client.delete_object(Bucket=S3_BUCKET, Key=KEY)
    print(f"  cleaned up: {KEY}")

# ─────────────────────────────────────────
# POST_EXECUTION
# ─────────────────────────────────────────

assert download_file_ok, "FAIL: download_file content mismatch"
assert get_object_ok, "FAIL: get_object content mismatch"
assert content_match, "FAIL: download_file and get_object returned different content"

result = {
    "download_file_ok": download_file_ok,
    "get_object_ok":    get_object_ok,
    "content_match":    content_match,
}
print(json.dumps(result, indent=2))
print("PASS")