Patterns & edge cases¶
Production workarounds for the hard cases.
Long conversations that blow the context window¶
Trim history before each model call, non-destructively (the full history stays in durable state and is re-trimmed each step):
from spine_middleware import Compaction
agent = Agent("openai:gpt-4o-mini", middleware=[Compaction(max_messages=40, keep_last=20)])
A leading orphan tool result is dropped so the trimmed window stays valid for the provider.
Strict cost control¶
Cost guards stop before the next step, so a run can overshoot by one in-flight
call. To keep a hard cap, combine a low max_cost_usd with max_tokens and a
tight max_steps, and make sure cost is measured:
from spine_core import Guards
from spine_middleware import CostTracking
agent = Agent(
"openai:gpt-4o-mini",
guards=Guards(max_steps=4, max_cost_usd=0.05, max_tokens=8000),
middleware=[CostTracking(0.15, 0.60)], # needed for non-native price tables
)
Resilient model calls: retry → fallback → breaker¶
Order matters — these are independent middlewares that compose:
from spine_middleware import Retry, ModelFallback, CircuitBreaker
agent = Agent(
"openai:gpt-4o-mini",
middleware=[
CircuitBreaker(threshold=5, cooldown_s=30), # stop hammering a dead provider
Retry(max_attempts=3, base=0.2), # transient errors
ModelFallback("anthropic:claude-sonnet-4-6"),# last resort: switch provider
],
)
The kernel also caps provider attempts (100/step) and re-checks the wall-clock timeout inside the retry loop, so a misbehaving middleware can't loop forever.
Don't double-charge on retries¶
Make side-effecting tools idempotent so a retry replays the cached result instead of running the effect twice:
from spine_middleware import Idempotency
agent = Agent("openai:gpt-4o-mini", tools=[charge_card],
middleware=[Idempotency(tools=["charge_card"])])
Graceful shutdown (SIGTERM)¶
Pass a cancel predicate; the kernel finishes and checkpoints the current step,
then returns cancelled — resume later from exactly there:
import signal
stopping = False
signal.signal(signal.SIGTERM, lambda *_: globals().__setitem__("stopping", True))
result = await agent.run(task, should_cancel=lambda: stopping)
if result.stopped_reason.value == "cancelled":
... # state is checkpointed; another worker can resume result.state.session_id
HITL that outlives the process¶
The resume token is the session id, backed by the checkpoint store. A brand-new process resumes a days-old pause:
from spine_backends import SQLiteCheckpoint
store = SQLiteCheckpoint("runs.db")
# process A pauses for approval → record res.resume_token (== session_id)
# process B (next day): same store
res = await Agent("openai:gpt-4o-mini", tools=[refund], checkpoint=store).resume(token, "approve")
Huge tool outputs¶
from spine_middleware import ToolOutputTruncation
agent = Agent("openai:gpt-4o-mini", tools=[scrape],
middleware=[ToolOutputTruncation(max_chars=4000)])
Runaway / untrusted tool code¶
from spine_middleware import Sandbox
# sync tools only; resource limits (CPU/mem) + wall timeout; POSIX
agent = Agent("openai:gpt-4o-mini", tools=[run_user_code],
middleware=[Sandbox(tools=["run_user_code"], timeout_s=5, max_memory_mb=256)])
This stops runaway CPU/memory/hangs. For security isolation of untrusted code, use a container/VM.
Stop on repeated actions¶
from spine_middleware import LoopGuard
agent = Agent("openai:gpt-4o-mini", tools=tools, middleware=[LoopGuard(window=4, max_repeats=3)])
# stops with StopReason.LOOP if the same (tool, args) repeats
Golden-trace regression tests¶
Record a run once; replay it forever with no network and no side effects — assert the answer is byte-stable:
from spine_middleware import Recorder, Replayer
rec = Recorder()
gold = await Agent(provider, tools=tools, middleware=[rec]).run("task")
recording = rec.recording() # save to a fixture file
# in CI — deterministic, offline
out = await Agent(provider, tools=tools, middleware=[Replayer(recording)]).run("task")
assert out.answer == gold.answer
Concurrent sessions on one agent¶
An Agent is safe to reuse across concurrent runs — each run() with no
session_id gets a fresh State. Stateful middleware (LoopGuard, OTel, …) keys by
session id, so sessions don't interfere. For a shared ScriptedProvider in tests,
keep concurrency=1 (its index is mutable).
Per-tenant isolation & budgets¶
from spine_middleware import TenantBudget
budget = TenantBudget(max_cost_usd=10.0) # cumulative, per tenant, across runs
agent = Agent("openai:gpt-4o-mini", middleware=[budget], tenant_id="acme")
Custom stop conditions¶
Any middleware can end a run cleanly:
from spine_core import Middleware, StepContext, StopRun, StopReason
class MaxToolCalls(Middleware):
def __init__(self, limit: int) -> None:
self.limit, self.seen = limit, 0
async def after_model(self, ctx: StepContext) -> None:
self.seen += len(ctx.response.message.tool_calls)
if self.seen > self.limit:
raise StopRun(StopReason.GUARDRAIL, f"too many tool calls (>{self.limit})")
Force another turn¶
Set ctx.force_continue = True in after_model to loop again even when the model
produced a plain answer — the basis of repair loops (see StructuredOutput).