Skip to main content

Lazy Pipelines, Fast Backends

Lazy Pipelines, Fast Backends digs into how to keep data pipelines easy to write while still hitting serious performance in the backend.

Code Cracking
30m read
#datapipelines#backend#performance
Lazy Pipelines, Fast Backends - Featured blog post image
Mahmoud Zalt

1:1 Mentor

Are you a software engineer moving into AI?

Let's have a call. I'll help you modernize your skills and learn the tools, systems, and architecture behind real AI products. One session or ongoing.

Hire AI Employees

Hire AI employees that work 24/7. No code.

We’re examining how Polars turns friendly Python into a ruthless, multi-engine query planner. Polars is a fast DataFrame library that leans heavily on a Rust core, and at the center of its lazy story is LazyFrame: not a dataset, but a description of work to be done. I’m Mahmoud Zalt, an AI solutions architect, and we’ll use this file as a guide to designing lazy APIs that stay pleasant at the edges while brutal in the middle.

We’ll focus on one core lesson: design your lazy API as a blueprint, and treat engine selection and execution as pluggable strategies. We’ll see how LazyFrame keeps the blueprint pure, delegates execution to engines (CPU, streaming, GPU, cloud), and wraps sinks, schema evolution, and observability around that boundary without leaking complexity back into user code.

LazyFrame as a blueprint, not a dataset

The mental model is simple but strict: an eager DataFrame is data, a LazyFrame is a plan. Every method either extends that plan or triggers its execution; mixing the two is how you end up with surprise performance bugs.

polars/
  py-polars/
    src/polars/
      lazyframe/
        frame.py      <-- LazyFrame Python API over PyLazyFrame
      dataframe/
        __init__.py   (DataFrame eager API)
      _plr.so         (Rust-backed core: PyLazyFrame, PyExpr, ...)

User code
   |
   v
LazyFrame (frame.py)  -- build logical plan (select, join, group_by, ...)
   |
   v
PyLazyFrame (Rust)    -- optimization & physical planning
   |
   +--> in-memory engine   -- collect() -> DataFrame
   +--> streaming engine   -- collect_batches()/sink_*
   +--> GPU engine         -- collect(engine="gpu")
   +--> Polars Cloud       -- remote().execute()
LazyFrame sits between Python and the Rust engine, holding the logical plan.

The constructor makes this boundary explicit. It always goes through an eager DataFrame, then immediately switches to a lazy plan:

class LazyFrame:
    def __init__(
        self,
        data: FrameInitTypes | None = None,
        schema: SchemaDefinition | None = None,
        ...,
    ) -> None:
        from polars.dataframe import DataFrame

        self._ldf = (
            DataFrame(
                data=data,
                schema=schema,
                ...,
            )
            .lazy()
            ._ldf
        )

From that point on, self._ldf is a PyLazyFrame owned by Rust. The Python layer becomes a façade: it parses arguments, builds expressions, and hands them to _ldf as new plan nodes. As long as a method returns a LazyFrame, it’s expected to only modify this blueprint.

Engine selection as a strategy pattern

Once the plan is separate, the question becomes: who decides how and where to run it? In Polars, engine choice is a small, explicit strategy wired in at the execution boundary, not something scattered across plan-building methods.

Every execution-style method converges on a helper that resolves the engine:

def _select_engine(engine: EngineType) -> EngineType:
    return get_engine_affinity() if engine == "auto" else engine

"auto" is interpreted once via global affinity (config/env), everything else ("in-memory", "streaming", "gpu", or a GPUEngine instance) passes through unchanged. That small helper is the top of the strategy funnel.

GPU support stays out of the core API by living behind a dedicated callback constructor:

def _gpu_engine_callback(
    engine: EngineType,
    *,
    background: bool,
    _eager: bool,
) -> Callable[[Any, int | None], None] | None:
    is_gpu = (is_config_obj := isinstance(engine, GPUEngine)) or engine == "gpu"
    if not (
        is_config_obj or engine in ("auto", "cpu", "in-memory", "streaming", "gpu")
    ):
        raise ValueError(f"Invalid engine argument {engine=}")

    if background and is_gpu:
        issue_warning(
            "GPU engine does not support background collection, disabling GPU engine.",
            category=UserWarning,
        )
        is_gpu = False
    if _eager:
        # don't run on GPU in _eager mode
        is_gpu = False

    if not is_gpu:
        return None

    cudf_polars = import_optional("cudf_polars", ...)
    if not is_config_obj:
        engine = GPUEngine()
    return partial(cudf_polars.execute_with_cudf, config=engine)

This function centralizes three concerns:

  • Validate engine names in one place.
  • Apply policy rules once (no GPU in background or eager mode).
  • Hide the optional cudf_polars dependency behind a generic callback.

collect then becomes the narrow execution gate:

@deprecate_streaming_parameter()
@forward_old_opt_flags()
def collect(
    self,
    *,
    engine: EngineType = "auto",
    background: bool = False,
    optimizations: QueryOptFlags = DEFAULT_QUERY_OPT_FLAGS,
    **_kwargs,
) -> DataFrame | InProcessQuery:
    engine = _select_engine(engine)

    callback = _gpu_engine_callback(
        engine,
        background=background,
        _eager=optimizations._pyoptflags.eager,
    )
    if isinstance(engine, GPUEngine):
        engine = "gpu"

    ldf = self._ldf.with_optimizations(optimizations._pyoptflags)

    if background:
        issue_unstable_warning("background mode is considered unstable.")
        return InProcessQuery(ldf.collect_concurrently())

    callback = _kwargs.get("post_opt_callback", callback)
    return wrap_df(ldf.collect(engine, callback))

The logical plan itself is oblivious to engines; all it sees is a configuration string and an optional function to run the query on GPU. Invalid combinations are rejected here, before Rust does any work.

API discipline: lazy methods vs eager escapes

With the blueprint/engine boundary clear, the next challenge is API hygiene: guaranteeing that “lazy” methods stay lazy, and that expensive helpers are very explicit about their cost.

Sharing one brain for filter/remove

filter and remove show how to offer a flexible surface without leaking execution: both are thin shells over a single _filter helper that only manipulates expressions and plan nodes.

def _filter(
    self,
    *,
    predicates: tuple[
        IntoExprColumn | Iterable[IntoExprColumn] | bool | list[bool] | np.ndarray[Any, Any],
        ...,
    ],
    constraints: dict[str, Any],
    invert: bool = False,
) -> LazyFrame:
    all_predicates: list[pl.Expr] = []
    boolean_masks = []

    for p in predicates:
        if (p is False and invert) or (p is True and not invert):
            continue
        if (p is True and invert) or (p is False and not invert):
            return self.clear()

        if _is_generator(p):
            p = tuple(p)

        if is_bool_sequence(p, include_series=True):
            boolean_masks.append(pl.Series(p, dtype=Boolean))
        elif (... type checks ...):
            raise TypeError(...)
        else:
            all_predicates.extend(
                wrap_expr(x) for x in parse_into_list_of_expressions(p)
            )

    all_predicates.extend(
        F.col(name).eq(value) for name, value in constraints.items()
    )
    if not (all_predicates or boolean_masks):
        raise TypeError("at least one predicate or constraint must be provided")

    combined_predicate = ...  # combine exprs with AND

    if boolean_masks:
        mask_expr = F.lit(reduce(and_, boolean_masks))
        combined_predicate = (
            mask_expr if combined_predicate is None else mask_expr & combined_predicate
        )

    if combined_predicate is None:
        return self._from_pyldf(self._ldf)

    filter_method = self._ldf.remove if invert else self._ldf.filter
    return self._from_pyldf(filter_method(combined_predicate._pyexpr))

This helper never calls collect or performs I/O. It normalizes the variety of predicate shapes (booleans, lists, numpy arrays, expressions, keyword constraints) into a single expression, then adds the appropriate node to the logical plan.

The user-facing filter and remove methods mostly decide what invert should be and delegate. This keeps “smart” behavior centralized and testable.

describe as a deliberate eager escape hatch

At the other end of the spectrum is describe, which is intentionally eager. It collects the frame, computes statistics, and returns a DataFrame. This is useful, but it’s also expensive, so the implementation and docs are explicit about breaking laziness.

Internally, describe:

  • Uses collect_schema() first to understand column types.
  • Builds a large expression list to compute counts, distincts, min/max, and quantiles.
  • Performs an extra O(n log n) sort per temporal/numeric column when multiple quantiles are requested, trading CPU for fewer passes over data.
  • Runs a final .select(...).collect() and returns the materialized result.

The docstring calls this out directly:

This method does not maintain the laziness of the frame, and will collect the final result. This could potentially be an expensive operation.

That pattern—keep the main API lazy, but provide a few clearly-labeled, eager helpers for inspection—is essential when you want good ergonomics without hiding costs.

Streaming and sinks as output strategies

Execution doesn’t always mean “return a single in-memory DataFrame”. The same plan can be executed as a stream of batches or written directly to storage. In this file, that shows up as a streaming iterator and a family of sink_* methods, all of which treat the logical plan as input and I/O configuration as strategy.

Streaming execution with collect_batches

collect_batches is the streaming counterpart to collect. It runs the same plan but exposes results incrementally as DataFrame chunks instead of one monolithic table.

@unstable()
def collect_batches(
    self,
    *,
    chunk_size: int | None = None,
    maintain_order: bool = True,
    lazy: bool = False,
    engine: EngineType = "auto",
    optimizations: QueryOptFlags = DEFAULT_QUERY_OPT_FLAGS,
) -> Iterator[DataFrame]:
    engine = _select_engine(engine)
    if engine == "auto":
        engine = "streaming"

    class CollectBatches:
        def __init__(self, inner: Any) -> None:
            self._inner = inner
        def __iter__(self) -> CollectBatches:
            return self
        def __next__(self) -> DataFrame:
            pydf = next(self._inner)
            return pl.DataFrame._from_pydf(pydf)
        def __arrow_c_stream__(self, requested_schema: object | None = None) -> object:
            return self._inner.__arrow_c_stream__(requested_schema)

    ldf = self._ldf.with_optimizations(optimizations._pyoptflags)
    inner = ldf.collect_batches(
        engine=engine,
        maintain_order=maintain_order,
        chunk_size=chunk_size,
        lazy=lazy,
    )
    return CollectBatches(inner)

The structure mirrors collect:

  1. Resolve engine, defaulting "auto" to "streaming".
  2. Apply optimization flags to the logical plan.
  3. Delegate to Rust for actual streaming execution.
  4. Wrap each low-level batch into a Python DataFrame iterator.

Memory usage is now roughly O(chunk_size) per batch instead of O(total_rows), which is the difference between “works on my laptop” and “works on a 10× larger dataset”.

Normalizing sink targets

Sinks like sink_parquet, sink_ipc, sink_csv, sink_ndjson, sink_delta, sink_iceberg, and sink_batches all follow the same pattern: normalize a Python-level target, prepare options (including cloud storage), then hand everything to PyLazyFrame.

A small but important piece is _to_sink_target:

def _to_sink_target(
    path: str | Path | IO[bytes] | IO[str] | PartitionBy,
) -> str | Path | IO[bytes] | IO[str] | PartitionBy:
    from polars.io.partition import PartitionBy

    if isinstance(path, (str, Path)):
        return normalize_filepath(path)
    elif isinstance(path, io.IOBase):
        return path
    elif isinstance(path, PartitionBy):
        return path
    elif callable(getattr(path, "write", None)):
        # allow custom writers
        return path
    else:
        msg = (
            f"`path` argument has invalid type {qualified_type_name(path)!r}, "
            "and cannot be turned into a sink target"
        )
        raise TypeError(msg)

This is an adapter: user code can pass strings, Paths, file objects, partitioning helpers, or anything with a write method, and the Rust side always receives a normalized, expected type.

sink_parquet as the archetype

sink_parquet is the richest sink and representative of the pattern. It:

  • Transforms high-level options into an explicit statistics configuration (e.g. True, "full", or a dict).
  • Builds a _SinkOptions object containing storage_options, credential providers, retry behavior, and partitioning.
  • Respects lazy to either execute immediately or return a deferred plan node.
  • Uses _select_engine to honor engine choice.

The report notes that much of this _SinkOptions-building logic is duplicated across multiple sinks. The recommended refactor is a shared _prepare_sink_options helper to centralize retry deprecation, credential wiring, and option validation. That keeps the blueprint/engine boundary clean even as you add formats and targets.

Schema evolution as a plan operation

Real pipelines rarely enjoy a stable schema. Columns change, nested structs grow new fields, and type requirements tighten over time. Polars treats this problem as part of planning, not something you solve with ad hoc code around every load.

match_to_schema is the main tool here, and it operates entirely at the LazyFrame level:

@unstable()
def match_to_schema(
    self,
    schema: SchemaDict | Schema,
    *,
    missing_columns: (
        Literal["insert", "raise"]
        | Mapping[str, Literal["insert", "raise"] | Expr]
        | Expr
    ) = "raise",
    missing_struct_fields: (
        Literal["insert", "raise"]
        | Mapping[str, Literal["insert", "raise"]]
    ) = "raise",
    extra_columns: Literal["ignore", "raise"] = "raise",
    extra_struct_fields: (
        Literal["ignore", "raise"]
        | Mapping[str, Literal["ignore", "raise"]]
    ) = "raise",
    integer_cast: (
        Literal["upcast", "forbid"]
        | Mapping[str, Literal["upcast", "forbid"]]
    ) = "forbid",
    float_cast: (
        Literal["upcast", "forbid"]
        | Mapping[str, Literal["upcast", "forbid"]]
    ) = "forbid",
) -> LazyFrame:

The implementation normalizes both the target schema and the policy for how to get there:

if isinstance(schema, Mapping):
    schema_prep = Schema(schema)
else:
    schema_prep = schema

if isinstance(missing_columns, Mapping):
    missing_columns_pyexpr = {
        key: prepare_missing_columns(value)
        for key, value in missing_columns.items()
    }
elif isinstance(missing_columns, Expr):
    missing_columns_pyexpr = prepare_missing_columns(missing_columns)
else:
    missing_columns_pyexpr = missing_columns

return LazyFrame._from_pyldf(
    self._ldf.match_to_schema(
        schema=schema_prep,
        missing_columns=missing_columns_pyexpr,
        missing_struct_fields=missing_struct_fields,
        extra_columns=extra_columns,
        extra_struct_fields=extra_struct_fields,
        integer_cast=integer_cast,
        float_cast=float_cast,
    )
)

The effect is a declarative contract between caller and engine:

  • For missing columns: insert default values, compute them via expressions, or fail.
  • For extra columns: ignore them or treat their presence as an error.
  • For numeric casts: allow or forbid widening (e.g. int32 → int64, float32 → float64) globally or per-column.

All of this happens without executing the plan. The Rust core enforces and applies these rules when the query finally runs.

Operational surface: async, limits, and metrics

Even though this file is “just” Python bindings, it’s also the operational boundary. It decides which operations are expensive, how concurrency is handled, and where you’d naturally hang metrics and warnings.

Where the real work lives

The genuine hot paths are limited and easy to see:

  • Execution: collect, execute, collect_async, collect_batches.
  • Output: sink_parquet, sink_ipc, sink_csv, sink_ndjson, and other sinks.
  • Heavy transforms: group_by, join, group_by_dynamic, describe.

Most methods are thin wrappers whose cost is dominated by the Rust engine. describe is the notable exception because of its extra sort per temporal/numeric column for multi-quantile statistics.

Async collection and safety constraints

collect_async is where Python’s concurrency model meets the Rust executor. It uses dedicated thread pools and small result wrappers to integrate with asyncio or gevent, but still respects engine-level constraints.

_COLLECT_BATCHES_POOL = ThreadPoolExecutor(thread_name_prefix="pl_col_batch_")

@deprecate_streaming_parameter()
def collect_async(
    self,
    *,
    engine: EngineType = "auto",
    optimizations: QueryOptFlags = DEFAULT_QUERY_OPT_FLAGS,
):
    engine = _select_engine(engine)
    if engine == "streaming":
        issue_unstable_warning("streaming mode is considered unstable.")

    ldf = self._ldf.with_optimizations(optimizations._pyoptflags)

    result = _GeventDataFrameResult() if gevent else _AioDataFrameResult()
    ldf.collect_with_callback(engine, result._callback)
    return result

GPU-specific rules (like “no GPU in async/background mode”) are enforced earlier in _gpu_engine_callback. Async is treated as a scheduling concern only — it doesn’t change the logical plan, just how the event loop waits for results.

Natural metric points

This façade is also where observability hooks belong. The report suggests metrics that map cleanly onto the execution boundary:

MetricPurpose
polars_lazyframe_query_duration_seconds Latency of collect/execute/sinks, labeled by engine and query type (e.g. interactive vs batch).
polars_lazyframe_rows_processed_total Total number of rows processed, to relate volume to latency and resource use.
polars_lazyframe_streaming_batches_in_flight Gauge of concurrent streaming batches for collect_batches and sinks, capturing backpressure.
polars_lazyframe_gpu_fallback_count Count of cases where GPU execution was requested but fell back to CPU, exposing misconfiguration or unsupported features.
polars_lazyframe_io_errors_total Aggregate count of I/O errors across all sink_* calls and cloud operations.

Because the blueprint is separate from execution, these counters can be incremented solely at the execution boundary, with no pollution of core plan-building logic.

Design patterns to reuse

Seen as a whole, this LazyFrame implementation is an example of one main principle: keep the lazy API as a pure blueprint, and plug execution engines and outputs in at a narrow, explicit boundary. For intermediate and senior engineers building their own data or rules engines, there are several patterns worth copying.

1. Treat pipelines as blueprints

  • Make plan-building methods return new plan objects, never realized results.
  • Keep those methods free of heavy work: they should only build DAGs of operations and expressions.
  • Reserve a tiny set of well-named methods (collect, describe, sinks) that are allowed to execute, and document their cost.

2. Encapsulate engine selection

  • Introduce a helper like _select_engine to interpret "auto" and environment defaults.
  • Represent engine-specific behavior (GPU, streaming, cloud) as callbacks or small config objects passed into the executor.
  • Enforce invalid combinations (GPU + background, GPU + eager) in a single place before work starts.

3. Centralize complex argument handling

  • For rich APIs like filter/remove, invest in one robust internal helper that normalizes arguments and returns a new plan node.
  • Keep user-facing variants as thin wrappers so they’re easier to reason about and easier to deprecate or extend.

4. Model sinks and schema as first-class strategies

  • Treat sinks as adapters over the same logical plan, with shared utilities for paths, credentials, and retries.
  • Expose schema evolution (match_to_schema-style) as a plan operation with explicit policies instead of bespoke ETL code.

5. Put observability at the execution boundary

  • Identify execution hot spots (collect, streaming, sinks, remote execution) and hang metrics and warnings there.
  • Surface profiling and explain-style helpers to let users inspect how their blueprints map to work.

If you’re designing an analytical engine, a transformation layer, or even a complex business rules system, this pattern gives you a way to stay fast without sacrificing ergonomics: build blueprints first, choose engines and outputs later, and keep that seam narrow, explicit, and observable.

Full Source Code

Direct source from the upstream repository. Preview it inline or open it on GitHub.

heads/main/py-polars/src/polars/lazyframe/frame.py

pola-rs/polars • refs

Choose one action below.

Open on GitHub

Thanks for reading! I hope this was useful. If you have questions or thoughts, feel free to reach out.

Content Creation Process: This article was generated via a semi-automated workflow using AI tools. I prepared the strategic framework, including specific prompts and data sources. From there, the automation system conducted the research, analysis, and writing. The content passed through automated verification steps before being finalized and published without manual intervention.

Mahmoud Zalt

About the Author

I’m Zalt, a technologist with 16+ years of experience, passionate about designing and building AI systems that move us closer to a world where machines handle everything and humans reclaim wonder.

Let's connect if you're working on interesting AI projects, looking for technical advice or want to discuss anything.

Support this content

Share this article

CONSULTING

AI consulting. Strategy to production.

Architecture, implementation, team guidance.