Implementation:Apache Beam InMemoryJobService Run
| Property | Value |
|---|---|
| Implementation Name | InMemoryJobService Run |
| Category | Pipeline_Orchestration, Job_Management |
| Related Principle | Principle:Apache_Beam_Job_Execution |
| Source File | runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/InMemoryJobService.java |
| Also | JobInvocation.java, JobInvoker.java |
| Language | Java |
| last_updated | 2026-02-09 04:00 GMT |
Overview
InMemoryJobService.run() is the concrete tool for running a prepared pipeline job via the in-memory job service. It retrieves the preparation from the prepare phase, validates the pipeline, creates a JobInvocation via the JobInvoker, starts execution, and returns a job ID to the client.
Code Reference
Source Location
| Property | Value |
|---|---|
| InMemoryJobService | runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/InMemoryJobService.java
|
| Run Method | Lines L219-284 |
| JobInvocation | runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/JobInvocation.java, L51-271
|
| JobInvoker | runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/JobInvoker.java, L34-60
|
Run Method Signature
@Override
public void run(
RunJobRequest request,
StreamObserver<RunJobResponse> responseObserver) // L219-284
JobInvocation Class
public class JobInvocation {
// Constructor (L67-83)
public JobInvocation(
JobInfo jobInfo,
ListeningExecutorService executorService,
Pipeline pipeline,
PortablePipelineRunner pipelineRunner);
// Lifecycle methods
public synchronized void start(); // L90-157
public synchronized void cancel(); // L165-191
public String getId(); // L160-162
public JobState.Enum getState(); // L201-203
public JobStateEvent getStateEvent(); // L206-208
public RunnerApi.Pipeline getPipeline(); // L211-213
public JobApi.MetricResults getMetrics(); // L193-198
// Observer methods
public synchronized void addStateListener(
Consumer<JobStateEvent> stateStreamObserver); // L216-221
public synchronized void addMessageListener(
Consumer<JobMessage> messageStreamObserver); // L224-229
// Terminal state check
public static Boolean isTerminated(Enum state); // L260-270
}
JobInvoker Abstract Class
public abstract class JobInvoker {
// Factory method for creating invocations
JobInvocation invoke(
RunnerApi.Pipeline pipeline,
Struct options,
@Nullable String retrievalToken) throws IOException; // L46-49
// Abstract method implemented by runner-specific invokers
protected abstract JobInvocation invokeWithExecutor(
RunnerApi.Pipeline pipeline,
Struct options,
@Nullable String retrievalToken,
ListeningExecutorService executorService) throws IOException; // L39-44
// Constructor creates a cached thread pool executor
protected JobInvoker(String name); // L57-59
}
I/O Contract
Inputs
| Input | Type | Description |
|---|---|---|
request.preparationId |
String |
The ID returned by the prepare RPC |
request.retrievalToken |
String |
Optional token for retrieving staged artifacts |
responseObserver |
StreamObserver<RunJobResponse> |
gRPC response callback |
Outputs
| Output | Type | Description |
|---|---|---|
response.jobId |
String |
Unique invocation ID for the running job |
| Side effect | Running JobInvocation |
Pipeline executing asynchronously in executor pool |
| Side effect | Preparation removed | The preparation record is removed from the map after successful run |
Run Method Walkthrough
Step 1: Retrieve Preparation (L222-231)
String preparationId = request.getPreparationId();
JobPreparation preparation = preparations.get(preparationId);
if (preparation == null) {
String errMessage = String.format("Unknown Preparation Id \"%s\".", preparationId);
StatusException exception = Status.NOT_FOUND.withDescription(errMessage).asException();
responseObserver.onError(exception);
return;
}
Step 2: Validate Pipeline (L232-238)
try {
PipelineValidator.validate(preparation.pipeline());
} catch (Exception e) {
LOG.warn("Encountered Unexpected Exception during validation", e);
responseObserver.onError(
new StatusRuntimeException(Status.INVALID_ARGUMENT.withCause(e)));
return;
}
Step 3: Resolve Dependencies and Create Invocation (L241-246)
JobInvocation invocation =
invoker.invoke(
resolveDependencies(preparation.pipeline(), stagingSessionTokens.get(preparationId)),
preparation.options(),
request.getRetrievalToken());
String invocationId = invocation.getId();
The resolveDependencies() method at L299-334 replaces the original artifact references in the pipeline with the actual staged artifact locations from the staging service.
Step 4: Register State Listener for Cleanup (L248-265)
invocation.addStateListener(
event -> {
if (!JobInvocation.isTerminated(event.getState())) {
return;
}
String stagingSessionToken = stagingSessionTokens.get(preparationId);
stagingSessionTokens.remove(preparationId);
try {
if (cleanupJobFn != null) {
cleanupJobFn.accept(stagingSessionToken);
}
} catch (Exception e) {
LOG.warn("Failed to remove job staging directory for token {}.",
stagingSessionToken, e);
} finally {
onFinishedInvocationCleanup(invocationId);
}
});
The cleanup listener:
- Waits for a terminal state (DONE, FAILED, CANCELLED, DRAINED)
- Removes the staging session token
- Runs the cleanup function (e.g., deleting staging directories)
- Manages invocation history by calling
onFinishedInvocationCleanup()
Step 5: Start Execution (L267-268)
invocation.start();
invocations.put(invocationId, invocation);
Step 6: Clean Up Preparation and Respond (L270-274)
preparations.remove(preparationId);
RunJobResponse response = RunJobResponse.newBuilder().setJobId(invocationId).build();
responseObserver.onNext(response);
responseObserver.onCompleted();
The preparation is removed because it is now an active invocation. If the run fails, a new prepare is required.
JobInvocation.start() Detailed Walkthrough
The start() method at L90-157 orchestrates the asynchronous execution:
public synchronized void start() {
LOG.info("Starting job invocation {}", getId());
if (getState() != JobState.Enum.STOPPED) {
throw new IllegalStateException(
String.format("Job %s already running.", getId()));
}
setState(JobState.Enum.STARTING);
invocationFuture = executorService.submit(this::runPipeline);
setState(JobState.Enum.RUNNING);
Futures.addCallback(invocationFuture,
new FutureCallback<PortablePipelineResult>() {
@Override
public void onSuccess(PortablePipelineResult pipelineResult) {
if (pipelineResult != null) {
PipelineResult.State state = pipelineResult.getState();
if (state.isTerminal()) {
metrics = pipelineResult.portableMetrics();
} else {
resultHandle = pipelineResult;
}
switch (state) {
case DONE: setState(Enum.DONE); break;
case RUNNING: setState(Enum.RUNNING); break;
case CANCELLED: setState(Enum.CANCELLED); break;
case FAILED: setState(Enum.FAILED); break;
default: setState(JobState.Enum.UNSPECIFIED);
}
} else {
setState(JobState.Enum.UNSPECIFIED);
}
}
@Override
public void onFailure(Throwable throwable) {
if (throwable instanceof CancellationException) {
setState(JobState.Enum.CANCELLED);
return;
}
sendMessage(JobMessage.newBuilder()
.setMessageText(getStackTraceAsString(throwable))
.setImportance(JobMessage.MessageImportance.JOB_MESSAGE_DEBUG)
.build());
sendMessage(JobMessage.newBuilder()
.setMessageText(getRootCause(throwable).toString())
.setImportance(JobMessage.MessageImportance.JOB_MESSAGE_ERROR)
.build());
setState(JobState.Enum.FAILED);
}
}, executorService);
}
Invocation History Management
The onFinishedInvocationCleanup() method at L539-547 manages a bounded history of completed invocations:
private void onFinishedInvocationCleanup(String invocationId) {
completedInvocationsIds.addLast(invocationId);
while (completedInvocationsIds.size() > maxInvocationHistory) {
invocations.remove(completedInvocationsIds.removeFirst());
}
}
The default maximum history is 10 invocations (DEFAULT_MAX_INVOCATION_HISTORY).
Usage Examples
Client-Side Run Request
// From PortableRunner.run() at L208-219
RunJobRequest runJobRequest =
RunJobRequest.newBuilder()
.setPreparationId(prepareJobResponse.getPreparationId())
.build();
RunJobResponse runJobResponse = jobService.run(runJobRequest);
ByteString jobId = runJobResponse.getJobIdBytes();
Querying Job State After Submission
// The InMemoryJobService also supports state queries
GetJobStateRequest stateRequest =
GetJobStateRequest.newBuilder().setJobId(invocationId).build();
JobStateEvent stateEvent = jobServiceStub.getState(stateRequest);
Cancelling a Running Job
CancelJobRequest cancelRequest =
CancelJobRequest.newBuilder().setJobId(invocationId).build();
CancelJobResponse cancelResponse = jobServiceStub.cancel(cancelRequest);
Related Pages
- Principle:Apache_Beam_Job_Execution -- The principle this implementation realizes
- Implementation:Apache_Beam_InMemoryJobService_Prepare -- The prepare phase that precedes run
- Implementation:Apache_Beam_PortableRunner_Run -- The client-side runner that sends the run request
- Implementation:Apache_Beam_JobServicePipelineResult -- The result object used for monitoring after run
- Implementation:Apache_Beam_PortablePipelineJarCreator -- Alternative runner used by JobInvocation
- Environment:Apache_Beam_Portable_Runner_Environment -- Portability framework runtime with gRPC Job Service and Docker/loopback SDK harness.