Implementation:Treeverse LakeFS External Scheduler Configuration
| Knowledge Sources | |
|---|---|
| Domains | Storage_Management, REST_API |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
External scheduler configuration defines the orchestration patterns and tools used to automate the lakeFS garbage collection pipeline on a recurring schedule, using systems external to lakeFS such as cron, Apache Airflow, AWS Step Functions, or Kubernetes CronJobs.
Description
lakeFS does not include a built-in job scheduler for garbage collection. Instead, operators configure an external orchestration tool to periodically execute the full GC pipeline:
- (Optional) Update retention rules via
setGCRules - Prepare GC metadata via
prepareGarbageCollectionCommits - Run the Spark GC job via
spark-submit(typically in Docker) - Verify results via
listObjectswith presigned URL checks
Each orchestration tool has different strengths in terms of retry logic, dependency management, monitoring, and alerting. This page documents configuration patterns for the most commonly used tools.
Usage
Use this pattern when:
- Automating GC for one or more lakeFS repositories in production
- Integrating GC into an existing data engineering workflow orchestration platform
- Setting up GC with monitoring, alerting, and retry capabilities
Code Reference
Source Location
- Source: N/A — external to lakeFS codebase
- Tools: cron, Apache Airflow, AWS Step Functions, Kubernetes CronJob
- Pattern: Schedule the full GC pipeline as a periodic job, calling lakeFS APIs and running the Spark GC job in sequence
Signature
# Conceptual pipeline definition
GC_Pipeline:
steps:
- name: prepare_metadata
api: POST /api/v1/repositories/{repository}/gc/prepare_commits
output: run_id
- name: run_spark_gc
command: spark-submit --class io.treeverse.gc.GarbageCollection ...
input: run_id
- name: verify_results
api: GET /api/v1/repositories/{repository}/refs/{ref}/objects/ls?presign=true
check: presigned URL returns 404 for expired objects
schedule: "0 2 * * 0" # Weekly at 2 AM on Sunday
Import
# For cron-based scheduling, no special import is needed
# For Airflow, install the apache-airflow package
pip install apache-airflow
# For Kubernetes CronJob, ensure kubectl is configured
kubectl version --client
I/O Contract
Inputs
| Parameter | Type | Required | Description |
|---|---|---|---|
repository |
string | Yes | The lakeFS repository name to run GC against |
lakefs_url |
string | Yes | lakeFS API base URL (e.g., http://localhost:8000/api/v1)
|
access_key |
string | Yes | lakeFS access key ID |
secret_key |
string | Yes | lakeFS secret access key |
spark_image |
string | Yes | Docker image for Spark execution (e.g., treeverse/bitnami-spark:3.3)
|
schedule |
string | Yes | Cron expression or equivalent schedule definition |
Outputs
| Output | Description |
|---|---|
| Pipeline success | All steps completed: metadata prepared, Spark job finished, verification passed |
| Pipeline failure | One or more steps failed; alert triggered; retry scheduled (if configured) |
| Logs | Per-step execution logs for debugging and auditing |
Usage Examples
Crontab Configuration
#!/bin/bash
# /opt/lakefs/gc_pipeline.sh
# Full GC pipeline script for cron execution
set -euo pipefail
LAKEFS_URL="http://localhost:8000/api/v1"
REPO="my-repo"
ACCESS_KEY="AKIAIOSFODNN7EXAMPLE"
SECRET_KEY="wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
SPARK_IMAGE="treeverse/bitnami-spark:3.3"
LOG_FILE="/var/log/lakefs-gc/gc_$(date +%Y%m%d_%H%M%S).log"
echo "[$(date)] Starting GC pipeline for repository: $REPO" | tee -a "$LOG_FILE"
# Step 1: Prepare GC metadata
echo "[$(date)] Step 1: Preparing GC metadata..." | tee -a "$LOG_FILE"
PREPARE_RESPONSE=$(curl -s -X POST \
"${LAKEFS_URL}/repositories/${REPO}/gc/prepare_commits" \
-u "${ACCESS_KEY}:${SECRET_KEY}")
RUN_ID=$(echo "$PREPARE_RESPONSE" | jq -r '.run_id')
echo "[$(date)] Preparation complete. Run ID: $RUN_ID" | tee -a "$LOG_FILE"
# Step 2: Run Spark GC job
echo "[$(date)] Step 2: Running Spark GC job..." | tee -a "$LOG_FILE"
docker run --rm \
--network host \
"$SPARK_IMAGE" \
spark-submit \
--class io.treeverse.gc.GarbageCollection \
--master "local[*]" \
--conf "spark.hadoop.lakefs.api.url=${LAKEFS_URL}" \
--conf "spark.hadoop.lakefs.api.access_key=${ACCESS_KEY}" \
--conf "spark.hadoop.lakefs.api.secret_key=${SECRET_KEY}" \
/opt/metaclient/client.jar \
"$REPO" \
"$RUN_ID" \
2>&1 | tee -a "$LOG_FILE"
echo "[$(date)] GC pipeline complete." | tee -a "$LOG_FILE"
# Crontab entry: Run GC every Sunday at 2:00 AM
# crontab -e
0 2 * * 0 /opt/lakefs/gc_pipeline.sh >> /var/log/lakefs-gc/cron.log 2>&1
Apache Airflow DAG
"""
Apache Airflow DAG for lakeFS Garbage Collection Pipeline.
Runs weekly on Sunday at 02:00 UTC.
"""
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.models import Variable
import requests
import json
LAKEFS_URL = Variable.get("lakefs_url", default_var="http://localhost:8000/api/v1")
LAKEFS_ACCESS_KEY = Variable.get("lakefs_access_key")
LAKEFS_SECRET_KEY = Variable.get("lakefs_secret_key")
REPOSITORY = Variable.get("lakefs_gc_repository", default_var="my-repo")
SPARK_IMAGE = Variable.get("lakefs_spark_image", default_var="treeverse/bitnami-spark:3.3")
default_args = {
"owner": "data-engineering",
"depends_on_past": False,
"email_on_failure": True,
"email": ["data-team@example.com"],
"retries": 2,
"retry_delay": timedelta(minutes=10),
}
def prepare_gc_metadata(**context):
"""Call prepareGarbageCollectionCommits and push run_id to XCom."""
response = requests.post(
f"{LAKEFS_URL}/repositories/{REPOSITORY}/gc/prepare_commits",
auth=(LAKEFS_ACCESS_KEY, LAKEFS_SECRET_KEY),
)
response.raise_for_status()
result = response.json()
run_id = result["run_id"]
context["ti"].xcom_push(key="gc_run_id", value=run_id)
context["ti"].xcom_push(key="gc_commits_location", value=result["gc_commits_location"])
print(f"GC metadata prepared. Run ID: {run_id}")
return run_id
def verify_gc_results(**context):
"""Verify that GC deleted expected objects by checking presigned URLs."""
response = requests.get(
f"{LAKEFS_URL}/repositories/{REPOSITORY}/refs/main/objects/ls",
params={"presign": "true", "amount": 100},
auth=(LAKEFS_ACCESS_KEY, LAKEFS_SECRET_KEY),
)
response.raise_for_status()
objects = response.json().get("results", [])
for obj in objects:
url = obj.get("physical_address", "")
if url:
check = requests.get(url)
print(f"{obj['path']}: HTTP {check.status_code}")
with DAG(
dag_id="lakefs_garbage_collection",
default_args=default_args,
description="lakeFS Garbage Collection Pipeline",
schedule_interval="0 2 * * 0", # Weekly on Sunday at 2 AM UTC
start_date=datetime(2026, 1, 1),
catchup=False,
tags=["lakefs", "gc", "storage-management"],
) as dag:
prepare_metadata = PythonOperator(
task_id="prepare_gc_metadata",
python_callable=prepare_gc_metadata,
)
run_spark_gc = BashOperator(
task_id="run_spark_gc",
bash_command="""
RUN_ID={{ ti.xcom_pull(task_ids='prepare_gc_metadata', key='gc_run_id') }}
docker run --rm \
--network host \
{{ var.value.lakefs_spark_image }} \
spark-submit \
--class io.treeverse.gc.GarbageCollection \
--master "local[*]" \
--conf "spark.hadoop.lakefs.api.url={{ var.value.lakefs_url }}" \
--conf "spark.hadoop.lakefs.api.access_key={{ var.value.lakefs_access_key }}" \
--conf "spark.hadoop.lakefs.api.secret_key={{ var.value.lakefs_secret_key }}" \
/opt/metaclient/client.jar \
{{ var.value.lakefs_gc_repository }} \
"$RUN_ID"
""",
)
verify_results = PythonOperator(
task_id="verify_gc_results",
python_callable=verify_gc_results,
)
prepare_metadata >> run_spark_gc >> verify_results
Kubernetes CronJob
apiVersion: batch/v1
kind: CronJob
metadata:
name: lakefs-gc-pipeline
namespace: data-engineering
spec:
schedule: "0 2 * * 0" # Weekly on Sunday at 2 AM
concurrencyPolicy: Forbid
successfulJobsHistoryLimit: 5
failedJobsHistoryLimit: 3
jobTemplate:
spec:
backoffLimit: 2
template:
spec:
restartPolicy: OnFailure
containers:
- name: gc-pipeline
image: treeverse/bitnami-spark:3.3
command:
- /bin/bash
- -c
- |
# Prepare GC metadata
RESPONSE=$(curl -s -X POST \
"${LAKEFS_URL}/repositories/${REPOSITORY}/gc/prepare_commits" \
-u "${LAKEFS_ACCESS_KEY}:${LAKEFS_SECRET_KEY}")
RUN_ID=$(echo "$RESPONSE" | jq -r '.run_id')
echo "Run ID: $RUN_ID"
# Run Spark GC
spark-submit \
--class io.treeverse.gc.GarbageCollection \
--master "local[*]" \
--conf "spark.hadoop.lakefs.api.url=${LAKEFS_URL}" \
--conf "spark.hadoop.lakefs.api.access_key=${LAKEFS_ACCESS_KEY}" \
--conf "spark.hadoop.lakefs.api.secret_key=${LAKEFS_SECRET_KEY}" \
/opt/metaclient/client.jar \
"${REPOSITORY}" \
"$RUN_ID"
env:
- name: LAKEFS_URL
valueFrom:
secretKeyRef:
name: lakefs-credentials
key: api-url
- name: LAKEFS_ACCESS_KEY
valueFrom:
secretKeyRef:
name: lakefs-credentials
key: access-key
- name: LAKEFS_SECRET_KEY
valueFrom:
secretKeyRef:
name: lakefs-credentials
key: secret-key
- name: REPOSITORY
value: "my-repo"
resources:
requests:
memory: "4Gi"
cpu: "2"
limits:
memory: "8Gi"
cpu: "4"
Related Pages
Implements Principle