Implementation:Apache Kafka CoordinatorLoaderImpl Load
| Knowledge Sources | |
|---|---|
| Domains | Distributed_Systems, State_Management |
| Last Updated | 2026-02-09 12:00 GMT |
Overview
Concrete tool for loading coordinator state from partition logs provided by CoordinatorLoaderImpl.
Description
The CoordinatorLoaderImpl.load method reads all records from a coordinator topic-partition and replays them through a CoordinatorPlayback instance. It schedules the loading on an internal single-threaded scheduler, reads records in batches using BufferSupplier, deserializes them, and periodically commits progress. Returns a CompletableFuture<LoadSummary> with loading statistics.
Usage
Called by the CoordinatorRuntime after a partition transitions to LOADING state. Not invoked directly by user code.
Code Reference
Source Location
- Repository: Apache Kafka
- File: coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImpl.java
- Lines: L104-351
Signature
public CompletableFuture<LoadSummary> load(
TopicPartition tp,
CoordinatorPlayback<T> coordinator
)
Import
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoaderImpl;
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader.LoadSummary;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| tp | TopicPartition | Yes | Partition to load records from |
| coordinator | CoordinatorPlayback<T> | Yes | Replay target for deserialized records |
Outputs
| Name | Type | Description |
|---|---|---|
| CompletableFuture<LoadSummary> | future | Completes with startOffset, endOffset, numRecords, numBatches |
Usage Examples
// Internal usage within CoordinatorRuntime
CoordinatorLoaderImpl<Record> loader = new CoordinatorLoaderImpl<>(
time, partitionLogSupplier, partitionLogEndOffsetSupplier,
deserializer, loadBufferSize, DEFAULT_COMMIT_INTERVAL_OFFSETS
);
CompletableFuture<LoadSummary> future = loader.load(
new TopicPartition("__consumer_offsets", 0),
coordinatorPlayback
);
future.thenAccept(summary ->
log.info("Loaded {} records in {} batches from offset {} to {}",
summary.numRecords(), summary.numBatches(),
summary.startOffset(), summary.endOffset()));