When we build LLM features, we usually obsess over prompts and models. Yet the real magic often sits one layer above: the piece of code that decides when to call the model, how to stream, what to log, and which session to mutate. In the OpenClaw project—an automation system that wires LLM agents into messaging channels and queues—that role is played by a single orchestrator function.
We’ll dissect that orchestrator, runReplyAgent in src/auto-reply/reply/agent-runner.ts, and see how it coordinates a single reply turn: routing, session lifecycle, streaming, typing signals, diagnostics, and cost. I’m Mahmoud Zalt, an AI solutions architect; I help teams turn AI into reliable, observable product behavior, and this file is a concrete example of how to do that in practice.
Our goal is simple: understand how to design an application-level LLM orchestrator that keeps conversations sane, users confident, and operators informed. We’ll follow one turn through its lifecycle—session handling, steering, real‑time experience, and observability—and close with refactoring patterns that keep this critical function under control.
The orchestrator in context
The code we’re examining lives in src/auto-reply/reply/agent-runner.ts. OpenClaw’s auto‑reply system receives triggers from messaging channels, runs them through agents, and pushes replies back out. At the center of a single turn is runReplyAgent.
What runReplyAgent really is: an application-level orchestrator. It doesn’t implement model logic; it coordinates everything around it—sessions, queues, tools, streaming, and accounting. This is the layer most teams underestimate when shipping LLM features.
src/
auto-reply/
reply/
agent-runner.ts # Orchestrator for a single agent reply turn
agent-runner-execution.ts
agent-runner-helpers.ts
agent-runner-memory.ts
agent-runner-payloads.ts
agent-runner-utils.ts
block-reply-pipeline.ts
block-streaming.ts
followup-runner.ts
queue.ts
reply-threading.ts
session-updates.ts
session-usage.ts
typing-mode.ts
[Message/Trigger]
|
v
[Higher-level auto-reply controller]
|
v
[runReplyAgent]
|-- steering / followup decision
|-- memory flush & session updates
|-- agent turn & tools
|-- streaming & typing
|-- usage & diagnostics
|
v
[ReplyPayload | ReplyPayload[] | undefined]
|
v
[Channel adapter sends replies]
Conceptually, runReplyAgent takes everything known about a message and session and decides what happens next: reply now, steer to another agent, enqueue a followup, reset a broken session, or quietly do nothing. Along the way it keeps typing indicators, streaming blocks, and usage accounting in sync.
Owning the session lifecycle
Long‑lived conversations are where LLM apps either feel reliable or slowly fall apart. In OpenClaw, a session is a persisted record of an ongoing conversation: IDs, transcript file paths, flags like groupActivationNeedsSystemIntro, and usage info. runReplyAgent receives an optional sessionEntry, a sessionStore, and a sessionKey, and treats them as the source of truth for this turn.
Early in the function, it delegates history management to a dedicated helper:
activeSessionEntry = await runMemoryFlushIfNeeded({
cfg,
followupRun,
sessionCtx,
opts,
defaultModel,
agentCfgContextTokens,
resolvedVerboseLevel,
sessionEntry: activeSessionEntry,
sessionStore: activeSessionStore,
sessionKey,
storePath,
isHeartbeat,
});
All pruning and compaction live in runMemoryFlushIfNeeded. The orchestrator stays responsible for which session entry is "current" and passes that on to the rest of the turn. Separation of concerns is clear: orchestration owns when to flush and how to propagate the result; the helper owns how to flush.
The more delicate part is handling broken sessions. If compaction fails or the transcript order is corrupted, the orchestrator can’t just crash. Instead it uses an internal resetSession helper that creates a fresh session and updates every reference:
const resetSession = async ({
failureLabel,
buildLogMessage,
cleanupTranscripts,
}: SessionResetOptions): Promise => {
if (!sessionKey || !activeSessionStore || !storePath) return false;
const prevEntry = activeSessionStore[sessionKey] ?? activeSessionEntry;
if (!prevEntry) return false;
const prevSessionId = cleanupTranscripts ? prevEntry.sessionId : undefined;
const nextSessionId = crypto.randomUUID();
const nextEntry: SessionEntry = {
...prevEntry,
sessionId: nextSessionId,
updatedAt: Date.now(),
systemSent: false,
abortedLastRun: false,
};
const agentId = resolveAgentIdFromSessionKey(sessionKey);
const nextSessionFile = resolveSessionTranscriptPath(
nextSessionId,
agentId,
sessionCtx.MessageThreadId,
);
nextEntry.sessionFile = nextSessionFile;
activeSessionStore[sessionKey] = nextEntry;
try {
await updateSessionStore(storePath, (store) => {
store[sessionKey] = nextEntry;
});
} catch (err) {
defaultRuntime.error(
`Failed to persist session reset after ${failureLabel} (${sessionKey}): ${String(err)}`,
);
}
followupRun.run.sessionId = nextSessionId;
followupRun.run.sessionFile = nextSessionFile;
activeSessionEntry = nextEntry;
activeIsNewSession = true;
defaultRuntime.error(buildLogMessage(nextSessionId));
if (cleanupTranscripts && prevSessionId) {
const transcriptCandidates = new Set();
const resolved = resolveSessionFilePath(prevSessionId, prevEntry, { agentId });
if (resolved) transcriptCandidates.add(resolved);
transcriptCandidates.add(resolveSessionTranscriptPath(prevSessionId, agentId));
for (const candidate of transcriptCandidates) {
try {
fs.unlinkSync(candidate);
} catch {
// Best-effort cleanup.
}
}
}
return true;
};
A few principles here are worth copying:
-
Single place for state rewiring. The reset updates
activeSessionStore,followupRun.run, andactiveSessionEntrytogether. There’s no chance one subsystem keeps pointing at the old session. - Failures are logged, not fatal. If persisting the reset fails, the error is recorded but the turn tries to proceed. User experience wins over perfect bookkeeping.
- Transcript cleanup is best‑effort. Synchronous deletions with swallowed errors keep broken files from taking down the run. (We’ll revisit performance implications later.)
Steering, streaming, and user trust
Once the session is stable, the orchestrator has to decide what to do with the current message and how the user should experience that decision in real time. This is where steering, followups, streaming, and typing signals intersect.
Early exits for steering and followups
Before doing any heavy work, runReplyAgent checks whether this message should be answered now, steered to another agent, or converted into a queued followup. That logic lives near the top of the function and uses early returns to keep the rest of the flow simple:
if (shouldSteer && isStreaming) {
const steered = queueEmbeddedPiMessage(
followupRun.run.sessionId,
followupRun.prompt,
);
if (steered && !shouldFollowup) {
if (activeSessionEntry && activeSessionStore && sessionKey) {
const updatedAt = Date.now();
activeSessionEntry.updatedAt = updatedAt;
activeSessionStore[sessionKey] = activeSessionEntry;
if (storePath) {
await updateSessionStoreEntry({
storePath,
sessionKey,
update: async () => ({ updatedAt }),
});
}
}
typing.cleanup();
return undefined;
}
}
if (isActive && (shouldFollowup || resolvedQueue.mode === "steer")) {
enqueueFollowupRun(queueKey, followupRun, resolvedQueue);
if (activeSessionEntry && activeSessionStore && sessionKey) {
const updatedAt = Date.now();
activeSessionEntry.updatedAt = updatedAt;
activeSessionStore[sessionKey] = activeSessionEntry;
if (storePath) {
await updateSessionStoreEntry({
storePath,
sessionKey,
update: async () => ({ updatedAt }),
});
}
}
typing.cleanup();
return undefined;
}
The pattern is consistent:
- Decide whether to steer to an embedded Pi agent or enqueue a followup based on flags and queue configuration.
-
If exiting early, always bump
updatedAton the session and clean up typing indicators. -
Return
undefinedto signal "no direct reply payload"—the work continues elsewhere.
Importantly, the orchestrator never leaves background signals dangling. That discipline shows up again at the end of the function:
return finalizeWithFollowup(
finalPayloads.length === 1 ? finalPayloads[0] : finalPayloads,
queueKey,
runFollowupTurn,
);
} finally {
blockReplyPipeline?.stop();
typing.markRunComplete();
}
No matter how the function exits—steering, error, or normal completion—typing is marked complete and the streaming pipeline is stopped.
Typing signals and streaming as first-class citizens
After steering decisions, the orchestrator focuses on real‑time experience: whether the user sees typing indicators and how model output is streamed.
Typing behavior is split into two steps. First, createTypingSignaler wires the low-level runtime (like a channel‑specific typing API) into a generic interface:
const isHeartbeat = opts?.isHeartbeat === true;
const typingSignals = createTypingSignaler({
typing,
mode: typingMode,
isHeartbeat,
});
Later, once reply payloads are known, signalTypingIfNeeded decides whether to actually send typing signals based on the payload shape:
await signalTypingIfNeeded(replyPayloads, typingSignals);
This keeps channel idiosyncrasies in one helper and the "should we type at all for this reply?" logic in another. The orchestrator just sequences them.
Streaming is handled via a block reply pipeline, which coalesces partial outputs into larger blocks and flushes them on a timeout:
const blockReplyCoalescing =
blockStreamingEnabled && opts?.onBlockReply
? resolveBlockStreamingCoalescing(
cfg,
sessionCtx.Provider,
sessionCtx.AccountId,
blockReplyChunking,
)
: undefined;
const blockReplyPipeline =
blockStreamingEnabled && opts?.onBlockReply
? createBlockReplyPipeline({
onBlockReply: opts.onBlockReply,
timeoutMs: blockReplyTimeoutMs,
coalescing: blockReplyCoalescing,
buffer: createAudioAsVoiceBuffer({ isAudioPayload }),
})
: null;
The orchestrator chooses whether streaming is enabled, computes coalescing behavior from configuration, and instantiates the pipeline. Downstream, runAgentTurnWithFallback pushes content into this pipeline, and after the turn completes the orchestrator forces a final flush and teardown:
if (blockReplyPipeline) {
await blockReplyPipeline.flush({ force: true });
blockReplyPipeline.stop();
}
A timeout constant (BLOCK_REPLY_SEND_TIMEOUT_MS, 15 seconds by default) governs how long the pipeline can wait before sending whatever it has. That gives you a lever to balance smoother, coalesced blocks against fast first‑token feedback.
Usage, diagnostics, and cost
Beyond the in‑moment experience, an orchestrator must answer two questions: "What did this turn cost?" and "How is the system behaving at scale?" runReplyAgent bakes both into the main path instead of leaving them as afterthoughts.
Persisting session usage
After the agent completes, the orchestrator extracts usage and model metadata and persists an updated view for the session:
const usage = runResult.meta.agentMeta?.usage;
const modelUsed =
runResult.meta.agentMeta?.model ??
fallbackModel ??
defaultModel;
const providerUsed =
runResult.meta.agentMeta?.provider ??
fallbackProvider ??
followupRun.run.provider;
const cliSessionId = isCliProvider(providerUsed, cfg)
? runResult.meta.agentMeta?.sessionId?.trim()
: undefined;
const contextTokensUsed =
agentCfgContextTokens ??
lookupContextTokens(modelUsed) ??
activeSessionEntry?.contextTokens ??
DEFAULT_CONTEXT_TOKENS;
await persistSessionUsageUpdate({
storePath,
sessionKey,
usage,
modelUsed,
providerUsed,
contextTokensUsed,
systemPromptReport: runResult.meta.systemPromptReport,
cliSessionId,
});
There are two notable patterns here:
-
Graceful fallbacks. The orchestrator tolerates partial metadata, resolving
modelUsedandcontextTokensUsedthrough several layers of defaults. -
Centralized updates. All session‑level usage persistence goes through
persistSessionUsageUpdate. Downstream components don’t need to know how or where this is stored.
Emitting diagnostic events
When diagnostics are enabled and usage is non‑zero, the orchestrator emits a structured event describing the turn:
if (isDiagnosticsEnabled(cfg) && hasNonzeroUsage(usage)) {
const input = usage.input ?? 0;
const output = usage.output ?? 0;
const cacheRead = usage.cacheRead ?? 0;
const cacheWrite = usage.cacheWrite ?? 0;
const promptTokens = input + cacheRead + cacheWrite;
const totalTokens = usage.total ?? promptTokens + output;
const costConfig = resolveModelCostConfig({
provider: providerUsed,
model: modelUsed,
config: cfg,
});
const costUsd = estimateUsageCost({ usage, cost: costConfig });
emitDiagnosticEvent({
type: "model.usage",
sessionKey,
sessionId: followupRun.run.sessionId,
channel: replyToChannel,
provider: providerUsed,
model: modelUsed,
usage: {
input,
output,
cacheRead,
cacheWrite,
promptTokens,
total: totalTokens,
},
context: {
limit: contextTokensUsed,
used: totalTokens,
},
costUsd,
durationMs: Date.now() - runStartedAt,
});
}
Token breakdown, context utilization, estimated cost, and run duration are all present in one payload. From here it’s straightforward to derive metrics such as:
agent_run_duration_ms: end‑to‑end latency per turn.agent_run_failure_rate: frequency of failed runs or session resets.model_tokens_total: total tokens by provider/model.session_reset_count: stability of your session layer.
Surfacing usage back to users
Observability isn’t only for dashboards. OpenClaw can optionally expose usage to the end user as a line appended to the reply, controlled by a response usage mode stored in the session:
const responseUsageRaw =
activeSessionEntry?.responseUsage ??
(sessionKey ? activeSessionStore?.[sessionKey]?.responseUsage : undefined);
const responseUsageMode = resolveResponseUsageMode(responseUsageRaw);
if (responseUsageMode !== "off" && hasNonzeroUsage(usage)) {
const authMode = resolveModelAuthMode(providerUsed, cfg);
const showCost = authMode === "api-key";
const costConfig = showCost
? resolveModelCostConfig({
provider: providerUsed,
model: modelUsed,
config: cfg,
})
: undefined;
let formatted = formatResponseUsageLine({
usage,
showCost,
costConfig,
});
if (formatted && responseUsageMode === "full" && sessionKey) {
formatted = `${formatted} · session ${sessionKey}`;
}
if (formatted) {
responseUsageLine = formatted;
}
}
Later, this responseUsageLine is appended to the reply payloads via appendUsageLine. You can turn this off, show tokens only, or show tokens plus cost and session key for power users and internal debugging.
Refactoring by story phase
By now it’s clear that runReplyAgent does a lot. The code report measuring it found a cyclomatic complexity of 20 and a cognitive complexity of 22—high but not surprising for an orchestration layer that has to juggle sessions, queues, tools, streaming, and diagnostics.
The key to keeping such a function maintainable is to refactor along story phases, not arbitrary chunks of code. The existing design already exposes several clean seams.
1. Extract steering and followup handling
The early‑exit logic for steering and followups duplicates session updatedAt handling and typing cleanup. A dedicated helper like handleSteeringAndFollowup can encapsulate that behavior and return both a possible early result and an updated session entry.
With that helper, the top of the function reads more like a narrative:
const earlyExit = await handleSteeringAndFollowup({
shouldSteer,
shouldFollowup,
isStreaming,
isActive,
queueKey,
resolvedQueue,
followupRun,
typing,
sessionKey,
storePath,
activeSessionEntry,
activeSessionStore,
});
activeSessionEntry = earlyExit.activeSessionEntry;
if (earlyExit.result !== undefined) {
return earlyExit.result;
}
The main function can then proceed to "start typing", "run memory flush", "run agent turn", and "decorate replies" without being cluttered by steering details.
2. Make transcript cleanup non‑blocking
In resetSession, transcript files are deleted using fs.unlinkSync. That’s intentionally best‑effort, but it blocks the Node.js event loop and can become a problem under load or on slow disks.
A safer approach is to switch to fs.promises.unlink and dispatch deletions concurrently with Promise.allSettled. Behavior stays best‑effort, but the orchestrator no longer pauses the event loop while the filesystem catches up.
3. Extract reply decoration
Near the end of the function, reply payloads are decorated with auto‑compaction messages, new session hints, and optional usage lines. The logic is straightforward but dense:
let finalPayloads = replyPayloads;
const verboseEnabled = resolvedVerboseLevel !== "off";
if (autoCompactionCompleted) {
const count = await incrementCompactionCount({
sessionEntry: activeSessionEntry,
sessionStore: activeSessionStore,
sessionKey,
storePath,
});
if (verboseEnabled) {
const suffix = typeof count === "number" ? ` (count ${count})` : "";
finalPayloads = [
{ text: `🧹 Auto-compaction complete${suffix}.` },
...finalPayloads,
];
}
}
if (verboseEnabled && activeIsNewSession) {
finalPayloads = [
{ text: `🧭 New session: ${followupRun.run.sessionId}` },
...finalPayloads,
];
}
if (responseUsageLine) {
finalPayloads = appendUsageLine(finalPayloads, responseUsageLine);
}
A helper like decorateReplyPayloads can encapsulate this entire phase. That makes it easier to test decorations in isolation, to add new ones (like safety notices), and to reuse the same decoration rules from other orchestrators.
Conclusion and takeaways
Stepping back, runReplyAgent is more than a big function. It’s a concrete example of an LLM orchestrator that sits at the intersection of sessions, steering, streaming, typing, diagnostics, and cost. The primary lesson is that this orchestration layer—not prompts or models—is what makes an AI system feel reliable, transparent, and operable.
From this walkthrough, a few actionable patterns emerge:
- Promote the orchestrator to a first‑class component. Give it clear responsibilities: session lifecycle, steering and followups, real‑time UX (typing + streaming), and observability. Don’t bury these concerns inside model wrappers.
- Design explicit reset and early‑exit paths. When sessions break or messages are steered away, update all references in one place, bump timestamps, and close any user‑visible signals like typing indicators.
-
Build observability into the main path. Persist usage, emit structured diagnostics with tokens, context, cost, and duration, and optionally expose usage hints in replies. Track metrics like
agent_run_duration_msandsession_reset_countfrom the start. - Refactor along narrative boundaries. As complexity grows, extract helpers that align with phases: steering, memory management, agent execution, decoration. Let the main function read as a coherent story of one turn.
If you’re designing your own AI feature, sketch this orchestrator layer explicitly. Decide what each turn should own, what it should emit, and how it can recover from failures without surprising users. Treat it like the air‑traffic controller behind every reply—because in practice, that’s exactly what it is.
To explore the full implementation, you can read the source on GitHub: agent-runner.ts. Then, design the equivalent orchestrator in your stack and let that guide how you wire models, tools, and channels together.



