Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Lance format Lance AccumulationQueue

From Leeroopedia


Knowledge Sources
Domains Encoding, Infrastructure
Last Updated 2026-02-08 19:33 GMT

Overview

Description

The AccumulationQueue is a buffering utility in the lance-encoding crate that accumulates Arrow arrays until a byte threshold is reached, then flushes the batch for encoding. This enables the encoding pipeline to collect enough data to produce efficient encoded pages without requiring the caller to manage batching manually.

Key behaviors:

  • Arrays are buffered until their cumulative get_array_memory_size() exceeds cache_bytes
  • When the threshold is exceeded, all buffered arrays are flushed together as a batch along with the starting row number and total row count
  • Arrays below the threshold are deep-copied (via deep_copy_array) to release references to potentially large upstream buffers, unless keep_original_array is set
  • Arrays that trigger the flush are not deep-copied since they are returned immediately
  • A flush() method drains any remaining buffered data at end-of-stream
  • Debug/trace logging reports accumulation progress and flush events per column

Usage

The AccumulationQueue is used internally by the Lance encoding pipeline to batch column data before encoding. Each column in the encoding process typically has its own queue instance, identified by column_index.

Code Reference

Source Location

rust/lance-encoding/src/utils/accumulation.rs

Signature

#[derive(Debug)]
pub struct AccumulationQueue {
    cache_bytes: u64,
    keep_original_array: bool,
    buffered_arrays: Vec<ArrayRef>,
    current_bytes: u64,
    row_number: u64,
    num_rows: u64,
    column_index: u32,
}

impl AccumulationQueue {
    pub fn new(cache_bytes: u64, column_index: u32, keep_original_array: bool) -> Self;

    pub fn insert(
        &mut self,
        array: ArrayRef,
        row_number: u64,
        num_rows: u64,
    ) -> Option<(Vec<ArrayRef>, u64, u64)>;

    pub fn flush(&mut self) -> Option<(Vec<ArrayRef>, u64, u64)>;
}

Import

use lance_encoding::utils::accumulation::AccumulationQueue;

I/O Contract

Inputs

Parameter Type Description
cache_bytes u64 Byte threshold that triggers a flush when exceeded
column_index u32 Column index for logging/debugging purposes
keep_original_array bool If true, does not deep-copy arrays during accumulation (saves CPU at cost of memory)
array ArrayRef An Arrow array to buffer
row_number u64 The global row number of the first element in the array
num_rows u64 The number of top-level rows represented by this array

Outputs

Type Description
Option<(Vec<ArrayRef>, u64, u64)> On flush: the buffered arrays, starting row number, and total row count. None if still accumulating or buffer is empty.

Usage Examples

use lance_encoding::utils::accumulation::AccumulationQueue;
use arrow_array::{Int32Array, ArrayRef};
use std::sync::Arc;

let mut queue = AccumulationQueue::new(
    1024 * 1024, // 1MB threshold
    0,           // column index
    false,       // deep-copy during accumulation
);

// Insert arrays until threshold is reached
let arr: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
if let Some((arrays, row_num, num_rows)) = queue.insert(arr, 0, 3) {
    // Threshold exceeded, encode the flushed arrays
    encode_page(arrays, row_num, num_rows);
}

// At end of stream, flush remaining data
if let Some((arrays, row_num, num_rows)) = queue.flush() {
    encode_page(arrays, row_num, num_rows);
}

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment