Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Apache Flink WritableSerializer

From Leeroopedia
Revision as of 14:18, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/Apache_Flink_WritableSerializer.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Knowledge Sources
Domains Hadoop_Compatibility, Type_System
Last Updated 2026-02-09 00:00 GMT

Overview

WritableSerializer is a Flink TypeSerializer implementation that handles serialization, deserialization, and copying of Hadoop Writable types within Flink's data processing framework.

Description

WritableSerializer is an internal, final class that extends Flink's TypeSerializer<T>, where T extends Hadoop's Writable interface. It provides the serialization bridge between Hadoop's Writable protocol and Flink's internal data serialization system.

Key implementation details include:

  • Instance creation: The createInstance method handles NullWritable as a special case (using NullWritable.get()) and uses InstantiationUtil.instantiate for all other types.
  • Serialization: The serialize method delegates directly to Writable.write(DataOutput), writing the Writable's binary representation to a DataOutputView.
  • Deserialization: The deserialize method creates a new instance (or reuses one) and calls Writable.readFields(DataInput) to populate it from a DataInputView.
  • Copying: Deep copy operations use Kryo serialization with a DefaultInstantiatorStrategy and StdInstantiatorStrategy fallback. The KryoUtils.copy helper performs the actual copy. A stream-to-stream copy variant reads into a transient copyInstance and writes back out.
  • Type characteristics: The serializer reports isImmutableType as false and getLength as -1 (variable length), since Writable types are generally mutable and variable-sized.
  • Duplication: The duplicate method returns a new WritableSerializer instance to ensure thread safety in parallel execution.
  • Snapshot configuration: The snapshotConfiguration method returns a WritableSerializerSnapshot for serializer compatibility checks during state migration and savepoint restoration.
  • WritableSerializerSnapshot: A nested static final class extending GenericTypeSerializerSnapshot that supports serializer versioning and compatibility verification.

Usage

Use WritableSerializer when you need to serialize Hadoop Writable types within Flink. This serializer is typically not instantiated directly by users; it is created automatically by WritableTypeInfo.createSerializer. It is used internally by Flink's runtime for:

  • Serializing and deserializing data records during network shuffles between tasks.
  • Writing and reading data to/from managed state backends.
  • Performing deep copies of mutable Writable objects during processing.

Code Reference

Source Location

  • Repository: Apache_Flink
  • File: flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
  • Lines: 1-193

Signature

@Internal
public final class WritableSerializer<T extends Writable> extends TypeSerializer<T>

Import

import org.apache.flink.api.java.typeutils.runtime.WritableSerializer;

I/O Contract

Inputs

Name Type Required Description
typeClass Class<T extends Writable> Yes The class of the Hadoop Writable type to serialize

Outputs

Name Type Description
T instance T extends Writable A new or reused Writable instance created via createInstance or deserialization
serialized bytes byte[] Binary representation written to DataOutputView via Writable.write
TypeSerializerSnapshot<T> WritableSerializerSnapshot<T> Snapshot for serializer compatibility checks during state migration

Usage Examples

// WritableSerializer is typically obtained from WritableTypeInfo
WritableTypeInfo<Text> typeInfo = new WritableTypeInfo<>(Text.class);
TypeSerializer<Text> serializer = typeInfo.createSerializer(serializerConfig);

// Or instantiate directly
WritableSerializer<Text> directSerializer = new WritableSerializer<>(Text.class);

// Create a new instance
Text instance = directSerializer.createInstance();

// Serialize a record
Text record = new Text("hello");
directSerializer.serialize(record, dataOutputView);

// Deserialize a record
Text deserialized = directSerializer.deserialize(dataInputView);

// Deep copy a record
Text copied = directSerializer.copy(record);

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment