Skip to main content
المدونة

Zalt Blog

Deep Dives into Code & Architecture

AT SCALE

When Graph Orchestration Becomes a Power Tool

By محمود الزلط
Code Cracking
20m read
<

When does graph orchestration stop being glue code and start feeling like a real power tool? This explores that shift and why it matters for your systems.

/>
When Graph Orchestration Becomes a Power Tool - Featured blog post image

CONSULTING

Learning graph-based AI?

Knowledge graphs, Neo4j, entity extraction — 1:1 mentoring to go from graph concepts to production implementations.

We’re examining how Langflow turns a static flow diagram into a live execution engine through its Graph class. Langflow lets you design AI flows as connected components, and this class is the core runner that decides what runs, when, and in what order. I’m Mahmoud Zalt, an AI solutions architect, and we’ll use this file as a case study in how to separate “how work is done” from “when work is scheduled” so your orchestration layer stays powerful instead of chaotic.

Setting the scene: what this Graph actually runs

Langflow represents a flow as vertices (components) connected by edges that describe how data moves. The Graph class in base.py is the execution brain: it takes those vertices and edges and turns them into a runnable system.

langflow/
  src/
    lfx/
      graph/
        graph/
          base.py        # Graph execution engine (this file)
          constants.py
          runnable_vertices_manager.py
          utils.py
          state_model.py
        vertex/
          base.py        # Vertex definitions
          schema.py
          vertex_types.py
        edge/
          base.py        # Edge and CycleEdge
          schema.py
        schema.py        # Graph-level schemas (GraphData, RunOutputs, ...)
      services/
        chat/            # Cache/chat service
      tracing/           # Tracing service
Where the Graph orchestrator sits inside Langflow.

At a high level, Graph:

  • Builds a graph from payloads (nodes, edges) or component objects.
  • Computes adjacency maps and execution layers from that structure.
  • Runs vertices step-by-step or in parallel batches, including cycles and stateful components.
  • Integrates caching, tracing, and logging.
  • Exposes async and sync APIs: arun, process, async_start, astep, start, step.

The scheduler mental model: air-traffic control, not a pipe

It’s tempting to think of a flow runner as a pipeline: step A, then B, then C. The Graph behaves more like an air-traffic controller. Vertices are planes, edges are flight plans, and the orchestrator decides which planes are cleared for takeoff.

On the topology side, a few methods define the “radar” the scheduler uses:

  • build_graph_maps – builds predecessor_map, successor_map, in_degree_map, and parent_child_map.
  • sort_vertices – turns those maps into execution layers.
  • get_next_runnable_vertices – given a completed vertex, decides which successors are now runnable.
def build_graph_maps(self, edges=None, vertices=None) -> None:
    if edges is None:
        edges = self.edges
    if vertices is None:
        vertices = self.vertices

    self.predecessor_map, self.successor_map = self.build_adjacency_maps(edges)
    self.in_degree_map = self.build_in_degree(edges)
    self.parent_child_map = self.build_parent_child_map(vertices)
Adjacency and in-degree maps – the scheduler’s radar.

The critical architectural move is here: representation is separate from scheduling. Vertices and edges encode what can happen; the Graph plus RunnableVerticesManager decides what does happen now.

Execution modes on one scheduling model

Once the scheduler is in place, the next question is execution style. This file exposes two main modes — batch and step-wise — without duplicating scheduling logic. Both ride on the same primitives: adjacency maps, run manager, and “runnable” checks.

Batch execution with process()

process is the “run the whole flow” path. It computes the first execution layer, runs each layer in parallel with asyncio.create_task, then asks the scheduler which vertices can run next.

async def process(self, *, fallback_to_env_vars: bool,
                  start_component_id: str | None = None,
                  event_manager: EventManager | None = None) -> Graph:
    has_webhook_component = "webhook" in start_component_id.lower() if start_component_id else False
    first_layer = self.sort_vertices(start_component_id=start_component_id)
    vertex_task_run_count: dict[str, int] = {}
    to_process = deque(first_layer)
    layer_index = 0
    chat_service = get_chat_service()

    if chat_service is not None:
        get_cache_func = chat_service.get_cache
        set_cache_func = chat_service.set_cache
    else:
        async def get_cache_func(*_, **__):
            return None
        async def set_cache_func(*_, **__):
            pass

    await self.initialize_run()
    lock = asyncio.Lock()
    while to_process:
        current_batch = list(to_process)
        to_process.clear()
        tasks = []
        for vertex_id in current_batch:
            vertex = self.get_vertex(vertex_id)
            task = asyncio.create_task(
                self.build_vertex(
                    vertex_id=vertex_id,
                    user_id=self.user_id,
                    inputs_dict={},
                    fallback_to_env_vars=fallback_to_env_vars,
                    get_cache=get_cache_func,
                    set_cache=set_cache_func,
                    event_manager=event_manager,
                ),
                name=f"{vertex.id} Run {vertex_task_run_count.get(vertex_id, 0)}",
            )
            tasks.append(task)
            vertex_task_run_count[vertex_id] = vertex_task_run_count.get(vertex_id, 0) + 1

        await logger.adebug(f"Running layer {layer_index} with {len(tasks)} tasks, {current_batch}")
        next_runnable_vertices = await self._execute_tasks(
            tasks, lock=lock, has_webhook_component=has_webhook_component
        )
        if not next_runnable_vertices:
            break
        to_process.extend(next_runnable_vertices)
        layer_index += 1

    await logger.adebug("Graph processing complete")
    return self
Layered parallel execution driven by the scheduler.

The orchestration loop itself is simple: maintain a frontier, run it, ask the scheduler for the next frontier. The complexity lives in the scheduler and in build_vertex, not in the outer loop.

Interactive execution with astep()

astep is the interactive cousin used for step-wise or streaming scenarios. Instead of layers, it keeps a run queue and pops one vertex at a time, but it still delegates “what’s next” to the same scheduler.

async def astep(self, inputs: InputValueRequest | None = None,
                files: list[str] | None = None,
                user_id: str | None = None,
                event_manager: EventManager | None = None):
    if not self._prepared:
        raise ValueError("Graph not prepared. Call prepare() first.")
    if not self._run_queue:
        self._end_all_traces_async()
        return Finish()

    vertex_id = self.get_next_in_queue()
    if not vertex_id:
        raise ValueError("No vertex to run")

    chat_service = get_chat_service()
    if chat_service is not None:
        get_cache_func = chat_service.get_cache
        set_cache_func = chat_service.set_cache
    else:
        async def get_cache_func(*_, **__):
            return None
        async def set_cache_func(*_, **__) -> bool:
            return True

    vertex_build_result = await self.build_vertex(
        vertex_id=vertex_id,
        user_id=user_id,
        inputs_dict=inputs.model_dump() if inputs and hasattr(inputs, "model_dump") else {},
        files=files,
        get_cache=get_cache_func,
        set_cache=set_cache_func,
        event_manager=event_manager,
    )

    next_runnable_vertices = await self.get_next_runnable_vertices(
        self.lock, vertex=vertex_build_result.vertex, cache=False
    )
    if self.stop_vertex and self.stop_vertex in next_runnable_vertices:
        next_runnable_vertices = [self.stop_vertex]

    self.extend_run_queue(next_runnable_vertices)
    self.reset_inactivated_vertices()
    self.reset_activated_vertices()

    if chat_service is not None:
        await chat_service.set_cache(str(self.flow_id or self._run_id), self)
    self._record_snapshot(vertex_id)
    return vertex_build_result
Single-step execution, still using the same scheduling rules.

Both process and astep lean on the same core pieces:

  • build_vertex – how a vertex does work.
  • get_next_runnable_vertices – which vertices become runnable after that work.
  • RunnableVerticesManager – tracking what is runnable and what already ran.

Caching and conditional routing as orchestration policies

With the scheduling model in place, Langflow layers in cross-cutting policies: caching and conditional routing. Both are implemented centrally in the Graph instead of inside vertices. This keeps vertices focused on “how to work” and lets the orchestrator enforce flow-wide behavior.

Caching and frozen vertices

The build_vertex method wraps the vertex’s own build logic with caching and “frozen” behavior, plus a special case for looping components.

async def build_vertex(self, vertex_id: str, *,
                       get_cache: GetCache | None = None,
                       set_cache: SetCache | None = None,
                       inputs_dict: dict[str, str] | None = None,
                       files: list[str] | None = None,
                       user_id: str | None = None,
                       fallback_to_env_vars: bool = False,
                       event_manager: EventManager | None = None) -> VertexBuildResult:
    vertex = self.get_vertex(vertex_id)
    self.run_manager.add_to_vertices_being_run(vertex_id)
    try:
        should_build = False
        is_loop_component = vertex.display_name == "Loop" or vertex.is_loop
        if not vertex.frozen or is_loop_component:
            should_build = True
        else:
            cached_result = await get_cache(key=vertex.id) if get_cache is not None else CacheMiss()
            if isinstance(cached_result, CacheMiss):
                should_build = True
            else:
                try:
                    cached_vertex_dict = cached_result["result"]
                    vertex.built = cached_vertex_dict["built"]
                    vertex.artifacts = cached_vertex_dict["artifacts"]
                    vertex.built_object = cached_vertex_dict["built_object"]
                    vertex.built_result = cached_vertex_dict["built_result"]
                    vertex.full_data = cached_vertex_dict["full_data"]
                    vertex.results = cached_vertex_dict["results"]
                    try:
                        vertex.finalize_build()
                        if vertex.result is not None:
                            vertex.result.used_frozen_result = True
                    except Exception:
                        logger.debug("Error finalizing build", exc_info=True)
                        vertex.built = False
                        should_build = True
                except KeyError:
                    vertex.built = False
                    should_build = True

        if should_build:
            await vertex.build(
                user_id=user_id,
                inputs=inputs_dict,
                fallback_to_env_vars=fallback_to_env_vars,
                files=files,
                event_manager=event_manager,
            )
            if set_cache is not None:
                vertex_dict = {
                    "built": vertex.built,
                    "results": vertex.results,
                    "artifacts": vertex.artifacts,
                    "built_object": vertex.built_object,
                    "built_result": vertex.built_result,
                    "full_data": vertex.full_data,
                }
                await set_cache(key=vertex.id, data=vertex_dict)

    except Exception as exc:
        if not isinstance(exc, ComponentBuildError):
            await logger.aexception("Error building Component")
        raise

    if vertex.result is None:
        raise ValueError(f"Error building Component: no result found for vertex {vertex_id}")

    params = vertex.built_object_repr()
    return VertexBuildResult(
        result_dict=vertex.result,
        params=params,
        valid=True,
        artifacts=vertex.artifacts,
        vertex=vertex,
    )
Caching and frozen-result handling wrapped around a vertex build.

Simplified, the behavior is:

  1. Non-frozen or loop vertices always build fresh.
  2. Frozen vertices attempt a cache restore.
  3. Any cache miss or invalid data falls back to building, then caching.
  4. Restored results are marked via used_frozen_result.
Scenario Build? Cache behavior
Non-frozen vertex Yes May write after build
Frozen vertex, cache miss Yes Write after build
Frozen vertex, cache hit, finalize ok No Restore only, mark frozen result
Frozen vertex, cache hit, finalize fails Yes Rebuild and recache

The key is that caching is an orchestration policy, not something each component re-implements. The Graph decides when a vertex is allowed to skip work, and does so consistently across the flow.

Conditional routing without mutating the graph

Conditional routing adds another policy: sometimes we want to keep the topology but temporarily close a branch. Langflow models this as conditional exclusion data, not structural changes.

def exclude_branch_conditionally(self, vertex_id: str, output_name: str | None = None) -> None:
    """Marks a branch as conditionally excluded (for conditional routing).

    This system is separate from the ACTIVE/INACTIVE state used for cycle management:
    - ACTIVE/INACTIVE: Reset after each cycle iteration
    - Conditional exclusion: Persists until explicitly cleared by the same source vertex
    """
    if vertex_id in self.conditional_exclusion_sources:
        previous_exclusions = self.conditional_exclusion_sources[vertex_id]
        self.conditionally_excluded_vertices -= previous_exclusions
        del self.conditional_exclusion_sources[vertex_id]

    visited: set[str] = set()
    excluded: set[str] = set()
    self._exclude_branch_conditionally(vertex_id, visited, excluded, output_name, skip_first=True)

    if excluded:
        self.conditional_exclusion_sources[vertex_id] = excluded
Conditional branch exclusion: closing tracks without changing the rails.

