Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Pola rs Polars Sink Operations

From Leeroopedia


Knowledge Sources
Domains Data Engineering, Streaming
Last Updated 2026-02-09 10:00 GMT

Overview

Concrete sink methods on LazyFrame that write streaming query results directly to Parquet, IPC, or CSV files without full in-memory materialization, supporting partitioned output and multiplexed writes.

Description

Polars provides a family of sink_* methods on LazyFrame for writing streaming query results directly to persistent storage. Each method processes the query plan in streaming mode and writes output batches to disk as they are produced, avoiding full materialization of the result in memory.

The sink methods support two modes:

  • Immediate execution (default, lazy=False) -- Executes the query and writes the output, returning a DataFrame with metadata.
  • Deferred execution (lazy=True) -- Returns a LazyFrame that can be combined with other lazy sinks via pl.collect_all() for multiplexed writes.

Partitioned output is supported through the pl.PartitionBy helper, which enables Hive-style directory partitioning and row-count-based file splitting.

Usage

Use sink operations whenever you need to:

  • Write streaming query results directly to Parquet, IPC, or CSV files.
  • Perform end-to-end data transformations without materializing results in memory.
  • Produce partitioned datasets for downstream consumption.
  • Write a single query result to multiple output formats simultaneously.

Code Reference

Source Location

  • Repository: Polars
  • File: docs/source/user-guide/lazy/sources_sinks.md (lines 11-54)

Signature

class LazyFrame:
    def sink_parquet(
        self,
        path: str | Path | PartitionBy,
        *,
        lazy: bool = False,
    ) -> DataFrame | LazyFrame: ...

    def sink_ipc(
        self,
        path: str | Path | PartitionBy,
        *,
        lazy: bool = False,
    ) -> DataFrame | LazyFrame: ...

    def sink_csv(
        self,
        path: str | Path,
    ) -> DataFrame: ...


class PartitionBy:
    def __init__(
        self,
        path: str | Path,
        *,
        by: str | list[str] | None = None,
        max_rows_per_file: int | None = None,
    ): ...


def collect_all(
    queries: list[LazyFrame],
) -> list[DataFrame]: ...

Import

import polars as pl

I/O Contract

Inputs

Name Type Required Description
path Path | PartitionBy Yes Output file path, or pl.PartitionBy object for partitioned output
lazy bool No If True, returns a LazyFrame for multiplexed writes via pl.collect_all(). Default is False.
max_rows_per_file (PartitionBy) int No Maximum number of rows per output file when using partitioned output
by (PartitionBy) list[str] No Column(s) for Hive-style partitioning

Outputs

Name Type Description
result (lazy=False) DataFrame Metadata about the written output (e.g., row counts)
result (lazy=True) LazyFrame A deferred sink that can be collected with pl.collect_all()

Usage Examples

Sink Directly to Parquet

import polars as pl

lf = pl.scan_parquet("large_data/*.parquet").filter(pl.col("value") > 100)

# Sink directly to parquet -- no full materialization
lf.sink_parquet("output.parquet")

Sink to CSV

import polars as pl

lf = pl.scan_parquet("large_data/*.parquet").filter(pl.col("value") > 100)

# Sink to CSV format
lf.sink_csv("output.csv")

Partitioned Output

import polars as pl

lf = pl.scan_parquet("large_data/*.parquet").filter(pl.col("value") > 100)

# Partitioned output with max rows per file
lf.sink_parquet(pl.PartitionBy("output/", max_rows_per_file=1_000_000))

Multiplexed Sinks (Write to Multiple Outputs)

import polars as pl

lf = pl.scan_parquet("large_data/*.parquet").filter(pl.col("value") > 100)

# Create lazy sinks -- no execution yet
q1 = lf.sink_parquet("output.parquet", lazy=True)
q2 = lf.sink_ipc("output.ipc", lazy=True)

# Execute both sinks from a single read of the source data
pl.collect_all([q1, q2])

End-to-End ETL Pipeline

import polars as pl

# Full out-of-core ETL: scan -> transform -> sink
(
    pl.scan_csv("raw_data/*.csv")
    .filter(pl.col("status") == "active")
    .with_columns(
        pl.col("created_at").str.to_datetime().alias("created_at"),
        (pl.col("price") * pl.col("quantity")).alias("total"),
    )
    .group_by("region", "product_category")
    .agg(
        pl.col("total").sum().alias("revenue"),
        pl.col("order_id").count().alias("order_count"),
    )
    .sink_parquet("aggregated_output.parquet")
)

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment