Implementation:Pola rs Polars Collect Streaming Engine
| Knowledge Sources | |
|---|---|
| Domains | Data Engineering, Streaming |
| Last Updated | 2026-02-09 10:00 GMT |
Overview
Concrete LazyFrame.collect() method with the engine="streaming" parameter that triggers batch-based execution of a lazy query plan for out-of-core dataset processing.
Description
The collect() method is the terminal action that materializes a LazyFrame into a DataFrame. When called with engine="streaming", the Polars execution engine switches from the default in-memory strategy to a streaming strategy that processes data in fixed-size batches.
The streaming engine reads source data in chunks, applies the pipeline operations (filter, project, aggregate) to each chunk, and merges partial results into the final DataFrame. Operations that cannot be streamed automatically fall back to the in-memory engine for that pipeline stage.
Usage
Use collect(engine="streaming") whenever you need to:
- Materialize a lazy query result from a dataset that may exceed available RAM.
- Execute a query with bounded, predictable memory consumption.
- Run scan-filter-aggregate pipelines on large files or partitioned datasets.
Code Reference
Source Location
- Repository: Polars
- File:
docs/source/src/python/user-guide/concepts/streaming.py(line 14)
Signature
class LazyFrame:
def collect(
self,
*,
engine: str = "cpu", # "cpu" for in-memory, "streaming" for batch processing
) -> DataFrame: ...
Import
import polars as pl
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| engine | str |
No | Execution engine: "streaming" for batch-based out-of-core processing; default is in-memory execution
|
Outputs
| Name | Type | Description |
|---|---|---|
| result | DataFrame |
The fully materialized query result as an in-memory DataFrame |
Usage Examples
Basic Streaming Collection
import polars as pl
q = (
pl.scan_csv("large_file.csv")
.filter(pl.col("sepal_length") > 5)
.group_by("species")
.agg(pl.col("sepal_width").mean())
)
# Streaming collection for out-of-core processing
df = q.collect(engine="streaming")
print(df)
Comparison: In-Memory vs Streaming
import polars as pl
q = (
pl.scan_parquet("dataset/*.parquet")
.filter(pl.col("value") > 100)
.group_by("category")
.agg(pl.col("value").sum())
)
# In-memory execution (default) -- loads all data at once
df_memory = q.collect()
# Streaming execution -- processes data in batches
df_streaming = q.collect(engine="streaming")
# Both produce identical results
assert df_memory.sort("category").equals(df_streaming.sort("category"))
Streaming with Multi-File Scan
import polars as pl
# Scan many files, filter, aggregate, collect via streaming
df = (
pl.scan_parquet("s3://bucket/logs/**/*.parquet")
.filter(pl.col("status_code") >= 400)
.group_by("endpoint")
.agg(
pl.col("status_code").count().alias("error_count"),
pl.col("response_time_ms").mean().alias("avg_response_time"),
)
.sort("error_count", descending=True)
.collect(engine="streaming")
)