Implementation:Apache Flink WritableComparator
| Knowledge Sources | |
|---|---|
| Domains | Hadoop_Compatibility, Type_System |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
WritableComparator is a Flink TypeComparator implementation for Hadoop Writable types that also implement Comparable, enabling ordering and key-based operations on Hadoop-compatible data within Flink.
Description
WritableComparator is an internal class that extends Flink's TypeComparator<T>, where T must extend both Writable and Comparable<T>. It provides the comparison logic required by Flink for operations such as sorting, grouping, and joining on Hadoop Writable types.
Key implementation details include:
- Sort order: The comparator supports both ascending and descending comparison via the ascendingComparison flag. When descending, comparison results are negated.
- Reference-based comparison: The setReference and equalToReference methods use Kryo-based deep copying to maintain a reference instance for comparison, delegating to the WritableSerializer for copy operations.
- Serialized comparison: The compareSerialized method reads two Writable instances from DataInputView streams and compares them directly, enabling binary comparisons without full deserialization into user objects.
- Normalized key support: If the Writable type also implements NormalizableKey, the comparator supports normalized key operations for efficient binary comparisons. Methods supportsNormalizedKey, getNormalizeKeyLen, putNormalizedKey, and invertNormalizedKey delegate to the NormalizableKey interface.
- Kryo initialization: Kryo is lazily initialized with a DefaultInstantiatorStrategy and StdInstantiatorStrategy fallback, and the type class is registered for efficient serialization.
- Lazy instantiation: Reference and temporary reference objects are instantiated on demand via InstantiationUtil.instantiate using the Writable base class.
- Key normalization with serialization: Not supported; supportsSerializationWithKeyNormalization returns false, and the corresponding read/write methods throw UnsupportedOperationException.
Usage
Use WritableComparator when Flink needs to perform comparison operations on Hadoop Writable types. This comparator is typically not instantiated directly by user code; instead, it is created automatically by WritableTypeInfo.createComparator when the Writable type implements Comparable. It is used internally by Flink's runtime during:
- Sort-based operations on DataSet API pipelines.
- Key-based grouping and partitioning.
- Join operations that require key comparison.
Code Reference
Source Location
- Repository: Apache_Flink
- File: flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
- Lines: 1-197
Signature
@Internal
public class WritableComparator<T extends Writable & Comparable<T>> extends TypeComparator<T>
Import
import org.apache.flink.api.java.typeutils.runtime.WritableComparator;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| ascending | boolean | Yes | Whether the comparator should use ascending sort order |
| type | Class<T> | Yes | The class of the Writable and Comparable type to compare |
Outputs
| Name | Type | Description |
|---|---|---|
| hash | int | Hash code of the record, delegated to Writable.hashCode() |
| compare result | int | Comparison result between two Writable instances, negated for descending order |
| normalized key | byte[] | Normalized key bytes written to a MemorySegment if the type implements NormalizableKey |
Usage Examples
// WritableComparator is typically created via WritableTypeInfo
WritableTypeInfo<Text> typeInfo = new WritableTypeInfo<>(Text.class);
// Create an ascending comparator
TypeComparator<Text> comparator = typeInfo.createComparator(true, executionConfig);
// Compare two Text values
Text first = new Text("apple");
Text second = new Text("banana");
int result = comparator.compare(first, second); // negative value (ascending)
// Duplicate the comparator for use in parallel operations
TypeComparator<Text> duplicated = comparator.duplicate();