Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Heibaiying BigData Notes FlinkToMySQLSink Implementation

From Leeroopedia


Knowledge Sources
Domains Stream_Processing, Big_Data
Last Updated 2026-02-10 10:00 GMT

Overview

Concrete tool for writing stream processing results to a MySQL database provided by a custom RichSinkFunction implementation in the BigData-Notes repository.

Description

The FlinkToMySQLSink class extends Flink's RichSinkFunction<Employee> to implement a custom JDBC sink that writes Employee records to a MySQL database. It follows the open-invoke-close lifecycle pattern:

  • open(): Establishes a JDBC connection to MySQL and prepares a reusable PreparedStatement for INSERT operations.
  • invoke(): For each incoming Employee record, sets the statement parameters (name, age) and executes the INSERT.
  • close(): Closes the PreparedStatement and the JDBC Connection to release database resources.

This implementation demonstrates the fundamental pattern for building custom database sinks in Flink, applicable to any JDBC-compatible database.

Usage

Use FlinkToMySQLSink (or a similarly structured custom sink) when your streaming pipeline needs to persist results directly to a relational database. Attach it to a DataStream using stream.addSink(new FlinkToMySQLSink()). Ensure that the MySQL JDBC driver is available on the classpath and that the target database and table exist.

Code Reference

Source Location

  • Repository: BigData-Notes
  • File: code/Flink/flink-kafka-integration/src/main/java/com/heibaiying/sink/FlinkToMySQLSink.java
  • Lines: 11-43

Signature

public class FlinkToMySQLSink extends RichSinkFunction<Employee> {

    // Lifecycle: initialize JDBC connection
    @Override
    public void open(Configuration parameters) throws Exception

    // Core: write each Employee record to MySQL
    @Override
    public void invoke(Employee employee, Context context) throws Exception

    // Lifecycle: release JDBC resources
    @Override
    public void close() throws Exception
}

Import

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

I/O Contract

Inputs

Name Type Required Description
employee Employee Yes An Employee object containing the data to be written. The invoke() method extracts name and age fields from this object.
context SinkFunction.Context Yes Flink-provided context with metadata such as the current processing time, watermark, and timestamp of the element. Provided automatically by the runtime.
parameters Configuration No Flink configuration passed to open(). Can be used to externalize JDBC connection parameters.

Outputs

Name Type Description
MySQL row Database record An INSERT statement is executed against the configured MySQL table for each incoming Employee. The sink does not produce a Flink DataStream output (it is a terminal operator).

Usage Examples

Basic Usage

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

public class FlinkToMySQLSink extends RichSinkFunction<Employee> {

    private PreparedStatement ps;
    private Connection connection;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        // Establish JDBC connection
        connection = DriverManager.getConnection(
            "jdbc:mysql://localhost:3306/employees",
            "root",
            "password"
        );
        ps = connection.prepareStatement(
            "INSERT INTO emp (name, age) VALUES (?, ?)"
        );
    }

    @Override
    public void invoke(Employee employee, Context context) throws Exception {
        ps.setString(1, employee.getName());
        ps.setInt(2, employee.getAge());
        ps.executeUpdate();
    }

    @Override
    public void close() throws Exception {
        super.close();
        if (ps != null) {
            ps.close();
        }
        if (connection != null) {
            connection.close();
        }
    }
}

// Attaching the sink to a stream:
// DataStream<Employee> employeeStream = ...;
// employeeStream.addSink(new FlinkToMySQLSink());

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment