Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Apache Flink WritableComparator

From Leeroopedia


Knowledge Sources
Domains Hadoop_Compatibility, Type_System
Last Updated 2026-02-09 00:00 GMT

Overview

WritableComparator is a Flink TypeComparator implementation for Hadoop Writable types that also implement Comparable, enabling ordering and key-based operations on Hadoop-compatible data within Flink.

Description

WritableComparator is an internal class that extends Flink's TypeComparator<T>, where T must extend both Writable and Comparable<T>. It provides the comparison logic required by Flink for operations such as sorting, grouping, and joining on Hadoop Writable types.

Key implementation details include:

  • Sort order: The comparator supports both ascending and descending comparison via the ascendingComparison flag. When descending, comparison results are negated.
  • Reference-based comparison: The setReference and equalToReference methods use Kryo-based deep copying to maintain a reference instance for comparison, delegating to the WritableSerializer for copy operations.
  • Serialized comparison: The compareSerialized method reads two Writable instances from DataInputView streams and compares them directly, enabling binary comparisons without full deserialization into user objects.
  • Normalized key support: If the Writable type also implements NormalizableKey, the comparator supports normalized key operations for efficient binary comparisons. Methods supportsNormalizedKey, getNormalizeKeyLen, putNormalizedKey, and invertNormalizedKey delegate to the NormalizableKey interface.
  • Kryo initialization: Kryo is lazily initialized with a DefaultInstantiatorStrategy and StdInstantiatorStrategy fallback, and the type class is registered for efficient serialization.
  • Lazy instantiation: Reference and temporary reference objects are instantiated on demand via InstantiationUtil.instantiate using the Writable base class.
  • Key normalization with serialization: Not supported; supportsSerializationWithKeyNormalization returns false, and the corresponding read/write methods throw UnsupportedOperationException.

Usage

Use WritableComparator when Flink needs to perform comparison operations on Hadoop Writable types. This comparator is typically not instantiated directly by user code; instead, it is created automatically by WritableTypeInfo.createComparator when the Writable type implements Comparable. It is used internally by Flink's runtime during:

  • Sort-based operations on DataSet API pipelines.
  • Key-based grouping and partitioning.
  • Join operations that require key comparison.

Code Reference

Source Location

  • Repository: Apache_Flink
  • File: flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
  • Lines: 1-197

Signature

@Internal
public class WritableComparator<T extends Writable & Comparable<T>> extends TypeComparator<T>

Import

import org.apache.flink.api.java.typeutils.runtime.WritableComparator;

I/O Contract

Inputs

Name Type Required Description
ascending boolean Yes Whether the comparator should use ascending sort order
type Class<T> Yes The class of the Writable and Comparable type to compare

Outputs

Name Type Description
hash int Hash code of the record, delegated to Writable.hashCode()
compare result int Comparison result between two Writable instances, negated for descending order
normalized key byte[] Normalized key bytes written to a MemorySegment if the type implements NormalizableKey

Usage Examples

// WritableComparator is typically created via WritableTypeInfo
WritableTypeInfo<Text> typeInfo = new WritableTypeInfo<>(Text.class);

// Create an ascending comparator
TypeComparator<Text> comparator = typeInfo.createComparator(true, executionConfig);

// Compare two Text values
Text first = new Text("apple");
Text second = new Text("banana");
int result = comparator.compare(first, second); // negative value (ascending)

// Duplicate the comparator for use in parallel operations
TypeComparator<Text> duplicated = comparator.duplicate();

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment