Skip to main content

The Orchestrator Behind Every AI Reply

Every AI answer hides an orchestrator deciding what to call, when to respond, and how it all fits together. The Orchestrator Behind Every AI Reply digs into that layer.

Code Cracking
25m read
#AI#developers#LLM#architecture
The Orchestrator Behind Every AI Reply - Featured blog post image

CONSULTING

Got a specific AI pipeline question?

Orchestration flow, agent coordination, retry logic — bring your architecture, get a clear answer in one session.

When we build LLM features, we usually obsess over prompts and models. Yet the real magic often sits one layer above: the piece of code that decides when to call the model, how to stream, what to log, and which session to mutate. In the OpenClaw project—an automation system that wires LLM agents into messaging channels and queues—that role is played by a single orchestrator function.

We’ll dissect that orchestrator, runReplyAgent in src/auto-reply/reply/agent-runner.ts, and see how it coordinates a single reply turn: routing, session lifecycle, streaming, typing signals, diagnostics, and cost. I’m Mahmoud Zalt, an AI solutions architect; I help teams turn AI into reliable, observable product behavior, and this file is a concrete example of how to do that in practice.

Our goal is simple: understand how to design an application-level LLM orchestrator that keeps conversations sane, users confident, and operators informed. We’ll follow one turn through its lifecycle—session handling, steering, real‑time experience, and observability—and close with refactoring patterns that keep this critical function under control.

The orchestrator in context

The code we’re examining lives in src/auto-reply/reply/agent-runner.ts. OpenClaw’s auto‑reply system receives triggers from messaging channels, runs them through agents, and pushes replies back out. At the center of a single turn is runReplyAgent.

What runReplyAgent really is: an application-level orchestrator. It doesn’t implement model logic; it coordinates everything around it—sessions, queues, tools, streaming, and accounting. This is the layer most teams underestimate when shipping LLM features.

src/
  auto-reply/
    reply/
      agent-runner.ts        # Orchestrator for a single agent reply turn
      agent-runner-execution.ts
      agent-runner-helpers.ts
      agent-runner-memory.ts
      agent-runner-payloads.ts
      agent-runner-utils.ts
      block-reply-pipeline.ts
      block-streaming.ts
      followup-runner.ts
      queue.ts
      reply-threading.ts
      session-updates.ts
      session-usage.ts
      typing-mode.ts

[Message/Trigger]
      |
      v
[Higher-level auto-reply controller]
      |
      v
[runReplyAgent]
  |-- steering / followup decision
  |-- memory flush & session updates
  |-- agent turn & tools
  |-- streaming & typing
  |-- usage & diagnostics
      |
      v
[ReplyPayload | ReplyPayload[] | undefined]
      |
      v
[Channel adapter sends replies]
The orchestrator in its natural habitat: one turn in, one decision-rich flow out.

Conceptually, runReplyAgent takes everything known about a message and session and decides what happens next: reply now, steer to another agent, enqueue a followup, reset a broken session, or quietly do nothing. Along the way it keeps typing indicators, streaming blocks, and usage accounting in sync.

Owning the session lifecycle

Long‑lived conversations are where LLM apps either feel reliable or slowly fall apart. In OpenClaw, a session is a persisted record of an ongoing conversation: IDs, transcript file paths, flags like groupActivationNeedsSystemIntro, and usage info. runReplyAgent receives an optional sessionEntry, a sessionStore, and a sessionKey, and treats them as the source of truth for this turn.

Early in the function, it delegates history management to a dedicated helper:

activeSessionEntry = await runMemoryFlushIfNeeded({
  cfg,
  followupRun,
  sessionCtx,
  opts,
  defaultModel,
  agentCfgContextTokens,
  resolvedVerboseLevel,
  sessionEntry: activeSessionEntry,
  sessionStore: activeSessionStore,
  sessionKey,
  storePath,
  isHeartbeat,
});

All pruning and compaction live in runMemoryFlushIfNeeded. The orchestrator stays responsible for which session entry is "current" and passes that on to the rest of the turn. Separation of concerns is clear: orchestration owns when to flush and how to propagate the result; the helper owns how to flush.

The more delicate part is handling broken sessions. If compaction fails or the transcript order is corrupted, the orchestrator can’t just crash. Instead it uses an internal resetSession helper that creates a fresh session and updates every reference:

const resetSession = async ({
  failureLabel,
  buildLogMessage,
  cleanupTranscripts,
}: SessionResetOptions): Promise => {
  if (!sessionKey || !activeSessionStore || !storePath) return false;

  const prevEntry = activeSessionStore[sessionKey] ?? activeSessionEntry;
  if (!prevEntry) return false;

  const prevSessionId = cleanupTranscripts ? prevEntry.sessionId : undefined;
  const nextSessionId = crypto.randomUUID();

  const nextEntry: SessionEntry = {
    ...prevEntry,
    sessionId: nextSessionId,
    updatedAt: Date.now(),
    systemSent: false,
    abortedLastRun: false,
  };

  const agentId = resolveAgentIdFromSessionKey(sessionKey);
  const nextSessionFile = resolveSessionTranscriptPath(
    nextSessionId,
    agentId,
    sessionCtx.MessageThreadId,
  );

  nextEntry.sessionFile = nextSessionFile;
  activeSessionStore[sessionKey] = nextEntry;

  try {
    await updateSessionStore(storePath, (store) => {
      store[sessionKey] = nextEntry;
    });
  } catch (err) {
    defaultRuntime.error(
      `Failed to persist session reset after ${failureLabel} (${sessionKey}): ${String(err)}`,
    );
  }

  followupRun.run.sessionId = nextSessionId;
  followupRun.run.sessionFile = nextSessionFile;
  activeSessionEntry = nextEntry;
  activeIsNewSession = true;

  defaultRuntime.error(buildLogMessage(nextSessionId));

  if (cleanupTranscripts && prevSessionId) {
    const transcriptCandidates = new Set();
    const resolved = resolveSessionFilePath(prevSessionId, prevEntry, { agentId });
    if (resolved) transcriptCandidates.add(resolved);
    transcriptCandidates.add(resolveSessionTranscriptPath(prevSessionId, agentId));

    for (const candidate of transcriptCandidates) {
      try {
        fs.unlinkSync(candidate);
      } catch {
        // Best-effort cleanup.
      }
    }
  }

  return true;
};

A few principles here are worth copying:

  • Single place for state rewiring. The reset updates activeSessionStore, followupRun.run, and activeSessionEntry together. There’s no chance one subsystem keeps pointing at the old session.
  • Failures are logged, not fatal. If persisting the reset fails, the error is recorded but the turn tries to proceed. User experience wins over perfect bookkeeping.
  • Transcript cleanup is best‑effort. Synchronous deletions with swallowed errors keep broken files from taking down the run. (We’ll revisit performance implications later.)

Steering, streaming, and user trust

Once the session is stable, the orchestrator has to decide what to do with the current message and how the user should experience that decision in real time. This is where steering, followups, streaming, and typing signals intersect.

Early exits for steering and followups

Before doing any heavy work, runReplyAgent checks whether this message should be answered now, steered to another agent, or converted into a queued followup. That logic lives near the top of the function and uses early returns to keep the rest of the flow simple:

if (shouldSteer && isStreaming) {
  const steered = queueEmbeddedPiMessage(
    followupRun.run.sessionId,
    followupRun.prompt,
  );

  if (steered && !shouldFollowup) {
    if (activeSessionEntry && activeSessionStore && sessionKey) {
      const updatedAt = Date.now();
      activeSessionEntry.updatedAt = updatedAt;
      activeSessionStore[sessionKey] = activeSessionEntry;

      if (storePath) {
        await updateSessionStoreEntry({
          storePath,
          sessionKey,
          update: async () => ({ updatedAt }),
        });
      }
    }

    typing.cleanup();
    return undefined;
  }
}

if (isActive && (shouldFollowup || resolvedQueue.mode === "steer")) {
  enqueueFollowupRun(queueKey, followupRun, resolvedQueue);

  if (activeSessionEntry && activeSessionStore && sessionKey) {
    const updatedAt = Date.now();
    activeSessionEntry.updatedAt = updatedAt;
    activeSessionStore[sessionKey] = activeSessionEntry;

    if (storePath) {
      await updateSessionStoreEntry({
        storePath,
        sessionKey,
        update: async () => ({ updatedAt }),
      });
    }
  }

  typing.cleanup();
  return undefined;
}

The pattern is consistent:

  • Decide whether to steer to an embedded Pi agent or enqueue a followup based on flags and queue configuration.
  • If exiting early, always bump updatedAt on the session and clean up typing indicators.
  • Return undefined to signal "no direct reply payload"—the work continues elsewhere.

Importantly, the orchestrator never leaves background signals dangling. That discipline shows up again at the end of the function:

return finalizeWithFollowup(
  finalPayloads.length === 1 ? finalPayloads[0] : finalPayloads,
  queueKey,
  runFollowupTurn,
);
} finally {
  blockReplyPipeline?.stop();
  typing.markRunComplete();
}

No matter how the function exits—steering, error, or normal completion—typing is marked complete and the streaming pipeline is stopped.

Typing signals and streaming as first-class citizens

After steering decisions, the orchestrator focuses on real‑time experience: whether the user sees typing indicators and how model output is streamed.

Typing behavior is split into two steps. First, createTypingSignaler wires the low-level runtime (like a channel‑specific typing API) into a generic interface:

const isHeartbeat = opts?.isHeartbeat === true;
const typingSignals = createTypingSignaler({
  typing,
  mode: typingMode,
  isHeartbeat,
});

Later, once reply payloads are known, signalTypingIfNeeded decides whether to actually send typing signals based on the payload shape:

await signalTypingIfNeeded(replyPayloads, typingSignals);

This keeps channel idiosyncrasies in one helper and the "should we type at all for this reply?" logic in another. The orchestrator just sequences them.

Streaming is handled via a block reply pipeline, which coalesces partial outputs into larger blocks and flushes them on a timeout:

const blockReplyCoalescing =
  blockStreamingEnabled && opts?.onBlockReply
    ? resolveBlockStreamingCoalescing(
        cfg,
        sessionCtx.Provider,
        sessionCtx.AccountId,
        blockReplyChunking,
      )
    : undefined;

const blockReplyPipeline =
  blockStreamingEnabled && opts?.onBlockReply
    ? createBlockReplyPipeline({
        onBlockReply: opts.onBlockReply,
        timeoutMs: blockReplyTimeoutMs,
        coalescing: blockReplyCoalescing,
        buffer: createAudioAsVoiceBuffer({ isAudioPayload }),
      })
    : null;

The orchestrator chooses whether streaming is enabled, computes coalescing behavior from configuration, and instantiates the pipeline. Downstream, runAgentTurnWithFallback pushes content into this pipeline, and after the turn completes the orchestrator forces a final flush and teardown:

if (blockReplyPipeline) {
  await blockReplyPipeline.flush({ force: true });
  blockReplyPipeline.stop();
}

A timeout constant (BLOCK_REPLY_SEND_TIMEOUT_MS, 15 seconds by default) governs how long the pipeline can wait before sending whatever it has. That gives you a lever to balance smoother, coalesced blocks against fast first‑token feedback.

Usage, diagnostics, and cost

Beyond the in‑moment experience, an orchestrator must answer two questions: "What did this turn cost?" and "How is the system behaving at scale?" runReplyAgent bakes both into the main path instead of leaving them as afterthoughts.

Persisting session usage

After the agent completes, the orchestrator extracts usage and model metadata and persists an updated view for the session:

const usage = runResult.meta.agentMeta?.usage;
const modelUsed =
  runResult.meta.agentMeta?.model ??
  fallbackModel ??
  defaultModel;
const providerUsed =
  runResult.meta.agentMeta?.provider ??
  fallbackProvider ??
  followupRun.run.provider;

const cliSessionId = isCliProvider(providerUsed, cfg)
  ? runResult.meta.agentMeta?.sessionId?.trim()
  : undefined;

const contextTokensUsed =
  agentCfgContextTokens ??
  lookupContextTokens(modelUsed) ??
  activeSessionEntry?.contextTokens ??
  DEFAULT_CONTEXT_TOKENS;

await persistSessionUsageUpdate({
  storePath,
  sessionKey,
  usage,
  modelUsed,
  providerUsed,
  contextTokensUsed,
  systemPromptReport: runResult.meta.systemPromptReport,
  cliSessionId,
});

There are two notable patterns here:

  • Graceful fallbacks. The orchestrator tolerates partial metadata, resolving modelUsed and contextTokensUsed through several layers of defaults.
  • Centralized updates. All session‑level usage persistence goes through persistSessionUsageUpdate. Downstream components don’t need to know how or where this is stored.

Emitting diagnostic events

When diagnostics are enabled and usage is non‑zero, the orchestrator emits a structured event describing the turn:

if (isDiagnosticsEnabled(cfg) && hasNonzeroUsage(usage)) {
  const input = usage.input ?? 0;
  const output = usage.output ?? 0;
  const cacheRead = usage.cacheRead ?? 0;
  const cacheWrite = usage.cacheWrite ?? 0;

  const promptTokens = input + cacheRead + cacheWrite;
  const totalTokens = usage.total ?? promptTokens + output;

  const costConfig = resolveModelCostConfig({
    provider: providerUsed,
    model: modelUsed,
    config: cfg,
  });

  const costUsd = estimateUsageCost({ usage, cost: costConfig });

  emitDiagnosticEvent({
    type: "model.usage",
    sessionKey,
    sessionId: followupRun.run.sessionId,
    channel: replyToChannel,
    provider: providerUsed,
    model: modelUsed,
    usage: {
      input,
      output,
      cacheRead,
      cacheWrite,
      promptTokens,
      total: totalTokens,
    },
    context: {
      limit: contextTokensUsed,
      used: totalTokens,
    },
    costUsd,
    durationMs: Date.now() - runStartedAt,
  });
}

Token breakdown, context utilization, estimated cost, and run duration are all present in one payload. From here it’s straightforward to derive metrics such as:

  • agent_run_duration_ms: end‑to‑end latency per turn.
  • agent_run_failure_rate: frequency of failed runs or session resets.
  • model_tokens_total: total tokens by provider/model.
  • session_reset_count: stability of your session layer.

Surfacing usage back to users

Observability isn’t only for dashboards. OpenClaw can optionally expose usage to the end user as a line appended to the reply, controlled by a response usage mode stored in the session:

const responseUsageRaw =
  activeSessionEntry?.responseUsage ??
  (sessionKey ? activeSessionStore?.[sessionKey]?.responseUsage : undefined);

const responseUsageMode = resolveResponseUsageMode(responseUsageRaw);

if (responseUsageMode !== "off" && hasNonzeroUsage(usage)) {
  const authMode = resolveModelAuthMode(providerUsed, cfg);
  const showCost = authMode === "api-key";

  const costConfig = showCost
    ? resolveModelCostConfig({
        provider: providerUsed,
        model: modelUsed,
        config: cfg,
      })
    : undefined;

  let formatted = formatResponseUsageLine({
    usage,
    showCost,
    costConfig,
  });

  if (formatted && responseUsageMode === "full" && sessionKey) {
    formatted = `${formatted} · session ${sessionKey}`;
  }

  if (formatted) {
    responseUsageLine = formatted;
  }
}

Later, this responseUsageLine is appended to the reply payloads via appendUsageLine. You can turn this off, show tokens only, or show tokens plus cost and session key for power users and internal debugging.

Refactoring by story phase

By now it’s clear that runReplyAgent does a lot. The code report measuring it found a cyclomatic complexity of 20 and a cognitive complexity of 22—high but not surprising for an orchestration layer that has to juggle sessions, queues, tools, streaming, and diagnostics.

The key to keeping such a function maintainable is to refactor along story phases, not arbitrary chunks of code. The existing design already exposes several clean seams.

1. Extract steering and followup handling

The early‑exit logic for steering and followups duplicates session updatedAt handling and typing cleanup. A dedicated helper like handleSteeringAndFollowup can encapsulate that behavior and return both a possible early result and an updated session entry.

With that helper, the top of the function reads more like a narrative:

const earlyExit = await handleSteeringAndFollowup({
  shouldSteer,
  shouldFollowup,
  isStreaming,
  isActive,
  queueKey,
  resolvedQueue,
  followupRun,
  typing,
  sessionKey,
  storePath,
  activeSessionEntry,
  activeSessionStore,
});

activeSessionEntry = earlyExit.activeSessionEntry;

if (earlyExit.result !== undefined) {
  return earlyExit.result;
}

The main function can then proceed to "start typing", "run memory flush", "run agent turn", and "decorate replies" without being cluttered by steering details.

2. Make transcript cleanup non‑blocking

In resetSession, transcript files are deleted using fs.unlinkSync. That’s intentionally best‑effort, but it blocks the Node.js event loop and can become a problem under load or on slow disks.

A safer approach is to switch to fs.promises.unlink and dispatch deletions concurrently with Promise.allSettled. Behavior stays best‑effort, but the orchestrator no longer pauses the event loop while the filesystem catches up.

3. Extract reply decoration

Near the end of the function, reply payloads are decorated with auto‑compaction messages, new session hints, and optional usage lines. The logic is straightforward but dense:

let finalPayloads = replyPayloads;
const verboseEnabled = resolvedVerboseLevel !== "off";

if (autoCompactionCompleted) {
  const count = await incrementCompactionCount({
    sessionEntry: activeSessionEntry,
    sessionStore: activeSessionStore,
    sessionKey,
    storePath,
  });

  if (verboseEnabled) {
    const suffix = typeof count === "number" ? ` (count ${count})` : "";
    finalPayloads = [
      { text: `🧹 Auto-compaction complete${suffix}.` },
      ...finalPayloads,
    ];
  }
}

if (verboseEnabled && activeIsNewSession) {
  finalPayloads = [
    { text: `🧭 New session: ${followupRun.run.sessionId}` },
    ...finalPayloads,
  ];
}

if (responseUsageLine) {
  finalPayloads = appendUsageLine(finalPayloads, responseUsageLine);
}

A helper like decorateReplyPayloads can encapsulate this entire phase. That makes it easier to test decorations in isolation, to add new ones (like safety notices), and to reuse the same decoration rules from other orchestrators.

Conclusion and takeaways

Stepping back, runReplyAgent is more than a big function. It’s a concrete example of an LLM orchestrator that sits at the intersection of sessions, steering, streaming, typing, diagnostics, and cost. The primary lesson is that this orchestration layer—not prompts or models—is what makes an AI system feel reliable, transparent, and operable.

From this walkthrough, a few actionable patterns emerge:

  • Promote the orchestrator to a first‑class component. Give it clear responsibilities: session lifecycle, steering and followups, real‑time UX (typing + streaming), and observability. Don’t bury these concerns inside model wrappers.
  • Design explicit reset and early‑exit paths. When sessions break or messages are steered away, update all references in one place, bump timestamps, and close any user‑visible signals like typing indicators.
  • Build observability into the main path. Persist usage, emit structured diagnostics with tokens, context, cost, and duration, and optionally expose usage hints in replies. Track metrics like agent_run_duration_ms and session_reset_count from the start.
  • Refactor along narrative boundaries. As complexity grows, extract helpers that align with phases: steering, memory management, agent execution, decoration. Let the main function read as a coherent story of one turn.

If you’re designing your own AI feature, sketch this orchestrator layer explicitly. Decide what each turn should own, what it should emit, and how it can recover from failures without surprising users. Treat it like the air‑traffic controller behind every reply—because in practice, that’s exactly what it is.

To explore the full implementation, you can read the source on GitHub: agent-runner.ts. Then, design the equivalent orchestrator in your stack and let that guide how you wire models, tools, and channels together.

CONSULTING

AI orchestration layer feeling brittle?

Every AI reply hides an orchestrator. When that orchestrator is fragile, your users feel it. An architecture review finds the cracks.

Full Source Code

Here's the full source code of the file that inspired this article.
Read on GitHub

Thanks for reading! I hope this was useful. If you have questions or thoughts, feel free to reach out.

Content Creation Process: This article was generated via a semi-automated workflow using AI tools. I prepared the strategic framework, including specific prompts and data sources. From there, the automation system conducted the research, analysis, and writing. The content passed through automated verification steps before being finalized and published without manual intervention.

Mahmoud Zalt

About the Author

I’m Zalt, a technologist with 16+ years of experience, passionate about designing and building AI systems that move us closer to a world where machines handle everything and humans reclaim wonder.

Let's connect if you're working on interesting AI projects, looking for technical advice or want to discuss anything.

Support this content

Share this article