Implementation:Treeverse LakeFS S3 PutObject
Appearance
| Knowledge Sources | |
|---|---|
| Domains | S3_Compatibility, REST_API |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
Wrapper for standard S3 write operations (PutObject, CopyObject, DeleteObject, multipart upload) via the lakeFS S3 gateway.
Description
This implementation wraps S3 write operations that are translated by the lakeFS S3 gateway into lakeFS staging operations. All writes stage changes on the target branch; they are not committed until an explicit commit is made via the lakeFS REST API. The supported operations are:
- PutObject -- Upload a single object with content type and user metadata
- CopyObject -- Server-side copy within or between repositories
- DeleteObject -- Remove a single object (staged as a tombstone)
- DeleteObjects -- Bulk delete multiple objects
- CreateMultipartUpload / UploadPart / CompleteMultipartUpload -- Upload large objects in parts
Usage
Use this implementation when:
- Writing data to lakeFS through any S3-compatible tool or SDK
- Uploading large files that exceed the single-request size limit (use multipart upload)
- Copying data between branches or repositories through the S3 protocol
- Deleting objects from a branch via S3 tools
Code Reference
Source Location
- File:
esti/s3_gateway_test.go- Lines: L126-197 (
TestS3UploadAndDownload-- PutObject) - Lines: L786-859 (
TestS3CopyObjectMultipart-- multipart copy) - Lines: L860-996 (
TestS3CopyObject-- CopyObject with metadata) - Lines: L1217-1310 (
TestDeleteObjects-- bulk delete)
- Lines: L126-197 (
- File:
esti/multipart_test.go- Lines: L1-227 (
TestMultipartUpload-- multipart upload)
- Lines: L1-227 (
Signature
// PutObject: Upload a single object
_, err := clt.PutObject(ctx, repo, "main/data/file.csv",
strings.NewReader(content), int64(len(content)),
minio.PutObjectOptions{
ContentType: "text/csv",
UserMetadata: map[string]string{"Key1": "value1"},
})
// CopyObject: Server-side copy within the same repository
_, err := clt.CopyObject(ctx,
minio.CopyDestOptions{Bucket: repo, Object: "main/data/dest.csv"},
minio.CopySrcOptions{Bucket: repo, Object: "main/data/source.csv"})
// CopyObject: Cross-repository copy with metadata replacement
_, err := clt.CopyObject(ctx,
minio.CopyDestOptions{
Bucket: destRepo,
Object: destPath,
UserMetadata: map[string]string{"Key1": "newvalue"},
ReplaceMetadata: true,
},
minio.CopySrcOptions{Bucket: srcRepo, Object: srcPath})
// DeleteObject: Remove a single object
err := clt.RemoveObject(ctx, repo, "main/data/old.csv", minio.RemoveObjectOptions{})
// DeleteObjects: Bulk delete via channel
errChan := clt.RemoveObjects(ctx, repo, objectsCh, minio.RemoveObjectsOptions{})
// Multipart Upload (AWS SDK v2)
createResp, err := svc.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{
Bucket: aws.String(repo),
Key: aws.String("main/multipart_file"),
})
uploadResp, err := svc.UploadPart(ctx, &s3.UploadPartInput{
Bucket: aws.String(repo),
Key: aws.String("main/multipart_file"),
UploadId: createResp.UploadId,
PartNumber: aws.Int32(1),
Body: bytes.NewReader(partData),
})
_, err = svc.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{
Bucket: aws.String(repo),
Key: aws.String("main/multipart_file"),
UploadId: createResp.UploadId,
MultipartUpload: &types.CompletedMultipartUpload{
Parts: completedParts,
},
})
Import
import boto3
s3 = boto3.client('s3',
endpoint_url='http://localhost:8000',
aws_access_key_id='AKIAIOSFDNN7EXAMPLEQ',
aws_secret_access_key='wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY',
)
I/O Contract
Inputs
| Parameter | Type | Required | Description |
|---|---|---|---|
Bucket |
string | Yes | lakeFS repository name |
Key |
string | Yes | Object key in format {branch}/{path}
|
Body |
byte stream | Yes (PutObject) | Object content to upload |
ContentType |
string | No | MIME type of the object (e.g., text/csv, application/parquet)
|
Metadata |
map[string]string | No | User-defined metadata key-value pairs (x-amz-meta-*)
|
If-None-Match |
string | No | Set to * for conditional create (fails if object exists with 412)
|
CopySource |
string | Yes (CopyObject) | Source bucket/key for server-side copy |
UploadId |
string | Yes (multipart) | Upload ID returned by CreateMultipartUpload |
PartNumber |
integer | Yes (UploadPart) | Part number (1 to 10,000) |
Outputs
| Output | Type | Description |
|---|---|---|
| ETag | string | Entity tag of the uploaded/copied object |
| VersionId | string | Version identifier (if applicable) |
| UploadId | string | Multipart upload ID (CreateMultipartUpload response) |
| HTTP 200/204 | status | Success for PutObject, CopyObject, DeleteObject |
| HTTP 412 | status | Precondition Failed (If-None-Match violation) |
| HTTP 403 | status | Forbidden (write to read-only repository) |
Usage Examples
Python boto3: Upload an object
import boto3
s3 = boto3.client('s3',
endpoint_url='http://localhost:8000',
aws_access_key_id='AKIAIOSFDNN7EXAMPLEQ',
aws_secret_access_key='wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY',
)
# Upload a CSV file to the main branch
s3.put_object(
Bucket='my-repo',
Key='main/data/sales.csv',
Body=b'date,amount\n2026-01-01,100\n2026-01-02,200\n',
ContentType='text/csv',
Metadata={'source': 'etl-pipeline', 'version': '2'}
)
Python boto3: Multipart upload for large files
from boto3.s3.transfer import TransferConfig
# Configure multipart upload thresholds
config = TransferConfig(
multipart_threshold=5 * 1024 * 1024, # 5 MiB
multipart_chunksize=5 * 1024 * 1024, # 5 MiB per part
)
# Upload a large file using multipart upload
s3.upload_file(
Filename='/path/to/large_dataset.parquet',
Bucket='my-repo',
Key='main/data/large_dataset.parquet',
Config=config
)
Python boto3: Copy object between branches
# Copy an object from main to a feature branch
s3.copy_object(
CopySource={'Bucket': 'my-repo', 'Key': 'main/data/model.pkl'},
Bucket='my-repo',
Key='feature-branch/data/model.pkl'
)
Python boto3: Delete objects
# Delete a single object
s3.delete_object(Bucket='my-repo', Key='main/data/old_file.csv')
# Bulk delete multiple objects
s3.delete_objects(
Bucket='my-repo',
Delete={
'Objects': [
{'Key': 'main/data/temp1.csv'},
{'Key': 'main/data/temp2.csv'},
{'Key': 'main/data/temp3.csv'},
]
}
)
Python boto3: Conditional write (If-None-Match)
import botocore
# Only create the object if it does not already exist
try:
s3.put_object(
Bucket='my-repo',
Key='main/data/unique_file.csv',
Body=b'data',
IfNoneMatch='*'
)
print("Object created successfully")
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == '412':
print("Object already exists -- skipped")
else:
raise
AWS CLI: Upload and delete
# Upload a file
aws --endpoint-url http://localhost:8000 s3 cp \
./data/file.csv s3://my-repo/main/data/file.csv
# Upload an entire directory
aws --endpoint-url http://localhost:8000 s3 sync \
./data/ s3://my-repo/main/data/
# Delete an object
aws --endpoint-url http://localhost:8000 s3 rm \
s3://my-repo/main/data/old_file.csv
Related Pages
Implements Principle
Requires Environment
Uses Heuristic
Page Connections
Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment