Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Treeverse LakeFS External Scheduler Configuration

From Leeroopedia


Knowledge Sources
Domains Storage_Management, REST_API
Last Updated 2026-02-08 00:00 GMT

Overview

External scheduler configuration defines the orchestration patterns and tools used to automate the lakeFS garbage collection pipeline on a recurring schedule, using systems external to lakeFS such as cron, Apache Airflow, AWS Step Functions, or Kubernetes CronJobs.

Description

lakeFS does not include a built-in job scheduler for garbage collection. Instead, operators configure an external orchestration tool to periodically execute the full GC pipeline:

  1. (Optional) Update retention rules via setGCRules
  2. Prepare GC metadata via prepareGarbageCollectionCommits
  3. Run the Spark GC job via spark-submit (typically in Docker)
  4. Verify results via listObjects with presigned URL checks

Each orchestration tool has different strengths in terms of retry logic, dependency management, monitoring, and alerting. This page documents configuration patterns for the most commonly used tools.

Usage

Use this pattern when:

  • Automating GC for one or more lakeFS repositories in production
  • Integrating GC into an existing data engineering workflow orchestration platform
  • Setting up GC with monitoring, alerting, and retry capabilities

Code Reference

Source Location

  • Source: N/A — external to lakeFS codebase
  • Tools: cron, Apache Airflow, AWS Step Functions, Kubernetes CronJob
  • Pattern: Schedule the full GC pipeline as a periodic job, calling lakeFS APIs and running the Spark GC job in sequence

Signature

# Conceptual pipeline definition
GC_Pipeline:
  steps:
    - name: prepare_metadata
      api: POST /api/v1/repositories/{repository}/gc/prepare_commits
      output: run_id
    - name: run_spark_gc
      command: spark-submit --class io.treeverse.gc.GarbageCollection ...
      input: run_id
    - name: verify_results
      api: GET /api/v1/repositories/{repository}/refs/{ref}/objects/ls?presign=true
      check: presigned URL returns 404 for expired objects
  schedule: "0 2 * * 0"  # Weekly at 2 AM on Sunday

Import

# For cron-based scheduling, no special import is needed
# For Airflow, install the apache-airflow package
pip install apache-airflow

# For Kubernetes CronJob, ensure kubectl is configured
kubectl version --client

I/O Contract

Inputs

Parameter Type Required Description
repository string Yes The lakeFS repository name to run GC against
lakefs_url string Yes lakeFS API base URL (e.g., http://localhost:8000/api/v1)
access_key string Yes lakeFS access key ID
secret_key string Yes lakeFS secret access key
spark_image string Yes Docker image for Spark execution (e.g., treeverse/bitnami-spark:3.3)
schedule string Yes Cron expression or equivalent schedule definition

Outputs

Output Description
Pipeline success All steps completed: metadata prepared, Spark job finished, verification passed
Pipeline failure One or more steps failed; alert triggered; retry scheduled (if configured)
Logs Per-step execution logs for debugging and auditing

Usage Examples

Crontab Configuration

#!/bin/bash
# /opt/lakefs/gc_pipeline.sh
# Full GC pipeline script for cron execution

set -euo pipefail

LAKEFS_URL="http://localhost:8000/api/v1"
REPO="my-repo"
ACCESS_KEY="AKIAIOSFODNN7EXAMPLE"
SECRET_KEY="wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
SPARK_IMAGE="treeverse/bitnami-spark:3.3"
LOG_FILE="/var/log/lakefs-gc/gc_$(date +%Y%m%d_%H%M%S).log"

echo "[$(date)] Starting GC pipeline for repository: $REPO" | tee -a "$LOG_FILE"

# Step 1: Prepare GC metadata
echo "[$(date)] Step 1: Preparing GC metadata..." | tee -a "$LOG_FILE"
PREPARE_RESPONSE=$(curl -s -X POST \
  "${LAKEFS_URL}/repositories/${REPO}/gc/prepare_commits" \
  -u "${ACCESS_KEY}:${SECRET_KEY}")

RUN_ID=$(echo "$PREPARE_RESPONSE" | jq -r '.run_id')
echo "[$(date)] Preparation complete. Run ID: $RUN_ID" | tee -a "$LOG_FILE"

# Step 2: Run Spark GC job
echo "[$(date)] Step 2: Running Spark GC job..." | tee -a "$LOG_FILE"
docker run --rm \
  --network host \
  "$SPARK_IMAGE" \
  spark-submit \
    --class io.treeverse.gc.GarbageCollection \
    --master "local[*]" \
    --conf "spark.hadoop.lakefs.api.url=${LAKEFS_URL}" \
    --conf "spark.hadoop.lakefs.api.access_key=${ACCESS_KEY}" \
    --conf "spark.hadoop.lakefs.api.secret_key=${SECRET_KEY}" \
    /opt/metaclient/client.jar \
    "$REPO" \
    "$RUN_ID" \
  2>&1 | tee -a "$LOG_FILE"

echo "[$(date)] GC pipeline complete." | tee -a "$LOG_FILE"
# Crontab entry: Run GC every Sunday at 2:00 AM
# crontab -e
0 2 * * 0 /opt/lakefs/gc_pipeline.sh >> /var/log/lakefs-gc/cron.log 2>&1

Apache Airflow DAG

"""
Apache Airflow DAG for lakeFS Garbage Collection Pipeline.
Runs weekly on Sunday at 02:00 UTC.
"""
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.models import Variable
import requests
import json

LAKEFS_URL = Variable.get("lakefs_url", default_var="http://localhost:8000/api/v1")
LAKEFS_ACCESS_KEY = Variable.get("lakefs_access_key")
LAKEFS_SECRET_KEY = Variable.get("lakefs_secret_key")
REPOSITORY = Variable.get("lakefs_gc_repository", default_var="my-repo")
SPARK_IMAGE = Variable.get("lakefs_spark_image", default_var="treeverse/bitnami-spark:3.3")

default_args = {
    "owner": "data-engineering",
    "depends_on_past": False,
    "email_on_failure": True,
    "email": ["data-team@example.com"],
    "retries": 2,
    "retry_delay": timedelta(minutes=10),
}


def prepare_gc_metadata(**context):
    """Call prepareGarbageCollectionCommits and push run_id to XCom."""
    response = requests.post(
        f"{LAKEFS_URL}/repositories/{REPOSITORY}/gc/prepare_commits",
        auth=(LAKEFS_ACCESS_KEY, LAKEFS_SECRET_KEY),
    )
    response.raise_for_status()
    result = response.json()
    run_id = result["run_id"]
    context["ti"].xcom_push(key="gc_run_id", value=run_id)
    context["ti"].xcom_push(key="gc_commits_location", value=result["gc_commits_location"])
    print(f"GC metadata prepared. Run ID: {run_id}")
    return run_id


def verify_gc_results(**context):
    """Verify that GC deleted expected objects by checking presigned URLs."""
    response = requests.get(
        f"{LAKEFS_URL}/repositories/{REPOSITORY}/refs/main/objects/ls",
        params={"presign": "true", "amount": 100},
        auth=(LAKEFS_ACCESS_KEY, LAKEFS_SECRET_KEY),
    )
    response.raise_for_status()
    objects = response.json().get("results", [])
    for obj in objects:
        url = obj.get("physical_address", "")
        if url:
            check = requests.get(url)
            print(f"{obj['path']}: HTTP {check.status_code}")


with DAG(
    dag_id="lakefs_garbage_collection",
    default_args=default_args,
    description="lakeFS Garbage Collection Pipeline",
    schedule_interval="0 2 * * 0",  # Weekly on Sunday at 2 AM UTC
    start_date=datetime(2026, 1, 1),
    catchup=False,
    tags=["lakefs", "gc", "storage-management"],
) as dag:

    prepare_metadata = PythonOperator(
        task_id="prepare_gc_metadata",
        python_callable=prepare_gc_metadata,
    )

    run_spark_gc = BashOperator(
        task_id="run_spark_gc",
        bash_command="""
        RUN_ID={{ ti.xcom_pull(task_ids='prepare_gc_metadata', key='gc_run_id') }}
        docker run --rm \
          --network host \
          {{ var.value.lakefs_spark_image }} \
          spark-submit \
            --class io.treeverse.gc.GarbageCollection \
            --master "local[*]" \
            --conf "spark.hadoop.lakefs.api.url={{ var.value.lakefs_url }}" \
            --conf "spark.hadoop.lakefs.api.access_key={{ var.value.lakefs_access_key }}" \
            --conf "spark.hadoop.lakefs.api.secret_key={{ var.value.lakefs_secret_key }}" \
            /opt/metaclient/client.jar \
            {{ var.value.lakefs_gc_repository }} \
            "$RUN_ID"
        """,
    )

    verify_results = PythonOperator(
        task_id="verify_gc_results",
        python_callable=verify_gc_results,
    )

    prepare_metadata >> run_spark_gc >> verify_results

Kubernetes CronJob

apiVersion: batch/v1
kind: CronJob
metadata:
  name: lakefs-gc-pipeline
  namespace: data-engineering
spec:
  schedule: "0 2 * * 0"  # Weekly on Sunday at 2 AM
  concurrencyPolicy: Forbid
  successfulJobsHistoryLimit: 5
  failedJobsHistoryLimit: 3
  jobTemplate:
    spec:
      backoffLimit: 2
      template:
        spec:
          restartPolicy: OnFailure
          containers:
            - name: gc-pipeline
              image: treeverse/bitnami-spark:3.3
              command:
                - /bin/bash
                - -c
                - |
                  # Prepare GC metadata
                  RESPONSE=$(curl -s -X POST \
                    "${LAKEFS_URL}/repositories/${REPOSITORY}/gc/prepare_commits" \
                    -u "${LAKEFS_ACCESS_KEY}:${LAKEFS_SECRET_KEY}")
                  RUN_ID=$(echo "$RESPONSE" | jq -r '.run_id')
                  echo "Run ID: $RUN_ID"

                  # Run Spark GC
                  spark-submit \
                    --class io.treeverse.gc.GarbageCollection \
                    --master "local[*]" \
                    --conf "spark.hadoop.lakefs.api.url=${LAKEFS_URL}" \
                    --conf "spark.hadoop.lakefs.api.access_key=${LAKEFS_ACCESS_KEY}" \
                    --conf "spark.hadoop.lakefs.api.secret_key=${LAKEFS_SECRET_KEY}" \
                    /opt/metaclient/client.jar \
                    "${REPOSITORY}" \
                    "$RUN_ID"
              env:
                - name: LAKEFS_URL
                  valueFrom:
                    secretKeyRef:
                      name: lakefs-credentials
                      key: api-url
                - name: LAKEFS_ACCESS_KEY
                  valueFrom:
                    secretKeyRef:
                      name: lakefs-credentials
                      key: access-key
                - name: LAKEFS_SECRET_KEY
                  valueFrom:
                    secretKeyRef:
                      name: lakefs-credentials
                      key: secret-key
                - name: REPOSITORY
                  value: "my-repo"
              resources:
                requests:
                  memory: "4Gi"
                  cpu: "2"
                limits:
                  memory: "8Gi"
                  cpu: "4"

Related Pages

Implements Principle


Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment