Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Beam InMemoryJobService Prepare

From Leeroopedia


Property Value
Implementation Name InMemoryJobService Prepare
Category Pipeline_Orchestration, Job_Management
Related Principle Principle:Apache_Beam_Job_Preparation
Source File runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/InMemoryJobService.java
Language Java
last_updated 2026-02-09 04:00 GMT

Overview

InMemoryJobService is the concrete tool for preparing a pipeline job for execution on the in-memory job service. The prepare() method handles the server-side reception of a pipeline submission, generating a unique preparation ID, registering the pipeline with the staging service, and returning artifact staging credentials to the client.

Code Reference

Source Location

Property Value
File runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/InMemoryJobService.java
Package org.apache.beam.runners.jobsubmission
Class Declaration L81: public class InMemoryJobService extends JobServiceGrpc.JobServiceImplBase implements FnService
Prepare Method Lines L168-216
Factory Methods Lines L96-107, L119-127

Factory Methods

// Primary factory (L96-107)
public static InMemoryJobService create(
    GrpcFnServer<ArtifactStagingService> stagingService,
    Function<String, String> stagingServiceTokenProvider,
    ThrowingConsumer<Exception, String> cleanupJobFn,
    JobInvoker invoker) {
    return new InMemoryJobService(
        stagingService, stagingServiceTokenProvider,
        cleanupJobFn, invoker, DEFAULT_MAX_INVOCATION_HISTORY);
}

// Factory with custom history size (L119-127)
public static InMemoryJobService create(
    GrpcFnServer<ArtifactStagingService> stagingService,
    Function<String, String> stagingServiceTokenProvider,
    ThrowingConsumer<Exception, String> cleanupJobFn,
    JobInvoker invoker,
    int maxInvocationHistory) {
    return new InMemoryJobService(
        stagingService, stagingServiceTokenProvider,
        cleanupJobFn, invoker, maxInvocationHistory);
}

Prepare Method Signature

@Override
public void prepare(
    PrepareJobRequest request,
    StreamObserver<PrepareJobResponse> responseObserver)   // L168-216

Internal State

// Concurrent maps for thread-safe state management
private final ConcurrentHashMap<String, JobPreparation> preparations;       // L130
private final ConcurrentHashMap<String, String> stagingSessionTokens;       // L132
private final ConcurrentHashMap<String, JobInvocation> invocations;         // L134
private final ConcurrentLinkedDeque<String> completedInvocationsIds;        // L136

private final GrpcFnServer<ArtifactStagingService> stagingService;         // L138
private final Endpoints.ApiServiceDescriptor stagingServiceDescriptor;     // L139
private final Function<String, String> stagingServiceTokenProvider;         // L140
private final ThrowingConsumer<Exception, String> cleanupJobFn;             // L141
private final JobInvoker invoker;                                           // L142
private final int maxInvocationHistory;                                     // L145

I/O Contract

Inputs

Input Type Description
request.jobName String Human-readable name for the job
request.pipeline RunnerApi.Pipeline The translated pipeline protobuf
request.pipelineOptions Struct Pipeline options serialized as a protobuf Struct
responseObserver StreamObserver<PrepareJobResponse> gRPC response callback

Outputs

Output Type Description
response.preparationId String Unique ID in format {jobName}_{UUID}
response.artifactStagingEndpoint ApiServiceDescriptor gRPC endpoint for uploading artifacts
response.stagingSessionToken String Authorization token for artifact staging

Prepare Method Walkthrough

The prepare() method at L168-216 performs the following steps:

Step 1: Generate Preparation ID (L173-174)

String preparationId =
    String.format("%s_%s", request.getJobName(), UUID.randomUUID().toString());

The preparation ID combines the job name with a random UUID, ensuring uniqueness while remaining human-readable.

Step 2: Validate Pipeline Options (L175-178)

