Skip to main content

The Conversation Traffic Controller Pattern

Most LLM bugs aren’t in the model, they’re in the messy loops around it. The Conversation Traffic Controller Pattern gives you a cleaner way to orchestrate chats.

Code Cracking
25m read
#LLM#AIagents#softwaredesign#architecture
The Conversation Traffic Controller Pattern - Featured blog post image

CONSULTING

Learning to build AI chat systems?

Conversation routing, multi-channel patterns, LLM loop design — 1:1 mentoring on building chat that actually works in production.

LLM apps rarely fail because a single model call goes wrong. They fail when the orchestration around the model becomes a tangle of ad‑hoc loops, flags, and callbacks. Here we’ll dissect a TypeScript module from the pi-mono toolkit that gets this orchestration right: a streaming agent loop that juggles user messages, LLM responses, tools, and live steering without losing control.

I’m Mahmoud Zalt, an AI solutions architect. We’ll use this file to build a reusable way of thinking about agent orchestration: treating your agent loop as a conversation traffic controller.

Setting the scene: where this loop lives

We’re examining agent-loop.ts in the agent package of pi-mono. pi-mono is a toolkit for building LLM agents; this file is its orchestration core. It doesn’t know about HTTP, UIs, or specific LLM vendors – only about conversations, tools, and streams.

pi-mono/
  packages/
    agent/
      src/
        types.ts         (AgentContext, AgentEvent, AgentTool, ...)
        agent-loop.ts    <-- this file: orchestrates agent conversation loop
        agent.ts         (higher-level agent interfaces)
        proxy.ts         (proxying to remote agents/LLMs)
        index.ts         (exports public API)

Agent client
  |
  | agentLoop / agentLoopContinue
  v
[agent-loop.ts]
  +---------------------+
  | createAgentStream   |
  | runLoop             |
  |  - streamAssistant  |--> streamFn/streamSimple --> LLM provider
  |  - executeToolCalls |--> AgentTool.execute      --> external systems
  +---------------------+
  |
  | EventStream
  v
UI / CLI / Web / Logs
agent-loop.ts lives in the orchestration layer: it owns the conversation and event stream, not transport or vendor details.

At the top level the file exposes two functions: agentLoop and agentLoopContinue. Everything else is an implementation detail behind a small, typed API.

export function agentLoop(
  prompts: AgentMessage[],
  context: AgentContext,
  config: AgentLoopConfig,
  signal?: AbortSignal,
  streamFn?: StreamFn,
): EventStream {
  const stream = createAgentStream();

  (async () => {
    const newMessages: AgentMessage[] = [...prompts];
    const currentContext: AgentContext = {
      ...context,
      messages: [...context.messages, ...prompts],
    };

    stream.push({ type: "agent_start" });
    stream.push({ type: "turn_start" });
    for (const prompt of prompts) {
      stream.push({ type: "message_start", message: prompt });
      stream.push({ type: "message_end", message: prompt });
    }

    await runLoop(currentContext, newMessages, config, signal, stream, streamFn);
  })();

  return stream;
}

The contract is: “Given your current AgentContext, some prompt messages, and a configuration, return an EventStream of AgentEvent plus the new messages that were produced.”

The agent as a conversation traffic controller

The core of this file is runLoop, which maintains the conversation over multiple turns. This is where the traffic‑controller mental model is useful. Think of each kind of message as a different aircraft type:

  • User and steering messages – incoming planes requesting landing.
  • Assistant responses – planes taking off.
  • Tool calls and results – cargo flights that route via external hubs.

The controller coordinates these in order, exposes what’s happening as events, and stops only when the “airspace” (conversation) is empty.

async function runLoop(
  currentContext: AgentContext,
  newMessages: AgentMessage[],
  config: AgentLoopConfig,
  signal: AbortSignal | undefined,
  stream: EventStream,
  streamFn?: StreamFn,
): Promise {
  let firstTurn = true;
  let pendingMessages: AgentMessage[] = (await config.getSteeringMessages?.()) || [];

  // Outer loop: repeats if follow-up messages queue another turn
  while (true) {
    let hasMoreToolCalls = true;
    let steeringAfterTools: AgentMessage[] | null = null;

    // Inner loop: process tools and steering until the turn settles
    while (hasMoreToolCalls || pendingMessages.length > 0) {
      if (!firstTurn) {
        stream.push({ type: "turn_start" });
      } else {
        firstTurn = false;
      }

      // 1) Inject pending user/steering messages
      // 2) Stream assistant
      // 3) Execute tools (if any)
      // 4) Fetch steering for next pass
    }

    const followUpMessages = (await config.getFollowUpMessages?.()) || [];
    if (followUpMessages.length > 0) {
      pendingMessages = followUpMessages;
      continue;
    }

    break;
  }

  stream.push({ type: "agent_end", messages: newMessages });
  stream.end(newMessages);
}

Why this design works: the outer loop models “turns”; the inner loop models “what happens inside a turn” (streaming, tools, steering). That separation makes it clear how steering, tools, and follow‑ups interact instead of hiding everything inside a single while (true) with tangled flags.

Concretely, the inner loop keeps doing two things:

  1. Drain pendingMessages (user steering or follow‑ups) into the context.
  2. Stream an assistant response and, if it contains tool calls, execute them.

The outer loop asks one simple question: “Did this turn produce follow‑up messages that should start another turn?” That is exactly the traffic controller’s job: keep repeating the pattern until there’s nothing left to sequence.

The streaming heartbeat of the agent

Inside a turn, the critical operation is asking the LLM for a response as a stream. This module treats that streaming call as the agent’s heartbeat: every partial token becomes an event, and the conversation state is updated in lockstep.

streamAssistantResponse is careful about boundaries:

  • It works in terms of AgentMessage[] (the toolkit’s own types).
  • It only converts to provider format at the edge via convertToLlm.
  • It hides the vendor behind streamFn / streamSimple.
async function streamAssistantResponse(
  context: AgentContext,
  config: AgentLoopConfig,
  signal: AbortSignal | undefined,
  stream: EventStream,
  streamFn?: StreamFn,
): Promise {
  let messages = context.messages;
  if (config.transformContext) {
    messages = await config.transformContext(messages, signal);
  }

  const llmMessages = await config.convertToLlm(messages);

  const llmContext: Context = {
    systemPrompt: context.systemPrompt,
    messages: llmMessages,
    tools: context.tools,
  };

  const streamFunction = streamFn || streamSimple;
  const resolvedApiKey =
    (config.getApiKey ? await config.getApiKey(config.model.provider) : undefined) ||
    config.apiKey;

  const response = await streamFunction(config.model, llmContext, {
    ...config,
    apiKey: resolvedApiKey,
    signal,
  });

  let partialMessage: AssistantMessage | null = null;
  let addedPartial = false;

  for await (const event of response) {
    switch (event.type) {
      case "start":
        partialMessage = event.partial;
        context.messages.push(partialMessage);
        addedPartial = true;
        stream.push({ type: "message_start", message: { ...partialMessage } });
        break;

      case "text_start":
      case "text_delta":
      case "text_end":
      case "thinking_start":
      case "thinking_delta":
      case "thinking_end":
      case "toolcall_start":
      case "toolcall_delta":
      case "toolcall_end":
        if (partialMessage) {
          partialMessage = event.partial;
          context.messages[context.messages.length - 1] = partialMessage;
          stream.push({
            type: "message_update",
            assistantMessageEvent: event,
            message: { ...partialMessage },
          });
        }
        break;

      case "done":
      case "error": {
        const finalMessage = await response.result();
        if (addedPartial) {
          context.messages[context.messages.length - 1] = finalMessage;
        } else {
          context.messages.push(finalMessage);
        }
        if (!addedPartial) {
          stream.push({ type: "message_start", message: { ...finalMessage } });
        }
        stream.push({ type: "message_end", message: finalMessage });
        return finalMessage;
      }
    }
  }

  return await response.result();
}

The essential pattern:

  1. Transform then convert at the edge. transformContext lets callers summarise or prune history before it hits the model. Only after that does the loop call convertToLlm to adapt to provider formats.
  2. Treat streaming as state updates. A partialMessage is updated on every streaming event; each update is published as message_update. UIs can subscribe to the event stream instead of polling for completion.
  3. Normalise completion and errors. Both "done" and "error" resolve through response.result(), yielding a final AssistantMessage that the outer loop can interpret via its stopReason.

Tools as backstage assistants

The other major responsibility of the controller is reacting to tool calls. In tool‑augmented agents, the LLM sometimes says “Call search_files with these args” and relies on the orchestrator to run that tool and feed the result back.

This module models tool calls as content chunks in the assistant message. Once streaming finishes for a turn, runLoop filters those chunks and, if any are present, calls executeToolCalls.

async function executeToolCalls(
  tools: AgentTool[] | undefined,
  assistantMessage: AssistantMessage,
  signal: AbortSignal | undefined,
  stream: EventStream,
  getSteeringMessages?: AgentLoopConfig["getSteeringMessages"],
): Promise<{ toolResults: ToolResultMessage[]; steeringMessages?: AgentMessage[] }> {
  const toolCalls = assistantMessage.content.filter((c) => c.type === "toolCall");
  const results: ToolResultMessage[] = [];
  let steeringMessages: AgentMessage[] | undefined;

  for (let index = 0; index < toolCalls.length; index++) {
    const toolCall = toolCalls[index];
    const tool = tools?.find((t) => t.name === toolCall.name);

    stream.push({
      type: "tool_execution_start",
      toolCallId: toolCall.id,
      toolName: toolCall.name,
      args: toolCall.arguments,
    });

    let result: AgentToolResult;
    let isError = false;

    try {
      if (!tool) throw new Error(`Tool ${toolCall.name} not found`);

      const validatedArgs = validateToolArguments(tool, toolCall);

      result = await tool.execute(toolCall.id, validatedArgs, signal, (partialResult) => {
        stream.push({
          type: "tool_execution_update",
          toolCallId: toolCall.id,
          toolName: toolCall.name,
          args: toolCall.arguments,
          partialResult,
        });
      });
    } catch (e) {
      result = {
        content: [{ type: "text", text: e instanceof Error ? e.message : String(e) }],
        details: {},
      };
      isError = true;
    }

    stream.push({
      type: "tool_execution_end",
      toolCallId: toolCall.id,
      toolName: toolCall.name,
      result,
      isError,
    });

    const toolResultMessage: ToolResultMessage = {
      role: "toolResult",
      toolCallId: toolCall.id,
      toolName: toolCall.name,
      content: result.content,
      details: result.details,
      isError,
      timestamp: Date.now(),
    };

    results.push(toolResultMessage);
    stream.push({ type: "message_start", message: toolResultMessage });
    stream.push({ type: "message_end", message: toolResultMessage });

    // If user steering arrives, skip remaining tools explicitly
    if (getSteeringMessages) {
      const steering = await getSteeringMessages();
      if (steering.length > 0) {
        steeringMessages = steering;
        const remainingCalls = toolCalls.slice(index + 1);
        for (const skipped of remainingCalls) {
          results.push(skipToolCall(skipped, stream));
        }
        break;
      }
    }
  }

  return { toolResults: results, steeringMessages };
}

Through the traffic‑controller lens:

  • Each tool execution is bracketed by tool_execution_start/_update/_end events. That’s like logging when a plane starts taxiing, is in flight, and lands.
  • Tool outputs are normalised into ToolResultMessage instances and appended to the conversation history. To the rest of the system, tool results are just another turn.
  • If user steering arrives mid‑execution (for example, “stop calling tools, just answer”), the remaining tool calls are not dropped silently. They are explicitly marked as skipped via skipToolCall, which still emits tool_execution_* and toolResult events.

That last behaviour is easy to miss in agent systems: you don’t want ghost invocations that disappear because the user changed their mind. This implementation makes interruptions explicit and observable.

How skipped tools are represented

skipToolCall constructs a ToolResultMessage with isError: true and a human‑readable reason such as Skipped due to queued user message. It also fires tool_execution_start and tool_execution_end so your logs and metrics stay structurally consistent.

Scaling the loop and adding guardrails

So far we’ve focused on behaviour and observability. The same traffic‑control structure also makes it straightforward to reason about performance and guardrails when you scale.

  • Time complexity. runLoop is essentially linear in the number of turns, tool calls, and streaming events. Latency is dominated by the LLM and tools, not the orchestrator logic.
  • Memory growth. currentContext.messages grows monotonically: every user prompt, assistant message, and tool result is appended. That’s great for traceability, dangerous for very long sessions.
  • Concurrency. Each agent loop instance is self‑contained and relies on Node’s single‑threaded async model; there is no shared mutable state across loops.

The file already provides a hook to control history: transformContext. You can turn that into a hard safety net by adding a maxHistoryMessages option to the config and slicing old messages before each LLM call.

Risk Impact Suggested guardrail
Unbounded message history Memory and token cost blow‑up; provider context limits Use transformContext plus an optional maxHistoryMessages slice
Slow or stuck tools Turns taking tens of seconds; stuck agents Enforce timeouts in AgentTool.execute and track a tool_execution_duration_ms metric per tool
Hidden LLM errors Agents ending unexpectedly with no clear signal upstream Observe stopReason on the final assistant message and count error or abort reasons

A minimal operational set for production agents built on this pattern:

  • Turn duration. Measure time between turn_start and turn_end events. Watch high percentiles separately for “no tools” and “with tools” paths.
  • Tool execution duration. Track execution time per toolName using the tool_execution_* events to spot slow or flaky tools.
  • Messages per context. Count currentContext.messages length and trigger summarisation or pruning when it exceeds your safe bound.

Lessons you can reuse today

Viewed as a whole, agent-loop.ts demonstrates one core idea: an agent loop should behave like a conversation traffic controller. One place coordinates turns, tools, and interruptions through a clean event model, while vendor‑specific details live at the edges.

Here are concrete patterns you can adopt in your own agent code:

  1. Separate orchestration from providers. Keep your loop working in your own message types and inject provider behaviour via conversions (convertToLlm) and pluggable stream functions (streamFn). Swapping models or SDKs becomes a config change instead of a refactor.
  2. Model everything as events. Expose a single EventStream with rich event types: agent_start, turn_start, message_start/update/end, tool_execution_start/update/end, agent_end. UIs, logs, and metrics can all subscribe without coupling to internal state.
  3. Make interruptions explicit. When user steering arrives mid‑tool‑execution, don’t silently drop remaining tools. Emit explicit “skipped” tool results so downstream consumers understand what happened.
  4. Plan for growth from day one. Hooks like transformContext, getSteeringMessages, and getFollowUpMessages let you add summarisation, routing, and cross‑turn behaviour later without rewriting the loop.
  5. Tame complexity with named state. Even in a dense function like runLoop, state such as pendingMessages, steeringAfterTools, and hasMoreToolCalls keeps the control flow understandable. If it grows further, extract helpers like a processTurn that owns a single TurnState.

If you design your own agent loop as a traffic controller – a single, observable place that sequences turns, tools, and interruptions – it becomes much easier to evolve as models, tools, and UIs change around it. agent-loop.ts is more than a working implementation; it’s a template for structuring non‑trivial AI orchestration logic so it stays understandable, observable, and scalable.

CONSULTING

Chat system getting messy?

Most LLM chat bugs live in the routing, not the model. An architecture review finds the control flow problems before users do.

Full Source Code

Here's the full source code of the file that inspired this article.
View source on GitHub

Thanks for reading! I hope this was useful. If you have questions or thoughts, feel free to reach out.

Content Creation Process: This article was generated via a semi-automated workflow using AI tools. I prepared the strategic framework, including specific prompts and data sources. From there, the automation system conducted the research, analysis, and writing. The content passed through automated verification steps before being finalized and published without manual intervention.

Mahmoud Zalt

About the Author

I’m Zalt, a technologist with 16+ years of experience, passionate about designing and building AI systems that move us closer to a world where machines handle everything and humans reclaim wonder.

Let's connect if you're working on interesting AI projects, looking for technical advice or want to discuss anything.

Support this content

Share this article