Agents Cookbook
When to use this page
The other docs cover individual mechanics — Sessions, Tools, Attachments, the SDK shape. This page combines them into the patterns that show up in production orchestrators driving multi-step inference loops.
Each recipe is a complete, copy-paste-runnable script. None depend on each other. Skim, pick the one that matches your shape, adapt.
Recipe 1 — Single-shot batch processing
A list of items where each gets one model call and the Stateless Contract is what you want across the batch — no conversation memory leaking between items. The canonical use of the daemon's stateless default mode, where each /v1/messages call binds to a fresh Pre-warmed PTY from the Session Pool.
from anthropic import Anthropic
client = Anthropic(api_key="unused", base_url="http://localhost:7421")
queries = ["Summarize: ...", "Classify: ...", "Extract: ..."]
results = []
for q in queries:
msg = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=512,
messages=[{"role": "user", "content": q}],
)
results.append(msg.content[0].text)Throughput: serial batches reuse the same 2 warm PTYs at ~10 ms overhead per call. Concurrent batches spawn additional PTYs on demand — no hard cap (v1.1.10+); your RAM is the ceiling (~150–250 MB per claude subprocess).
Pool warmup pattern.A cold daemon's Session Pool starts empty; the first call pays ~2 s for synchronous PTY spawn. For batch jobs that need consistent latency from call 1, issue two cheap warmup calls after daemon start — that fills the idle pool to its TARGET_IDLE of 2, and subsequent calls grab Pre-warmed PTYs at ~10 ms overhead.
# Two cheap warmups before the real batch
for _ in range(2):
client.messages.create(model="claude-sonnet-4-6", max_tokens=1,
messages=[{"role":"user","content":"ok"}])For parallel batches, set the semaphore to whatever your RAM can afford. 20 concurrent is comfortable on a 16 GB MacBook; 50+ is the right ballpark on a 64 GB workstation:
import asyncio
from anthropic import AsyncAnthropic
client = AsyncAnthropic(api_key="unused", base_url="http://localhost:7421")
sem = asyncio.Semaphore(20)
async def run_one(q):
async with sem:
msg = await client.messages.create(
model="claude-sonnet-4-6",
max_tokens=512,
messages=[{"role": "user", "content": q}],
)
return msg.content[0].text
results = await asyncio.gather(*(run_one(q) for q in queries))Recipe 2 — Multi-turn planner loop
A long-running agent that builds up conversation state across many turns within one logical session. This is the Sticky Session pattern.
import httpx
import uuid
session_id = f"planner-{uuid.uuid4()}"
client = httpx.Client(base_url="http://localhost:7421",
headers={"X-IR-Session-ID": session_id})
# Turn 1: establish context
r = client.post("/v1/messages", json={
"model": "claude-sonnet-4-6",
"max_tokens": 1024,
"messages": [{"role": "user",
"content": "You are planning a 3-step task. Step 1: ..."}],
})
print(r.json()["content"][0]["text"])
# Turn 2: claude remembers Turn 1 (sticky session)
r = client.post("/v1/messages", json={
"model": "claude-sonnet-4-6",
"max_tokens": 1024,
"messages": [{"role": "user",
"content": "Now execute Step 2 with the result from Step 1."}],
})
print(r.json()["content"][0]["text"])
# Cleanup on shutdown
client.delete(f"/v1/sessions/{session_id}")The X-IR-Session-IDheader pins a stable PTY for the orchestrator's lifetime. The messages[] array on each call should contain just the current turn — claude has its own memory of the conversation through the sticky Session.
If your orchestrator expects to outlive the daemon, persist session_idto disk and reuse it after a daemon restart. The PTY won't survive the restart, but the next call with that id will spawn a fresh Session under the same identifier (no memory of prior turns).
Recipe 3 — Tool-using agent
The model returns tool_use content blocks; your code executes the tool and sends a tool_result on the next turn. The standard Anthropic tool-calling shape works unchanged.
from anthropic import Anthropic
client = Anthropic(api_key="unused", base_url="http://localhost:7421")
tools = [{
"name": "get_weather",
"description": "Look up the current weather in a city.",
"input_schema": {
"type": "object",
"properties": {"city": {"type": "string"}},
"required": ["city"],
},
}]
# Turn 1: ask for weather
msg = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=512,
tools=tools,
messages=[{"role": "user", "content": "What's the weather in Paris?"}],
)
# Find the tool_use block
tool_use = next(b for b in msg.content if b.type == "tool_use")
print(f"Model wants: {tool_use.name}({tool_use.input})")
# Execute the tool yourself
def get_weather(city):
return f"{city}: 18°C, light rain"
result = get_weather(tool_use.input["city"])
# Turn 2: feed result back
follow = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=512,
tools=tools,
messages=[
{"role": "user", "content": "What's the weather in Paris?"},
{"role": "assistant", "content": msg.content},
{"role": "user", "content": [{
"type": "tool_result",
"tool_use_id": tool_use.id,
"content": result,
}]},
],
)
print(follow.content[0].text)The daemon routes the tool definition into claude through its bundled MCP server — claude sees the tools as live, calls them, the daemon captures the call and returns it as a tool_use block. Your code is the tool executor; the daemon never runs the tool itself.
See Tools for the round-trip protocol details.
Recipe 4 — Vision-aware extraction
A single inference call where the user-content array carries both text and an inline image content block. The daemon writes the base64-decoded image to a per-call tempfile and mentions it to the Claude Code subprocess via @path. Useful for OCR-shaped tasks, diagram reading, screenshot analysis — anything where the user prompt is grounded in an image alongside instructions.
import base64
from anthropic import Anthropic
client = Anthropic(api_key="unused", base_url="http://localhost:7421")
with open("screenshot.png", "rb") as f:
img_b64 = base64.standard_b64encode(f.read()).decode()
msg = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{
"role": "user",
"content": [
{"type": "text", "text": "What error message is shown?"},
{"type": "image",
"source": {"type": "base64", "media_type": "image/png", "data": img_b64}},
],
}],
)
print(msg.content[0].text)The daemon writes the base64 payload to a tempfile and mentions it to claude as an @pathattachment. No size cap at the daemon layer — the limit is whatever your subscription's context window allows.
Recipe 5 — Reset-between-conversations
A long-running Orchestrator Process that drives several distinct logical conversations through a single Sticky Session id. Each conversation needs its own clean Stateless Contract, but the orchestrator wants to avoid the ~2 s spawn cost of binding a new session id. Pool-swap reset is the answer — drop the existing PTY, bind a Pre-warmed replacement under the same id:
import httpx
session_id = "agent-main"
client = httpx.Client(base_url="http://localhost:7421",
headers={"X-IR-Session-ID": session_id})
# Conversation A
client.post("/v1/messages", json={...})
client.post("/v1/messages", json={...})
# Wipe memory, keep id
client.post(f"/v1/sessions/{session_id}/reset")
# {"ok":true,"latency_ms":11}
# Conversation B (fresh memory, same id)
client.post("/v1/messages", json={...})The reset endpoint drops the PTY and binds a Pre-warmed replacement from the Pool. ~10 ms in the happy path. Conversation memory: gone. Session id: preserved.
Recipe 6 — Live activity stream consumer (v1.1.14+)
The daemon emits an SSE event on GET /v1/activity/streamfor every successful or errored /v1/messages call. The dashboard uses this for real-time Recent Activity; you can also tap the same stream from your own monitoring code — useful for building latency dashboards, anomaly detectors, or local audit logs without polling.
# pip install sseclient-py requests
import json
from sseclient import SSEClient
for event in SSEClient("http://localhost:7421/v1/activity/stream"):
if event.event != "call":
if event.event == "lagged":
print("WARN: dropped", event.data, "— refetch /v1/recent-calls")
continue
call = json.loads(event.data)
print(f"{call['sessionId'][:8]} {call['status']:7} "
f"{call.get('inputTokens', 0)}+{call.get('outputTokens', 0)} tok "
f"${call.get('costAvoidedUsd', 0):.4f} saved "
f"{call.get('promptPreview', '')[:60]}")Each event payload is the full CallRecord shape from /v1/recent-calls — including the rawTranscript field if you want to mirror Agent Inspector behavior in your own UI. Slow consumers (~256 events behind) get a lagged event instead of a call — refetch /v1/recent-calls to reconcile.
Recipe 7 — Self-service diagnostic (v1.1.14+)
When something breaks in your CI environment or on a customer machine, the inference-relay doctor command runs five diagnostic checks and prints a structured report. Bake into your CI failure-debug step or your support intake:
# Human-readable inference-relay doctor # Machine-readable — pipe into your incident tool inference-relay doctor --json | jq '.checks[] | select(.status != "pass")'
Checks: claude binary resolvable, settings.json readable, license configured, daemon /v1/health reachable, update server reachable. Exit 0 on PASS / WARN, exit 1 on FAIL.
Recipe 8 — OpenAI SDK consumer (v1.1.15+)
If your app is already coded against the OpenAI SDK, point it at http://localhost:7421/v1and skip the Anthropic SDK migration entirely. The daemon's /v1/chat/completions endpoint translates OpenAI request/response shapes to/from the same internal Anthropic pipeline. Tool calling, streaming, sticky sessions — all work.
# pip install openai
from openai import OpenAI
client = OpenAI(
api_key="unused", # required by SDK; daemon ignores
base_url="http://localhost:7421/v1",
)
# Plain text — note the OpenAI tier-naming convention; daemon's claude
# uses its own model regardless of what you pass here.
resp = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "You are terse."},
{"role": "user", "content": "Reply with: OK"},
],
max_tokens=30,
)
print(resp.choices[0].message.content)
# Function calling — OpenAI tools shape round-trips cleanly.
tools = [{
"type": "function",
"function": {
"name": "get_weather",
"description": "Get current weather for a city",
"parameters": {
"type": "object",
"properties": {"city": {"type": "string"}},
"required": ["city"],
},
},
}]
resp = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "weather in Tokyo?"}],
tools=tools,
max_tokens=200,
)
# resp.choices[0].message.tool_calls[0].function.name == "get_weather"
# resp.choices[0].message.tool_calls[0].function.arguments == '{"city": "Tokyo"}'Sticky sessions work the same way — set X-IR-Session-IDvia the SDK's default_headers:
client = OpenAI(
api_key="unused",
base_url="http://localhost:7421/v1",
default_headers={"X-IR-Session-ID": "planner-loop-1"},
)Where to go next
- Sessions deeper dive → Sessions
- Tool round-trip protocol → Tools
- Daemon failure modes for agents → Troubleshooting