Struct pipelineOptions = request.getPipelineOptions();
if (pipelineOptions == null) {
    throw new NullPointerException("Encountered null pipeline options.");
}

Step 3: Create and Store JobPreparation (L180-194)

JobPreparation preparation =
    JobPreparation.builder()
        .setId(preparationId)
        .setPipeline(request.getPipeline())
        .setOptions(pipelineOptions)
        .build();
JobPreparation previous = preparations.putIfAbsent(preparationId, preparation);
if (previous != null) {
    String errMessage =
        String.format("A job with the preparation ID \"%s\" already exists.", preparationId);
    StatusException exception = Status.NOT_FOUND.withDescription(errMessage).asException();
    responseObserver.onError(exception);
    return;
}

The putIfAbsent() call ensures atomicity in the concurrent map. A UUID collision (extremely rare) results in an error response.

Step 4: Generate Staging Token and Register Job (L196-201)

String stagingSessionToken = stagingServiceTokenProvider.apply(preparationId);
stagingSessionTokens.putIfAbsent(preparationId, stagingSessionToken);

stagingService.getService()
    .registerJob(stagingSessionToken, extractDependencies(request.getPipeline()));

The extractDependencies() helper method iterates over all environments in the pipeline and extracts their ArtifactInformation dependencies, including handling AnyOf environments by expanding sub-environments.

Step 5: Build and Send Response (L204-211)

PrepareJobResponse response =
    PrepareJobResponse.newBuilder()
        .setPreparationId(preparationId)
        .setArtifactStagingEndpoint(stagingServiceDescriptor)
        .setStagingSessionToken(stagingSessionToken)
        .build();
responseObserver.onNext(response);
responseObserver.onCompleted();

Error Handling (L212-215)

catch (Exception e) {
    LOG.error("Could not prepare job with name {}", request.getJobName(), e);
    responseObserver.onError(Status.INTERNAL.withCause(e).asException());
}

Any unexpected exception is caught and returned as a gRPC INTERNAL status error.

Dependency Extraction

The extractDependencies() helper method at L286-297 extracts artifact information from each pipeline environment:

private Map<String, List<RunnerApi.ArtifactInformation>> extractDependencies(
    RunnerApi.Pipeline pipeline) {
    Map<String, List<RunnerApi.ArtifactInformation>> dependencies = new HashMap<>();
    for (Map.Entry<String, RunnerApi.Environment> entry :
        pipeline.getComponents().getEnvironmentsMap().entrySet()) {
        List<RunnerApi.Environment> subEnvs =
            Environments.expandAnyOfEnvironments(entry.getValue());
        for (int i = 0; i < subEnvs.size(); i++) {
            dependencies.put(i + ":" + entry.getKey(), subEnvs.get(i).getDependenciesList());
        }
    }
    return dependencies;
}

Usage Examples

Server-Side Setup

// Creating the InMemoryJobService
GrpcFnServer<ArtifactStagingService> stagingService = /* ... */;
Function<String, String> tokenProvider = preparationId -> "token-" + preparationId;
ThrowingConsumer<Exception, String> cleanupFn = token -> { /* cleanup staging dir */ };
JobInvoker invoker = /* runner-specific invoker */;

InMemoryJobService jobService = InMemoryJobService.create(
    stagingService, tokenProvider, cleanupFn, invoker);

Client-Side Preparation

// From PortableRunner.run()
PrepareJobRequest request = PrepareJobRequest.newBuilder()
    .setJobName("my-pipeline-job")
    .setPipeline(pipelineProto)
    .setPipelineOptions(PipelineOptionsTranslation.toProto(options))
    .build();

PrepareJobResponse response = jobServiceStub
    .withDeadlineAfter(60, TimeUnit.SECONDS)
    .withWaitForReady()
    .prepare(request);

String preparationId = response.getPreparationId();
String stagingToken = response.getStagingSessionToken();
ApiServiceDescriptor stagingEndpoint = response.getArtifactStagingEndpoint();

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment