Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Lance format Lance Chunker

From Leeroopedia
Revision as of 15:26, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/Lance_format_Lance_Chunker.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Knowledge Sources
Domains DataFusion_Integration, Query_Execution
Last Updated 2026-02-08 19:33 GMT

Overview

The Chunker module provides utilities for splitting and reshaping streams of Arrow RecordBatches into fixed-size chunks for batch processing.

Description

This module contains several strategies for re-batching a SendableRecordBatchStream into chunks of a desired row count. The main components are:

  • BatchReaderChunker -- An internal struct that wraps a stream and buffers incoming batches, yielding zero-copy sliced Vec<RecordBatch> chunks of a target size. It uses a VecDeque buffer and tracks the current offset within the front batch to avoid copying data.
  • break_stream -- A public function that inserts batch boundaries at regular row intervals. It does not combine batches; it only splits them so that a new batch is emitted every max_chunk_size rows. For example, input lengths [3, 5, 8, 3, 5] with a break point of 10 become [3, 5, 2, 6, 3, 1, 4].
  • chunk_stream -- A public function that groups stream output into Vec<RecordBatch> chunks whose total row count equals the target chunk_size. This is zero-copy; batches are sliced but not concatenated.
  • chunk_concat_stream -- A public function that builds on chunk_stream but concatenates the batch slices into a single RecordBatch per chunk, which forces a data copy. Useful when downstream consumers require exactly one batch per chunk.
  • StrictBatchSizeStream -- A public struct that wraps a stream and enforces exact output batch sizes by splitting and merging incoming batches. It maintains a residual buffer to carry over partial data across poll boundaries.

Usage

Use these utilities when you need precise control over batch sizing in DataFusion execution pipelines, such as:

  • Writing data in fixed-size pages for index construction (e.g., BTree index)
  • Feeding ML inference pipelines that require exact batch dimensions
  • Controlling memory usage by capping batch row counts during scan operations

Code Reference

Source Location

rust/lance-datafusion/src/chunker.rs

Signature

pub fn break_stream(
    stream: SendableRecordBatchStream,
    max_chunk_size: usize,
) -> Pin<Box<dyn Stream<Item = Result<RecordBatch>> + Send>>

pub fn chunk_stream(
    stream: SendableRecordBatchStream,
    chunk_size: usize,
) -> Pin<Box<dyn Stream<Item = Result<Vec<RecordBatch>>> + Send>>

pub fn chunk_concat_stream(
    stream: SendableRecordBatchStream,
    chunk_size: usize,
) -> SendableRecordBatchStream

pub struct StrictBatchSizeStream<S> {
    inner: S,
    batch_size: usize,
    residual: Option<RecordBatch>,
}

Import

use lance_datafusion::chunker::{break_stream, chunk_stream, chunk_concat_stream, StrictBatchSizeStream};

I/O Contract

Input Type Description
stream SendableRecordBatchStream The input stream of Arrow RecordBatches to be re-chunked
chunk_size / max_chunk_size usize The target number of rows per output chunk
Output Type Description
break_stream Pin<Box<dyn Stream<Item = Result<RecordBatch>>>> A stream of RecordBatches with boundaries at every max_chunk_size rows
chunk_stream Pin<Box<dyn Stream<Item = Result<Vec<RecordBatch>>>>> A stream of zero-copy batch vectors totalling chunk_size rows
chunk_concat_stream SendableRecordBatchStream A stream of concatenated single RecordBatches of chunk_size rows

Usage Examples

use lance_datafusion::chunker::{chunk_concat_stream, break_stream};

// Re-chunk a stream into batches of exactly 1024 rows
let resized_stream = chunk_concat_stream(input_stream, 1024);

// Insert batch boundaries every 4096 rows without combining
let broken = break_stream(input_stream, 4096);

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment