Implementation:Lance format Lance CompactionTask Execute
| Knowledge Sources | |
|---|---|
| Domains | Data_Engineering, Storage_Optimization |
| Last Updated | 2026-02-08 19:00 GMT |
Overview
Concrete tool for executing a single compaction task that rewrites one or more fragments into new, optimally-sized fragments, provided by the Lance library.
Description
CompactionTask::execute is the entry point for running a compaction task. It first ensures the dataset is checked out to the correct read version (the version from which compaction was planned). It then delegates to the internal rewrite_files function, which:
- Checks whether the binary copy optimization is eligible for the input fragments.
- If binary copy is eligible, copies data pages verbatim from input to output files without re-encoding.
- If binary copy is not eligible, creates a scanner over the input fragments, captures row IDs, and writes the scanned data to new fragment files.
- Reserves new fragment IDs through a
ReserveFragmentscommit to avoid collisions. - Builds a row ID mapping (old row addresses to new row addresses) for subsequent index remapping, or serializes changed row addresses if index remap is deferred.
- Computes
CompactionMetricssummarizing the operation.
Usage
Use CompactionTask::execute in the middle step of a distributed compaction workflow. After receiving a serialized task from the coordinator, each worker deserializes it, opens the dataset at the appropriate version, and calls execute(). The resulting RewriteResult is sent back to the coordinator for the commit phase.
Code Reference
Source Location
- Repository: Lance
- File:
rust/lance/src/dataset/optimize.rs - Lines: L727-L731 (CompactionTask struct), L742-L749 (execute method), L917-L1131 (rewrite_files)
Signature
pub struct CompactionTask {
pub task: TaskData,
pub read_version: u64,
pub options: CompactionOptions,
}
impl CompactionTask {
pub async fn execute(&self, dataset: &Dataset) -> Result<RewriteResult>
}
Import
use lance::dataset::optimize::{CompactionTask, RewriteResult};
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| self | &CompactionTask | Yes | The compaction task containing the fragments to rewrite, the read version, and compaction options. |
| dataset | &Dataset | Yes | Reference to the Lance dataset. If the dataset version does not match read_version, the task will check out the correct version automatically.
|
CompactionTask fields:
| Field | Type | Description |
|---|---|---|
| task | TaskData | Contains fragments: Vec<Fragment>, the list of fragments to compact.
|
| read_version | u64 | The dataset version from which compaction was planned. |
| options | CompactionOptions | The compaction configuration (target rows, binary copy settings, etc.). |
Outputs
| Name | Type | Description |
|---|---|---|
| RewriteResult | struct | The result of the rewrite operation, containing the fields below. |
RewriteResult fields:
| Field | Type | Description |
|---|---|---|
| metrics | CompactionMetrics | Counts of fragments/files added and removed. |
| new_fragments | Vec<Fragment> | The newly written fragment metadata. |
| read_version | u64 | The dataset version that was read during execution. |
| original_fragments | Vec<Fragment> | The original input fragments that were rewritten. |
| row_id_map | Option<HashMap<u64, Option<u64>>> | Mapping from old row IDs to new row IDs (or None if the row was deleted). Set when index remap is not deferred. |
| changed_row_addrs | Option<Vec<u8>> | Serialized RoaringTreemap of changed row addresses. Set when index remap is deferred.
|
Usage Examples
use lance::Dataset;
use lance::dataset::optimize::{plan_compaction, CompactionOptions};
async fn distributed_compaction(dataset: &Dataset) -> lance::Result<()> {
let options = CompactionOptions::default();
let plan = plan_compaction(dataset, &options).await?;
let mut results = Vec::new();
for task in plan.compaction_tasks() {
// In a real system, serialize `task` and send to a worker.
// On the worker:
let result = task.execute(dataset).await?;
println!(
"Task rewrote {} fragments into {} new fragments",
result.metrics.fragments_removed,
result.metrics.fragments_added
);
results.push(result);
}
// results are then passed to commit_compaction()
Ok(())
}