Implementation:Heibaiying BigData Notes FlinkToMySQLSink Implementation
| Knowledge Sources | |
|---|---|
| Domains | Stream_Processing, Big_Data |
| Last Updated | 2026-02-10 10:00 GMT |
Overview
Concrete tool for writing stream processing results to a MySQL database provided by a custom RichSinkFunction implementation in the BigData-Notes repository.
Description
The FlinkToMySQLSink class extends Flink's RichSinkFunction<Employee> to implement a custom JDBC sink that writes Employee records to a MySQL database. It follows the open-invoke-close lifecycle pattern:
- open(): Establishes a JDBC connection to MySQL and prepares a reusable PreparedStatement for INSERT operations.
- invoke(): For each incoming Employee record, sets the statement parameters (name, age) and executes the INSERT.
- close(): Closes the PreparedStatement and the JDBC Connection to release database resources.
This implementation demonstrates the fundamental pattern for building custom database sinks in Flink, applicable to any JDBC-compatible database.
Usage
Use FlinkToMySQLSink (or a similarly structured custom sink) when your streaming pipeline needs to persist results directly to a relational database. Attach it to a DataStream using stream.addSink(new FlinkToMySQLSink()). Ensure that the MySQL JDBC driver is available on the classpath and that the target database and table exist.
Code Reference
Source Location
- Repository: BigData-Notes
- File:
code/Flink/flink-kafka-integration/src/main/java/com/heibaiying/sink/FlinkToMySQLSink.java - Lines: 11-43
Signature
public class FlinkToMySQLSink extends RichSinkFunction<Employee> {
// Lifecycle: initialize JDBC connection
@Override
public void open(Configuration parameters) throws Exception
// Core: write each Employee record to MySQL
@Override
public void invoke(Employee employee, Context context) throws Exception
// Lifecycle: release JDBC resources
@Override
public void close() throws Exception
}
Import
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| employee | Employee | Yes | An Employee object containing the data to be written. The invoke() method extracts name and age fields from this object. |
| context | SinkFunction.Context | Yes | Flink-provided context with metadata such as the current processing time, watermark, and timestamp of the element. Provided automatically by the runtime. |
| parameters | Configuration | No | Flink configuration passed to open(). Can be used to externalize JDBC connection parameters. |
Outputs
| Name | Type | Description |
|---|---|---|
| MySQL row | Database record | An INSERT statement is executed against the configured MySQL table for each incoming Employee. The sink does not produce a Flink DataStream output (it is a terminal operator). |
Usage Examples
Basic Usage
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
public class FlinkToMySQLSink extends RichSinkFunction<Employee> {
private PreparedStatement ps;
private Connection connection;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// Establish JDBC connection
connection = DriverManager.getConnection(
"jdbc:mysql://localhost:3306/employees",
"root",
"password"
);
ps = connection.prepareStatement(
"INSERT INTO emp (name, age) VALUES (?, ?)"
);
}
@Override
public void invoke(Employee employee, Context context) throws Exception {
ps.setString(1, employee.getName());
ps.setInt(2, employee.getAge());
ps.executeUpdate();
}
@Override
public void close() throws Exception {
super.close();
if (ps != null) {
ps.close();
}
if (connection != null) {
connection.close();
}
}
}
// Attaching the sink to a stream:
// DataStream<Employee> employeeStream = ...;
// employeeStream.addSink(new FlinkToMySQLSink());