Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Principle:Apache Beam Job Execution

From Leeroopedia


Property Value
Principle Name Job Execution
Category Pipeline_Orchestration, Job_Management
Related Implementation Implementation:Apache_Beam_InMemoryJobService_Run
Source Reference runners/java-job-service/.../InMemoryJobService.java L219-284
last_updated 2026-02-09 04:00 GMT

Overview

Job Execution is the process of starting a prepared pipeline job on the runner backend, managing its lifecycle from submission through completion. It is the second phase of the two-phase submission protocol, following job preparation and artifact staging.

Description

Job execution takes a prepared pipeline (from the preparation phase) and starts actual execution. The server performs several steps when a run request arrives:

  1. Retrieve the preparation -- The server looks up the JobPreparation record by the preparation ID from the run request.
  2. Validate the pipeline -- PipelineValidator.validate() checks the pipeline proto for structural correctness.
  3. Resolve staged artifacts -- Dependencies registered during preparation are resolved with the actual staged artifact locations.
  4. Create a JobInvocation -- The JobInvoker creates a JobInvocation object that wraps the execution.
  5. Register state listeners -- A cleanup listener is attached to handle staging token cleanup and invocation history management when the job terminates.
  6. Start execution -- invocation.start() submits the pipeline to a ListeningExecutorService for asynchronous execution.
  7. Return the job ID -- The server responds with a RunJobResponse containing the unique job invocation ID.

Job State Machine

The JobInvocation manages a well-defined state machine:

State Description Terminal?
STOPPED Initial state after construction No
STARTING Execution has been submitted to the executor No
RUNNING Pipeline is actively executing No
DONE Pipeline completed successfully Yes
FAILED Pipeline encountered an error Yes
CANCELLED Pipeline was cancelled by client or system Yes
DRAINED Streaming pipeline was drained Yes

State transitions are managed synchronously within JobInvocation, with observers notified after each transition:

// JobInvocation.start() at L90-157
public synchronized void start() {
    if (getState() != JobState.Enum.STOPPED) {
        throw new IllegalStateException(String.format("Job %s already running.", getId()));
    }
    setState(JobState.Enum.STARTING);
    invocationFuture = executorService.submit(this::runPipeline);
    setState(JobState.Enum.RUNNING);
    Futures.addCallback(invocationFuture, new FutureCallback<PortablePipelineResult>() {
        @Override
        public void onSuccess(PortablePipelineResult pipelineResult) {
            // Transition to DONE, FAILED, CANCELLED, etc.
        }
        @Override
        public void onFailure(Throwable throwable) {
            // Transition to FAILED or CANCELLED
        }
    }, executorService);
}

Observer Pattern

JobInvocation supports two types of observers:

  • State listeners -- Receive JobStateEvent notifications on every state transition. Replay all historical states to new listeners.
  • Message listeners -- Receive JobMessage notifications for log messages and errors. Replay all historical messages to new listeners.
// Adding observers
invocation.addStateListener(event -> {
    if (JobInvocation.isTerminated(event.getState())) {
        // Perform cleanup
    }
});

Usage

Job execution happens after job preparation and artifact staging. Understanding job execution helps with debugging startup failures and monitoring job state transitions.

Execution Flow

Step Component Description
1 PortableRunner Sends RunJobRequest with preparationId
2 InMemoryJobService Looks up preparation, validates pipeline
3 JobInvoker Creates JobInvocation with executor and runner
4 JobInvocation Transitions STOPPED -> STARTING -> RUNNING
5 PortablePipelineRunner Executes the pipeline (delegated)
6 JobInvocation Transitions to terminal state (DONE/FAILED/CANCELLED)
7 InMemoryJobService Cleans up preparation, manages invocation history

Cleanup and History Management

The InMemoryJobService manages a bounded history of completed invocations (default: 10). When a job terminates:

  • The staging session token is removed
  • The cleanup function runs (e.g., removing staging directories)
  • The invocation is added to the completed history deque
  • If the history exceeds maxInvocationHistory, the oldest completed invocation is removed

Theoretical Basis

Job Execution is based on the job lifecycle state machine pattern, which provides:

  • State Machine Formalism -- Jobs transition through well-defined states with deterministic transitions. The isTerminated() method provides a clear predicate for terminal states, enabling clean shutdown logic.
  • Observer Pattern -- State and message observers implement the classic GoF Observer pattern. The replay-on-subscribe behavior ensures that late-joining observers receive the full event history, preventing information loss.
  • Future-Based Concurrency -- The use of ListenableFuture and FutureCallback implements the asynchronous computation pattern, allowing the job service to handle multiple concurrent jobs without blocking.
  • Bounded History -- The ConcurrentLinkedDeque with a maximum size implements a bounded buffer pattern, preventing unbounded memory growth from accumulating completed job records.

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment