Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Beam InMemoryJobService Run

From Leeroopedia


Property Value
Implementation Name InMemoryJobService Run
Category Pipeline_Orchestration, Job_Management
Related Principle Principle:Apache_Beam_Job_Execution
Source File runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/InMemoryJobService.java
Also JobInvocation.java, JobInvoker.java
Language Java
last_updated 2026-02-09 04:00 GMT

Overview

InMemoryJobService.run() is the concrete tool for running a prepared pipeline job via the in-memory job service. It retrieves the preparation from the prepare phase, validates the pipeline, creates a JobInvocation via the JobInvoker, starts execution, and returns a job ID to the client.

Code Reference

Source Location

Property Value
InMemoryJobService runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/InMemoryJobService.java
Run Method Lines L219-284
JobInvocation runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/JobInvocation.java, L51-271
JobInvoker runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/JobInvoker.java, L34-60

Run Method Signature

@Override
public void run(
    RunJobRequest request,
    StreamObserver<RunJobResponse> responseObserver)   // L219-284

JobInvocation Class

public class JobInvocation {

    // Constructor (L67-83)
    public JobInvocation(
        JobInfo jobInfo,
        ListeningExecutorService executorService,
        Pipeline pipeline,
        PortablePipelineRunner pipelineRunner);

    // Lifecycle methods
    public synchronized void start();                   // L90-157
    public synchronized void cancel();                  // L165-191
    public String getId();                              // L160-162
    public JobState.Enum getState();                    // L201-203
    public JobStateEvent getStateEvent();               // L206-208
    public RunnerApi.Pipeline getPipeline();             // L211-213
    public JobApi.MetricResults getMetrics();             // L193-198

    // Observer methods
    public synchronized void addStateListener(
        Consumer<JobStateEvent> stateStreamObserver);   // L216-221
    public synchronized void addMessageListener(
        Consumer<JobMessage> messageStreamObserver);    // L224-229

    // Terminal state check
    public static Boolean isTerminated(Enum state);     // L260-270
}

JobInvoker Abstract Class

public abstract class JobInvoker {

    // Factory method for creating invocations
    JobInvocation invoke(
        RunnerApi.Pipeline pipeline,
        Struct options,
        @Nullable String retrievalToken) throws IOException;       // L46-49

    // Abstract method implemented by runner-specific invokers
    protected abstract JobInvocation invokeWithExecutor(
        RunnerApi.Pipeline pipeline,
        Struct options,
        @Nullable String retrievalToken,
        ListeningExecutorService executorService) throws IOException; // L39-44

    // Constructor creates a cached thread pool executor
    protected JobInvoker(String name);                                // L57-59
}

I/O Contract

Inputs

Input Type Description
request.preparationId String The ID returned by the prepare RPC
request.retrievalToken String Optional token for retrieving staged artifacts
responseObserver StreamObserver<RunJobResponse> gRPC response callback

Outputs

Output Type Description
response.jobId String Unique invocation ID for the running job
Side effect Running JobInvocation Pipeline executing asynchronously in executor pool
Side effect Preparation removed The preparation record is removed from the map after successful run

Run Method Walkthrough

Step 1: Retrieve Preparation (L222-231)

String preparationId = request.getPreparationId();
JobPreparation preparation = preparations.get(preparationId);
if (preparation == null) {
    String errMessage = String.format("Unknown Preparation Id \"%s\".", preparationId);
    StatusException exception = Status.NOT_FOUND.withDescription(errMessage).asException();
    responseObserver.onError(exception);
    return;
}

Step 2: Validate Pipeline (L232-238)

try {
    PipelineValidator.validate(preparation.pipeline());
} catch (Exception e) {
    LOG.warn("Encountered Unexpected Exception during validation", e);
    responseObserver.onError(
        new StatusRuntimeException(Status.INVALID_ARGUMENT.withCause(e)));
    return;
}

Step 3: Resolve Dependencies and Create Invocation (L241-246)

JobInvocation invocation =
    invoker.invoke(
        resolveDependencies(preparation.pipeline(), stagingSessionTokens.get(preparationId)),
        preparation.options(),
        request.getRetrievalToken());
String invocationId = invocation.getId();

The resolveDependencies() method at L299-334 replaces the original artifact references in the pipeline with the actual staged artifact locations from the staging service.

Step 4: Register State Listener for Cleanup (L248-265)

invocation.addStateListener(
    event -> {
        if (!JobInvocation.isTerminated(event.getState())) {
            return;
        }
        String stagingSessionToken = stagingSessionTokens.get(preparationId);
        stagingSessionTokens.remove(preparationId);
        try {
            if (cleanupJobFn != null) {
                cleanupJobFn.accept(stagingSessionToken);
            }
        } catch (Exception e) {
            LOG.warn("Failed to remove job staging directory for token {}.",
                     stagingSessionToken, e);
        } finally {
            onFinishedInvocationCleanup(invocationId);
        }
    });

The cleanup listener:

  • Waits for a terminal state (DONE, FAILED, CANCELLED, DRAINED)
  • Removes the staging session token
  • Runs the cleanup function (e.g., deleting staging directories)
  • Manages invocation history by calling onFinishedInvocationCleanup()

Step 5: Start Execution (L267-268)

invocation.start();
invocations.put(invocationId, invocation);

Step 6: Clean Up Preparation and Respond (L270-274)

preparations.remove(preparationId);
RunJobResponse response = RunJobResponse.newBuilder().setJobId(invocationId).build();
responseObserver.onNext(response);
responseObserver.onCompleted();

The preparation is removed because it is now an active invocation. If the run fails, a new prepare is required.

JobInvocation.start() Detailed Walkthrough

The start() method at L90-157 orchestrates the asynchronous execution:

public synchronized void start() {
    LOG.info("Starting job invocation {}", getId());
    if (getState() != JobState.Enum.STOPPED) {
        throw new IllegalStateException(
            String.format("Job %s already running.", getId()));
    }
    setState(JobState.Enum.STARTING);
    invocationFuture = executorService.submit(this::runPipeline);
    setState(JobState.Enum.RUNNING);

    Futures.addCallback(invocationFuture,
        new FutureCallback<PortablePipelineResult>() {
            @Override
            public void onSuccess(PortablePipelineResult pipelineResult) {
                if (pipelineResult != null) {
                    PipelineResult.State state = pipelineResult.getState();
                    if (state.isTerminal()) {
                        metrics = pipelineResult.portableMetrics();
                    } else {
                        resultHandle = pipelineResult;
                    }
                    switch (state) {
                        case DONE:      setState(Enum.DONE); break;
                        case RUNNING:   setState(Enum.RUNNING); break;
                        case CANCELLED: setState(Enum.CANCELLED); break;
                        case FAILED:    setState(Enum.FAILED); break;
                        default:        setState(JobState.Enum.UNSPECIFIED);
                    }
                } else {
                    setState(JobState.Enum.UNSPECIFIED);
                }
            }

            @Override
            public void onFailure(Throwable throwable) {
                if (throwable instanceof CancellationException) {
                    setState(JobState.Enum.CANCELLED);
                    return;
                }
                sendMessage(JobMessage.newBuilder()
                    .setMessageText(getStackTraceAsString(throwable))
                    .setImportance(JobMessage.MessageImportance.JOB_MESSAGE_DEBUG)
                    .build());
                sendMessage(JobMessage.newBuilder()
                    .setMessageText(getRootCause(throwable).toString())
                    .setImportance(JobMessage.MessageImportance.JOB_MESSAGE_ERROR)
                    .build());
                setState(JobState.Enum.FAILED);
            }
        }, executorService);
}

Invocation History Management

The onFinishedInvocationCleanup() method at L539-547 manages a bounded history of completed invocations:

private void onFinishedInvocationCleanup(String invocationId) {
    completedInvocationsIds.addLast(invocationId);
    while (completedInvocationsIds.size() > maxInvocationHistory) {
        invocations.remove(completedInvocationsIds.removeFirst());
    }
}

The default maximum history is 10 invocations (DEFAULT_MAX_INVOCATION_HISTORY).

Usage Examples

Client-Side Run Request

// From PortableRunner.run() at L208-219
RunJobRequest runJobRequest =
    RunJobRequest.newBuilder()
        .setPreparationId(prepareJobResponse.getPreparationId())
        .build();

RunJobResponse runJobResponse = jobService.run(runJobRequest);
ByteString jobId = runJobResponse.getJobIdBytes();

Querying Job State After Submission

// The InMemoryJobService also supports state queries
GetJobStateRequest stateRequest =
    GetJobStateRequest.newBuilder().setJobId(invocationId).build();
JobStateEvent stateEvent = jobServiceStub.getState(stateRequest);

Cancelling a Running Job

CancelJobRequest cancelRequest =
    CancelJobRequest.newBuilder().setJobId(invocationId).build();
CancelJobResponse cancelResponse = jobServiceStub.cancel(cancelRequest);

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment