Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Beam Pipeline Create

From Leeroopedia


Field Value
Implementation Name Pipeline Create
Overview Concrete tool for constructing a Beam Pipeline object from PipelineOptions provided by the Beam SDK core library.
Type Wrapper Doc (SDK-level API, not runner-specific code)
Source sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
Implements Principle:Apache_Beam_Pipeline_Construction
last_updated 2026-02-09 04:00 GMT

Code Reference

Source Location

File Lines Description
sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java L149-163 Pipeline.create() factory methods
sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java L179-196 Pipeline.apply() root transform application
sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java L216-219 Pipeline.replaceAll() transform override application
sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java L130 Class declaration: public class Pipeline

Signature

// Factory method: create Pipeline with default PipelineOptions
public static Pipeline create()

// Factory method: create Pipeline with explicit PipelineOptions
public static Pipeline create(PipelineOptions options)

// Apply a root PTransform to the Pipeline
public <OutputT extends POutput> OutputT apply(PTransform<? super PBegin, OutputT> root)

// Apply a named root PTransform to the Pipeline
public <OutputT extends POutput> OutputT apply(
    String name, PTransform<? super PBegin, OutputT> root)

// Replace transforms matching overrides (used internally by runners)
@Internal
public void replaceAll(List<PTransformOverride> overrides)

// Traverse the pipeline's transform hierarchy topologically
public void traverseTopologically(PipelineVisitor visitor)

// Run the pipeline using the configured runner
public PipelineResult run()
public PipelineResult run(PipelineOptions options)

Import

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;

I/O Contract

Inputs

Parameter Type Description
options PipelineOptions (Optional) Configuration object specifying the runner class, parallelism, enforcement flags (e.g., DirectOptions.isEnforceImmutability()), and other runner-specific settings. If omitted, PipelineOptionsFactory.create() provides defaults.

Outputs

Output Type Description
Pipeline object Pipeline A fully constructed pipeline containing a TransformHierarchy with all user-defined transforms applied. Ready to be submitted to any runner via Pipeline.run().

Usage Examples

Basic Pipeline Construction

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.KV;

// Create pipeline with options from command-line arguments
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
Pipeline p = Pipeline.create(options);

// Build the DAG
PCollection<String> lines = p.apply("ReadInput", TextIO.read().from("gs://bucket/input.txt"));
PCollection<String> words = lines.apply("ExtractWords", ParDo.of(new ExtractWordsFn()));
PCollection<KV<String, Long>> counts = words.apply("CountWords", Count.perElement());
counts.apply("WriteOutput", TextIO.write().to("gs://bucket/output"));

// Execute the pipeline
p.run().waitUntilFinish();

Pipeline with Default Options (Direct Runner)

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptors;

// Create pipeline with default options (uses DirectRunner)
Pipeline p = Pipeline.create();

p.apply(Create.of("hello", "world", "beam"))
 .apply(MapElements.into(TypeDescriptors.strings())
     .via(word -> word.toUpperCase()))
 .apply(ParDo.of(new PrintFn()));

p.run().waitUntilFinish();

Pipeline with Multiple Roots

Pipeline p = Pipeline.create(options);

// Multiple root transforms
PCollection<String> source1 = p.apply("ReadSource1", TextIO.read().from("file1.txt"));
PCollection<String> source2 = p.apply("ReadSource2", TextIO.read().from("file2.txt"));
PCollection<String> source3 = p.apply("CreateInMemory",
    Create.of("inline", "data").withCoder(StringUtf8Coder.of()));

// Merge and process
PCollection<String> merged = PCollectionList.of(source1).and(source2).and(source3)
    .apply(Flatten.pCollections());

merged.apply("Process", ParDo.of(new ProcessFn()));
p.run();

Key Internal Details

The Pipeline.create(PipelineOptions options) method at line L156 performs a subtle but important step: it calls PipelineRunner.fromOptions(options) before constructing the Pipeline. This validates that the specified runner class can be instantiated from the given options, catching configuration errors early.

The internal TransformHierarchy manages the DAG structure. Each call to apply() invokes TransformHierarchy.pushNode() to register the transform, then calls PTransform.expand() to produce outputs, and finally calls TransformHierarchy.setOutput() to record the resulting PCollections.

Related Pages

Sources

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment