Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Datahub project Datahub EntityClient Upsert

From Leeroopedia


Field Value
Implementation Name EntityClient Upsert
Type API Doc
Status Active
Last Updated 2026-02-10
Repository Datahub_project_Datahub
Source File metadata-integration/java/datahub-client/src/main/java/datahub/client/v2/operations/EntityClient.java (Lines 82-195)

Overview

The upsert() method on EntityClient is the primary mechanism for persisting entity metadata to DataHub. It binds the entity to the client, emits all cached aspects and pending MCPs as full upserts, waits for their completion, then emits accumulated patches with version-aware transformation and retry logic.

Source Reference

File: metadata-integration/java/datahub-client/src/main/java/datahub/client/v2/operations/EntityClient.java (Lines 82-195)

public <T extends Entity> void upsert(@Nonnull T entity)
    throws IOException, ExecutionException, InterruptedException {

    // Step 1: Bind entity to this client (for lazy loading)
    entity.bindToClient(this, config.getMode());

    List<Future<MetadataWriteResponse>> futures = new ArrayList<>();

    // Step 2: Emit cached aspects (from builder) - full aspects for initial creation
    List<MetadataChangeProposalWrapper> mcps = entity.toMCPs();
    if (!mcps.isEmpty()) {
        for (MetadataChangeProposalWrapper mcp : mcps) {
            futures.add(emitter.emit(mcp));
        }
    }

    // Step 3: Emit pending full aspect MCPs (from set*() methods)
    if (entity.hasPendingMCPs()) {
        List<MetadataChangeProposalWrapper> pendingMCPs = entity.getPendingMCPs();
        for (MetadataChangeProposalWrapper mcp : pendingMCPs) {
            futures.add(emitter.emit(mcp));
        }
        entity.clearPendingMCPs();
    }

    // Step 4: Wait for all full aspect writes to complete before emitting patches
    if (!futures.isEmpty()) {
        for (int i = 0; i < futures.size(); i++) {
            MetadataWriteResponse response = futures.get(i).get();
            if (!response.isSuccess()) {
                throw new IOException("Failed to emit full aspect write: "
                    + response.getResponseContent());
            }
        }
    }

    // Step 5: Emit all pending patches with version-aware transformation
    if (entity.hasPendingPatches()) {
        List<MetadataChangeProposal> allPatches = entity.getPendingPatches();

        ServerConfig serverConfig = client.getServerConfig();
        List<TransformResult> transformResults =
            patchTransformer.transform(allPatches, serverConfig);

        for (TransformResult result : transformResults) {
            emitWithRetry(result);
        }

        entity.clearPatchBuilders();
        entity.clearPendingPatches();
    }

    // Step 6: Clear dirty flag
    entity.clearDirty();
}

Method Signature

public <T extends Entity> void upsert(@Nonnull T entity)
    throws IOException, ExecutionException, InterruptedException

Accessed via:

client.entities().upsert(entity);

I/O Contract

Input:

  • entity -- An Entity subclass instance (e.g., Dataset, Chart, Dashboard) with zero or more pending mutations:
    • Cached aspects from builder (dirty aspects in WriteTrackingAspectCache)
    • Pending full aspect MCPs from set*() methods
    • Accumulated patch builders from add*() / remove*() methods
    • Legacy pending patch MCPs

Output:

  • void -- no return value on success
  • All pending mutations are emitted to the DataHub server
  • Entity's dirty flag is cleared
  • Patch builders and pending MCPs/patches are cleared

Exceptions:

  • IOException -- if any MCP emission fails, including after retry exhaustion
  • ExecutionException -- if a future for an emitted MCP fails
  • InterruptedException -- if the thread is interrupted while waiting for emission

Execution Flow

The upsert method follows a strict ordering to prevent consistency issues:

Step Operation Source Change Type Notes
1 Bind entity to client -- -- Sets operation mode (SDK/INGESTION) and client reference for lazy loading
2 Emit cached aspects entity.toMCPs() UPSERT Aspects from builder (e.g., DatasetProperties set during construction)
3 Emit pending MCPs entity.getPendingMCPs() UPSERT Full aspect replacements from setTags(), setOwners(), etc.
4 Wait for completion -- -- Blocks until all full aspect writes succeed; fails fast on error
5 Emit patches entity.getPendingPatches() PATCH (transformed) Accumulated patches from addTag(), addOwner(), etc. Passes through VersionAwarePatchTransformer
6 Clear dirty flag entity.clearDirty() -- Marks entity as clean (no pending mutations)

Retry Logic

The emitWithRetry() method (Lines 628-686) handles version conflict retries:

private void emitWithRetry(@Nonnull TransformResult result)
    throws IOException, ExecutionException, InterruptedException {
    int maxRetries = 3;
    long[] backoffDelays = {100L, 200L, 400L};

    for (int attempt = 0; attempt <= maxRetries; attempt++) {
        Future<MetadataWriteResponse> future = emitter.emit(mcp, null);
        MetadataWriteResponse response = future.get();

        if (response.isSuccess()) { return; }

        VersionConflictInfo conflict = parseVersionConflict(response, mcp);
        if (conflict != null && result.hasRetryFunction() && attempt < maxRetries) {
            Thread.sleep(backoffDelays[attempt]);
            mcp = result.getRetryFunction().apply(conflict);
        } else {
            throw new IOException("Failed to emit MCP: " + response.getResponseContent());
        }
    }
}

Retry parameters:

  • Maximum retries: 3
  • Backoff delays: 100ms, 200ms, 400ms (exponential)
  • Conflict detection: HTTP 422 with version mismatch pattern

Usage Examples

Create New Entity

Dataset dataset = Dataset.builder()
    .platform("snowflake")
    .name("my_table")
    .description("Customer data")
    .build();

dataset.addTag("pii");
dataset.addOwner("urn:li:corpuser:johndoe", OwnershipType.DATA_OWNER);

client.entities().upsert(dataset);

Update Existing Entity

Dataset dataset = client.entities().get(urn, Dataset.class);
Dataset mutable = dataset.mutable();

mutable.addTag("verified");
mutable.setDescription("Updated description");

client.entities().upsert(mutable);

Related

Knowledge Sources

Domains

Data_Integration, Metadata_Management, Java_SDK

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment