Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Pola rs Polars Streaming Query Pipeline

From Leeroopedia


Knowledge Sources
Domains Data Engineering, Streaming
Last Updated 2026-02-09 10:00 GMT

Overview

Concrete LazyFrame methods for building streaming-compatible query pipelines using the standard Polars expression API, including filtering, selection, aggregation, joining, and sorting.

Description

The LazyFrame class exposes a fluent API for constructing query plans. Each method appends an operation node to the internal plan DAG and returns a new LazyFrame. No computation occurs until a terminal action (collect or sink_*) is invoked. The same methods work identically whether the query is later executed in-memory or via the streaming engine.

Key operations include:

  • filter -- Row-level predicate filtering using expressions.
  • select -- Column projection and transformation.
  • with_columns -- Add or overwrite columns while preserving existing ones.
  • group_by / agg -- Grouped aggregation with arbitrary expressions.
  • join -- Combine two LazyFrames on shared keys.
  • sort -- Order rows by one or more columns.

Usage

Use these methods whenever you need to:

  • Build a multi-step transformation pipeline on lazily-scanned data.
  • Chain filters, projections, and aggregations in a single fluent expression.
  • Construct queries that will be executed by the streaming engine.

Code Reference

Source Location

  • Repository: Polars
  • File: docs/source/src/python/user-guide/concepts/streaming.py (lines 8-13)

Signature

class LazyFrame:
    def filter(self, predicate: Expr) -> LazyFrame: ...
    def select(self, *exprs: Expr | str) -> LazyFrame: ...
    def with_columns(self, *exprs: Expr | str) -> LazyFrame: ...
    def group_by(self, *by: str | Expr) -> LazyGroupBy: ...
    def join(
        self,
        other: LazyFrame,
        on: str | Expr | list[str | Expr] | None = None,
        how: str = "inner",
    ) -> LazyFrame: ...
    def sort(
        self,
        by: str | Expr | list[str | Expr],
        descending: bool = False,
    ) -> LazyFrame: ...

class LazyGroupBy:
    def agg(self, *exprs: Expr | str) -> LazyFrame: ...

Import

import polars as pl

I/O Contract

Inputs

Name Type Required Description
predicate (filter) Expr Yes Boolean expression for row filtering (e.g., pl.col("x") > 5)
exprs (select/with_columns/agg) str Yes One or more column expressions for projection or aggregation
by (group_by) Expr Yes One or more columns to group by
other (join) LazyFrame Yes The right-side LazyFrame for joining
on (join) Expr | list No Join key column(s); if None, uses common column names
how (join) str No Join type: "inner", "left", "outer", "cross"
by (sort) Expr | list Yes Column(s) to sort by
descending (sort) bool No If True, sort in descending order (default False)

Outputs

Name Type Description
result LazyFrame A new LazyFrame with the operation appended to the query plan. No data is computed.

Usage Examples

Basic Filter and Aggregation

import polars as pl

q = (
    pl.scan_csv("docs/assets/data/iris.csv")
    .filter(pl.col("sepal_length") > 5)
    .group_by("species")
    .agg(pl.col("sepal_width").mean())
)

# Same API as in-memory -- only collect differs

Multi-Step Pipeline with Select and With_Columns

import polars as pl

q = (
    pl.scan_parquet("sales/*.parquet")
    .filter(pl.col("amount") > 0)
    .with_columns(
        (pl.col("amount") * pl.col("quantity")).alias("total"),
        pl.col("date").dt.year().alias("year"),
    )
    .group_by("year", "region")
    .agg(
        pl.col("total").sum().alias("revenue"),
        pl.col("order_id").n_unique().alias("num_orders"),
    )
    .sort("revenue", descending=True)
)

Join Two Lazy Scans

import polars as pl

orders = pl.scan_parquet("orders/*.parquet")
customers = pl.scan_parquet("customers.parquet")

q = (
    orders
    .join(customers, on="customer_id", how="left")
    .group_by("country")
    .agg(pl.col("amount").sum())
)

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment