Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Treeverse LakeFS S3 PutObject

From Leeroopedia


Knowledge Sources
Domains S3_Compatibility, REST_API
Last Updated 2026-02-08 00:00 GMT

Overview

Wrapper for standard S3 write operations (PutObject, CopyObject, DeleteObject, multipart upload) via the lakeFS S3 gateway.

Description

This implementation wraps S3 write operations that are translated by the lakeFS S3 gateway into lakeFS staging operations. All writes stage changes on the target branch; they are not committed until an explicit commit is made via the lakeFS REST API. The supported operations are:

  • PutObject -- Upload a single object with content type and user metadata
  • CopyObject -- Server-side copy within or between repositories
  • DeleteObject -- Remove a single object (staged as a tombstone)
  • DeleteObjects -- Bulk delete multiple objects
  • CreateMultipartUpload / UploadPart / CompleteMultipartUpload -- Upload large objects in parts

Usage

Use this implementation when:

  • Writing data to lakeFS through any S3-compatible tool or SDK
  • Uploading large files that exceed the single-request size limit (use multipart upload)
  • Copying data between branches or repositories through the S3 protocol
  • Deleting objects from a branch via S3 tools

Code Reference

Source Location

  • File: esti/s3_gateway_test.go
    • Lines: L126-197 (TestS3UploadAndDownload -- PutObject)
    • Lines: L786-859 (TestS3CopyObjectMultipart -- multipart copy)
    • Lines: L860-996 (TestS3CopyObject -- CopyObject with metadata)
    • Lines: L1217-1310 (TestDeleteObjects -- bulk delete)
  • File: esti/multipart_test.go
    • Lines: L1-227 (TestMultipartUpload -- multipart upload)

Signature

// PutObject: Upload a single object
_, err := clt.PutObject(ctx, repo, "main/data/file.csv",
    strings.NewReader(content), int64(len(content)),
    minio.PutObjectOptions{
        ContentType:  "text/csv",
        UserMetadata: map[string]string{"Key1": "value1"},
    })

// CopyObject: Server-side copy within the same repository
_, err := clt.CopyObject(ctx,
    minio.CopyDestOptions{Bucket: repo, Object: "main/data/dest.csv"},
    minio.CopySrcOptions{Bucket: repo, Object: "main/data/source.csv"})

// CopyObject: Cross-repository copy with metadata replacement
_, err := clt.CopyObject(ctx,
    minio.CopyDestOptions{
        Bucket:          destRepo,
        Object:          destPath,
        UserMetadata:    map[string]string{"Key1": "newvalue"},
        ReplaceMetadata: true,
    },
    minio.CopySrcOptions{Bucket: srcRepo, Object: srcPath})

// DeleteObject: Remove a single object
err := clt.RemoveObject(ctx, repo, "main/data/old.csv", minio.RemoveObjectOptions{})

// DeleteObjects: Bulk delete via channel
errChan := clt.RemoveObjects(ctx, repo, objectsCh, minio.RemoveObjectsOptions{})

// Multipart Upload (AWS SDK v2)
createResp, err := svc.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{
    Bucket: aws.String(repo),
    Key:    aws.String("main/multipart_file"),
})

uploadResp, err := svc.UploadPart(ctx, &s3.UploadPartInput{
    Bucket:     aws.String(repo),
    Key:        aws.String("main/multipart_file"),
    UploadId:   createResp.UploadId,
    PartNumber: aws.Int32(1),
    Body:       bytes.NewReader(partData),
})

_, err = svc.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{
    Bucket:   aws.String(repo),
    Key:      aws.String("main/multipart_file"),
    UploadId: createResp.UploadId,
    MultipartUpload: &types.CompletedMultipartUpload{
        Parts: completedParts,
    },
})

Import

import boto3

s3 = boto3.client('s3',
    endpoint_url='http://localhost:8000',
    aws_access_key_id='AKIAIOSFDNN7EXAMPLEQ',
    aws_secret_access_key='wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY',
)

