pinecone-ingest-and-query

tool_calling/api · verified · null · json · download .py

Ingest documents into Pinecone serverless and query by similarity

import os
import sys
import time
import subprocess
import requests as _requests

# ----------------------------------------
# ENV_CHECK
# all credentials verified before any execution
# agent must confirm these are present before proceeding
# ----------------------------------------

REQUIRED_ENV = {
    "PINECONE_API_KEY": "Pinecone API key — https://app.pinecone.io (free tier, no credit card)",
}

missing = [k for k in REQUIRED_ENV if not os.environ.get(k)]
if missing:
    print("ENV_CHECK: missing required credentials:")
    for k in missing:
        print(f"  {k} — {REQUIRED_ENV[k]}")
    sys.exit(1)

print("ENV_CHECK: credentials verified ✓")
for k in REQUIRED_ENV:
    v = os.environ[k]
    print(f"  {k}: ...{v[-4:]}")

# ----------------------------------------
# PRE_EXECUTION
# FM-2.2: fetch ground truth for all registry_refs
# ----------------------------------------

REGISTRY_REFS = ["pinecone", "openai", "sentence-transformers"]
MAX_RETRIES = 2
registries = {}

for lib in REGISTRY_REFS:
    for attempt in range(MAX_RETRIES):
        try:
            response = _requests.get(
                f"https://checklist.day/api/registry/{lib}",
                timeout=10
            )
            if response.status_code == 200:
                registries[lib] = response.json()
                break
        except _requests.exceptions.RequestException:
            pass

for lib in REGISTRY_REFS:
    assert lib in registries, \
        f"ABORT: registry fetch failed for {lib} after {MAX_RETRIES} attempts"

# FM-2.4: surface breaking warnings — do not withhold
for lib, registry in registries.items():
    breaking = [
        w for w in registry.get("warnings", [])
        if w.get("severity") == "breaking"
    ]
    if breaking:
        print(f"PRE_EXECUTION: {lib} has {len(breaking)} breaking warning(s):")
        for w in breaking:
            print(f"  [!] [{w.get('affected_versions', 'all')}] {w['message'][:120]}")
            print(f"      fix: {w['fix'][:100]}")

print()
print("PRE_EXECUTION: all registry refs verified ✓")
for lib, registry in registries.items():
    install = registry.get("install", [{}])[0].get("cmd", "unknown")
    print(f"  {lib:25s} : {install}")

PINECONE_API_KEY = os.environ["PINECONE_API_KEY"]  # guaranteed present by ENV_CHECK

# ----------------------------------------
# KNOWN FAILURE MODES
#
# 1. pinecone.init() — REMOVED in v3. The old pattern:
#       import pinecone
#       pinecone.init(api_key=..., environment=...)  # BREAKS in v3+
#    Correct pattern: Pinecone(api_key=...)
#
# 2. Serverless index creation requires cloud + region, not environment string:
#       ServerlessSpec(cloud="aws", region="us-east-1")  # correct
#       NOT: PodSpec(environment="us-east1-gcp")          # pod-based, different billing
#
# 3. Dimension mismatch — embedding model output dim must match index dim exactly.
#    all-MiniLM-L6-v2 = 384 dims. text-embedding-ada-002 = 1536 dims.
#    Mismatch causes silent failures or explicit errors on upsert.
#
# 4. Upsert without batching — Pinecone recommends max 100 vectors per upsert.
#    Large batches without chunking cause timeouts.
#
# 5. Query immediately after upsert — Pinecone is eventually consistent.
#    Always wait for index stats to confirm upsert before querying.
#
# 6. Missing namespace — queries without namespace return results from all namespaces.
#    Always be explicit.
# ----------------------------------------

EMBEDDING_MODEL = "all-MiniLM-L6-v2"  # 384 dims, runs locally, no API key needed
EMBEDDING_DIM = 384
INDEX_NAME = "checklist-day-test"
NAMESPACE = "checklist-test"
UPSERT_BATCH_SIZE = 100
INDEX_WAIT_TIMEOUT = 60  # seconds


# ----------------------------------------
# EXECUTION
# FM-2.6: use Pinecone(api_key=...) — not pinecone.init()
# FM-1.1: create_index with get_or_create pattern — idempotent
# ----------------------------------------

try:
    from pinecone import Pinecone, ServerlessSpec
except ImportError:
    pkg = registries["pinecone"]["install"][0]["cmd"].replace("pip install ", "").strip()
    print(f"\nEXECUTION: pinecone not found — installing {pkg}...")
    subprocess.check_call([sys.executable, "-m", "pip", "install", pkg])
    print(f"EXECUTION: {pkg} installed ✓")
    from pinecone import Pinecone, ServerlessSpec

try:
    from sentence_transformers import SentenceTransformer
except ImportError:
    pkg = registries["sentence-transformers"]["install"][0]["cmd"].replace("pip install ", "").strip()
    print(f"\nEXECUTION: sentence-transformers not found — installing {pkg}...")
    subprocess.check_call([sys.executable, "-m", "pip", "install", pkg])
    print(f"EXECUTION: {pkg} installed ✓")
    from sentence_transformers import SentenceTransformer

print()
print("EXECUTION: initializing Pinecone client...")

# FM-2.6: correct v3+ init — not pinecone.init()
pc = Pinecone(api_key=PINECONE_API_KEY)

# FM-1.1: get_or_create pattern — safe on retry
existing_indexes = [i.name for i in pc.list_indexes()]
if INDEX_NAME not in existing_indexes:
    print(f"EXECUTION: creating serverless index '{INDEX_NAME}'...")
    pc.create_index(
        name=INDEX_NAME,
        dimension=EMBEDDING_DIM,       # FM-2.6: must match embedding model exactly
        metric="cosine",
        spec=ServerlessSpec(            # FM-2.6: serverless requires ServerlessSpec, not PodSpec
            cloud="aws",
            region="us-east-1"
        )
    )
    print(f"EXECUTION: index created ✓")
else:
    print(f"EXECUTION: index '{INDEX_NAME}' already exists ✓")

# Wait for index to be ready
print("EXECUTION: waiting for index to be ready...")
deadline = time.time() + INDEX_WAIT_TIMEOUT
while time.time() < deadline:
    status = pc.describe_index(INDEX_NAME).status
    if status.get("ready"):
        break
    time.sleep(2)
else:
    raise TimeoutError(
        f"ABORT: index '{INDEX_NAME}' not ready after {INDEX_WAIT_TIMEOUT}s"
    )

print("EXECUTION: index ready ✓")

index = pc.Index(INDEX_NAME)

# Load embedding model
print(f"EXECUTION: loading embedding model '{EMBEDDING_MODEL}'...")
model = SentenceTransformer(EMBEDDING_MODEL)
print("EXECUTION: model loaded ✓")

# Documents to ingest
documents = [
    {"id": "doc1", "text": "checklist.day is a machine-readable registry for AI agents"},
    {"id": "doc2", "text": "Pinecone is a managed vector database for production workloads"},
    {"id": "doc3", "text": "RAG retrieval augmented generation improves LLM accuracy"},
]

# Embed
print(f"EXECUTION: embedding {len(documents)} documents...")
embeddings = model.encode([d["text"] for d in documents])

# FM-1.1: upsert in batches — idempotent, same IDs overwrite
vectors = [
    {
        "id": doc["id"],
        "values": emb.tolist(),
        "metadata": {"text": doc["text"]}
    }
    for doc, emb in zip(documents, embeddings)
]

# Upsert in batches of UPSERT_BATCH_SIZE
for i in range(0, len(vectors), UPSERT_BATCH_SIZE):
    batch = vectors[i:i + UPSERT_BATCH_SIZE]
    index.upsert(vectors=batch, namespace=NAMESPACE)

print(f"EXECUTION: {len(vectors)} vectors upserted ✓")

# FM-3.2: wait for upsert to be reflected in stats before querying
# Pinecone is eventually consistent — do not query immediately
print("EXECUTION: waiting for upsert to be indexed...")
deadline = time.time() + 30
while time.time() < deadline:
    stats = index.describe_index_stats()
    ns = stats.namespaces.get(NAMESPACE)
    ns_count = ns.vector_count if ns else 0
    if ns_count >= len(documents):
        break
    time.sleep(2)
else:
    raise TimeoutError(
        f"ABORT: upserted {len(documents)} vectors but only {ns_count} visible after 30s"
    )

print(f"EXECUTION: {ns_count} vectors confirmed in namespace '{NAMESPACE}' ✓")

# Query
query_text = "machine readable registry for agents"
query_embedding = model.encode([query_text])[0].tolist()

print(f"\nEXECUTION: querying — '{query_text}'")
results = index.query(
    vector=query_embedding,
    top_k=1,
    namespace=NAMESPACE,           # FM-2.6: always specify namespace
    include_metadata=True
)

matches = results.matches  # Pinecone v3 returns QueryResponse object, not plain dict

print(f"EXECUTION: query complete ✓")
print(f"  top match: {matches[0].metadata['text']}")
print(f"  score    : {matches[0].score:.4f}")

# ----------------------------------------
# POST_EXECUTION
# FM-3.2: verify match count before asserting
# FM-3.3: exact match on expected top result
# ----------------------------------------

assert len(matches) == 1, \
    f"FAIL: expected 1 match, got {len(matches)}"

top_match = matches[0]

assert top_match.id == "doc1", \
    f"FAIL: expected top match 'doc1', got '{top_match.id}'"

assert top_match.score > 0.7, \
    f"FAIL: similarity score too low — expected >0.7, got {top_match.score:.4f}"

assert top_match.metadata["text"] == documents[0]["text"], \
    f"FAIL: metadata text mismatch"

# Verify index stats
stats = index.describe_index_stats()
final_ns = stats.namespaces.get(NAMESPACE)
final_count = final_ns.vector_count if final_ns else 0
assert final_count == len(documents), \
    f"FAIL: expected {len(documents)} vectors in index, got {final_count}"

print()
print("POST_EXECUTION: match count verified ✓")
print(f"POST_EXECUTION: top match is doc1 ✓  (score={top_match.score:.4f})")
print("POST_EXECUTION: metadata verified ✓")
print(f"POST_EXECUTION: index count verified ✓  ({final_count}/{len(documents)} vectors)")

result = {
    "status": "pass",
    "index_ready": True,
    "vectors_upserted": ns_count,
    "top_match_verified": True,
    "similarity_score": float(top_match.score),
    "index_count_verified": True,
}
print(result)
print("PASS")