Implementation:Lance format Lance Chunker
Appearance
| Knowledge Sources | |
|---|---|
| Domains | DataFusion_Integration, Query_Execution |
| Last Updated | 2026-02-08 19:33 GMT |
Overview
The Chunker module provides utilities for splitting and reshaping streams of Arrow RecordBatches into fixed-size chunks for batch processing.
Description
This module contains several strategies for re-batching a SendableRecordBatchStream into chunks of a desired row count. The main components are:
- BatchReaderChunker -- An internal struct that wraps a stream and buffers incoming batches, yielding zero-copy sliced
Vec<RecordBatch>chunks of a target size. It uses aVecDequebuffer and tracks the current offset within the front batch to avoid copying data. - break_stream -- A public function that inserts batch boundaries at regular row intervals. It does not combine batches; it only splits them so that a new batch is emitted every
max_chunk_sizerows. For example, input lengths[3, 5, 8, 3, 5]with a break point of 10 become[3, 5, 2, 6, 3, 1, 4]. - chunk_stream -- A public function that groups stream output into
Vec<RecordBatch>chunks whose total row count equals the targetchunk_size. This is zero-copy; batches are sliced but not concatenated. - chunk_concat_stream -- A public function that builds on
chunk_streambut concatenates the batch slices into a singleRecordBatchper chunk, which forces a data copy. Useful when downstream consumers require exactly one batch per chunk. - StrictBatchSizeStream -- A public struct that wraps a stream and enforces exact output batch sizes by splitting and merging incoming batches. It maintains a residual buffer to carry over partial data across poll boundaries.
Usage
Use these utilities when you need precise control over batch sizing in DataFusion execution pipelines, such as:
- Writing data in fixed-size pages for index construction (e.g., BTree index)
- Feeding ML inference pipelines that require exact batch dimensions
- Controlling memory usage by capping batch row counts during scan operations
Code Reference
Source Location
rust/lance-datafusion/src/chunker.rs
Signature
pub fn break_stream(
stream: SendableRecordBatchStream,
max_chunk_size: usize,
) -> Pin<Box<dyn Stream<Item = Result<RecordBatch>> + Send>>
pub fn chunk_stream(
stream: SendableRecordBatchStream,
chunk_size: usize,
) -> Pin<Box<dyn Stream<Item = Result<Vec<RecordBatch>>> + Send>>
pub fn chunk_concat_stream(
stream: SendableRecordBatchStream,
chunk_size: usize,
) -> SendableRecordBatchStream
pub struct StrictBatchSizeStream<S> {
inner: S,
batch_size: usize,
residual: Option<RecordBatch>,
}
Import
use lance_datafusion::chunker::{break_stream, chunk_stream, chunk_concat_stream, StrictBatchSizeStream};
I/O Contract
| Input | Type | Description |
|---|---|---|
| stream | SendableRecordBatchStream |
The input stream of Arrow RecordBatches to be re-chunked |
| chunk_size / max_chunk_size | usize |
The target number of rows per output chunk |
| Output | Type | Description |
|---|---|---|
| break_stream | Pin<Box<dyn Stream<Item = Result<RecordBatch>>>> |
A stream of RecordBatches with boundaries at every max_chunk_size rows |
| chunk_stream | Pin<Box<dyn Stream<Item = Result<Vec<RecordBatch>>>>> |
A stream of zero-copy batch vectors totalling chunk_size rows |
| chunk_concat_stream | SendableRecordBatchStream |
A stream of concatenated single RecordBatches of chunk_size rows |
Usage Examples
use lance_datafusion::chunker::{chunk_concat_stream, break_stream};
// Re-chunk a stream into batches of exactly 1024 rows
let resized_stream = chunk_concat_stream(input_stream, 1024);
// Insert batch boundaries every 4096 rows without combining
let broken = break_stream(input_stream, 4096);
Related Pages
- Lance_format_Lance_ExecPlans -- Execution plan infrastructure that consumes chunked streams
- Lance_format_Lance_StreamingWriteSource -- Write source abstraction that may feed into chunkers
Page Connections
Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment