Implementation:Pola rs Polars Sink Operations
| Knowledge Sources | |
|---|---|
| Domains | Data Engineering, Streaming |
| Last Updated | 2026-02-09 10:00 GMT |
Overview
Concrete sink methods on LazyFrame that write streaming query results directly to Parquet, IPC, or CSV files without full in-memory materialization, supporting partitioned output and multiplexed writes.
Description
Polars provides a family of sink_* methods on LazyFrame for writing streaming query results directly to persistent storage. Each method processes the query plan in streaming mode and writes output batches to disk as they are produced, avoiding full materialization of the result in memory.
The sink methods support two modes:
- Immediate execution (default,
lazy=False) -- Executes the query and writes the output, returning aDataFramewith metadata. - Deferred execution (
lazy=True) -- Returns aLazyFramethat can be combined with other lazy sinks viapl.collect_all()for multiplexed writes.
Partitioned output is supported through the pl.PartitionBy helper, which enables Hive-style directory partitioning and row-count-based file splitting.
Usage
Use sink operations whenever you need to:
- Write streaming query results directly to Parquet, IPC, or CSV files.
- Perform end-to-end data transformations without materializing results in memory.
- Produce partitioned datasets for downstream consumption.
- Write a single query result to multiple output formats simultaneously.
Code Reference
Source Location
- Repository: Polars
- File:
docs/source/user-guide/lazy/sources_sinks.md(lines 11-54)
Signature
class LazyFrame:
def sink_parquet(
self,
path: str | Path | PartitionBy,
*,
lazy: bool = False,
) -> DataFrame | LazyFrame: ...
def sink_ipc(
self,
path: str | Path | PartitionBy,
*,
lazy: bool = False,
) -> DataFrame | LazyFrame: ...
def sink_csv(
self,
path: str | Path,
) -> DataFrame: ...
class PartitionBy:
def __init__(
self,
path: str | Path,
*,
by: str | list[str] | None = None,
max_rows_per_file: int | None = None,
): ...
def collect_all(
queries: list[LazyFrame],
) -> list[DataFrame]: ...
Import
import polars as pl
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| path | Path | PartitionBy | Yes | Output file path, or pl.PartitionBy object for partitioned output
|
| lazy | bool |
No | If True, returns a LazyFrame for multiplexed writes via pl.collect_all(). Default is False.
|
| max_rows_per_file (PartitionBy) | int |
No | Maximum number of rows per output file when using partitioned output |
| by (PartitionBy) | list[str] | No | Column(s) for Hive-style partitioning |
Outputs
| Name | Type | Description |
|---|---|---|
| result (lazy=False) | DataFrame |
Metadata about the written output (e.g., row counts) |
| result (lazy=True) | LazyFrame |
A deferred sink that can be collected with pl.collect_all()
|
Usage Examples
Sink Directly to Parquet
import polars as pl
lf = pl.scan_parquet("large_data/*.parquet").filter(pl.col("value") > 100)
# Sink directly to parquet -- no full materialization
lf.sink_parquet("output.parquet")
Sink to CSV
import polars as pl
lf = pl.scan_parquet("large_data/*.parquet").filter(pl.col("value") > 100)
# Sink to CSV format
lf.sink_csv("output.csv")
Partitioned Output
import polars as pl
lf = pl.scan_parquet("large_data/*.parquet").filter(pl.col("value") > 100)
# Partitioned output with max rows per file
lf.sink_parquet(pl.PartitionBy("output/", max_rows_per_file=1_000_000))
Multiplexed Sinks (Write to Multiple Outputs)
import polars as pl
lf = pl.scan_parquet("large_data/*.parquet").filter(pl.col("value") > 100)
# Create lazy sinks -- no execution yet
q1 = lf.sink_parquet("output.parquet", lazy=True)
q2 = lf.sink_ipc("output.ipc", lazy=True)
# Execute both sinks from a single read of the source data
pl.collect_all([q1, q2])
End-to-End ETL Pipeline
import polars as pl
# Full out-of-core ETL: scan -> transform -> sink
(
pl.scan_csv("raw_data/*.csv")
.filter(pl.col("status") == "active")
.with_columns(
pl.col("created_at").str.to_datetime().alias("created_at"),
(pl.col("price") * pl.col("quantity")).alias("total"),
)
.group_by("region", "product_category")
.agg(
pl.col("total").sum().alias("revenue"),
pl.col("order_id").count().alias("order_count"),
)
.sink_parquet("aggregated_output.parquet")
)