Implementation:Lance format Lance AccumulationQueue
| Knowledge Sources | |
|---|---|
| Domains | Encoding, Infrastructure |
| Last Updated | 2026-02-08 19:33 GMT |
Overview
Description
The AccumulationQueue is a buffering utility in the lance-encoding crate that accumulates Arrow arrays until a byte threshold is reached, then flushes the batch for encoding. This enables the encoding pipeline to collect enough data to produce efficient encoded pages without requiring the caller to manage batching manually.
Key behaviors:
- Arrays are buffered until their cumulative
get_array_memory_size()exceedscache_bytes - When the threshold is exceeded, all buffered arrays are flushed together as a batch along with the starting row number and total row count
- Arrays below the threshold are deep-copied (via
deep_copy_array) to release references to potentially large upstream buffers, unlesskeep_original_arrayis set - Arrays that trigger the flush are not deep-copied since they are returned immediately
- A
flush()method drains any remaining buffered data at end-of-stream - Debug/trace logging reports accumulation progress and flush events per column
Usage
The AccumulationQueue is used internally by the Lance encoding pipeline to batch column data before encoding. Each column in the encoding process typically has its own queue instance, identified by column_index.
Code Reference
Source Location
rust/lance-encoding/src/utils/accumulation.rs
Signature
#[derive(Debug)]
pub struct AccumulationQueue {
cache_bytes: u64,
keep_original_array: bool,
buffered_arrays: Vec<ArrayRef>,
current_bytes: u64,
row_number: u64,
num_rows: u64,
column_index: u32,
}
impl AccumulationQueue {
pub fn new(cache_bytes: u64, column_index: u32, keep_original_array: bool) -> Self;
pub fn insert(
&mut self,
array: ArrayRef,
row_number: u64,
num_rows: u64,
) -> Option<(Vec<ArrayRef>, u64, u64)>;
pub fn flush(&mut self) -> Option<(Vec<ArrayRef>, u64, u64)>;
}
Import
use lance_encoding::utils::accumulation::AccumulationQueue;
I/O Contract
Inputs
| Parameter | Type | Description |
|---|---|---|
| cache_bytes | u64 |
Byte threshold that triggers a flush when exceeded |
| column_index | u32 |
Column index for logging/debugging purposes |
| keep_original_array | bool |
If true, does not deep-copy arrays during accumulation (saves CPU at cost of memory) |
| array | ArrayRef |
An Arrow array to buffer |
| row_number | u64 |
The global row number of the first element in the array |
| num_rows | u64 |
The number of top-level rows represented by this array |
Outputs
| Type | Description |
|---|---|
Option<(Vec<ArrayRef>, u64, u64)> |
On flush: the buffered arrays, starting row number, and total row count. None if still accumulating or buffer is empty.
|
Usage Examples
use lance_encoding::utils::accumulation::AccumulationQueue;
use arrow_array::{Int32Array, ArrayRef};
use std::sync::Arc;
let mut queue = AccumulationQueue::new(
1024 * 1024, // 1MB threshold
0, // column index
false, // deep-copy during accumulation
);
// Insert arrays until threshold is reached
let arr: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
if let Some((arrays, row_num, num_rows)) = queue.insert(arr, 0, 3) {
// Threshold exceeded, encode the flushed arrays
encode_page(arrays, row_num, num_rows);
}
// At end of stream, flush remaining data
if let Some((arrays, row_num, num_rows)) = queue.flush() {
encode_page(arrays, row_num, num_rows);
}
Related Pages
- Lance_format_Lance_BytePack -- Another encoding utility in the same module