LLM apps rarely fail because a single model call goes wrong. They fail when the orchestration around the model becomes a tangle of ad‑hoc loops, flags, and callbacks. Here we’ll dissect a TypeScript module from the pi-mono toolkit that gets this orchestration right: a streaming agent loop that juggles user messages, LLM responses, tools, and live steering without losing control.
I’m Mahmoud Zalt, an AI solutions architect. We’ll use this file to build a reusable way of thinking about agent orchestration: treating your agent loop as a conversation traffic controller.
Setting the scene: where this loop lives
We’re examining agent-loop.ts in the agent package of
pi-mono. pi-mono is a toolkit for building LLM agents;
this file is its orchestration core. It doesn’t know about HTTP, UIs, or
specific LLM vendors – only about conversations, tools, and streams.
pi-mono/
packages/
agent/
src/
types.ts (AgentContext, AgentEvent, AgentTool, ...)
agent-loop.ts <-- this file: orchestrates agent conversation loop
agent.ts (higher-level agent interfaces)
proxy.ts (proxying to remote agents/LLMs)
index.ts (exports public API)
Agent client
|
| agentLoop / agentLoopContinue
v
[agent-loop.ts]
+---------------------+
| createAgentStream |
| runLoop |
| - streamAssistant |--> streamFn/streamSimple --> LLM provider
| - executeToolCalls |--> AgentTool.execute --> external systems
+---------------------+
|
| EventStream
v
UI / CLI / Web / Logs
agent-loop.ts lives in the orchestration layer: it owns the
conversation and event stream, not transport or vendor details.
At the top level the file exposes two functions:
agentLoop and agentLoopContinue. Everything else is an
implementation detail behind a small, typed API.
export function agentLoop(
prompts: AgentMessage[],
context: AgentContext,
config: AgentLoopConfig,
signal?: AbortSignal,
streamFn?: StreamFn,
): EventStream {
const stream = createAgentStream();
(async () => {
const newMessages: AgentMessage[] = [...prompts];
const currentContext: AgentContext = {
...context,
messages: [...context.messages, ...prompts],
};
stream.push({ type: "agent_start" });
stream.push({ type: "turn_start" });
for (const prompt of prompts) {
stream.push({ type: "message_start", message: prompt });
stream.push({ type: "message_end", message: prompt });
}
await runLoop(currentContext, newMessages, config, signal, stream, streamFn);
})();
return stream;
}
The contract is: “Given your current AgentContext, some prompt
messages, and a configuration, return an EventStream of
AgentEvent plus the new messages that were produced.”
The agent as a conversation traffic controller
The core of this file is runLoop, which maintains the conversation
over multiple turns. This is where the traffic‑controller mental model
is useful. Think of each kind of message as a different aircraft type:
- User and steering messages – incoming planes requesting landing.
- Assistant responses – planes taking off.
- Tool calls and results – cargo flights that route via external hubs.
The controller coordinates these in order, exposes what’s happening as events, and stops only when the “airspace” (conversation) is empty.
async function runLoop(
currentContext: AgentContext,
newMessages: AgentMessage[],
config: AgentLoopConfig,
signal: AbortSignal | undefined,
stream: EventStream,
streamFn?: StreamFn,
): Promise {
let firstTurn = true;
let pendingMessages: AgentMessage[] = (await config.getSteeringMessages?.()) || [];
// Outer loop: repeats if follow-up messages queue another turn
while (true) {
let hasMoreToolCalls = true;
let steeringAfterTools: AgentMessage[] | null = null;
// Inner loop: process tools and steering until the turn settles
while (hasMoreToolCalls || pendingMessages.length > 0) {
if (!firstTurn) {
stream.push({ type: "turn_start" });
} else {
firstTurn = false;
}
// 1) Inject pending user/steering messages
// 2) Stream assistant
// 3) Execute tools (if any)
// 4) Fetch steering for next pass
}
const followUpMessages = (await config.getFollowUpMessages?.()) || [];
if (followUpMessages.length > 0) {
pendingMessages = followUpMessages;
continue;
}
break;
}
stream.push({ type: "agent_end", messages: newMessages });
stream.end(newMessages);
}
Why this design works: the outer loop models “turns”; the
inner loop models “what happens inside a turn” (streaming, tools, steering).
That separation makes it clear how steering, tools, and follow‑ups interact
instead of hiding everything inside a single while (true) with
tangled flags.
Concretely, the inner loop keeps doing two things:
- Drain
pendingMessages(user steering or follow‑ups) into the context. - Stream an assistant response and, if it contains tool calls, execute them.
The outer loop asks one simple question: “Did this turn produce follow‑up messages that should start another turn?” That is exactly the traffic controller’s job: keep repeating the pattern until there’s nothing left to sequence.
The streaming heartbeat of the agent
Inside a turn, the critical operation is asking the LLM for a response as a stream. This module treats that streaming call as the agent’s heartbeat: every partial token becomes an event, and the conversation state is updated in lockstep.
streamAssistantResponse is careful about boundaries:
- It works in terms of
AgentMessage[](the toolkit’s own types). - It only converts to provider format at the edge via
convertToLlm. - It hides the vendor behind
streamFn/streamSimple.
async function streamAssistantResponse(
context: AgentContext,
config: AgentLoopConfig,
signal: AbortSignal | undefined,
stream: EventStream,
streamFn?: StreamFn,
): Promise {
let messages = context.messages;
if (config.transformContext) {
messages = await config.transformContext(messages, signal);
}
const llmMessages = await config.convertToLlm(messages);
const llmContext: Context = {
systemPrompt: context.systemPrompt,
messages: llmMessages,
tools: context.tools,
};
const streamFunction = streamFn || streamSimple;
const resolvedApiKey =
(config.getApiKey ? await config.getApiKey(config.model.provider) : undefined) ||
config.apiKey;
const response = await streamFunction(config.model, llmContext, {
...config,
apiKey: resolvedApiKey,
signal,
});
let partialMessage: AssistantMessage | null = null;
let addedPartial = false;
for await (const event of response) {
switch (event.type) {
case "start":
partialMessage = event.partial;
context.messages.push(partialMessage);
addedPartial = true;
stream.push({ type: "message_start", message: { ...partialMessage } });
break;
case "text_start":
case "text_delta":
case "text_end":
case "thinking_start":
case "thinking_delta":
case "thinking_end":
case "toolcall_start":
case "toolcall_delta":
case "toolcall_end":
if (partialMessage) {
partialMessage = event.partial;
context.messages[context.messages.length - 1] = partialMessage;
stream.push({
type: "message_update",
assistantMessageEvent: event,
message: { ...partialMessage },
});
}
break;
case "done":
case "error": {
const finalMessage = await response.result();
if (addedPartial) {
context.messages[context.messages.length - 1] = finalMessage;
} else {
context.messages.push(finalMessage);
}
if (!addedPartial) {
stream.push({ type: "message_start", message: { ...finalMessage } });
}
stream.push({ type: "message_end", message: finalMessage });
return finalMessage;
}
}
}
return await response.result();
}
The essential pattern:
-
Transform then convert at the edge.
transformContextlets callers summarise or prune history before it hits the model. Only after that does the loop callconvertToLlmto adapt to provider formats. -
Treat streaming as state updates. A
partialMessageis updated on every streaming event; each update is published asmessage_update. UIs can subscribe to the event stream instead of polling for completion. -
Normalise completion and errors. Both
"done"and"error"resolve throughresponse.result(), yielding a finalAssistantMessagethat the outer loop can interpret via itsstopReason.
Tools as backstage assistants
The other major responsibility of the controller is reacting to tool calls.
In tool‑augmented agents, the LLM sometimes says “Call
search_files with these args” and relies on the orchestrator to run
that tool and feed the result back.
This module models tool calls as content chunks in the assistant message. Once
streaming finishes for a turn, runLoop filters those chunks and, if
any are present, calls executeToolCalls.
async function executeToolCalls(
tools: AgentTool[] | undefined,
assistantMessage: AssistantMessage,
signal: AbortSignal | undefined,
stream: EventStream,
getSteeringMessages?: AgentLoopConfig["getSteeringMessages"],
): Promise<{ toolResults: ToolResultMessage[]; steeringMessages?: AgentMessage[] }> {
const toolCalls = assistantMessage.content.filter((c) => c.type === "toolCall");
const results: ToolResultMessage[] = [];
let steeringMessages: AgentMessage[] | undefined;
for (let index = 0; index < toolCalls.length; index++) {
const toolCall = toolCalls[index];
const tool = tools?.find((t) => t.name === toolCall.name);
stream.push({
type: "tool_execution_start",
toolCallId: toolCall.id,
toolName: toolCall.name,
args: toolCall.arguments,
});
let result: AgentToolResult;
let isError = false;
try {
if (!tool) throw new Error(`Tool ${toolCall.name} not found`);
const validatedArgs = validateToolArguments(tool, toolCall);
result = await tool.execute(toolCall.id, validatedArgs, signal, (partialResult) => {
stream.push({
type: "tool_execution_update",
toolCallId: toolCall.id,
toolName: toolCall.name,
args: toolCall.arguments,
partialResult,
});
});
} catch (e) {
result = {
content: [{ type: "text", text: e instanceof Error ? e.message : String(e) }],
details: {},
};
isError = true;
}
stream.push({
type: "tool_execution_end",
toolCallId: toolCall.id,
toolName: toolCall.name,
result,
isError,
});
const toolResultMessage: ToolResultMessage = {
role: "toolResult",
toolCallId: toolCall.id,
toolName: toolCall.name,
content: result.content,
details: result.details,
isError,
timestamp: Date.now(),
};
results.push(toolResultMessage);
stream.push({ type: "message_start", message: toolResultMessage });
stream.push({ type: "message_end", message: toolResultMessage });
// If user steering arrives, skip remaining tools explicitly
if (getSteeringMessages) {
const steering = await getSteeringMessages();
if (steering.length > 0) {
steeringMessages = steering;
const remainingCalls = toolCalls.slice(index + 1);
for (const skipped of remainingCalls) {
results.push(skipToolCall(skipped, stream));
}
break;
}
}
}
return { toolResults: results, steeringMessages };
}
Through the traffic‑controller lens:
-
Each tool execution is bracketed by
tool_execution_start/_update/_endevents. That’s like logging when a plane starts taxiing, is in flight, and lands. -
Tool outputs are normalised into
ToolResultMessageinstances and appended to the conversation history. To the rest of the system, tool results are just another turn. -
If user steering arrives mid‑execution (for example, “stop calling tools,
just answer”), the remaining tool calls are not dropped silently. They are
explicitly marked as skipped via
skipToolCall, which still emitstool_execution_*andtoolResultevents.
That last behaviour is easy to miss in agent systems: you don’t want ghost invocations that disappear because the user changed their mind. This implementation makes interruptions explicit and observable.
How skipped tools are represented
skipToolCall constructs a ToolResultMessage with
isError: true and a human‑readable reason such as
Skipped due to queued user message.
It also fires
tool_execution_start and tool_execution_end so your
logs and metrics stay structurally consistent.
Scaling the loop and adding guardrails
So far we’ve focused on behaviour and observability. The same traffic‑control structure also makes it straightforward to reason about performance and guardrails when you scale.
-
Time complexity.
runLoopis essentially linear in the number of turns, tool calls, and streaming events. Latency is dominated by the LLM and tools, not the orchestrator logic. -
Memory growth.
currentContext.messagesgrows monotonically: every user prompt, assistant message, and tool result is appended. That’s great for traceability, dangerous for very long sessions. - Concurrency. Each agent loop instance is self‑contained and relies on Node’s single‑threaded async model; there is no shared mutable state across loops.
The file already provides a hook to control history:
transformContext. You can turn that into a hard safety net by adding
a maxHistoryMessages option to the config and slicing old messages
before each LLM call.
| Risk | Impact | Suggested guardrail |
|---|---|---|
| Unbounded message history | Memory and token cost blow‑up; provider context limits |
Use transformContext plus an optional
maxHistoryMessages slice
|
| Slow or stuck tools | Turns taking tens of seconds; stuck agents |
Enforce timeouts in AgentTool.execute and track a
tool_execution_duration_ms metric per tool
|
| Hidden LLM errors | Agents ending unexpectedly with no clear signal upstream |
Observe stopReason on the final assistant message and count
error or abort reasons
|
A minimal operational set for production agents built on this pattern:
-
Turn duration. Measure time between
turn_startandturn_endevents. Watch high percentiles separately for “no tools” and “with tools” paths. -
Tool execution duration. Track execution time per
toolNameusing thetool_execution_*events to spot slow or flaky tools. -
Messages per context. Count
currentContext.messageslength and trigger summarisation or pruning when it exceeds your safe bound.
Lessons you can reuse today
Viewed as a whole, agent-loop.ts demonstrates one core idea: an
agent loop should behave like a conversation traffic controller.
One place coordinates turns, tools, and interruptions through a clean event
model, while vendor‑specific details live at the edges.
Here are concrete patterns you can adopt in your own agent code:
-
Separate orchestration from providers. Keep your loop
working in your own message types and inject provider behaviour via
conversions (
convertToLlm) and pluggable stream functions (streamFn). Swapping models or SDKs becomes a config change instead of a refactor. -
Model everything as events. Expose a single
EventStreamwith rich event types:agent_start,turn_start,message_start/update/end,tool_execution_start/update/end,agent_end. UIs, logs, and metrics can all subscribe without coupling to internal state. - Make interruptions explicit. When user steering arrives mid‑tool‑execution, don’t silently drop remaining tools. Emit explicit “skipped” tool results so downstream consumers understand what happened.
-
Plan for growth from day one. Hooks like
transformContext,getSteeringMessages, andgetFollowUpMessageslet you add summarisation, routing, and cross‑turn behaviour later without rewriting the loop. -
Tame complexity with named state. Even in a dense function
like
runLoop, state such aspendingMessages,steeringAfterTools, andhasMoreToolCallskeeps the control flow understandable. If it grows further, extract helpers like aprocessTurnthat owns a singleTurnState.
If you design your own agent loop as a traffic controller – a single,
observable place that sequences turns, tools, and interruptions – it becomes
much easier to evolve as models, tools, and UIs change around it.
agent-loop.ts is more than a working implementation; it’s a
template for structuring non‑trivial AI orchestration logic so it stays
understandable, observable, and scalable.



