Implementation:Apache Beam JobServiceGrpc Connection
| Property | Value |
|---|---|
| Implementation Name | JobServiceGrpc Connection (Wrapper Doc) |
| Category | Distributed_Systems, Networking |
| Related Principle | Principle:Apache_Beam_Job_Service_Connection |
| Source File | runners/portability/java/src/main/java/org/apache/beam/runners/portability/PortableRunner.java |
| Language | Java |
| Type | Wrapper Doc (documents usage of external gRPC dependency: beam-vendor-grpc)
|
| last_updated | 2026-02-09 04:00 GMT |
Overview
This is a Wrapper Doc for the gRPC ManagedChannel and JobServiceGrpc classes provided by the beam-vendor-grpc external dependency. It documents how Apache Beam establishes gRPC connections to the job service using ManagedChannelFactory and creates blocking stubs for synchronous RPC communication.
Code Reference
Source Location
| Property | Value |
|---|---|
| File | runners/portability/java/src/main/java/org/apache/beam/runners/portability/PortableRunner.java
|
| Connection Code | Lines L174-177 |
| Resource Wrapping | Lines L178-179 |
| Stub Transfer | Line L222 |
Signature
The connection is established inline within PortableRunner.run(). There is no standalone connection class; the pattern is composed of gRPC primitives:
// Step 1: Create ManagedChannel from endpoint URL (L174-175)
ManagedChannel jobServiceChannel =
channelFactory.forDescriptor(ApiServiceDescriptor.newBuilder().setUrl(endpoint).build());
// Step 2: Create blocking stub for synchronous RPCs (L177)
JobServiceBlockingStub jobService = JobServiceGrpc.newBlockingStub(jobServiceChannel);
// Step 3: Wrap in CloseableResource for lifecycle management (L178-179)
try (CloseableResource<JobServiceBlockingStub> wrappedJobService =
CloseableResource.of(jobService, unused -> jobServiceChannel.shutdown())) {
// Step 4: Use stub with deadline and wait-for-ready (L182-186)
final int jobServerTimeout = options.as(PortablePipelineOptions.class).getJobServerTimeout();
PrepareJobResponse prepareJobResponse =
jobService.withDeadlineAfter(jobServerTimeout, TimeUnit.SECONDS)
.withWaitForReady()
.prepare(prepareJobRequest);
// ... artifact staging and run ...
// Step 5: Transfer ownership to result object (L222)
return new JobServicePipelineResult(jobId, jobServerTimeout,
wrappedJobService.transfer(), cleanup);
}
Key External Classes
| Class | Package | Role |
|---|---|---|
ManagedChannel |
org.apache.beam.vendor.grpc.v1p69p0.io.grpc |
gRPC channel managing the TCP connection to the job service |
JobServiceGrpc |
org.apache.beam.model.jobmanagement.v1 |
Generated gRPC service class for the Beam Job API |
JobServiceBlockingStub |
org.apache.beam.model.jobmanagement.v1.JobServiceGrpc |
Synchronous RPC stub providing blocking method calls |
ManagedChannelFactory |
org.apache.beam.sdk.fn.channel |
Beam utility for creating channels from ApiServiceDescriptor
|
ApiServiceDescriptor |
org.apache.beam.model.pipeline.v1.Endpoints |
Protobuf message wrapping a service endpoint URL |
CloseableResource |
org.apache.beam.runners.portability |
Beam utility for RAII-style resource management with ownership transfer |
I/O Contract
Inputs
| Input | Type | Source |
|---|---|---|
| Job service endpoint URL | String |
PortablePipelineOptions.getJobEndpoint()
|
| Channel factory | ManagedChannelFactory |
Injected via PortableRunner.create() or default
|
| Job server timeout | int (seconds) |
PortablePipelineOptions.getJobServerTimeout()
|
Outputs
| Output | Type | Description |
|---|---|---|
| Job service stub | JobServiceBlockingStub |
Live gRPC stub connected to the remote job service |
| Wrapped resource | CloseableResource<JobServiceBlockingStub> |
Auto-closeable wrapper that shuts down the channel on close |
Connection Lifecycle
The connection has a well-defined lifecycle with ownership transfer:
Phase 1: Creation in PortableRunner.run()
// Channel is created and immediately used for prepare + staging + run
ManagedChannel jobServiceChannel =
channelFactory.forDescriptor(ApiServiceDescriptor.newBuilder().setUrl(endpoint).build());
JobServiceBlockingStub jobService = JobServiceGrpc.newBlockingStub(jobServiceChannel);
Phase 2: Ownership Transfer
After the run RPC succeeds, ownership of the stub is transferred from PortableRunner to JobServicePipelineResult:
// wrappedJobService.transfer() moves ownership to the result object
return new JobServicePipelineResult(jobId, jobServerTimeout,
wrappedJobService.transfer(), cleanup);
The transfer() method prevents the try-with-resources block from closing the channel, since JobServicePipelineResult needs the connection for ongoing state polling and metrics collection.
Phase 3: Cleanup in JobServicePipelineResult.close()
// JobServicePipelineResult.close() at L130-141
public void close() {
try (CloseableResource<JobServiceBlockingStub> jobService = this.jobService) {
// Fetch final metrics
jobMetrics = jobService.get().getJobMetrics(metricsRequest).getMetrics();
// Run cleanup (e.g., stop loopback worker)
if (cleanup != null) { cleanup.run(); }
}
// Channel is shut down by CloseableResource close handler
}
Artifact Staging Channel
A separate gRPC channel is created for artifact staging, using the endpoint returned by the prepare response:
// L193-195: Separate channel for artifact staging
ApiServiceDescriptor artifactStagingEndpoint = prepareJobResponse.getArtifactStagingEndpoint();
try (CloseableResource<ManagedChannel> artifactChannel =
CloseableResource.of(
channelFactory.forDescriptor(artifactStagingEndpoint), ManagedChannel::shutdown)) {
ArtifactStagingService.offer(
new ArtifactRetrievalService(),
ArtifactStagingServiceGrpc.newStub(artifactChannel.get()),
stagingSessionToken);
}
This channel is short-lived and closed immediately after artifact staging completes.
Usage Examples
Configuring the Job Endpoint
// Via command-line arguments
PipelineOptions options = PipelineOptionsFactory.fromArgs(
"--runner=PortableRunner",
"--jobEndpoint=localhost:8099",
"--jobServerTimeout=60"
).create();
// Programmatically
options.as(PortablePipelineOptions.class).setJobEndpoint("my-job-service:8099");
Testing with Custom Channel Factory
// For unit testing, inject a custom channel factory
ManagedChannelFactory mockFactory = mock(ManagedChannelFactory.class);
PortableRunner runner = PortableRunner.create(options, mockFactory);
Related Pages
- Principle:Apache_Beam_Job_Service_Connection -- The principle this implementation realizes
- Implementation:Apache_Beam_PortableRunner_Run -- The runner that creates and uses this connection
- Implementation:Apache_Beam_JobServicePipelineResult -- Receives transferred ownership for monitoring
- Implementation:Apache_Beam_InMemoryJobService_Prepare -- Server-side endpoint receiving prepare RPCs
- Environment:Apache_Beam_Portable_Runner_Environment -- Portability framework runtime with gRPC Job Service and Docker/loopback SDK harness.