Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Beam Twister2Runner ZipDependencies

From Leeroopedia


Attribute Value
Implementation Name Twister2Runner ZipDependencies
Domain Packaging, HPC
Overview Concrete tool for packaging pipeline classpath dependencies into ZIP archives for Twister2 cluster distribution
Deprecation Notice The Twister2 Runner is deprecated and scheduled for removal in Apache Beam 3.0
last_updated 2026-02-09 04:00 GMT

Overview

Twister2Runner ZipDependencies documents the private methods within Twister2Runner that package Java classpath dependencies into a ZIP archive for distribution to Twister2 cluster worker nodes. The packaging consists of two steps: preparing the list of files to stage (via Beam's PipelineResources) and creating the ZIP archive with deduplication and Twister2 JAR filtering.

Note: The Twister2 Runner is deprecated and is scheduled for removal in Apache Beam 3.0. Users should plan migration to an actively maintained runner.

Code Reference

Source Location

File Lines Repository
runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2Runner.java L273-342 (zipFilesToStage), L195-251 (setupSystem) GitHub

Signature: prepareFilesToStage

/**
 * Classpath contains non jar files (eg. directories with .class files
 * or empty directories) will cause exception in running log.
 */
private void prepareFilesToStage(Twister2PipelineOptions options) {
    PipelineResources.prepareFilesForStaging(options);
}

Signature: zipFilesToStage

/**
 * Creates a single zip file from all the jar files that are listed
 * as files to stage in options.
 */
private void zipFilesToStage(Twister2PipelineOptions options) {
    File zipFile = null;
    Set<String> jarSet = new HashSet<>();
    List<String> filesToStage = options.getFilesToStage();
    List<String> trimmed = new ArrayList<>();

    // Remove twister2 jars from the list
    for (String file : filesToStage) {
        if (!file.contains("/org/twister2")) {
            trimmed.add(file);
        }
    }

    FileInputStream fis = null;
    try {
        zipFile = File.createTempFile("twister2-", ".zip");
        FileOutputStream fos = new FileOutputStream(zipFile);
        ZipOutputStream zipOut = new ZipOutputStream(fos);
        zipOut.putNextEntry(new ZipEntry("lib/"));

        for (String srcFile : trimmed) {
            File fileToZip = new File(srcFile);
            if (!jarSet.contains(fileToZip.getName())) {
                jarSet.add(fileToZip.getName());
            } else {
                continue; // skip duplicates
            }

            fis = new FileInputStream(fileToZip);
            ZipEntry zipEntry =
                new ZipEntry("lib/" + fileToZip.getName());
            zipOut.putNextEntry(zipEntry);

            byte[] bytes = new byte[1024];
            int length;
            while ((length = fis.read(bytes)) >= 0) {
                zipOut.write(bytes, 0, length);
            }
            fis.close();
        }
        zipOut.close();
        fos.close();
        zipFile.deleteOnExit();
    } catch (FileNotFoundException e) {
        LOG.info(e.getMessage());
    } catch (IOException e) {
        LOG.info(e.getMessage());
    } finally {
        if (fis != null) {
            try { fis.close(); } catch (IOException e) {
                LOG.info(e.getMessage());
            }
        }
    }
    if (zipFile != null) {
        options.setJobFileZip(zipFile.getPath());
    }
}

Signature: setupSystem

private void setupSystem(Twister2PipelineOptions options) {
    prepareFilesToStage(options);
    zipFilesToStage(options);
    System.setProperty("cluster_type", options.getClusterType());
    System.setProperty("job_file", options.getJobFileZip());
    System.setProperty("job_type", options.getJobType());

    if (isLocalMode(options)) {
        System.setProperty("twister2_home",
            System.getProperty("java.io.tmpdir"));
        System.setProperty("config_dir",
            System.getProperty("java.io.tmpdir") + "/conf/");
    } else {
        System.setProperty("twister2_home",
            options.getTwister2Home());
        System.setProperty("config_dir",
            options.getTwister2Home() + "/conf/");

        // Validate required config files exist
        File cDir = new File(
            System.getProperty("config_dir"),
            options.getClusterType());
        String[] filesList = new String[] {
            "core.yaml", "network.yaml", "data.yaml",
            "resource.yaml", "task.yaml",
        };
        for (String file : filesList) {
            File toCheck = new File(cDir, file);
            if (!toCheck.exists()) {
                throw new Twister2RuntimeException(
                    "Couldn't find " + file
                    + " in config directory specified.");
            }
        }
        // Setup logging from logger.properties
    }
}

Import Statements

These are private methods within Twister2Runner, so there is no separate import. The enclosing class import is:

import org.apache.beam.runners.twister2.Twister2Runner;

I/O Contract

Inputs

Input Type Description
Classpath JARs List<String> via options.getFilesToStage() All JAR files on the current JVM classpath (prepared by PipelineResources)
options Twister2PipelineOptions Configuration including cluster type, Twister2 home, job type

Outputs

Output Type Description
ZIP archive Temporary file on disk A ZIP file containing all non-Twister2 classpath JARs under a lib/ directory prefix
options.jobFileZip String (path) Path to the created ZIP archive, set on the options object
System properties JVM system properties cluster_type, job_file, job_type, twister2_home, config_dir

Packaging Details

ZIP Archive Structure

twister2-XXXXXXXXX.zip
  lib/
    beam-runners-twister2-X.X.X.jar
    beam-sdks-java-core-X.X.X.jar
    guava-XX.X.jar
    user-pipeline.jar
    ... (all non-Twister2 classpath JARs)

Filtering Logic

JARs containing /org/twister2 in their path are filtered out. This prevents bundling Twister2's own libraries, which are already available on the cluster via the Twister2 installation. This avoids:

  • Version conflicts between the bundled and cluster-installed Twister2 libraries
  • Unnecessary archive bloat
  • Classpath ordering issues

Deduplication Logic

JARs are deduplicated by file name (not full path). If two classpath entries have the same file name (e.g., guava-32.1.2-jre.jar appearing in multiple paths), only the first occurrence is included.

Usage Examples

Observing Packaging Behavior

Since these are private methods, they cannot be called directly. However, their behavior can be observed:

// The ZIP file path is stored in the options after setupSystem()
Twister2PipelineOptions options = PipelineOptionsFactory
    .as(Twister2PipelineOptions.class);
options.setClusterType("standalone");
options.setTwister2Home("/opt/twister2");

Pipeline pipeline = Pipeline.create(options);
pipeline.run(); // setupSystem() is called internally

// After run(), options.getJobFileZip() contains the ZIP path

Troubleshooting Missing Dependencies

If a class is not found on a worker node, verify:

  • The JAR containing the class is on the client's classpath
  • The JAR does not contain /org/twister2 in its path (which would filter it out)
  • The JAR name is not a duplicate of another JAR that was included first

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment