Implementation:Lance format Lance Commit Compaction
| Knowledge Sources | |
|---|---|
| Domains | Data_Engineering, Storage_Optimization |
| Last Updated | 2026-02-08 19:00 GMT |
Overview
Concrete tool for committing the results of compaction tasks to a Lance dataset, atomically replacing old fragments with new ones, provided by the Lance library.
Description
commit_compaction takes a vector of RewriteResult objects produced by compaction task execution and commits them as a single Operation::Rewrite transaction. The function:
- Returns early with empty metrics if no tasks are provided.
- Iterates all completed tasks, building
RewriteGroupstructures and accumulatingCompactionMetrics. - If the dataset does not use stable row IDs and index remap is not deferred, aggregates the row ID map from all tasks and passes it to the
IndexRemapperto produceRewrittenIndexentries. - If
defer_index_remapis enabled, collectsFragReuseGroupentries and builds a fragment reuse index for later remapping. - If neither remapping nor deferral is needed (stable row IDs), reserves fragment IDs for all new fragments.
- Creates and applies a
TransactionwithOperation::Rewrite, producing a new dataset version.
The convenience function compact_files at L537-L545 combines all three phases (plan, execute, commit) into a single call.
Usage
Call commit_compaction after collecting RewriteResult objects from distributed compaction workers. Pass all successfully completed results; failed tasks can be omitted safely.
Code Reference
Source Location
- Repository: Lance
- File:
rust/lance/src/dataset/optimize.rs - Lines: L1299-L1394
Signature
pub async fn commit_compaction(
dataset: &mut Dataset,
completed_tasks: Vec<RewriteResult>,
remap_options: Arc<dyn IndexRemapperOptions>,
options: &CompactionOptions,
) -> Result<CompactionMetrics>
Import
use lance::dataset::optimize::{commit_compaction, CompactionMetrics, CompactionOptions, RewriteResult};
use lance::dataset::optimize::remapping::IndexRemapperOptions;
use std::sync::Arc;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| dataset | &mut Dataset | Yes | Mutable reference to the dataset. Will be updated to the new version after commit. |
| completed_tasks | Vec<RewriteResult> | Yes | The results from executing compaction tasks. May be a subset of all planned tasks. |
| remap_options | Arc<dyn IndexRemapperOptions> | Yes | Factory for creating an IndexRemapper. Use Arc::new(IgnoreRemap {}) to skip index remapping.
|
| options | &CompactionOptions | Yes | The compaction options (same as used during planning and execution). Used to determine whether index remap is deferred. |
Outputs
| Name | Type | Description |
|---|---|---|
| CompactionMetrics | struct | Aggregated metrics from all committed tasks. |
CompactionMetrics fields:
| Field | Type | Description |
|---|---|---|
| fragments_removed | usize | Number of old fragments that were replaced. |
| fragments_added | usize | Number of new fragments that were written. |
| files_removed | usize | Number of old files removed (data files + deletion files). |
| files_added | usize | Number of new files added. |
Usage Examples
use std::sync::Arc;
use lance::Dataset;
use lance::dataset::optimize::{
plan_compaction, commit_compaction, CompactionOptions, IgnoreRemap,
};
async fn full_compaction_workflow(dataset: &mut Dataset) -> lance::Result<()> {
let options = CompactionOptions::default();
let plan = plan_compaction(dataset, &options).await?;
// Execute all tasks (could be distributed)
let mut results = Vec::new();
for task in plan.compaction_tasks() {
let result = task.execute(dataset).await?;
results.push(result);
}
// Commit all results at once
let metrics = commit_compaction(
dataset,
results,
Arc::new(IgnoreRemap {}),
&options,
).await?;
println!(
"Compaction complete: removed {} fragments, added {}",
metrics.fragments_removed,
metrics.fragments_added,
);
Ok(())
}
// Or use the convenience function:
use lance::dataset::optimize::compact_files;
async fn simple_compaction(dataset: &mut Dataset) -> lance::Result<()> {
let metrics = compact_files(dataset, Default::default(), None).await?;
println!("Removed {} fragments", metrics.fragments_removed);
Ok(())
}