Implementation:Apache Beam Pipeline Create
Appearance
| Field | Value |
|---|---|
| Implementation Name | Pipeline Create |
| Overview | Concrete tool for constructing a Beam Pipeline object from PipelineOptions provided by the Beam SDK core library. |
| Type | Wrapper Doc (SDK-level API, not runner-specific code) |
| Source | sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
|
| Implements | Principle:Apache_Beam_Pipeline_Construction |
| last_updated | 2026-02-09 04:00 GMT |
Code Reference
Source Location
| File | Lines | Description |
|---|---|---|
sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java |
L149-163 | Pipeline.create() factory methods
|
sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java |
L179-196 | Pipeline.apply() root transform application
|
sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java |
L216-219 | Pipeline.replaceAll() transform override application
|
sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java |
L130 | Class declaration: public class Pipeline
|
Signature
// Factory method: create Pipeline with default PipelineOptions
public static Pipeline create()
// Factory method: create Pipeline with explicit PipelineOptions
public static Pipeline create(PipelineOptions options)
// Apply a root PTransform to the Pipeline
public <OutputT extends POutput> OutputT apply(PTransform<? super PBegin, OutputT> root)
// Apply a named root PTransform to the Pipeline
public <OutputT extends POutput> OutputT apply(
String name, PTransform<? super PBegin, OutputT> root)
// Replace transforms matching overrides (used internally by runners)
@Internal
public void replaceAll(List<PTransformOverride> overrides)
// Traverse the pipeline's transform hierarchy topologically
public void traverseTopologically(PipelineVisitor visitor)
// Run the pipeline using the configured runner
public PipelineResult run()
public PipelineResult run(PipelineOptions options)
Import
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
I/O Contract
Inputs
| Parameter | Type | Description |
|---|---|---|
options |
PipelineOptions |
(Optional) Configuration object specifying the runner class, parallelism, enforcement flags (e.g., DirectOptions.isEnforceImmutability()), and other runner-specific settings. If omitted, PipelineOptionsFactory.create() provides defaults.
|
Outputs
| Output | Type | Description |
|---|---|---|
| Pipeline object | Pipeline |
A fully constructed pipeline containing a TransformHierarchy with all user-defined transforms applied. Ready to be submitted to any runner via Pipeline.run().
|
Usage Examples
Basic Pipeline Construction
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.KV;
// Create pipeline with options from command-line arguments
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
Pipeline p = Pipeline.create(options);
// Build the DAG
PCollection<String> lines = p.apply("ReadInput", TextIO.read().from("gs://bucket/input.txt"));
PCollection<String> words = lines.apply("ExtractWords", ParDo.of(new ExtractWordsFn()));
PCollection<KV<String, Long>> counts = words.apply("CountWords", Count.perElement());
counts.apply("WriteOutput", TextIO.write().to("gs://bucket/output"));
// Execute the pipeline
p.run().waitUntilFinish();
Pipeline with Default Options (Direct Runner)
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptors;
// Create pipeline with default options (uses DirectRunner)
Pipeline p = Pipeline.create();
p.apply(Create.of("hello", "world", "beam"))
.apply(MapElements.into(TypeDescriptors.strings())
.via(word -> word.toUpperCase()))
.apply(ParDo.of(new PrintFn()));
p.run().waitUntilFinish();
Pipeline with Multiple Roots
Pipeline p = Pipeline.create(options);
// Multiple root transforms
PCollection<String> source1 = p.apply("ReadSource1", TextIO.read().from("file1.txt"));
PCollection<String> source2 = p.apply("ReadSource2", TextIO.read().from("file2.txt"));
PCollection<String> source3 = p.apply("CreateInMemory",
Create.of("inline", "data").withCoder(StringUtf8Coder.of()));
// Merge and process
PCollection<String> merged = PCollectionList.of(source1).and(source2).and(source3)
.apply(Flatten.pCollections());
merged.apply("Process", ParDo.of(new ProcessFn()));
p.run();
Key Internal Details
The Pipeline.create(PipelineOptions options) method at line L156 performs a subtle but important step: it calls PipelineRunner.fromOptions(options) before constructing the Pipeline. This validates that the specified runner class can be instantiated from the given options, catching configuration errors early.
The internal TransformHierarchy manages the DAG structure. Each call to apply() invokes TransformHierarchy.pushNode() to register the transform, then calls PTransform.expand() to produce outputs, and finally calls TransformHierarchy.setOutput() to record the resulting PCollections.
Related Pages
- Principle:Apache_Beam_Pipeline_Construction -- The principle describing pipeline construction via PTransform composition into a DAG.
- Environment:Apache_Beam_Java_Build_Environment -- Java build environment with JDK 8+, Gradle, and multi-language toolchain.
Sources
- Repo -- Apache Beam -- Source repository.
- Doc -- Beam Pipeline API: Creating a Pipeline -- Official documentation on pipeline creation.
Page Connections
Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment