agent-loop-prevention

code_execution · unverified · null · json · download .py

Detect and break infinite agent loops using step counters, state hashing, and hard ceilings

import sys, hashlib, json, time

# ── PRE_EXECUTION ─────────────────────────────────────────────
# No external registry needed — stdlib only
# Validate config before execution
MAX_STEPS = 10
MAX_REPEATED_STATES = 3
TASK = "find the value 42 in a list"

assert MAX_STEPS > 0, "ABORT: MAX_STEPS must be > 0"
assert MAX_REPEATED_STATES > 0, "ABORT: MAX_REPEATED_STATES must be > 0"
assert isinstance(TASK, str) and TASK.strip(), "ABORT: TASK must be a non-empty string"

print(f"[CONFIG] MAX_STEPS={MAX_STEPS}, MAX_REPEATED_STATES={MAX_REPEATED_STATES}")
print(f"[TASK] {TASK}")

# ── EXECUTION ──────────────────────────────────────────────────

def hash_state(state: dict) -> str:
    """Deterministic hash of agent state — FM-1.3 loop detection."""
    serialized = json.dumps(state, sort_keys=True)
    return hashlib.sha256(serialized.encode()).hexdigest()[:16]

def is_task_complete(state: dict) -> bool:
    """Binary completion check — FM-1.5 termination condition."""
    return state.get("found") is True

def agent_step(state: dict, step: int) -> dict:
    """
    Simulated agent step. In production replace with real LLM tool call.
    Deliberately loops for first 4 steps to demonstrate detection,
    then makes progress.
    """
    new_state = dict(state)

    if step < 4:
        # Simulate agent stuck in loop — same action repeated
        new_state["action"] = "search_list"
        new_state["position"] = 0  # never advances — loop detected
    else:
        # Simulate agent making progress
        new_state["action"] = "search_list"
        new_state["position"] = step - 3
        if new_state["position"] >= 3:
            new_state["found"] = True
            new_state["value"] = 42

    return new_state

# Agent execution loop with loop prevention
state = {"action": None, "position": 0, "found": False, "value": None}
state_history = []
repeat_counter = 0
steps_executed = 0
termination_reason = None
last_hash = None

t0 = time.time()

for step in range(1, MAX_STEPS + 1):
    steps_executed = step

    # Execute step
    state = agent_step(state, step)
    current_hash = hash_state(state)
    state_history.append(current_hash)

    print(f"[STEP {step:02d}] hash={current_hash} action={state['action']} position={state['position']} found={state['found']}")

    # FM-1.3 — detect repeated state
    if current_hash == last_hash:
        repeat_counter += 1
        print(f"[LOOP-DETECT] Repeated state {repeat_counter}/{MAX_REPEATED_STATES}")
        if repeat_counter >= MAX_REPEATED_STATES:
            termination_reason = "repeated_state"
            print(f"[ABORT] Repeated state ceiling hit at step {step} — breaking loop")
            break
    else:
        repeat_counter = 0

    last_hash = current_hash

    # FM-1.5 — check task completion every step
    if is_task_complete(state):
        termination_reason = "task_complete"
        print(f"[COMPLETE] Task finished at step {step}")
        break

else:
    # FM-1.5 — hard ceiling reached
    termination_reason = "max_steps"
    print(f"[ABORT] MAX_STEPS ceiling ({MAX_STEPS}) reached — hard stop")

elapsed_ms = int((time.time() - t0) * 1000)

# ── POST_EXECUTION ─────────────────────────────────────────────
# FM-3.2 — verify we have a recorded termination reason
assert termination_reason is not None, "FAIL: no termination reason recorded"

# FM-3.3 — exact termination condition check
assert termination_reason in ("task_complete", "max_steps", "repeated_state"), (
    f"FAIL: unknown termination_reason '{termination_reason}'"
)

# Verify step count is within bounds
assert 1 <= steps_executed <= MAX_STEPS, (
    f"FAIL: steps_executed={steps_executed} out of bounds [1, {MAX_STEPS}]"
)

# Verify state history was recorded
assert len(state_history) == steps_executed, (
    f"FAIL: state_history length {len(state_history)} != steps_executed {steps_executed}"
)

result = {
    "steps_executed": steps_executed,
    "termination_reason": termination_reason,
    "max_steps": MAX_STEPS,
    "state_history": state_history,
    "elapsed_ms": elapsed_ms,
    "task": TASK,
}

print(json.dumps(result, indent=2))
print("PASS")