Agents Cookbook

When to use this page

The other docs cover individual mechanics — Sessions, Tools, Attachments, the SDK shape. This page combines them into the patterns that show up in production orchestrators driving multi-step inference loops.

Each recipe is a complete, copy-paste-runnable script. None depend on each other. Skim, pick the one that matches your shape, adapt.

Recipe 1 — Single-shot batch processing

A list of items where each gets one model call and the Stateless Contract is what you want across the batch — no conversation memory leaking between items. The canonical use of the daemon's stateless default mode, where each /v1/messages call binds to a fresh Pre-warmed PTY from the Session Pool.

from anthropic import Anthropic

client = Anthropic(api_key="unused", base_url="http://localhost:7421")

queries = ["Summarize: ...", "Classify: ...", "Extract: ..."]

results = []
for q in queries:
    msg = client.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=512,
        messages=[{"role": "user", "content": q}],
    )
    results.append(msg.content[0].text)

Throughput: serial batches reuse the same 2 warm PTYs at ~10 ms overhead per call. Concurrent batches spawn additional PTYs on demand — no hard cap (v1.1.10+); your RAM is the ceiling (~150–250 MB per claude subprocess).

Pool warmup pattern.A cold daemon's Session Pool starts empty; the first call pays ~2 s for synchronous PTY spawn. For batch jobs that need consistent latency from call 1, issue two cheap warmup calls after daemon start — that fills the idle pool to its TARGET_IDLE of 2, and subsequent calls grab Pre-warmed PTYs at ~10 ms overhead.

# Two cheap warmups before the real batch
for _ in range(2):
    client.messages.create(model="claude-sonnet-4-6", max_tokens=1,
                           messages=[{"role":"user","content":"ok"}])

For parallel batches, set the semaphore to whatever your RAM can afford. 20 concurrent is comfortable on a 16 GB MacBook; 50+ is the right ballpark on a 64 GB workstation:

import asyncio
from anthropic import AsyncAnthropic

client = AsyncAnthropic(api_key="unused", base_url="http://localhost:7421")
sem = asyncio.Semaphore(20)

async def run_one(q):
    async with sem:
        msg = await client.messages.create(
            model="claude-sonnet-4-6",
            max_tokens=512,
            messages=[{"role": "user", "content": q}],
        )
        return msg.content[0].text

results = await asyncio.gather(*(run_one(q) for q in queries))

Recipe 2 — Multi-turn planner loop

A long-running agent that builds up conversation state across many turns within one logical session. This is the Sticky Session pattern.

import httpx
import uuid

session_id = f"planner-{uuid.uuid4()}"
client = httpx.Client(base_url="http://localhost:7421",
                     headers={"X-IR-Session-ID": session_id})

# Turn 1: establish context
r = client.post("/v1/messages", json={
    "model": "claude-sonnet-4-6",
    "max_tokens": 1024,
    "messages": [{"role": "user",
                  "content": "You are planning a 3-step task. Step 1: ..."}],
})
print(r.json()["content"][0]["text"])

# Turn 2: claude remembers Turn 1 (sticky session)
r = client.post("/v1/messages", json={
    "model": "claude-sonnet-4-6",
    "max_tokens": 1024,
    "messages": [{"role": "user",
                  "content": "Now execute Step 2 with the result from Step 1."}],
})
print(r.json()["content"][0]["text"])

# Cleanup on shutdown
client.delete(f"/v1/sessions/{session_id}")

The X-IR-Session-IDheader pins a stable PTY for the orchestrator's lifetime. The messages[] array on each call should contain just the current turn — claude has its own memory of the conversation through the sticky Session.

If your orchestrator expects to outlive the daemon, persist session_idto disk and reuse it after a daemon restart. The PTY won't survive the restart, but the next call with that id will spawn a fresh Session under the same identifier (no memory of prior turns).

Recipe 3 — Tool-using agent

The model returns tool_use content blocks; your code executes the tool and sends a tool_result on the next turn. The standard Anthropic tool-calling shape works unchanged.

from anthropic import Anthropic

client = Anthropic(api_key="unused", base_url="http://localhost:7421")

tools = [{
    "name": "get_weather",
    "description": "Look up the current weather in a city.",
    "input_schema": {
        "type": "object",
        "properties": {"city": {"type": "string"}},
        "required": ["city"],
    },
}]

# Turn 1: ask for weather
msg = client.messages.create(
    model="claude-sonnet-4-6",
    max_tokens=512,
    tools=tools,
    messages=[{"role": "user", "content": "What's the weather in Paris?"}],
)

# Find the tool_use block
tool_use = next(b for b in msg.content if b.type == "tool_use")
print(f"Model wants: {tool_use.name}({tool_use.input})")

# Execute the tool yourself
def get_weather(city):
    return f"{city}: 18°C, light rain"

result = get_weather(tool_use.input["city"])

# Turn 2: feed result back
follow = client.messages.create(
    model="claude-sonnet-4-6",
    max_tokens=512,
    tools=tools,
    messages=[
        {"role": "user", "content": "What's the weather in Paris?"},
        {"role": "assistant", "content": msg.content},
        {"role": "user", "content": [{
            "type": "tool_result",
            "tool_use_id": tool_use.id,
            "content": result,
        }]},
    ],
)
print(follow.content[0].text)

The daemon routes the tool definition into claude through its bundled MCP server — claude sees the tools as live, calls them, the daemon captures the call and returns it as a tool_use block. Your code is the tool executor; the daemon never runs the tool itself.

See Tools for the round-trip protocol details.

Recipe 4 — Vision-aware extraction

A single inference call where the user-content array carries both text and an inline image content block. The daemon writes the base64-decoded image to a per-call tempfile and mentions it to the Claude Code subprocess via @path. Useful for OCR-shaped tasks, diagram reading, screenshot analysis — anything where the user prompt is grounded in an image alongside instructions.

import base64
from anthropic import Anthropic

client = Anthropic(api_key="unused", base_url="http://localhost:7421")

with open("screenshot.png", "rb") as f:
    img_b64 = base64.standard_b64encode(f.read()).decode()

msg = client.messages.create(
    model="claude-sonnet-4-6",
    max_tokens=1024,
    messages=[{
        "role": "user",
        "content": [
            {"type": "text", "text": "What error message is shown?"},
            {"type": "image",
             "source": {"type": "base64", "media_type": "image/png", "data": img_b64}},
        ],
    }],
)
print(msg.content[0].text)

The daemon writes the base64 payload to a tempfile and mentions it to claude as an @pathattachment. No size cap at the daemon layer — the limit is whatever your subscription's context window allows.

Recipe 5 — Reset-between-conversations

A long-running Orchestrator Process that drives several distinct logical conversations through a single Sticky Session id. Each conversation needs its own clean Stateless Contract, but the orchestrator wants to avoid the ~2 s spawn cost of binding a new session id. Pool-swap reset is the answer — drop the existing PTY, bind a Pre-warmed replacement under the same id:

import httpx

session_id = "agent-main"
client = httpx.Client(base_url="http://localhost:7421",
                     headers={"X-IR-Session-ID": session_id})

# Conversation A
client.post("/v1/messages", json={...})
client.post("/v1/messages", json={...})

# Wipe memory, keep id
client.post(f"/v1/sessions/{session_id}/reset")
# {"ok":true,"latency_ms":11}

# Conversation B (fresh memory, same id)
client.post("/v1/messages", json={...})

The reset endpoint drops the PTY and binds a Pre-warmed replacement from the Pool. ~10 ms in the happy path. Conversation memory: gone. Session id: preserved.

Recipe 6 — Live activity stream consumer (v1.1.14+)

The daemon emits an SSE event on GET /v1/activity/streamfor every successful or errored /v1/messages call. The dashboard uses this for real-time Recent Activity; you can also tap the same stream from your own monitoring code — useful for building latency dashboards, anomaly detectors, or local audit logs without polling.

# pip install sseclient-py requests
import json
from sseclient import SSEClient

for event in SSEClient("http://localhost:7421/v1/activity/stream"):
    if event.event != "call":
        if event.event == "lagged":
            print("WARN: dropped", event.data, "— refetch /v1/recent-calls")
        continue
    call = json.loads(event.data)
    print(f"{call['sessionId'][:8]}  {call['status']:7}  "
          f"{call.get('inputTokens', 0)}+{call.get('outputTokens', 0)} tok  "
          f"${call.get('costAvoidedUsd', 0):.4f} saved  "
          f"{call.get('promptPreview', '')[:60]}")

Each event payload is the full CallRecord shape from /v1/recent-calls — including the rawTranscript field if you want to mirror Agent Inspector behavior in your own UI. Slow consumers (~256 events behind) get a lagged event instead of a call — refetch /v1/recent-calls to reconcile.

Recipe 7 — Self-service diagnostic (v1.1.14+)

When something breaks in your CI environment or on a customer machine, the inference-relay doctor command runs five diagnostic checks and prints a structured report. Bake into your CI failure-debug step or your support intake:

# Human-readable
inference-relay doctor

# Machine-readable — pipe into your incident tool
inference-relay doctor --json | jq '.checks[] | select(.status != "pass")'

Checks: claude binary resolvable, settings.json readable, license configured, daemon /v1/health reachable, update server reachable. Exit 0 on PASS / WARN, exit 1 on FAIL.

Recipe 8 — OpenAI SDK consumer (v1.1.15+)

If your app is already coded against the OpenAI SDK, point it at http://localhost:7421/v1and skip the Anthropic SDK migration entirely. The daemon's /v1/chat/completions endpoint translates OpenAI request/response shapes to/from the same internal Anthropic pipeline. Tool calling, streaming, sticky sessions — all work.

# pip install openai
from openai import OpenAI

client = OpenAI(
    api_key="unused",                       # required by SDK; daemon ignores
    base_url="http://localhost:7421/v1",
)

# Plain text — note the OpenAI tier-naming convention; daemon's claude
# uses its own model regardless of what you pass here.
resp = client.chat.completions.create(
    model="gpt-4o",
    messages=[
        {"role": "system", "content": "You are terse."},
        {"role": "user", "content": "Reply with: OK"},
    ],
    max_tokens=30,
)
print(resp.choices[0].message.content)

# Function calling — OpenAI tools shape round-trips cleanly.
tools = [{
    "type": "function",
    "function": {
        "name": "get_weather",
        "description": "Get current weather for a city",
        "parameters": {
            "type": "object",
            "properties": {"city": {"type": "string"}},
            "required": ["city"],
        },
    },
}]
resp = client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "weather in Tokyo?"}],
    tools=tools,
    max_tokens=200,
)
# resp.choices[0].message.tool_calls[0].function.name == "get_weather"
# resp.choices[0].message.tool_calls[0].function.arguments == '{"city": "Tokyo"}'

Sticky sessions work the same way — set X-IR-Session-IDvia the SDK's default_headers:

client = OpenAI(
    api_key="unused",
    base_url="http://localhost:7421/v1",
    default_headers={"X-IR-Session-ID": "planner-loop-1"},
)

Where to go next