Implementation:Lance format Lance BatchDecodeStream
| Knowledge Sources | |
|---|---|
| Domains | Encoding, Columnar_Data |
| Last Updated | 2026-02-08 19:33 GMT |
Overview
BatchDecodeStream and its companion DecodeBatchScheduler implement Lance's two-phase decoding architecture where I/O scheduling and CPU decoding run as separate concurrent tasks to maximize throughput.
Description
Reading Lance files involves two distinct phases:
1. Scheduling Phase (DecodeBatchScheduler):
The scheduler traverses the file metadata and issues I/O requests for page data. It works through the file from start to end, scheduling pages in row-major order. For a file with columns of varying page sizes, it schedules all pages needed for each row range before moving to the next range. The scheduler creates a decoder for each page and sends it through a channel to the decode stream.
Key types in the scheduling phase:
PageInfo-- Metadata for a single page (number of rows, encoding descriptor, buffer offsets)ColumnInfo-- Metadata for a column (page infos, column-level buffer offsets, column encoding)PageEncoding-- EitherLegacy(2.0 format) orStructural(2.1+ format)ColumnInfoIter-- Iterator over column infos that tracks position across top-level fieldsCoreFieldDecoderStrategy-- Maps field types to appropriate field schedulers and decompressors
2. Decoding Phase (BatchDecodeStream):
The decode stream reads decoders from the channel. Once enough loaded decoders are available to fill a batch, it spawns a "decode batch task" that performs the actual CPU work (decompression, arithmetic, type restoration). The batch size is configurable and independent of page sizes, keeping compute work decoupled from I/O.
Architecture Diagram (from source):
I/O PARALLELISM
Issues
Requests +------------------+
| | Wait for
+----------+ I/O Service +-----> Enough I/O <--+
| | | For batch |
| +------------------+ | |
| | |
+--------------------+-+ +---------+-------+-+
| | | |Poll
| Batch Decode | Decode tasks sent via channel| Batch Decode |
| Scheduler +----------------------------->+ Stream <----
| | | |
+------+----------+---+ +---------+---------+
| | |
| | +--------+--------+
+----------+ Buffer polling | |
Caller of schedule_range to achieve CPU | Decode Batch +---->
will be scheduler thread parallelism | Task |
and schedule one decode (thread per | |
task per logical page batch) +-----------------+
Batch Size Guidance:
- Batches should fit in CPU cache (at least L3)
- More batches means more parallelism opportunity
- A warning is logged if batch data exceeds 10 MiB
Usage
Use the decoder module when:
- Reading Lance files and converting encoded pages back into Arrow
RecordBatchobjects - Implementing custom field schedulers for new data types
- Integrating Lance decoding into a query engine (e.g., DataFusion)
Code Reference
| Source Location | rust/lance-encoding/src/decoder.rs
|
|---|---|
| Key Structs | DecodeBatchScheduler, PageInfo, ColumnInfo, PageEncoding, CoreFieldDecoderStrategy
|
| Key Functions | create_decode_stream()
|
| Import | use lance_encoding::decoder::{DecodeBatchScheduler, ColumnInfo, PageInfo, FilterExpression};
|
I/O Contract
DecodeBatchScheduler::try_new:
| Parameter | Type | Description |
|---|---|---|
schema |
&Schema |
The Lance schema describing the fields to decode |
column_indices |
&[u32] |
Indices of columns to read |
column_infos |
&[Arc<ColumnInfo>] |
Metadata for each column |
file_buffer_positions_and_sizes |
&[(u64, u64)] |
Global file-level buffer positions |
num_rows |
u64 |
Total rows in the file |
decoder_plugins |
Arc<DecoderPlugins> |
Extension decoders |
io |
Arc<dyn EncodingsIo> |
The I/O service to submit requests to |
cache |
Arc<LanceCache> |
Page cache |
filter |
&FilterExpression |
Row filter expression |
config |
&DecoderConfig |
Decoder configuration |
Output:
The BatchDecodeStream yields items containing a RecordBatch future that resolves to a decoded batch of Arrow data.
Usage Examples
use lance_encoding::decoder::{
DecodeBatchScheduler, ColumnInfo, PageInfo, FilterExpression, DecoderConfig,
create_decode_stream,
};
use std::sync::Arc;
// Typical decode flow (simplified):
// 1. Create scheduler from file metadata
// let scheduler = DecodeBatchScheduler::try_new(
// &schema, &column_indices, &column_infos,
// &file_buffers, num_rows, plugins, io, cache,
// &FilterExpression::no_filter(), &DecoderConfig::default(),
// ).await?;
// 2. Schedule a range of rows
// let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
// scheduler.schedule_range(0..num_rows, &tx).await?;
// 3. Create the decode stream
// let mut stream = create_decode_stream(
// &lance_schema, num_rows, batch_size,
// is_structural, should_validate, rx,
// )?;
// 4. Consume decoded batches
// while let Some(batch) = stream.next().await {
// let record_batch = batch.task.await?;
// // process record_batch
// }
Related Pages
- Lance_format_Lance_Data_Scanning_And_Reading - Higher-level scanning that uses
DecodeBatchSchedulerandBatchDecodeStream - Lance_format_Lance_EncodingsIo - The
EncodingsIotrait that the scheduler uses for I/O - Lance_format_Lance_CoreEncoder - The write-side counterpart that produces the pages this module decodes
- Lance_format_Lance_EncodingFormat - Protobuf definitions that describe page and column encodings
- Lance_format_Lance_Compression_Traits -
DecompressionStrategyused during the decode phase - Lance_format_Lance_RepDef -
CompositeRepDefUnravelerused to reconstruct validity and offsets during decoding