I/O Contract

Inputs

Parameter Type Required Description
Bucket string Yes lakeFS repository name
Key string Yes Object key in format {branch}/{path}
Body byte stream Yes (PutObject) Object content to upload
ContentType string No MIME type of the object (e.g., text/csv, application/parquet)
Metadata map[string]string No User-defined metadata key-value pairs (x-amz-meta-*)
If-None-Match string No Set to * for conditional create (fails if object exists with 412)
CopySource string Yes (CopyObject) Source bucket/key for server-side copy
UploadId string Yes (multipart) Upload ID returned by CreateMultipartUpload
PartNumber integer Yes (UploadPart) Part number (1 to 10,000)

Outputs

Output Type Description
ETag string Entity tag of the uploaded/copied object
VersionId string Version identifier (if applicable)
UploadId string Multipart upload ID (CreateMultipartUpload response)
HTTP 200/204 status Success for PutObject, CopyObject, DeleteObject
HTTP 412 status Precondition Failed (If-None-Match violation)
HTTP 403 status Forbidden (write to read-only repository)

Usage Examples

Python boto3: Upload an object

import boto3

s3 = boto3.client('s3',
    endpoint_url='http://localhost:8000',
    aws_access_key_id='AKIAIOSFDNN7EXAMPLEQ',
    aws_secret_access_key='wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY',
)

# Upload a CSV file to the main branch
s3.put_object(
    Bucket='my-repo',
    Key='main/data/sales.csv',
    Body=b'date,amount\n2026-01-01,100\n2026-01-02,200\n',
    ContentType='text/csv',
    Metadata={'source': 'etl-pipeline', 'version': '2'}
)

Python boto3: Multipart upload for large files

from boto3.s3.transfer import TransferConfig

# Configure multipart upload thresholds
config = TransferConfig(
    multipart_threshold=5 * 1024 * 1024,   # 5 MiB
    multipart_chunksize=5 * 1024 * 1024,    # 5 MiB per part
)

# Upload a large file using multipart upload
s3.upload_file(
    Filename='/path/to/large_dataset.parquet',
    Bucket='my-repo',
    Key='main/data/large_dataset.parquet',
    Config=config
)

Python boto3: Copy object between branches

# Copy an object from main to a feature branch
s3.copy_object(
    CopySource={'Bucket': 'my-repo', 'Key': 'main/data/model.pkl'},
    Bucket='my-repo',
    Key='feature-branch/data/model.pkl'
)

Python boto3: Delete objects

# Delete a single object
s3.delete_object(Bucket='my-repo', Key='main/data/old_file.csv')

# Bulk delete multiple objects
s3.delete_objects(
    Bucket='my-repo',
    Delete={
        'Objects': [
            {'Key': 'main/data/temp1.csv'},
            {'Key': 'main/data/temp2.csv'},
            {'Key': 'main/data/temp3.csv'},
        ]
    }
)

Python boto3: Conditional write (If-None-Match)

import botocore

# Only create the object if it does not already exist
try:
    s3.put_object(
        Bucket='my-repo',
        Key='main/data/unique_file.csv',
        Body=b'data',
        IfNoneMatch='*'
    )
    print("Object created successfully")
except botocore.exceptions.ClientError as e:
    if e.response['Error']['Code'] == '412':
        print("Object already exists -- skipped")
    else:
        raise

AWS CLI: Upload and delete

# Upload a file
aws --endpoint-url http://localhost:8000 s3 cp \
    ./data/file.csv s3://my-repo/main/data/file.csv

# Upload an entire directory
aws --endpoint-url http://localhost:8000 s3 sync \
    ./data/ s3://my-repo/main/data/

# Delete an object
aws --endpoint-url http://localhost:8000 s3 rm \
    s3://my-repo/main/data/old_file.csv

Related Pages

Implements Principle

Requires Environment

Uses Heuristic

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment