Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Heibaiying BigData Notes StreamExecutionEnvironment Execute

From Leeroopedia


Knowledge Sources
Domains Stream_Processing, Big_Data
Last Updated 2026-02-10 10:00 GMT

Overview

Concrete tool for triggering execution of a fully constructed Flink streaming pipeline provided by the Apache Flink Streaming API.

Description

The StreamExecutionEnvironment.execute() method is the final call in a Flink streaming application that transitions the program from the DAG construction phase to the execution phase. It compiles the logical stream graph into an optimized job graph, submits it to the Flink runtime (local mini-cluster or remote JobManager), and blocks until the job completes or fails. The method returns a JobExecutionResult object containing execution metrics.

In the BigData-Notes repository, the KafkaStreamingJob class calls env.execute("Flink Kafka Streaming") as its final statement, triggering the entire Kafka-source-to-sink pipeline.

Usage

Call execute() exactly once at the end of your Flink streaming application, after all sources, transformations, and sinks have been registered with the environment. Provide a descriptive job name string to identify the job in the Flink Web UI, logs, and metrics. For unbounded streaming jobs, the execute() call blocks indefinitely until the job is cancelled or encounters a fatal error.

Code Reference

Source Location

  • Repository: BigData-Notes
  • File: code/Flink/flink-kafka-integration/src/main/java/com/heibaiying/KafkaStreamingJob.java
  • Line: 43

Signature

public JobExecutionResult execute(String jobName) throws Exception

Import

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.common.JobExecutionResult;

I/O Contract

Inputs

Name Type Required Description
jobName String No A human-readable name for the job. Displayed in the Flink Web UI, logs, and monitoring systems. If omitted, a default name is generated.

Outputs

Name Type Description
result JobExecutionResult Contains execution metadata including the job ID, net runtime duration, and values of accumulators registered during execution. For streaming jobs, this is returned only after the job terminates.

Usage Examples

Basic Usage

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import java.util.Properties;

public class KafkaStreamingJob {

    public static void main(String[] args) throws Exception {
        // Phase 1: Set up environment
        final StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment();

        // Phase 2: Define sources
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "flink-group");

        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
            "input-topic",
            new SimpleStringSchema(),
            properties
        );

        // Phase 3: Build the pipeline
        env.addSource(kafkaConsumer)
           .map(String::toUpperCase)
           .print();  // Simple console sink for demonstration

        // Phase 4: Trigger execution
        // This call blocks until the streaming job is cancelled or fails
        env.execute("Flink Kafka Streaming");
    }
}

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment