s3-download-file
AWS region
import sys
import os
import subprocess
import time
import tempfile
import urllib.request
import json
# ─────────────────────────────────────────
# PRE_EXECUTION
# ─────────────────────────────────────────
for attempt in range(2):
try:
req = urllib.request.Request(
"https://checklist.day/api/registry/boto3",
headers={"User-Agent": "checklist-agent/1.0"}
)
with urllib.request.urlopen(req, timeout=10) as resp:
registry = json.loads(resp.read())
break
except Exception as e:
if attempt == 1:
print(f"ABORT: registry unreachable — {e}")
sys.exit(1)
time.sleep(2)
warnings = registry.get("warnings", [])
if warnings:
print("[boto3] WARNINGS:")
for w in warnings if isinstance(warnings, list) else [warnings]:
print(f" ⚠ {w}")
# ─────────────────────────────────────────
# EXECUTION
# ─────────────────────────────────────────
subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "boto3>=1.26.0"])
import boto3
AWS_ACCESS_KEY_ID = os.environ.get("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = os.environ.get("AWS_SECRET_ACCESS_KEY")
AWS_REGION = os.environ.get("AWS_REGION", "us-east-1")
S3_BUCKET = os.environ.get("S3_BUCKET")
if not AWS_ACCESS_KEY_ID:
print("ABORT: AWS_ACCESS_KEY_ID not set"); sys.exit(1)
if not AWS_SECRET_ACCESS_KEY:
print("ABORT: AWS_SECRET_ACCESS_KEY not set"); sys.exit(1)
if not S3_BUCKET:
print("ABORT: S3_BUCKET not set"); sys.exit(1)
KEY = "checklist-test/download-test.txt"
EXPECTED = b"checklist.day s3-download-file test\n"
client = boto3.client(
"s3",
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
region_name=AWS_REGION,
)
# Setup: upload test object
client.put_object(Bucket=S3_BUCKET, Key=KEY, Body=EXPECTED)
print(f" uploaded test object: {KEY}")
try:
# Method 1: download_file — writes directly to disk, handles multipart
with tempfile.NamedTemporaryFile(delete=False, suffix=".txt") as f:
tmp_path = f.name
client.download_file(S3_BUCKET, KEY, tmp_path)
with open(tmp_path, "rb") as f:
downloaded_content = f.read()
os.unlink(tmp_path)
download_file_ok = downloaded_content == EXPECTED
print(f" download_file: {len(downloaded_content)} bytes (match={download_file_ok})")
# Method 2: get_object — returns StreamingBody
# FOOTGUN: response["Body"] is a StreamingBody, NOT bytes — must call .read()
# FOOTGUN: StreamingBody can only be read once — store result immediately
response = client.get_object(Bucket=S3_BUCKET, Key=KEY)
body_bytes = response["Body"].read() # must call .read() explicitly
get_object_ok = body_bytes == EXPECTED
content_match = downloaded_content == body_bytes
print(f" get_object: {len(body_bytes)} bytes (match={get_object_ok})")
finally:
client.delete_object(Bucket=S3_BUCKET, Key=KEY)
print(f" cleaned up: {KEY}")
# ─────────────────────────────────────────
# POST_EXECUTION
# ─────────────────────────────────────────
assert download_file_ok, "FAIL: download_file content mismatch"
assert get_object_ok, "FAIL: get_object content mismatch"
assert content_match, "FAIL: download_file and get_object returned different content"
result = {
"download_file_ok": download_file_ok,
"get_object_ok": get_object_ok,
"content_match": content_match,
}
print(json.dumps(result, indent=2))
print("PASS")