Implementation:Pola rs Polars Streaming Query Pipeline
| Knowledge Sources | |
|---|---|
| Domains | Data Engineering, Streaming |
| Last Updated | 2026-02-09 10:00 GMT |
Overview
Concrete LazyFrame methods for building streaming-compatible query pipelines using the standard Polars expression API, including filtering, selection, aggregation, joining, and sorting.
Description
The LazyFrame class exposes a fluent API for constructing query plans. Each method appends an operation node to the internal plan DAG and returns a new LazyFrame. No computation occurs until a terminal action (collect or sink_*) is invoked. The same methods work identically whether the query is later executed in-memory or via the streaming engine.
Key operations include:
- filter -- Row-level predicate filtering using expressions.
- select -- Column projection and transformation.
- with_columns -- Add or overwrite columns while preserving existing ones.
- group_by / agg -- Grouped aggregation with arbitrary expressions.
- join -- Combine two
LazyFrames on shared keys. - sort -- Order rows by one or more columns.
Usage
Use these methods whenever you need to:
- Build a multi-step transformation pipeline on lazily-scanned data.
- Chain filters, projections, and aggregations in a single fluent expression.
- Construct queries that will be executed by the streaming engine.
Code Reference
Source Location
- Repository: Polars
- File:
docs/source/src/python/user-guide/concepts/streaming.py(lines 8-13)
Signature
class LazyFrame:
def filter(self, predicate: Expr) -> LazyFrame: ...
def select(self, *exprs: Expr | str) -> LazyFrame: ...
def with_columns(self, *exprs: Expr | str) -> LazyFrame: ...
def group_by(self, *by: str | Expr) -> LazyGroupBy: ...
def join(
self,
other: LazyFrame,
on: str | Expr | list[str | Expr] | None = None,
how: str = "inner",
) -> LazyFrame: ...
def sort(
self,
by: str | Expr | list[str | Expr],
descending: bool = False,
) -> LazyFrame: ...
class LazyGroupBy:
def agg(self, *exprs: Expr | str) -> LazyFrame: ...
Import
import polars as pl
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| predicate (filter) | Expr |
Yes | Boolean expression for row filtering (e.g., pl.col("x") > 5)
|
| exprs (select/with_columns/agg) | str | Yes | One or more column expressions for projection or aggregation |
| by (group_by) | Expr | Yes | One or more columns to group by |
| other (join) | LazyFrame |
Yes | The right-side LazyFrame for joining
|
| on (join) | Expr | list | No | Join key column(s); if None, uses common column names |
| how (join) | str |
No | Join type: "inner", "left", "outer", "cross"
|
| by (sort) | Expr | list | Yes | Column(s) to sort by |
| descending (sort) | bool |
No | If True, sort in descending order (default False) |
Outputs
| Name | Type | Description |
|---|---|---|
| result | LazyFrame |
A new LazyFrame with the operation appended to the query plan. No data is computed.
|
Usage Examples
Basic Filter and Aggregation
import polars as pl
q = (
pl.scan_csv("docs/assets/data/iris.csv")
.filter(pl.col("sepal_length") > 5)
.group_by("species")
.agg(pl.col("sepal_width").mean())
)
# Same API as in-memory -- only collect differs
Multi-Step Pipeline with Select and With_Columns
import polars as pl
q = (
pl.scan_parquet("sales/*.parquet")
.filter(pl.col("amount") > 0)
.with_columns(
(pl.col("amount") * pl.col("quantity")).alias("total"),
pl.col("date").dt.year().alias("year"),
)
.group_by("year", "region")
.agg(
pl.col("total").sum().alias("revenue"),
pl.col("order_id").n_unique().alias("num_orders"),
)
.sort("revenue", descending=True)
)
Join Two Lazy Scans
import polars as pl
orders = pl.scan_parquet("orders/*.parquet")
customers = pl.scan_parquet("customers.parquet")
q = (
orders
.join(customers, on="customer_id", how="left")
.group_by("country")
.agg(pl.col("amount").sum())
)