Routing decisions then integrate with scheduling through is_vertex_runnable:

def is_vertex_runnable(self, vertex_id: str) -> bool:
    if vertex_id in self.conditionally_excluded_vertices:
        return False
    vertex = self.get_vertex(vertex_id)
    is_active = vertex.is_active()
    is_loop = vertex.is_loop
    return self.run_manager.is_vertex_runnable(vertex_id, is_active=is_active, is_loop=is_loop)
Routing is expressed as data the scheduler consults, not as topology edits.

This gives three nice properties:

  • Static topology: edges and maps don’t change; you just consult extra sets.
  • Source-scoped decisions: exclusions are tracked per router vertex, so conditions can be updated cleanly.
  • Unified “can this run?” check: exclusion, activation, loop rules, and run history all converge in is_vertex_runnable.

Performance, structure, and what to steal

Once you see the Graph as a scheduler plus policies, the performance story and the structural trade-offs become clearer.

Where the cost really is

The orchestration algorithms (building maps, sorting vertices, traversing edges) are essentially O(V + E). The expensive part is Vertex.build: LLM calls, databases, external APIs.

That’s why the recommended metrics focus on vertices, not just whole flows:

  • graph_run_duration_seconds – total flow runtime.
  • vertex_build_duration_seconds{vertex_id,flow_id} – per-vertex hotspots.
  • graph_active_vertices_count – how big flows really are.
  • graph_errors_total{flow_id,vertex_id} – which components are unstable.
  • cache_hit_ratio{vertex_id} – whether frozen vertices are paying off.

In the air-traffic model: measure how long planes sit on the runway, which planes always cause delays, and how crowded the sky is.

Concurrency: one async core, thin sync wrappers

The class follows a clear concurrency pattern:

  • Async methods (process, arun, astep) implement the real behavior.
  • Sync methods (start, step) are thin wrappers that drive those async operations via event loops or generators.
  • An asyncio.Lock is used around critical sections in get_next_runnable_vertices and task completion to avoid races in the run manager and cache.

This avoids the “dual implementation” problem: there is one place to reason about scheduling and one place to reason about vertex work.

Snapshots vs memory footprint

For step-wise runs, _record_snapshot stores the run manager state, run queue, layers, and call order after each astep. That’s fantastic for debugging and replaying, but dangerous for long or cyclic flows because snapshots grow with the number of steps.

A small but important improvement is to cap snapshots (for example via _max_snapshots) and keep a sliding window of recent history. The general rule: your orchestrator’s memory should scale with graph size (V+E), not with number of steps.

Structural lessons: keeping the power tool sharp

All this power comes at a cost: Graph is close to being a god object. It owns topology, execution, caching, tracing, activation state, conditional routing, snapshotting, and serialization.

The design direction suggested by this file is to keep the core separation and extract focused collaborators over time. For example:

  • A small GraphTopology helper for adjacency, in-degree, and layering.
  • A cache manager for the frozen-vertex logic.
  • A routing policy object for conditional and cycle-based activation.

Then the main Graph can act as a façade: it coordinates these subsystems but continues to own the central rule that vertices describe how to work, while it decides when they run.

If you’re designing your own flow runner, start from three questions: where do components learn how to work, where does the system decide when they work, and which cross-cutting policies need to sit in the middle? Langflow’s Graph shows that answering those cleanly is what turns orchestration from glue code into a power tool.

Full Source Code

Direct source from the upstream repository. Preview it inline or open it on GitHub.

heads/main/src/lfx/src/lfx/graph/graph/base.py

langflow-ai/langflow • refs

Choose one action below.

Open on GitHub

Thanks for reading! I hope this was useful. If you have questions or thoughts, feel free to reach out.

Content Creation Process: This article was generated via a semi-automated workflow using AI tools. I prepared the strategic framework, including specific prompts and data sources. From there, the automation system conducted the research, analysis, and writing. The content passed through automated verification steps before being finalized and published without manual intervention.

Mahmoud Zalt

About the Author

I’m Zalt, a technologist with 16+ years of experience, passionate about designing and building AI systems that move us closer to a world where machines handle everything and humans reclaim wonder.

Let's connect if you're working on interesting AI projects, looking for technical advice or want to discuss anything.

Support this content

Share this article