Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Lance format Lance CompactionTask Execute

From Leeroopedia


Knowledge Sources
Domains Data_Engineering, Storage_Optimization
Last Updated 2026-02-08 19:00 GMT

Overview

Concrete tool for executing a single compaction task that rewrites one or more fragments into new, optimally-sized fragments, provided by the Lance library.

Description

CompactionTask::execute is the entry point for running a compaction task. It first ensures the dataset is checked out to the correct read version (the version from which compaction was planned). It then delegates to the internal rewrite_files function, which:

  1. Checks whether the binary copy optimization is eligible for the input fragments.
  2. If binary copy is eligible, copies data pages verbatim from input to output files without re-encoding.
  3. If binary copy is not eligible, creates a scanner over the input fragments, captures row IDs, and writes the scanned data to new fragment files.
  4. Reserves new fragment IDs through a ReserveFragments commit to avoid collisions.
  5. Builds a row ID mapping (old row addresses to new row addresses) for subsequent index remapping, or serializes changed row addresses if index remap is deferred.
  6. Computes CompactionMetrics summarizing the operation.

Usage

Use CompactionTask::execute in the middle step of a distributed compaction workflow. After receiving a serialized task from the coordinator, each worker deserializes it, opens the dataset at the appropriate version, and calls execute(). The resulting RewriteResult is sent back to the coordinator for the commit phase.

Code Reference

Source Location

  • Repository: Lance
  • File: rust/lance/src/dataset/optimize.rs
  • Lines: L727-L731 (CompactionTask struct), L742-L749 (execute method), L917-L1131 (rewrite_files)

Signature

pub struct CompactionTask {
    pub task: TaskData,
    pub read_version: u64,
    pub options: CompactionOptions,
}

impl CompactionTask {
    pub async fn execute(&self, dataset: &Dataset) -> Result<RewriteResult>
}

Import

use lance::dataset::optimize::{CompactionTask, RewriteResult};

I/O Contract

Inputs

Name Type Required Description
self &CompactionTask Yes The compaction task containing the fragments to rewrite, the read version, and compaction options.
dataset &Dataset Yes Reference to the Lance dataset. If the dataset version does not match read_version, the task will check out the correct version automatically.

CompactionTask fields:

Field Type Description
task TaskData Contains fragments: Vec<Fragment>, the list of fragments to compact.
read_version u64 The dataset version from which compaction was planned.
options CompactionOptions The compaction configuration (target rows, binary copy settings, etc.).

Outputs

Name Type Description
RewriteResult struct The result of the rewrite operation, containing the fields below.

RewriteResult fields:

Field Type Description
metrics CompactionMetrics Counts of fragments/files added and removed.
new_fragments Vec<Fragment> The newly written fragment metadata.
read_version u64 The dataset version that was read during execution.
original_fragments Vec<Fragment> The original input fragments that were rewritten.
row_id_map Option<HashMap<u64, Option<u64>>> Mapping from old row IDs to new row IDs (or None if the row was deleted). Set when index remap is not deferred.
changed_row_addrs Option<Vec<u8>> Serialized RoaringTreemap of changed row addresses. Set when index remap is deferred.

Usage Examples

use lance::Dataset;
use lance::dataset::optimize::{plan_compaction, CompactionOptions};

async fn distributed_compaction(dataset: &Dataset) -> lance::Result<()> {
    let options = CompactionOptions::default();
    let plan = plan_compaction(dataset, &options).await?;

    let mut results = Vec::new();
    for task in plan.compaction_tasks() {
        // In a real system, serialize `task` and send to a worker.
        // On the worker:
        let result = task.execute(dataset).await?;
        println!(
            "Task rewrote {} fragments into {} new fragments",
            result.metrics.fragments_removed,
            result.metrics.fragments_added
        );
        results.push(result);
    }

    // results are then passed to commit_compaction()
    Ok(())
}

Related Pages

Implements Principle

Requires Environment

Uses Heuristic

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment