Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Beam JobServiceGrpc Connection

From Leeroopedia


Property Value
Implementation Name JobServiceGrpc Connection (Wrapper Doc)
Category Distributed_Systems, Networking
Related Principle Principle:Apache_Beam_Job_Service_Connection
Source File runners/portability/java/src/main/java/org/apache/beam/runners/portability/PortableRunner.java
Language Java
Type Wrapper Doc (documents usage of external gRPC dependency: beam-vendor-grpc)
last_updated 2026-02-09 04:00 GMT

Overview

This is a Wrapper Doc for the gRPC ManagedChannel and JobServiceGrpc classes provided by the beam-vendor-grpc external dependency. It documents how Apache Beam establishes gRPC connections to the job service using ManagedChannelFactory and creates blocking stubs for synchronous RPC communication.

Code Reference

Source Location

Property Value
File runners/portability/java/src/main/java/org/apache/beam/runners/portability/PortableRunner.java
Connection Code Lines L174-177
Resource Wrapping Lines L178-179
Stub Transfer Line L222

Signature

The connection is established inline within PortableRunner.run(). There is no standalone connection class; the pattern is composed of gRPC primitives:

// Step 1: Create ManagedChannel from endpoint URL (L174-175)
ManagedChannel jobServiceChannel =
    channelFactory.forDescriptor(ApiServiceDescriptor.newBuilder().setUrl(endpoint).build());

// Step 2: Create blocking stub for synchronous RPCs (L177)
JobServiceBlockingStub jobService = JobServiceGrpc.newBlockingStub(jobServiceChannel);

// Step 3: Wrap in CloseableResource for lifecycle management (L178-179)
try (CloseableResource<JobServiceBlockingStub> wrappedJobService =
    CloseableResource.of(jobService, unused -> jobServiceChannel.shutdown())) {

    // Step 4: Use stub with deadline and wait-for-ready (L182-186)
    final int jobServerTimeout = options.as(PortablePipelineOptions.class).getJobServerTimeout();
    PrepareJobResponse prepareJobResponse =
        jobService.withDeadlineAfter(jobServerTimeout, TimeUnit.SECONDS)
            .withWaitForReady()
            .prepare(prepareJobRequest);

    // ... artifact staging and run ...

    // Step 5: Transfer ownership to result object (L222)
    return new JobServicePipelineResult(jobId, jobServerTimeout,
        wrappedJobService.transfer(), cleanup);
}

Key External Classes

Class Package Role
ManagedChannel org.apache.beam.vendor.grpc.v1p69p0.io.grpc gRPC channel managing the TCP connection to the job service
JobServiceGrpc org.apache.beam.model.jobmanagement.v1 Generated gRPC service class for the Beam Job API
JobServiceBlockingStub org.apache.beam.model.jobmanagement.v1.JobServiceGrpc Synchronous RPC stub providing blocking method calls
ManagedChannelFactory org.apache.beam.sdk.fn.channel Beam utility for creating channels from ApiServiceDescriptor
ApiServiceDescriptor org.apache.beam.model.pipeline.v1.Endpoints Protobuf message wrapping a service endpoint URL
CloseableResource org.apache.beam.runners.portability Beam utility for RAII-style resource management with ownership transfer

I/O Contract

Inputs

Input Type Source
Job service endpoint URL String PortablePipelineOptions.getJobEndpoint()
Channel factory ManagedChannelFactory Injected via PortableRunner.create() or default
Job server timeout int (seconds) PortablePipelineOptions.getJobServerTimeout()

Outputs

Output Type Description
Job service stub JobServiceBlockingStub Live gRPC stub connected to the remote job service
Wrapped resource CloseableResource<JobServiceBlockingStub> Auto-closeable wrapper that shuts down the channel on close

Connection Lifecycle

The connection has a well-defined lifecycle with ownership transfer:

Phase 1: Creation in PortableRunner.run()

// Channel is created and immediately used for prepare + staging + run
ManagedChannel jobServiceChannel =
    channelFactory.forDescriptor(ApiServiceDescriptor.newBuilder().setUrl(endpoint).build());
JobServiceBlockingStub jobService = JobServiceGrpc.newBlockingStub(jobServiceChannel);

Phase 2: Ownership Transfer

After the run RPC succeeds, ownership of the stub is transferred from PortableRunner to JobServicePipelineResult:

// wrappedJobService.transfer() moves ownership to the result object
return new JobServicePipelineResult(jobId, jobServerTimeout,
    wrappedJobService.transfer(), cleanup);

The transfer() method prevents the try-with-resources block from closing the channel, since JobServicePipelineResult needs the connection for ongoing state polling and metrics collection.

Phase 3: Cleanup in JobServicePipelineResult.close()

// JobServicePipelineResult.close() at L130-141
public void close() {
    try (CloseableResource<JobServiceBlockingStub> jobService = this.jobService) {
        // Fetch final metrics
        jobMetrics = jobService.get().getJobMetrics(metricsRequest).getMetrics();
        // Run cleanup (e.g., stop loopback worker)
        if (cleanup != null) { cleanup.run(); }
    }
    // Channel is shut down by CloseableResource close handler
}

Artifact Staging Channel

A separate gRPC channel is created for artifact staging, using the endpoint returned by the prepare response:

// L193-195: Separate channel for artifact staging
ApiServiceDescriptor artifactStagingEndpoint = prepareJobResponse.getArtifactStagingEndpoint();
try (CloseableResource<ManagedChannel> artifactChannel =
    CloseableResource.of(
        channelFactory.forDescriptor(artifactStagingEndpoint), ManagedChannel::shutdown)) {

    ArtifactStagingService.offer(
        new ArtifactRetrievalService(),
        ArtifactStagingServiceGrpc.newStub(artifactChannel.get()),
        stagingSessionToken);
}

This channel is short-lived and closed immediately after artifact staging completes.

Usage Examples

Configuring the Job Endpoint

// Via command-line arguments
PipelineOptions options = PipelineOptionsFactory.fromArgs(
    "--runner=PortableRunner",
    "--jobEndpoint=localhost:8099",
    "--jobServerTimeout=60"
).create();

// Programmatically
options.as(PortablePipelineOptions.class).setJobEndpoint("my-job-service:8099");

Testing with Custom Channel Factory

// For unit testing, inject a custom channel factory
ManagedChannelFactory mockFactory = mock(ManagedChannelFactory.class);
PortableRunner runner = PortableRunner.create(options, mockFactory);

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment