Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Pola rs Polars Collect Streaming Engine

From Leeroopedia


Knowledge Sources
Domains Data Engineering, Streaming
Last Updated 2026-02-09 10:00 GMT

Overview

Concrete LazyFrame.collect() method with the engine="streaming" parameter that triggers batch-based execution of a lazy query plan for out-of-core dataset processing.

Description

The collect() method is the terminal action that materializes a LazyFrame into a DataFrame. When called with engine="streaming", the Polars execution engine switches from the default in-memory strategy to a streaming strategy that processes data in fixed-size batches.

The streaming engine reads source data in chunks, applies the pipeline operations (filter, project, aggregate) to each chunk, and merges partial results into the final DataFrame. Operations that cannot be streamed automatically fall back to the in-memory engine for that pipeline stage.

Usage

Use collect(engine="streaming") whenever you need to:

  • Materialize a lazy query result from a dataset that may exceed available RAM.
  • Execute a query with bounded, predictable memory consumption.
  • Run scan-filter-aggregate pipelines on large files or partitioned datasets.

Code Reference

Source Location

  • Repository: Polars
  • File: docs/source/src/python/user-guide/concepts/streaming.py (line 14)

Signature

class LazyFrame:
    def collect(
        self,
        *,
        engine: str = "cpu",  # "cpu" for in-memory, "streaming" for batch processing
    ) -> DataFrame: ...

Import

import polars as pl

I/O Contract

Inputs

Name Type Required Description
engine str No Execution engine: "streaming" for batch-based out-of-core processing; default is in-memory execution

Outputs

Name Type Description
result DataFrame The fully materialized query result as an in-memory DataFrame

Usage Examples

Basic Streaming Collection

import polars as pl

q = (
    pl.scan_csv("large_file.csv")
    .filter(pl.col("sepal_length") > 5)
    .group_by("species")
    .agg(pl.col("sepal_width").mean())
)

# Streaming collection for out-of-core processing
df = q.collect(engine="streaming")
print(df)

Comparison: In-Memory vs Streaming

import polars as pl

q = (
    pl.scan_parquet("dataset/*.parquet")
    .filter(pl.col("value") > 100)
    .group_by("category")
    .agg(pl.col("value").sum())
)

# In-memory execution (default) -- loads all data at once
df_memory = q.collect()

# Streaming execution -- processes data in batches
df_streaming = q.collect(engine="streaming")

# Both produce identical results
assert df_memory.sort("category").equals(df_streaming.sort("category"))

Streaming with Multi-File Scan

import polars as pl

# Scan many files, filter, aggregate, collect via streaming
df = (
    pl.scan_parquet("s3://bucket/logs/**/*.parquet")
    .filter(pl.col("status_code") >= 400)
    .group_by("endpoint")
    .agg(
        pl.col("status_code").count().alias("error_count"),
        pl.col("response_time_ms").mean().alias("avg_response_time"),
    )
    .sort("error_count", descending=True)
    .collect(engine="streaming")
)

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment