Writing

Reliable AI Agent Pipelines: Orchestration, Retries, Circuit Breakers, and HumanintheLoop

Error taxonomy, retry strategies, circuit breakers, idempotent tool design, human-in-the-loop escalation gates, observability, and testing patterns.

30 min read
AI AgentsAI agent pipeline reliabilityAI agent orchestration

The demo works. The production deployment fails six times in the first week.

This is the standard AI agent reliability story. The agent produces correct outputs in testing: the task is clear, the tools respond as expected, the context is clean. In production, the task is ambiguous, the external API is rate-limited, the context has noise from previous failed attempts, and the agent confidently takes an irreversible action based on a hallucinated assumption.

Reliability in AI agent pipelines is not primarily an LLM problem. The underlying models are often working correctly. The problem is the engineering scaffolding around them. There is no error taxonomy, retry logic that amplifies failures instead of recovering from them, missing idempotency guarantees that cause double-writes on retry, human escalation paths that trigger too late or not at all, and zero visibility into what the agent did when something goes wrong.

Error taxonomy, retry strategies, circuit breakers, idempotent tool design, human-in-the-loop escalation gates, observability, and testing patterns. Every section includes Python code that implements production-grade patterns.

The mental model: treat your AI agent pipeline the same way you would treat a distributed microservice system. The LLM is an unreliable network call that sometimes returns wrong answers instead of errors. Design accordingly.

Error Taxonomy: Classifying What Can Go Wrong

Before you can design appropriate error handling, you need a complete taxonomy of the failure modes in an agent pipeline. Collapsing all errors into a single catch-except or a single "retry everything" policy is the fastest path to cascading failures.

Type 1: Transient infrastructure errors (safe to retry immediately or with backoff)

  • Rate limits (HTTP 429) - model API or tool API is over capacity
  • Timeout (HTTP 408, connection reset) - network latency, slow tool response
  • Service unavailability (HTTP 503) - brief outage, restart

Handling: Exponential backoff with jitter, maximum 3-5 retries, log each attempt.

Type 2: Input validation errors (never retry with same input)

  • Tool schema violation - agent called a tool with invalid parameter types
  • Input constraint violation - input exceeds length limit, illegal characters
  • Missing required parameter - agent hallucinated a tool call without required fields

Handling: Feed error back to agent with explicit correction prompt. Allow the agent one attempt to correct its tool call. If second attempt also fails validation, escalate to human.

Type 3: Tool execution errors (retry depends on tool semantics)

  • Database constraint violation: unique constraint, foreign key error
  • File not found, permission denied
  • Downstream API business logic error (e.g., "insufficient balance")

Handling: Classify per-tool. Retrying a database write that failed on unique constraint will always fail. Retrying a file read that timed out might succeed.

Type 4: LLM output quality errors (retry with modified prompt)

  • Hallucination - agent states a fact that contradicts retrieved context
  • Instruction non-compliance - agent ignored a constraint in the system prompt
  • Reasoning error - agent reached a logically inconsistent conclusion
  • Incomplete output - agent produced truncated output (token limit hit)

Handling: These require a different strategy than retry. Either regenerate with an explicit correction prompt, escalate to a more capable model, or request human review.

Type 5: Irreversible action errors (cannot be undone; human required)

  • Agent sent an email/message
  • Agent made a database mutation that succeeded but was semantically wrong
  • Agent executed a deployment, payment, or deletion

Handling: Post-mortem and process improvement. Prevent by requiring confirmation gates before any irreversible action.

Type 6: Cascading context errors (the session is corrupted)

  • Agent is stuck in a loop (same action repeated N times)
  • Agent's context contains contradictory information that causes inconsistent behavior
  • Agent has accumulated too many failed attempts and is reasoning based on a confused error state

Handling: Hard reset the session context. Start a new session with a clean summary of what was attempted and why it failed.

from enum import Enum
from dataclasses import dataclass
from typing import Optional
 
class ErrorType(Enum):
    TRANSIENT_INFRASTRUCTURE = "transient_infrastructure"
    INPUT_VALIDATION = "input_validation"
    TOOL_EXECUTION = "tool_execution"
    LLM_OUTPUT_QUALITY = "llm_output_quality"
    IRREVERSIBLE_ACTION = "irreversible_action"
    CASCADING_CONTEXT = "cascading_context"
 
class ErrorSeverity(Enum):
    LOW = "low"        # Retry transparently
    MEDIUM = "medium"  # Retry with notification
    HIGH = "high"      # Escalate to human
    CRITICAL = "critical"  # Stop pipeline immediately
 
@dataclass
class AgentError:
    error_type: ErrorType
    severity: ErrorSeverity
    message: str
    tool_name: Optional[str] = None
    retry_count: int = 0
    is_retryable: bool = True
    context: Optional[dict] = None
 
def classify_error(exception: Exception,
                    tool_name: Optional[str] = None,
                    retry_count: int = 0) -> AgentError:
    """Classify an exception into the agent error taxonomy."""
    error_msg = str(exception)
 
    # Infrastructure errors
    if isinstance(exception, TimeoutError):
        return AgentError(ErrorType.TRANSIENT_INFRASTRUCTURE, ErrorSeverity.LOW,
                         error_msg, tool_name, retry_count, is_retryable=True)
 
    if hasattr(exception, 'status_code'):
        status = exception.status_code
        if status == 429:
            return AgentError(ErrorType.TRANSIENT_INFRASTRUCTURE, ErrorSeverity.LOW,
                             f"Rate limit exceeded", tool_name, retry_count, is_retryable=True)
        if status in (500, 502, 503, 504):
            return AgentError(ErrorType.TRANSIENT_INFRASTRUCTURE, ErrorSeverity.MEDIUM,
                             f"Service unavailable: {status}", tool_name, retry_count,
                             is_retryable=retry_count < 3)
 
    # Validation errors
    if isinstance(exception, (ValueError, TypeError)) and tool_name:
        return AgentError(ErrorType.INPUT_VALIDATION, ErrorSeverity.MEDIUM,
                         error_msg, tool_name, retry_count,
                         is_retryable=retry_count < 1)  # One correction attempt only
 
    # Default: medium severity, limited retries
    return AgentError(ErrorType.TOOL_EXECUTION, ErrorSeverity.MEDIUM,
                     error_msg, tool_name, retry_count,
                     is_retryable=retry_count < 2)

Retry Strategies: What to Retry, When, and How

The two retry policies that matter for agent pipelines are different from each other and from standard HTTP retry logic:

Policy 1: Tool call retry with exponential backoff (for transient infrastructure errors)

import asyncio
import random
import time
from functools import wraps
from typing import TypeVar, Callable, Awaitable
 
T = TypeVar("T")
 
def with_exponential_backoff(
    max_retries: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    jitter: bool = True,
    retryable_types: tuple = (TimeoutError,),
):
    """
    Decorator for async tool functions that adds exponential backoff retry.
    Only retries on specified exception types.
    """
    def decorator(func: Callable[..., Awaitable[T]]) -> Callable[..., Awaitable[T]]:
        @wraps(func)
        async def wrapper(*args, **kwargs) -> T:
            last_exception = None
            for attempt in range(max_retries + 1):
                try:
                    return await func(*args, **kwargs)
                except retryable_types as e:
                    last_exception = e
                    if attempt == max_retries:
                        break
 
                    delay = min(base_delay * (2 ** attempt), max_delay)
                    if jitter:
                        delay *= (0.5 + random.random() * 0.5)  # ±50% jitter
 
                    import structlog
                    log = structlog.get_logger()
                    log.warning("tool_retry",
                               func=func.__name__,
                               attempt=attempt + 1,
                               delay_seconds=round(delay, 2),
                               error=str(e))
 
                    await asyncio.sleep(delay)
 
            raise last_exception
 
        return wrapper
    return decorator
 
# Usage:
@with_exponential_backoff(max_retries=3, retryable_types=(TimeoutError, ConnectionError))
async def call_external_api(url: str, params: dict) -> dict:
    async with aiohttp.ClientSession() as session:
        async with session.get(url, params=params, timeout=aiohttp.ClientTimeout(total=10)) as resp:
            resp.raise_for_status()
            return await resp.json()

Policy 2: LLM regeneration with correction prompt (for output quality errors)

When the LLM produces invalid output (wrong format, hallucinated values, violated constraints), do not retry the same call. Construct a correction prompt that includes the original output and an explicit statement of what was wrong:

async def regenerate_with_correction(
    original_prompt: str,
    bad_output: str,
    error_description: str,
    llm_client,
    max_correction_attempts: int = 2,
) -> str:
    """
    Regenerate LLM output with an explicit correction prompt.
    Used when the first generation is invalid (wrong format, constraint violation, etc.)
    """
    correction_prompt = f"""{original_prompt}
 
---
CORRECTION REQUIRED:
 
Your previous response was:
{bad_output}
 
Problem: {error_description}
 
Please try again, addressing the specific problem described above.
"""
 
    for attempt in range(max_correction_attempts):
        response = await llm_client.complete(correction_prompt)
 
        # Validate the new response
        validation_error = validate_output(response)
        if validation_error is None:
            return response
 
        # Update correction prompt with new error
        correction_prompt = f"""{correction_prompt}
 
---
SECOND CORRECTION ATTEMPT:
 
Your second response was:
{response}
 
Still has problem: {validation_error}
 
This is your final attempt. Be very explicit about following the required format.
"""
 
    # Both attempts failed: return last output for human review
    raise ValueError(f"Failed to generate valid output after {max_correction_attempts} attempts")
 
def validate_output(output: str) -> Optional[str]:
    """
    Validate LLM output. Return error description string if invalid, None if valid.
    Override this with your specific validation logic.
    """
    # Example: validate JSON tool call format
    try:
        if output.strip().startswith("{"):
            json.loads(output)
    except json.JSONDecodeError as e:
        return f"Output is not valid JSON: {e}"
 
    return None  # Valid

Policy 3: Stuck detection and session reset (for cascading context errors)

class AgentLoopDetector:
    """
    Detects when an agent is stuck in a repetitive loop
    and forces a context reset.
    """
 
    def __init__(self, window_size: int = 5, similarity_threshold: float = 0.9):
        self.window_size = window_size
        self.similarity_threshold = similarity_threshold
        self.recent_actions: list[str] = []
 
    def record_action(self, action_signature: str) -> bool:
        """
        Record an action and return True if a loop is detected.
        action_signature: a hash or summary of the tool call + parameters
        """
        self.recent_actions.append(action_signature)
        if len(self.recent_actions) > self.window_size:
            self.recent_actions.pop(0)
 
        if len(self.recent_actions) < 3:
            return False
 
        # Check if the last N actions are all the same
        unique_actions = set(self.recent_actions[-3:])
        if len(unique_actions) == 1:
            return True  # Exact loop: same action 3 times in a row
 
        # Check for alternating pattern (A, B, A, B, A)
        if len(self.recent_actions) >= 5:
            pattern = self.recent_actions[-5:]
            if pattern[0] == pattern[2] == pattern[4] and pattern[1] == pattern[3]:
                return True
 
        return False
 
    def reset(self):
        self.recent_actions = []

