Implementation:Apache Beam InMemoryJobService Prepare
| Property | Value |
|---|---|
| Implementation Name | InMemoryJobService Prepare |
| Category | Pipeline_Orchestration, Job_Management |
| Related Principle | Principle:Apache_Beam_Job_Preparation |
| Source File | runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/InMemoryJobService.java |
| Language | Java |
| last_updated | 2026-02-09 04:00 GMT |
Overview
InMemoryJobService is the concrete tool for preparing a pipeline job for execution on the in-memory job service. The prepare() method handles the server-side reception of a pipeline submission, generating a unique preparation ID, registering the pipeline with the staging service, and returning artifact staging credentials to the client.
Code Reference
Source Location
| Property | Value |
|---|---|
| File | runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/InMemoryJobService.java
|
| Package | org.apache.beam.runners.jobsubmission
|
| Class Declaration | L81: public class InMemoryJobService extends JobServiceGrpc.JobServiceImplBase implements FnService
|
| Prepare Method | Lines L168-216 |
| Factory Methods | Lines L96-107, L119-127 |
Factory Methods
// Primary factory (L96-107)
public static InMemoryJobService create(
GrpcFnServer<ArtifactStagingService> stagingService,
Function<String, String> stagingServiceTokenProvider,
ThrowingConsumer<Exception, String> cleanupJobFn,
JobInvoker invoker) {
return new InMemoryJobService(
stagingService, stagingServiceTokenProvider,
cleanupJobFn, invoker, DEFAULT_MAX_INVOCATION_HISTORY);
}
// Factory with custom history size (L119-127)
public static InMemoryJobService create(
GrpcFnServer<ArtifactStagingService> stagingService,
Function<String, String> stagingServiceTokenProvider,
ThrowingConsumer<Exception, String> cleanupJobFn,
JobInvoker invoker,
int maxInvocationHistory) {
return new InMemoryJobService(
stagingService, stagingServiceTokenProvider,
cleanupJobFn, invoker, maxInvocationHistory);
}
Prepare Method Signature
@Override
public void prepare(
PrepareJobRequest request,
StreamObserver<PrepareJobResponse> responseObserver) // L168-216
Internal State
// Concurrent maps for thread-safe state management
private final ConcurrentHashMap<String, JobPreparation> preparations; // L130
private final ConcurrentHashMap<String, String> stagingSessionTokens; // L132
private final ConcurrentHashMap<String, JobInvocation> invocations; // L134
private final ConcurrentLinkedDeque<String> completedInvocationsIds; // L136
private final GrpcFnServer<ArtifactStagingService> stagingService; // L138
private final Endpoints.ApiServiceDescriptor stagingServiceDescriptor; // L139
private final Function<String, String> stagingServiceTokenProvider; // L140
private final ThrowingConsumer<Exception, String> cleanupJobFn; // L141
private final JobInvoker invoker; // L142
private final int maxInvocationHistory; // L145
I/O Contract
Inputs
| Input | Type | Description |
|---|---|---|
request.jobName |
String |
Human-readable name for the job |
request.pipeline |
RunnerApi.Pipeline |
The translated pipeline protobuf |
request.pipelineOptions |
Struct |
Pipeline options serialized as a protobuf Struct |
responseObserver |
StreamObserver<PrepareJobResponse> |
gRPC response callback |
Outputs
| Output | Type | Description |
|---|---|---|
response.preparationId |
String |
Unique ID in format {jobName}_{UUID}
|
response.artifactStagingEndpoint |
ApiServiceDescriptor |
gRPC endpoint for uploading artifacts |
response.stagingSessionToken |
String |
Authorization token for artifact staging |
Prepare Method Walkthrough
The prepare() method at L168-216 performs the following steps:
Step 1: Generate Preparation ID (L173-174)
String preparationId =
String.format("%s_%s", request.getJobName(), UUID.randomUUID().toString());
The preparation ID combines the job name with a random UUID, ensuring uniqueness while remaining human-readable.
Step 2: Validate Pipeline Options (L175-178)
Struct pipelineOptions = request.getPipelineOptions();
if (pipelineOptions == null) {
throw new NullPointerException("Encountered null pipeline options.");
}
Step 3: Create and Store JobPreparation (L180-194)
JobPreparation preparation =
JobPreparation.builder()
.setId(preparationId)
.setPipeline(request.getPipeline())
.setOptions(pipelineOptions)
.build();
JobPreparation previous = preparations.putIfAbsent(preparationId, preparation);
if (previous != null) {
String errMessage =
String.format("A job with the preparation ID \"%s\" already exists.", preparationId);
StatusException exception = Status.NOT_FOUND.withDescription(errMessage).asException();
responseObserver.onError(exception);
return;
}
The putIfAbsent() call ensures atomicity in the concurrent map. A UUID collision (extremely rare) results in an error response.
Step 4: Generate Staging Token and Register Job (L196-201)
String stagingSessionToken = stagingServiceTokenProvider.apply(preparationId);
stagingSessionTokens.putIfAbsent(preparationId, stagingSessionToken);
stagingService.getService()
.registerJob(stagingSessionToken, extractDependencies(request.getPipeline()));
The extractDependencies() helper method iterates over all environments in the pipeline and extracts their ArtifactInformation dependencies, including handling AnyOf environments by expanding sub-environments.
Step 5: Build and Send Response (L204-211)
PrepareJobResponse response =
PrepareJobResponse.newBuilder()
.setPreparationId(preparationId)
.setArtifactStagingEndpoint(stagingServiceDescriptor)
.setStagingSessionToken(stagingSessionToken)
.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
Error Handling (L212-215)
catch (Exception e) {
LOG.error("Could not prepare job with name {}", request.getJobName(), e);
responseObserver.onError(Status.INTERNAL.withCause(e).asException());
}
Any unexpected exception is caught and returned as a gRPC INTERNAL status error.
Dependency Extraction
The extractDependencies() helper method at L286-297 extracts artifact information from each pipeline environment:
private Map<String, List<RunnerApi.ArtifactInformation>> extractDependencies(
RunnerApi.Pipeline pipeline) {
Map<String, List<RunnerApi.ArtifactInformation>> dependencies = new HashMap<>();
for (Map.Entry<String, RunnerApi.Environment> entry :
pipeline.getComponents().getEnvironmentsMap().entrySet()) {
List<RunnerApi.Environment> subEnvs =
Environments.expandAnyOfEnvironments(entry.getValue());
for (int i = 0; i < subEnvs.size(); i++) {
dependencies.put(i + ":" + entry.getKey(), subEnvs.get(i).getDependenciesList());
}
}
return dependencies;
}
Usage Examples
Server-Side Setup
// Creating the InMemoryJobService
GrpcFnServer<ArtifactStagingService> stagingService = /* ... */;
Function<String, String> tokenProvider = preparationId -> "token-" + preparationId;
ThrowingConsumer<Exception, String> cleanupFn = token -> { /* cleanup staging dir */ };
JobInvoker invoker = /* runner-specific invoker */;
InMemoryJobService jobService = InMemoryJobService.create(
stagingService, tokenProvider, cleanupFn, invoker);
Client-Side Preparation
// From PortableRunner.run()
PrepareJobRequest request = PrepareJobRequest.newBuilder()
.setJobName("my-pipeline-job")
.setPipeline(pipelineProto)
.setPipelineOptions(PipelineOptionsTranslation.toProto(options))
.build();
PrepareJobResponse response = jobServiceStub
.withDeadlineAfter(60, TimeUnit.SECONDS)
.withWaitForReady()
.prepare(request);
String preparationId = response.getPreparationId();
String stagingToken = response.getStagingSessionToken();
ApiServiceDescriptor stagingEndpoint = response.getArtifactStagingEndpoint();
Related Pages
- Principle:Apache_Beam_Job_Preparation -- The principle this implementation realizes
- Implementation:Apache_Beam_PortableRunner_Run -- The client-side runner that calls prepare
- Implementation:Apache_Beam_InMemoryJobService_Run -- The run method in the same class, invoked after staging
- Implementation:Apache_Beam_PortablePipelineJarCreator -- Alternative staging path that creates a JAR
- Environment:Apache_Beam_Portable_Runner_Environment -- Portability framework runtime with gRPC Job Service and Docker/loopback SDK harness.