When we sketch LLM workflows, we draw tidy boxes and arrows. In production, we get retries, partial failures, streaming UIs, and users who expect to resume in the middle of everything. Somewhere between those diagrams and reality, we need a ledger that keeps the story straight. In LangGraph, that ledger lives in the Pregel runtime. In this article, I (Mahmoud Zalt) want us to see how Pregel turns a messy, concurrent LLM workflow into a sequence of consistent checkpoints — and what that design teaches us about building our own stateful systems.
Pregel as a state ledger
To understand Pregel, it helps to stop thinking about it as “just an executor” and start seeing it as a state ledger for your graph. Every step, every task, every write is recorded, versioned, and replayable.
langgraph/
pregel/
_algo.py # scheduling & applying writes
_loop.py # SyncPregelLoop, AsyncPregelLoop (execution engine)
_runner.py # PregelRunner (per-task execution)
main.py # <== this file: Pregel runtime & NodeBuilder
Once you see Pregel as a ledger rather than a loop, its choices around checkpoints, streaming, and bulk updates become much easier to reason about.
Conceptually, Pregel follows the Bulk Synchronous Parallel model: work happens in steps. In each step, a set of workers run in parallel; then everyone stops, applies their writes, and only then moves on. That barrier gives us a natural boundary where we can write a consistent snapshot through a checkpointer.
The Pregel class in main.py orchestrates this:
- Drives
SyncPregelLoop/AsyncPregelLoopto step the graph. - Uses
PregelRunnerto run node logic (often LLM calls). - Reads and writes checkpoints through a
BaseCheckpointSaver. - Exposes a state API:
get_state,get_state_history,bulk_update_state, and their async variants.
Nodes, channels, and the builder
Once we treat Pregel as a ledger, we need a concrete mental model for what moves through it. A useful analogy from the code review is: think of channels as stations and nodes as trains. Each step is a scheduled departure.
Pregel exposes a NodeBuilder, a fluent API for defining these trains: what stations they read from, what they write to, and how they behave.
node1 = (
NodeBuilder().subscribe_only("a")
.do(lambda x: x + x)
.write_to("b")
)
"a", process, and write to "b".The core of this builder is small and focused. Here is the essence of subscribe_only and build:
def subscribe_only(self, channel: str) -> Self:
"""Subscribe to a single channel."""
if not self._channels:
self._channels = channel
else:
raise ValueError(
"Cannot subscribe to single channels when other channels are already subscribed to"
)
self._triggers.append(channel)
return self
def build(self) -> PregelNode:
"""Builds the node."""
return PregelNode(
channels=self._channels,
triggers=self._triggers,
tags=self._tags,
metadata=self._metadata,
writers=[ChannelWrite(self._writes)],
bound=self._bound,
retry_policy=self._retry_policy,
cache_policy=self._cache_policy,
)
NodeBuilder turns a fluent description into a concrete PregelNode with channels, triggers, and writers.In the trains-and-stations analogy:
channelsdescribe which stations the train can load from.triggersdescribe which station arrivals should schedule that train in the next step.writersdescribe which stations receive cargo when the train finishes.
Editing the ledger safely
Pregel becomes most interesting when we need to edit the ledger after the fact: fixing bad state, replaying parts of a run, or seeding new branches. That is what bulk_update_state and abulk_update_state are for.
bulk_update_state takes a series of supersteps, each a list of StateUpdate objects. A StateUpdate is essentially: “pretend node X, task Y wrote these values.” Internally, Pregel:
- Loads (and possibly migrates) the latest checkpoint.
- Resolves which tasks and writers correspond to each update.
- Reuses or creates task IDs so history stays coherent.
- Applies writes through the same machinery as normal execution.
- Persists a new checkpoint with updated channel versions.
One subtle part is disambiguating which node a single update belongs to when the caller omits as_node. Here is the core resolution logic:
valid_updates: list[tuple[str, dict[str, Any] | None, str | None]] = []
if len(updates) == 1:
values, as_node, task_id = updates[0]
# find last node that updated the state, if not provided
if as_node is None and len(self.nodes) == 1:
as_node = tuple(self.nodes)[0]
elif as_node is None and not any(
v
for vv in checkpoint["versions_seen"].values()
for v in vv.values()
):
if (
isinstance(self.input_channels, str)
and self.input_channels in self.nodes
):
as_node = self.input_channels
elif as_node is None:
last_seen_by_node = sorted(
(v, n)
for n, seen in checkpoint["versions_seen"].items()
if n in self.nodes
for v in seen.values()
)
# if two nodes updated the state at the same time, it's ambiguous
if last_seen_by_node:
if len(last_seen_by_node) == 1:
as_node = last_seen_by_node[0][1]
elif last_seen_by_node[-1][0] != last_seen_by_node[-2][0]:
as_node = last_seen_by_node[-1][1]
if as_node is None:
raise InvalidUpdateError("Ambiguous update, specify as_node")
as_node, Pregel tries to infer it from history; if it cannot do so safely, it fails loudly.The important principle: never guess silently. Pregel only infers as_node when there is a single clear candidate. As soon as two nodes might have written at the same logical time, it raises InvalidUpdateError. That discipline keeps the ledger trustworthy.
Special supersteps: END, INPUT, and "__copy__"
Besides ordinary “as this node” updates, bulk_update_state supports three special patterns:
- Clear everything:
values=Noneandas_node == ENDwipe tasks by calculating all writes that would flush the graph, then applying them. - Act as input:
as_node == INPUTfeedsvaluesthroughmap_inputand persists it as if it were a real graph input. - Fork a checkpoint:
as_node == "__copy__"creates a new checkpoint (a fork in the ledger) and can chain subsequent updates on top of it in the same call.
All three reuse the same apply_writes and create_checkpoint machinery as normal execution, so manual edits and standard runs share the same semantics.
Streaming on top of checkpoints
So far, we have treated Pregel as a batch ledger. Most real workflows need streaming: partial outputs, token streams, and debug traces. The key design choice is that Pregel builds streaming on top of the same step-and-checkpoint model instead of inventing a separate pipeline.
The synchronous stream method wires the pieces together. The inner loop looks like this:
while loop.tick():
for task in loop.match_cached_writes():
loop.output_writes(task.id, task.writes, cached=True)
for _ in runner.tick(
[t for t in loop.tasks.values() if not t.writes],
timeout=self.step_timeout,
get_waiter=get_waiter,
schedule_task=loop.accept_push,
):
# emit output
yield from _output(
stream_mode, print_mode, subgraphs, stream.get, queue.Empty
)
loop.after_tick()
# wait for checkpoint
if durability_ == "sync":
loop._put_checkpoint_fut.result()
This enforces two invariants:
- Step boundaries are explicit. Channel updates from step N become visible only when we move to step N+1. That is why we can take consistent snapshots at each step.
- Streaming is event-based. The loop pushes
StreamChunks into a queue;_outputdrains that queue, optionally prints for debug, and yields to the caller.
A dedicated StreamMessagesHandler is attached when you enable stream_mode="messages", streaming LLM tokens and metadata. Custom streaming goes through a Runtime.stream_writer callback that pushes (namespace, "custom", payload) tuples into the same queue.
Operational guardrails and lessons
Once we see Pregel as a ledger with streaming layered on top, operational concerns become easy to frame: how many pages we add, how big they are, and how long each write and stream takes.
| Metric | What it tells us | Suggested target |
|---|---|---|
pregel_steps_per_run |
How many steps each execution needs; high values hint at inefficient or looping graphs. | Most workflows < 50 steps; investigate if > 200 regularly. |
checkpoint_write_latency_ms |
How long checkpointer.put/aput takes; directly affects latency in durability="sync" mode. |
P50 < 50ms, P95 < 200ms. |
stream_queue_depth |
Current size of the SyncQueue/AsyncQueue; a proxy for backpressure. |
Keep under ~100 items under normal load. |
bulk_update_superstep_duration_ms |
How long a single bulk update superstep takes; useful when external tools edit graph state. | < 200ms per superstep in interactive scenarios. |
On the safety side, Pregel encodes several guardrails:
- Recursion limit: if the graph burns through too many steps without stopping, it raises
GraphRecursionErrorwith a specific error code. - Durability contract:
durabilityhas no effect unless a checkpointer is configured, and deprecated options likecheckpoint_duringare guarded explicitly. - Namespace hygiene: subgraph checkpoint namespaces are normalized via
recast_checkpoint_nsso parent and child graphs do not overwrite each other’s history.
Design lessons you can reuse
-
Make step boundaries explicit.
Just like Pregel’s “channel updates from step N become visible in N+1”, define clear phases in your workflows (plan → execute → commit). It simplifies reasoning about concurrency and makes checkpointing natural.
-
Expose a safe manual-edit path.
bulk_update_stateis a controlled edit interface into the ledger. Consider offering a similar API for your state: it lets operators fix issues and build admin tools without poking your database directly — if you enforce strong validation and unambiguous semantics. -
Design sync and async together.
Pregel keeps tight parity between
stream/astream,invoke/ainvoke, andbulk_update_state/abulk_update_state. When you add features, design the sync/async story together so you do not end up with two subtly different runtimes. -
Treat observability as part of the API.
Stream modes (
"values","updates","messages","tasks","checkpoints","debug") are part of the public surface, not bolted on later. Think of logs, metrics, and streams as first-class outputs of your system, not just side effects.
Viewed as a ledger, Pregel turns a complex LLM workflow into a sequence of carefully written pages you can always come back to: clear step boundaries, explicit state transitions, and safe ways to read and edit history. If we design our own runtimes with the same mindset, they become much easier to scale, debug, and evolve.



