Implementation:Apache Beam Twister2Runner ZipDependencies
| Attribute | Value |
|---|---|
| Implementation Name | Twister2Runner ZipDependencies |
| Domain | Packaging, HPC |
| Overview | Concrete tool for packaging pipeline classpath dependencies into ZIP archives for Twister2 cluster distribution |
| Deprecation Notice | The Twister2 Runner is deprecated and scheduled for removal in Apache Beam 3.0 |
| last_updated | 2026-02-09 04:00 GMT |
Overview
Twister2Runner ZipDependencies documents the private methods within Twister2Runner that package Java classpath dependencies into a ZIP archive for distribution to Twister2 cluster worker nodes. The packaging consists of two steps: preparing the list of files to stage (via Beam's PipelineResources) and creating the ZIP archive with deduplication and Twister2 JAR filtering.
Note: The Twister2 Runner is deprecated and is scheduled for removal in Apache Beam 3.0. Users should plan migration to an actively maintained runner.
Code Reference
Source Location
| File | Lines | Repository |
|---|---|---|
runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2Runner.java |
L273-342 (zipFilesToStage), L195-251 (setupSystem) | GitHub |
Signature: prepareFilesToStage
/**
* Classpath contains non jar files (eg. directories with .class files
* or empty directories) will cause exception in running log.
*/
private void prepareFilesToStage(Twister2PipelineOptions options) {
PipelineResources.prepareFilesForStaging(options);
}
Signature: zipFilesToStage
/**
* Creates a single zip file from all the jar files that are listed
* as files to stage in options.
*/
private void zipFilesToStage(Twister2PipelineOptions options) {
File zipFile = null;
Set<String> jarSet = new HashSet<>();
List<String> filesToStage = options.getFilesToStage();
List<String> trimmed = new ArrayList<>();
// Remove twister2 jars from the list
for (String file : filesToStage) {
if (!file.contains("/org/twister2")) {
trimmed.add(file);
}
}
FileInputStream fis = null;
try {
zipFile = File.createTempFile("twister2-", ".zip");
FileOutputStream fos = new FileOutputStream(zipFile);
ZipOutputStream zipOut = new ZipOutputStream(fos);
zipOut.putNextEntry(new ZipEntry("lib/"));
for (String srcFile : trimmed) {
File fileToZip = new File(srcFile);
if (!jarSet.contains(fileToZip.getName())) {
jarSet.add(fileToZip.getName());
} else {
continue; // skip duplicates
}
fis = new FileInputStream(fileToZip);
ZipEntry zipEntry =
new ZipEntry("lib/" + fileToZip.getName());
zipOut.putNextEntry(zipEntry);
byte[] bytes = new byte[1024];
int length;
while ((length = fis.read(bytes)) >= 0) {
zipOut.write(bytes, 0, length);
}
fis.close();
}
zipOut.close();
fos.close();
zipFile.deleteOnExit();
} catch (FileNotFoundException e) {
LOG.info(e.getMessage());
} catch (IOException e) {
LOG.info(e.getMessage());
} finally {
if (fis != null) {
try { fis.close(); } catch (IOException e) {
LOG.info(e.getMessage());
}
}
}
if (zipFile != null) {
options.setJobFileZip(zipFile.getPath());
}
}
Signature: setupSystem
private void setupSystem(Twister2PipelineOptions options) {
prepareFilesToStage(options);
zipFilesToStage(options);
System.setProperty("cluster_type", options.getClusterType());
System.setProperty("job_file", options.getJobFileZip());
System.setProperty("job_type", options.getJobType());
if (isLocalMode(options)) {
System.setProperty("twister2_home",
System.getProperty("java.io.tmpdir"));
System.setProperty("config_dir",
System.getProperty("java.io.tmpdir") + "/conf/");
} else {
System.setProperty("twister2_home",
options.getTwister2Home());
System.setProperty("config_dir",
options.getTwister2Home() + "/conf/");
// Validate required config files exist
File cDir = new File(
System.getProperty("config_dir"),
options.getClusterType());
String[] filesList = new String[] {
"core.yaml", "network.yaml", "data.yaml",
"resource.yaml", "task.yaml",
};
for (String file : filesList) {
File toCheck = new File(cDir, file);
if (!toCheck.exists()) {
throw new Twister2RuntimeException(
"Couldn't find " + file
+ " in config directory specified.");
}
}
// Setup logging from logger.properties
}
}
Import Statements
These are private methods within Twister2Runner, so there is no separate import. The enclosing class import is:
import org.apache.beam.runners.twister2.Twister2Runner;
I/O Contract
Inputs
| Input | Type | Description |
|---|---|---|
| Classpath JARs | List<String> via options.getFilesToStage() |
All JAR files on the current JVM classpath (prepared by PipelineResources)
|
options |
Twister2PipelineOptions |
Configuration including cluster type, Twister2 home, job type |
Outputs
| Output | Type | Description |
|---|---|---|
| ZIP archive | Temporary file on disk | A ZIP file containing all non-Twister2 classpath JARs under a lib/ directory prefix
|
options.jobFileZip |
String (path) |
Path to the created ZIP archive, set on the options object |
| System properties | JVM system properties | cluster_type, job_file, job_type, twister2_home, config_dir
|
Packaging Details
ZIP Archive Structure
twister2-XXXXXXXXX.zip
lib/
beam-runners-twister2-X.X.X.jar
beam-sdks-java-core-X.X.X.jar
guava-XX.X.jar
user-pipeline.jar
... (all non-Twister2 classpath JARs)
Filtering Logic
JARs containing /org/twister2 in their path are filtered out. This prevents bundling Twister2's own libraries, which are already available on the cluster via the Twister2 installation. This avoids:
- Version conflicts between the bundled and cluster-installed Twister2 libraries
- Unnecessary archive bloat
- Classpath ordering issues
Deduplication Logic
JARs are deduplicated by file name (not full path). If two classpath entries have the same file name (e.g., guava-32.1.2-jre.jar appearing in multiple paths), only the first occurrence is included.
Usage Examples
Observing Packaging Behavior
Since these are private methods, they cannot be called directly. However, their behavior can be observed:
// The ZIP file path is stored in the options after setupSystem()
Twister2PipelineOptions options = PipelineOptionsFactory
.as(Twister2PipelineOptions.class);
options.setClusterType("standalone");
options.setTwister2Home("/opt/twister2");
Pipeline pipeline = Pipeline.create(options);
pipeline.run(); // setupSystem() is called internally
// After run(), options.getJobFileZip() contains the ZIP path
Troubleshooting Missing Dependencies
If a class is not found on a worker node, verify:
- The JAR containing the class is on the client's classpath
- The JAR does not contain
/org/twister2in its path (which would filter it out) - The JAR name is not a duplicate of another JAR that was included first
Related Pages
- Principle:Apache_Beam_Classpath_Packaging -- The classpath packaging principle this implementation fulfills
- Implementation:Apache_Beam_Twister2Runner_Run -- The runner's
run()method that invokessetupSystem() - Implementation:Apache_Beam_BeamBatchWorker_Execute -- The worker that uses the packaged dependencies
- Implementation:Apache_Beam_Twister2PipelineOptions -- Configuration options that control packaging parameters
- Heuristic:Apache_Beam_Warning_Deprecated_Twister2_Runner