Circuit Breaker Pattern for LLM Tool Calls

The circuit breaker pattern prevents an agent from repeatedly calling a tool that is consistently failing. Without it, the agent wastes LLM tokens, generates noise in the error context, and spirals into confused reasoning.

The three states are: Closed (normal operation, call through), Open (blocking calls, returning cached failure), and Half-open (allowing one test call to check if service recovered).

import time
from enum import Enum
from dataclasses import dataclass, field
from threading import Lock
 
class CircuitState(Enum):
    CLOSED = "closed"         # Normal operation
    OPEN = "open"             # Blocking calls
    HALF_OPEN = "half_open"   # Testing if service recovered
 
@dataclass
class CircuitBreaker:
    """
    Circuit breaker for a named tool or external service.
    Thread-safe implementation.
    """
    name: str
    failure_threshold: int = 5        # Failures before opening
    recovery_timeout: float = 60.0    # Seconds before attempting recovery
    success_threshold: int = 2        # Successes needed to close from half-open
 
    state: CircuitState = field(default=CircuitState.CLOSED, init=False)
    failure_count: int = field(default=0, init=False)
    success_count: int = field(default=0, init=False)
    last_failure_time: Optional[float] = field(default=None, init=False)
    _lock: Lock = field(default_factory=Lock, init=False, repr=False)
 
    def call_allowed(self) -> bool:
        """Check if a call should be allowed through."""
        with self._lock:
            if self.state == CircuitState.CLOSED:
                return True
 
            if self.state == CircuitState.OPEN:
                # Check if recovery timeout has passed
                if (self.last_failure_time and
                        time.monotonic() - self.last_failure_time > self.recovery_timeout):
                    self.state = CircuitState.HALF_OPEN
                    self.success_count = 0
                    return True
                return False
 
            if self.state == CircuitState.HALF_OPEN:
                return True  # Allow test calls
 
        return False
 
    def record_success(self):
        with self._lock:
            if self.state == CircuitState.HALF_OPEN:
                self.success_count += 1
                if self.success_count >= self.success_threshold:
                    self.state = CircuitState.CLOSED
                    self.failure_count = 0
            elif self.state == CircuitState.CLOSED:
                self.failure_count = max(0, self.failure_count - 1)  # Decay failures
 
    def record_failure(self):
        with self._lock:
            self.failure_count += 1
            self.last_failure_time = time.monotonic()
 
            if (self.state == CircuitState.CLOSED and
                    self.failure_count >= self.failure_threshold):
                self.state = CircuitState.OPEN
            elif self.state == CircuitState.HALF_OPEN:
                self.state = CircuitState.OPEN  # Test failed, back to open
 
    def status(self) -> dict:
        return {
            "name": self.name,
            "state": self.state.value,
            "failure_count": self.failure_count,
            "time_until_retry": max(
                0,
                self.recovery_timeout - (time.monotonic() - (self.last_failure_time or 0))
            ) if self.state == CircuitState.OPEN else 0,
        }
 
class CircuitBreakerRegistry:
    """Central registry for all circuit breakers in the agent pipeline."""
 
    def __init__(self):
        self._breakers: dict[str, CircuitBreaker] = {}
 
    def get(self, tool_name: str,
             failure_threshold: int = 5,
             recovery_timeout: float = 60.0) -> CircuitBreaker:
        if tool_name not in self._breakers:
            self._breakers[tool_name] = CircuitBreaker(
                name=tool_name,
                failure_threshold=failure_threshold,
                recovery_timeout=recovery_timeout,
            )
        return self._breakers[tool_name]
 
    def all_statuses(self) -> list[dict]:
        return [b.status() for b in self._breakers.values()]
 
    def any_open(self) -> bool:
        return any(b.state == CircuitState.OPEN for b in self._breakers.values())
 
# Integration with tool execution:
async def execute_tool_with_circuit_breaker(
    tool_name: str,
    tool_fn: callable,
    args: dict,
    registry: CircuitBreakerRegistry,
) -> dict:
    breaker = registry.get(tool_name)
 
    if not breaker.call_allowed():
        status = breaker.status()
        raise RuntimeError(
            f"Circuit breaker OPEN for tool '{tool_name}'. "
            f"Retry in {status['time_until_retry']:.0f}s. "
            f"Do not call this tool again until the circuit recovers."
        )
 
    try:
        result = await tool_fn(**args)
        breaker.record_success()
        return result
    except Exception as e:
        breaker.record_failure()
        raise

Circuit breaker strategy per tool type:

Tool type Failure threshold Recovery timeout Notes
External API (third party) 3 30s External dependencies fail unpredictably
Database queries 5 60s DB issues usually require manual intervention
File system operations 10 15s FS errors often transient
LLM model calls 2 120s Rate limits need generous recovery time
Memory store reads 10 10s Local; fast recovery expected

When the agent receives a circuit-open error, the error message should be explicit enough for the agent to decide not to retry. Include the estimated recovery time in the error message. The agent needs this information to reason correctly about whether to wait, use an alternative tool, or escalate.

Idempotent Tools: Preventing DoubleWrite on Retry

Retry logic without idempotency guarantees causes double-writes. An agent calls a "send email" tool, the call times out before the response arrives, the agent retries, and two emails are sent. This is the standard behavior of retry-on-timeout without idempotency.

Every tool with side effects must be idempotent. The pattern is that each tool call includes a client-generated idempotency key. The tool implementation checks for duplicate keys and returns the previous result instead of executing again.

import hashlib
import json
from datetime import datetime, timezone, timedelta
import sqlite3
 
class IdempotencyStore:
    """
    Stores idempotency keys and previous tool results.
    Prevents duplicate execution of the same tool call on retry.
    """
 
    def __init__(self, db_path: str, ttl_hours: int = 24):
        self.ttl = timedelta(hours=ttl_hours)
        self.conn = sqlite3.connect(db_path, check_same_thread=False)
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS idempotency_keys (
                key         TEXT PRIMARY KEY,
                tool_name   TEXT NOT NULL,
                result      TEXT NOT NULL,  -- JSON-encoded result
                created_at  TEXT NOT NULL,
                expires_at  TEXT NOT NULL
            )
        """)
        self.conn.commit()
 
    def generate_key(self, tool_name: str, args: dict) -> str:
        """Generate a stable idempotency key from tool name + args."""
        # Sort args for consistent hashing regardless of dict order
        canonical = json.dumps({"tool": tool_name, "args": args}, sort_keys=True)
        return hashlib.sha256(canonical.encode()).hexdigest()
 
    def check(self, key: str) -> Optional[dict]:
        """Return previous result if key exists and is not expired. Else None."""
        now = datetime.now(timezone.utc).isoformat()
        row = self.conn.execute(
            "SELECT result, expires_at FROM idempotency_keys WHERE key=? AND expires_at>?",
            (key, now)
        ).fetchone()
 
        if row:
            return json.loads(row[0])
        return None
 
    def store(self, key: str, tool_name: str, result: dict):
        """Store a tool result with the given idempotency key."""
        now = datetime.now(timezone.utc)
        expires_at = (now + self.ttl).isoformat()
 
        self.conn.execute("""
            INSERT OR REPLACE INTO idempotency_keys VALUES (?,?,?,?,?)
        """, (key, tool_name, json.dumps(result), now.isoformat(), expires_at))
        self.conn.commit()
 
    def cleanup_expired(self):
        """Remove expired keys (run periodically)."""
        now = datetime.now(timezone.utc).isoformat()
        self.conn.execute("DELETE FROM idempotency_keys WHERE expires_at < ?", (now,))
        self.conn.commit()
 
def idempotent_tool(idempotency_store: IdempotencyStore):
    """
    Decorator that makes a tool function idempotent.
    Checks for a previous result before executing, stores result after.
    """
    def decorator(func: callable) -> callable:
        @wraps(func)
        async def wrapper(**kwargs) -> dict:
            tool_name = func.__name__
            key = idempotency_store.generate_key(tool_name, kwargs)
 
            # Check for previous result
            previous_result = idempotency_store.check(key)
            if previous_result is not None:
                import structlog
                log = structlog.get_logger()
                log.info("idempotency_hit",
                         tool=tool_name,
                         key=key[:8],
                         result_source="cache")
                return {**previous_result, "_idempotency_hit": True}
 
            # Execute the tool
            result = await func(**kwargs)
 
            # Store the result
            idempotency_store.store(key, tool_name, result)
            return result
 
        return wrapper
    return decorator
 
# Usage:
idempotency = IdempotencyStore("agent_idempotency.db", ttl_hours=24)
 
@idempotent_tool(idempotency)
async def send_notification(user_id: str, message: str, channel: str) -> dict:
    # This function will only execute once per unique (user_id, message, channel) combination
    # within the TTL window, regardless of how many times the agent calls it.
    result = await notification_service.send(user_id, message, channel)
    return {"sent": True, "message_id": result.id, "timestamp": result.timestamp}

Which tools must be idempotent: Any tool that writes data, sends messages, or has observable external effects. Tools that only read data (query, search, retrieve) do not need idempotency. Reading twice is fine.

The idempotency key design matters. The key must capture the semantic identity of the operation. For "send email to user@example.com about topic X", the key should include the recipient, subject, and content hash (not a timestamp). If the agent retries the same logical operation, it should get the same key and thus the same result.

HumanintheLoop: When to Escalate and How

Human-in-the-loop is not a fallback for when everything else fails. It is a designed part of the pipeline with explicit escalation conditions, timeout behavior, and integration points. Treating it as an emergency measure means it activates too late and with too little context.

Escalation triggers (conditions that should require human confirmation or review):

  1. High-stakes irreversible actions: Any action that cannot be undone, including deletion, payment, external publication, or deployment to production.

  2. Confidence below threshold: When the agent's reasoning involves "I'm not certain" signals (expressing doubt, hedging claims, requesting clarification that was not provided) and the next step is irreversible.

  3. Multiple consecutive failures: If a tool has failed 3+ times and the agent is still attempting it, human review of the approach is warranted.

  4. Context contradiction: The agent detects that information in its context contradicts earlier established facts. This often indicates a data quality issue or previous error that needs human resolution.

  5. Policy boundary: The requested action is outside the explicitly defined scope for autonomous operation.

from enum import Enum
from dataclasses import dataclass
from typing import Optional, Callable, Awaitable
import asyncio
 
class EscalationReason(Enum):
    IRREVERSIBLE_ACTION = "irreversible_action"
    LOW_CONFIDENCE = "low_confidence"
    REPEATED_FAILURE = "repeated_failure"
    CONTEXT_CONTRADICTION = "context_contradiction"
    POLICY_BOUNDARY = "policy_boundary"
 
class EscalationOutcome(Enum):
    APPROVED = "approved"
    REJECTED = "rejected"
    MODIFIED = "modified"    # Human approves with modifications
    TIMEOUT = "timeout"      # Human did not respond in time
 
@dataclass
class EscalationRequest:
    reason: EscalationReason
    action_description: str    # What the agent wants to do
    context_summary: str       # Why it wants to do it
    options: list[str]         # Suggested options for the human reviewer
    timeout_seconds: float = 300.0  # 5 minute default
    default_on_timeout: EscalationOutcome = EscalationOutcome.REJECTED
 
@dataclass
class EscalationResponse:
    outcome: EscalationOutcome
    instructions: Optional[str] = None    # For MODIFIED outcome
    reviewer_id: Optional[str] = None
    reviewed_at: Optional[str] = None
 
class HumanEscalationGate:
    """
    Gate that pauses agent execution pending human review.
    Integrates with whatever notification/review system you use.
    """
 
    def __init__(self,
                 notify_fn: Callable[[EscalationRequest], Awaitable[str]],
                 poll_fn: Callable[[str], Awaitable[Optional[EscalationResponse]]]):
        """
        notify_fn: send escalation to human, returns a request_id
        poll_fn: check if human has responded, returns response or None
        """
        self.notify = notify_fn
        self.poll = poll_fn
        self.pending_requests: dict[str, EscalationRequest] = {}
 
    async def request_approval(self, request: EscalationRequest) -> EscalationResponse:
        """
        Send an escalation request and wait for human response.
        Returns response or default on timeout.
        """
        # Send notification
        request_id = await self.notify(request)
        self.pending_requests[request_id] = request
 
        # Poll for response
        deadline = asyncio.get_event_loop().time() + request.timeout_seconds
        poll_interval = 5.0  # Check every 5 seconds
 
        while asyncio.get_event_loop().time() < deadline:
            response = await self.poll(request_id)
            if response is not None:
                del self.pending_requests[request_id]
                return response
            await asyncio.sleep(poll_interval)
 
        # Timeout
        del self.pending_requests.get(request_id, None)
        return EscalationResponse(
            outcome=request.default_on_timeout,
            instructions=f"Auto-{request.default_on_timeout.value} due to timeout after {request.timeout_seconds}s"
        )
 
def requires_approval(action_category: str,
                       risk_level: str = "high",
                       gate: Optional[HumanEscalationGate] = None):
    """
    Decorator for tool functions that require human approval before execution.
    """
    def decorator(func: callable) -> callable:
        @wraps(func)
        async def wrapper(**kwargs) -> dict:
            if gate is None:
                # No gate configured: allow through (development mode)
                return await func(**kwargs)
 
            # Build escalation request
            args_summary = json.dumps(kwargs, indent=2)[:500]
            request = EscalationRequest(
                reason=EscalationReason.IRREVERSIBLE_ACTION,
                action_description=f"Execute {func.__name__}({action_category})",
                context_summary=f"Tool: {func.__name__}\nRisk: {risk_level}\nParameters:\n{args_summary}",
                options=["Approve", "Reject", "Modify parameters"],
                default_on_timeout=EscalationOutcome.REJECTED,
            )
 
            response = await gate.request_approval(request)
 
            if response.outcome == EscalationOutcome.APPROVED:
                return await func(**kwargs)
            elif response.outcome == EscalationOutcome.MODIFIED:
                # Parse modified instructions and re-execute with new params
                # This requires structured modification format from the reviewer
                modified_kwargs = parse_modification_instructions(
                    response.instructions, kwargs
                )
                return await func(**modified_kwargs)
            else:  # REJECTED or TIMEOUT
                return {
                    "status": "rejected",
                    "reason": response.instructions or "Rejected by human reviewer",
                    "escalation_outcome": response.outcome.value,
                }
 
        return wrapper
    return decorator
 
# Usage:
@requires_approval(action_category="email_send", risk_level="high")
async def send_customer_email(customer_id: str, subject: str, body: str) -> dict:
    return await email_service.send(customer_id, subject, body)

Escalation UX requirements: The escalation notification must give the human reviewer enough context to make a good decision in 30 seconds. Include what action is proposed, why the agent wants to take it, what alternatives the agent considered, and what will happen if rejected. A one-line "Agent wants to send email, approve?" notification is not sufficient.

Graceful degradation on rejection: When a human rejects an action, the agent needs clear instructions on what to do instead. The rejection response includes a mandatory instructions field that tells the agent the next step ("Rejected: explain to the user that account deletion requires phone verification"). Bare rejections with no instructions leave the agent without a recovery path.

Orchestration Patterns: Sequential, Parallel, and Hierarchical

Sequential pipeline (most common): Steps execute in order. Each step's output is the next step's input. Failure in any step halts the pipeline.

Use when steps have hard dependencies, ordering is semantically required, or parallelism creates consistency risks.

class SequentialPipeline:
    def __init__(self, steps: list[callable], circuit_registry: CircuitBreakerRegistry):
        self.steps = steps
        self.circuits = circuit_registry
 
    async def run(self, initial_input: dict) -> dict:
        context = initial_input
        for step_fn in self.steps:
            if self.circuits.any_open():
                open_circuits = [s["name"] for s in self.circuits.all_statuses()
                                if s["state"] == "open"]
                raise RuntimeError(f"Pipeline halted: open circuits: {open_circuits}")
            context = await step_fn(context)
        return context

Parallel pipeline with fan-out/fan-in: Independent steps run concurrently. Results are merged.

Use when steps are independent, latency reduction matters, and individual step failures should not block other steps.

async def parallel_with_partial_failure(
    tasks: dict[str, Callable[[], Awaitable[dict]]],
    required_tasks: set[str],
) -> dict[str, dict]:
    """
    Run tasks in parallel. Required tasks must succeed; optional tasks
    may fail without halting the pipeline.
    """
    results = await asyncio.gather(
        *[task() for task in tasks.values()],
        return_exceptions=True
    )
 
    output = {}
    failed_required = []
 
    for (task_name, _), result in zip(tasks.items(), results):
        if isinstance(result, Exception):
            if task_name in required_tasks:
                failed_required.append((task_name, str(result)))
            else:
                output[task_name] = {"error": str(result), "skipped": True}
        else:
            output[task_name] = result
 
    if failed_required:
        raise RuntimeError(
            f"Required pipeline tasks failed: "
            + "; ".join(f"{name}: {err}" for name, err in failed_required)
        )
 
    return output

Hierarchical (planner + executor) pattern: A planning agent breaks a goal into subtasks. Executor agents handle individual subtasks. A critic agent reviews outputs.

The critical reliability consideration for hierarchical patterns is the planner's task decomposition. It can be incorrect. Build explicit validation between plan generation and execution.

async def hierarchical_agent_run(goal: str,
                                  planner_agent,
                                  executor_pool: dict[str, callable],
                                  critic_agent,
                                  human_gate: HumanEscalationGate,
                                  max_plan_revisions: int = 2) -> dict:
    """Hierarchical pipeline: plan → validate → execute → critique."""
 
    # 1. Generate plan
    plan = await planner_agent.plan(goal)
 
    # 2. Validate plan before execution
    for revision in range(max_plan_revisions + 1):
        validation_errors = validate_plan(plan, executor_pool)
 
        if not validation_errors:
            break
 
        if revision == max_plan_revisions:
            # Plan never validated: escalate to human
            response = await human_gate.request_approval(EscalationRequest(
                reason=EscalationReason.REPEATED_FAILURE,
                action_description=f"Execute plan for: {goal}",
                context_summary=f"Plan failed validation after {max_plan_revisions} revisions.\nErrors: {validation_errors}",
                options=["Provide corrected plan", "Cancel goal"],
            ))
            if response.outcome != EscalationOutcome.APPROVED:
                return {"status": "cancelled", "reason": "Plan validation failed"}
 
        plan = await planner_agent.revise_plan(plan, validation_errors)
 
    # 3. Execute plan steps
    results = {}
    for step in plan.steps:
        executor = executor_pool.get(step.executor_type)
        if not executor:
            raise ValueError(f"No executor for type: {step.executor_type}")
        results[step.id] = await executor(step, results)
 
    # 4. Critique output
    critique = await critic_agent.evaluate(goal, plan, results)
    if critique.has_issues:
        results["critique"] = critique.issues
 
    return {"status": "complete", "results": results, "critique": critique}

Observability: Tracing Agent Decisions at Runtime

An agent pipeline without observability is a black box that produces outputs you cannot explain. When something goes wrong (and it will), you need to reconstruct exactly what the agent decided, what tools it called, what context it had, and where the reasoning diverged from what you expected.

The minimum observability stack for production agents:

1. Structured logging with span IDs

Every agent action gets a trace ID (per-session) and span ID (per-tool-call). All downstream tool calls inherit the trace ID. This allows reconstructing the full execution graph from logs.

import structlog
import uuid
from contextvars import ContextVar
 
current_trace_id: ContextVar[str] = ContextVar("trace_id", default="")
current_span_id: ContextVar[str] = ContextVar("span_id", default="")
 
def get_logger():
    return structlog.get_logger().bind(
        trace_id=current_trace_id.get(),
        span_id=current_span_id.get(),
    )
 
class AgentTracer:
    """Structured tracing for agent pipeline execution."""
 
    def __init__(self, session_id: str):
        self.session_id = session_id
        self.trace_id = str(uuid.uuid4())[:8]
        current_trace_id.set(self.trace_id)
        self.log = get_logger().bind(session_id=session_id)
 
    def log_turn_start(self, user_message: str, context_tokens: int):
        self.log.info("turn_start",
                      message_preview=user_message[:100],
                      context_tokens=context_tokens)
 
    def log_tool_call(self, tool_name: str, args: dict) -> str:
        span_id = str(uuid.uuid4())[:8]
        current_span_id.set(span_id)
        self.log.info("tool_call",
                      tool=tool_name,
                      args_summary={k: str(v)[:50] for k, v in args.items()},
                      span_id=span_id)
        return span_id
 
    def log_tool_result(self, span_id: str, tool_name: str,
                        result: dict, duration_ms: float):
        self.log.info("tool_result",
                      tool=tool_name,
                      span_id=span_id,
                      duration_ms=round(duration_ms, 1),
                      result_preview=str(result)[:100],
                      success=True)
 
    def log_tool_error(self, span_id: str, tool_name: str,
                       error: AgentError, duration_ms: float):
        self.log.warning("tool_error",
                         tool=tool_name,
                         span_id=span_id,
                         duration_ms=round(duration_ms, 1),
                         error_type=error.error_type.value,
                         error_severity=error.severity.value,
                         message=error.message,
                         retry_count=error.retry_count)
 
    def log_escalation(self, reason: EscalationReason, outcome: EscalationOutcome):
        self.log.warning("human_escalation",
                         reason=reason.value,
                         outcome=outcome.value)
 
    def log_turn_end(self, response_preview: str, total_tool_calls: int,
                      total_duration_ms: float):
        self.log.info("turn_end",
                      response_preview=response_preview[:100],
                      total_tool_calls=total_tool_calls,
                      total_duration_ms=round(total_duration_ms, 1))

2. Agent decision logging

Beyond tool calls, log the agent's reasoning. For debugging, the most valuable log entry is often "what was in the context window when the agent made the wrong decision". Capture context size, retrieved memories, and the agent's stated reasoning for each tool call.

3. Key metrics to alert on

Metric Alert threshold What it signals
Tool error rate >5% over 5 minutes Infrastructure or agent quality problem
Escalation rate >20% over 1 hour Task scope too broad for automation
Circuit breaker opens Any in production Tool dependency failure
Context token utilization >90% Context management failure
Loop detection triggers >2/hour Agent stuck pattern
Average tool calls per turn >50% increase Agent struggling (calling more tools to compensate)

Testing Reliability: Fault Injection and Golden Path Tests

Testing agent reliability requires deliberately injecting failures, not just testing the happy path.

class FaultInjector:
    """
    Test utility: inject configured failures into tool calls.
    Use to validate that retry logic, circuit breakers, and escalation work.
    """
 
    def __init__(self):
        self.fault_configs: dict[str, list[dict]] = {}
 
    def configure(self, tool_name: str, faults: list[dict]):
        """
        faults: list of {
            "on_call": int,     # Which call number triggers this fault (1-indexed)
            "exception": Exception,  # Exception to raise
            "delay": float,    # Optional extra delay in seconds
        }
        """
        self.fault_configs[tool_name] = faults
        self._call_counts = {}
 
    def wrap_tool(self, tool_name: str, tool_fn: callable) -> callable:
        """Wrap a tool function with fault injection."""
        @wraps(tool_fn)
        async def wrapper(**kwargs):
            call_count = self._call_counts.get(tool_name, 0) + 1
            self._call_counts[tool_name] = call_count
 
            faults = self.fault_configs.get(tool_name, [])
            for fault in faults:
                if fault.get("on_call") == call_count:
                    if fault.get("delay"):
                        await asyncio.sleep(fault["delay"])
                    if fault.get("exception"):
                        raise fault["exception"]
 
            return await tool_fn(**kwargs)
        return wrapper
 
# Test: verify retry on transient failure
async def test_retry_on_rate_limit():
    injector = FaultInjector()
    injector.configure("search_web", [
        {"on_call": 1, "exception": RateLimitError("429 Too Many Requests")},
        {"on_call": 2, "exception": RateLimitError("429 Too Many Requests")},
        # Call 3 succeeds
    ])
 
    wrapped_search = injector.wrap_tool("search_web", real_search_web)
    result = await retry_with_backoff(wrapped_search, query="test query")
    assert result is not None, "Should succeed on third attempt"
    assert injector._call_counts["search_web"] == 3, "Should have made exactly 3 calls"
 
# Test: verify circuit breaker opens after threshold
async def test_circuit_breaker_opens():
    registry = CircuitBreakerRegistry()
    breaker = registry.get("flaky_api", failure_threshold=3)
 
    # Record failures
    for _ in range(3):
        breaker.record_failure()
 
    assert breaker.state == CircuitState.OPEN, "Circuit should be open after threshold"
 
    # Verify call is blocked
    assert not breaker.call_allowed(), "Calls should be blocked when circuit is open"
 
# Test: verify idempotency prevents double-write
async def test_idempotent_tool_deduplicates():
    store = IdempotencyStore(":memory:", ttl_hours=1)  # In-memory SQLite for testing
    call_count = 0
 
    @idempotent_tool(store)
    async def write_record(record_id: str, data: dict) -> dict:
        nonlocal call_count
        call_count += 1
        return {"written": True, "record_id": record_id}
 
    # Call twice with same parameters
    result1 = await write_record(record_id="test-123", data={"value": 42})
    result2 = await write_record(record_id="test-123", data={"value": 42})
 
    assert call_count == 1, "Tool should only execute once despite two calls"
    assert result2.get("_idempotency_hit"), "Second call should be a cache hit"

Failure Modes Reference: Symptoms, Causes, and Fixes

Symptom Root Cause Fix
Agent repeats same tool call indefinitely No loop detection, no max-attempts enforcement Implement AgentLoopDetector, add per-tool max retry count
Emails sent twice on retry No idempotency on send_email tool Add idempotency key to all write tools
Agent stops responding after external API goes down No circuit breaker; agent keeps retrying and exhausting context with error messages Add circuit breaker; include recovery time in error message
Agent takes irreversible action based on hallucination No confirmation gate for high-stakes actions Add @requires_approval for all irreversible tool calls
Error context accumulates and confuses agent Failed attempts left in context without summarization Clear failed attempt context after N retries, add session-level reset
No visibility into what went wrong Missing structured logging Add AgentTracer to all tool calls and turn boundaries
Agent quality degrades in long sessions Context filling with low-signal content Implement WorkingMemoryManager with priority slots
Test suite passes but production fails Testing only happy path Add FaultInjector tests for all failure modes

Key Takeaways

  • Agent pipeline reliability requires a complete error taxonomy. The six types (transient infrastructure, input validation, tool execution, LLM output quality, irreversible action, cascading context) need different handling strategies. A single catch-all retry policy amplifies failures rather than recovering from them.

  • Retry logic without idempotency causes double-writes. Every tool with side effects (writes, sends, deployments) must implement idempotency using client-generated keys. The idempotency key must capture the semantic identity of the operation: not a timestamp, but a hash of the tool name and parameters.

  • Circuit breakers prevent an open tool dependency from corrupting the agent's reasoning context with accumulated error messages. When a circuit opens, the error message to the agent must include the estimated recovery time. Otherwise the agent will keep trying and reasoning incorrectly about why the tool keeps failing.

  • Human-in-the-loop is a designed pipeline component, not an emergency fallback. Escalation triggers must be explicit: irreversible actions, confidence below threshold, repeated failures, context contradictions, policy boundaries. Escalation notifications must include enough context for a human to make a good decision in 30 seconds.

  • Observability is non-negotiable for production agents. Every tool call needs a trace ID and span ID. Every turn needs start/end logs with context token counts. Metrics to alert on: tool error rate >5%, escalation rate >20%, any circuit breaker opening, loop detection triggers. Without these, failure postmortems are impossible.

  • Test agent reliability with deliberate fault injection. The FaultInjector pattern allows configuring which tool calls raise which exceptions, verifying that retry logic, circuit breakers, and escalation gates behave correctly under failure conditions. Happy path tests alone do not validate reliability.

FAQ

How do you handle errors in AI agent pipelines?

AI agent pipeline errors require handling based on error type, not a uniform retry policy. Transient infrastructure errors (rate limits, timeouts) should be retried with exponential backoff and jitter, up to 3-5 attempts. Input validation errors (wrong parameters, schema violations) should feed the error back to the agent as an explicit correction prompt, allowing one correction attempt before escalating. Tool execution errors should be classified per-tool: some are retriable (timeout), others are not (unique constraint violation). LLM output quality errors require regeneration with a correction prompt, not retry of the same call. Irreversible action errors (wrong email sent) cannot be recovered programmatically and require human review. The circuit breaker pattern prevents repeated calls to consistently failing tools from corrupting the agent's reasoning context.

What is humanintheloop for AI agents?

Human-in-the-loop (HITL) for AI agents is a designed pipeline mechanism that pauses agent execution and routes specific decisions or actions to a human reviewer before proceeding. Unlike ad-hoc interruption, production HITL has explicit escalation triggers (irreversible actions, low confidence, repeated failures, policy boundaries), structured escalation requests that include the proposed action, context, and options, timeout behavior with defined defaults (typically reject on timeout), and clear instructions to the agent for what to do after rejection. HITL should be positioned before irreversible actions are taken, not as a post-hoc review after something goes wrong. The notification to the human reviewer must include enough context to make a good decision in under 30 seconds.

How do you prevent AI agents from sending duplicate messages on retry?

Preventing duplicate messages on retry requires making the "send message" tool idempotent using a client-generated idempotency key. The key is generated from a stable hash of the tool name and parameters (recipient, subject, message content), not from timestamps or random values. Before executing the send, the tool checks if this key has been used within the TTL window. If it has, it returns the previous result without executing again. If not, it executes and stores the result with the key. This guarantees that retrying the same logical send operation produces exactly one message regardless of how many times the tool is called. The idempotency key must be computed deterministically from the operation's semantic content. If the agent changes the message content on retry, a new key is generated and the new message is sent.

What is a circuit breaker in AI agent systems?

A circuit breaker in AI agent systems is a mechanism that tracks the failure rate of a specific tool or external service and temporarily blocks calls to it when the failure rate exceeds a threshold. It operates in three states: Closed (normal, all calls pass through), Open (blocking, all calls fail immediately with a clear error message), and Half-open (allowing one test call to check if the service has recovered). The primary value in agent contexts is preventing failed tool calls from filling the agent's context window with error messages, which causes the agent to reason incorrectly about why its actions are failing and often leads to stuck loops or escalating misdiagnosis. When a circuit is open, the error message to the agent should explicitly state that the tool is unavailable and include the estimated recovery time, giving the agent enough information to route around the failure or escalate appropriately.

How do you test AI agent reliability in production?

AI agent reliability testing requires both happy path tests and deliberate fault injection. Happy path tests verify that the agent produces correct outputs when all tools succeed. Fault injection tests verify that the reliability mechanisms work correctly under failure: the FaultInjector pattern configures specific tool calls to raise specific exceptions, allowing you to verify that retry logic makes the correct number of attempts, circuit breakers open after the configured failure threshold, idempotency prevents double-writes on retry, and escalation gates activate for the right conditions. Critical test cases: retry succeeds on third attempt after two transient failures; circuit breaker opens and blocks subsequent calls; idempotent tool executes exactly once despite three calls; escalation gate activates before an irreversible action and correctly handles both approval and rejection outcomes.

The difference between a demo agent and a production agent is not the quality of the LLM. It is the quality of the engineering around it.

Production reliability in agent pipelines is a solved problem in distributed systems. Retry logic, circuit breakers, idempotency, observability, and human escalation are all patterns with decades of implementation experience. The challenge specific to agent systems is that the LLM introduces a new failure mode: the system component that decides what to do next is itself unreliable. It can make wrong decisions, accept wrong inputs, and take wrong actions, and it does so confidently.

These reliability patterns address this directly. Error taxonomy gives you the vocabulary to handle different failure types correctly. Circuit breakers prevent tool failures from corrupting agent reasoning. Idempotency prevents retry from causing double-writes. Human-in-the-loop gates block irreversible actions pending verification. Observability lets you reconstruct what happened when something goes wrong.

None of these patterns are exotic. They are the same patterns used to build reliable distributed services, applied to a component that happens to use natural language as its interface. Once you recognize the agent pipeline as a distributed system with an unreliable decision-making component, the engineering approach is straightforward.

Build the scaffolding. The LLM will surprise you, but it will surprise you in ways you can recover from.

Written & published by Chaitanya Prabuddha