Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Flink FileSink ForRowFormat

From Leeroopedia


Knowledge Sources
Domains Stream_Processing, File_IO
Last Updated 2026-02-09 00:00 GMT

Overview

Concrete tool for constructing file sink connectors with row-wise or bulk serialization formats provided by the Apache Flink connector-files module.

Description

The FileSink class provides two static factory methods that serve as entry points for building file sinks. forRowFormat creates a builder for row-wise output using an Encoder, while forBulkFormat creates a builder for bulk output using a BulkWriter.Factory. Both return typed builder instances that allow further configuration (bucket assigner, rolling policy, output file config) before calling .build() to produce an immutable FileSink instance.

Usage

Import these factory methods when setting up a Flink pipeline that writes data to a filesystem. Use forRowFormat for text-based outputs (CSV, JSON lines) and forBulkFormat for columnar formats (Parquet, ORC).

Code Reference

Source Location

  • Repository: Apache Flink
  • File: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java
  • Lines: L203-212

Signature

public static <IN> DefaultRowFormatBuilder<IN> forRowFormat(
        final Path basePath, final Encoder<IN> encoder) {
    return new DefaultRowFormatBuilder<>(basePath, encoder, new DateTimeBucketAssigner<>());
}

public static <IN> DefaultBulkFormatBuilder<IN> forBulkFormat(
        final Path basePath, final BulkWriter.Factory<IN> bulkWriterFactory) {
    return new DefaultBulkFormatBuilder<>(
            basePath, bulkWriterFactory, new DateTimeBucketAssigner<>());
}

Import

import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.api.common.serialization.BulkWriter;

I/O Contract

Inputs

Name Type Required Description
basePath Path Yes Base output directory for all buckets
encoder Encoder<IN> Yes (row) Row-wise serializer for individual records
bulkWriterFactory BulkWriter.Factory<IN> Yes (bulk) Factory for creating bulk writers (e.g., Parquet)

Outputs

Name Type Description
builder DefaultRowFormatBuilder<IN> or DefaultBulkFormatBuilder<IN> Configured builder for further customization before .build()
FileSink<IN> FileSink<IN> Final immutable sink (after .build())

Usage Examples

Row Format (CSV/Text)

import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;

// Create a row-format file sink writing strings as UTF-8
FileSink<String> sink = FileSink
    .forRowFormat(new Path("/output/path"), new SimpleStringEncoder<String>("UTF-8"))
    .withRollingPolicy(
        DefaultRollingPolicy.builder()
            .withMaxPartSize(MemorySize.ofMebiBytes(256))
            .withRolloverInterval(Duration.ofMinutes(5))
            .build())
    .build();

dataStream.sinkTo(sink);

Bulk Format (Parquet)

import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.parquet.avro.AvroParquetWriters;

// Create a bulk-format file sink writing Avro records as Parquet
FileSink<GenericRecord> sink = FileSink
    .forBulkFormat(new Path("/output/parquet"), AvroParquetWriters.forGenericRecord(schema))
    .build();

dataStream.sinkTo(sink);

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment