Principle:Apache Spark Write Ahead Logging
| Knowledge Sources | |
|---|---|
| Domains | Streaming, Fault_Tolerance |
| Last Updated | 2026-02-08 22:00 GMT |
Overview
Durability mechanism that persists received streaming data to reliable storage before acknowledgment, enabling recovery after driver or receiver failures.
Description
⚠️ DEPRECATED: DStream-based Spark Streaming is deprecated. Use Structured Streaming for new applications. See Heuristic:Apache_Spark_Warning_Deprecated_DStream_Streaming.
Write-Ahead Logging (WAL) is a fault tolerance technique used by Spark Streaming to ensure that data received by receivers is not lost during driver failures. Before acknowledging receipt of data from sources (Kafka, Flume, etc.), the receiver writes the data to a write-ahead log on durable storage (typically HDFS). On recovery, the WAL is replayed to reconstruct the received data that was not yet processed. The WAL interface is pluggable, allowing custom implementations for different storage backends. Record handles are serializable for inclusion in checkpoint metadata.
Usage
Enable WAL when your streaming application requires at-least-once processing semantics and cannot afford data loss during driver failures. Configure via `spark.streaming.receiver.writeAheadLog.enable=true`. Implement the `WriteAheadLog` abstract class for custom storage backends.
Theoretical Basis
Write-ahead logging follows the classic write-before-acknowledge durability pattern from database systems:
- Write Phase: Serialize incoming data to a ByteBuffer and persist to durable storage with a time index
- Handle Return: Return a serializable handle containing all information needed to locate the record
- Read Phase: On recovery, use handles or scan the entire log to retrieve unpprocessed records
- Cleanup Phase: Periodically remove records older than a threshold (already processed batches)
- Idempotent Close: Release resources safely with idempotent close semantics
Pseudo-code Logic:
# Abstract algorithm description
# Write path (receiver side)
handle = wal.write(serialize(data), current_time)
store_handle_in_metadata(handle)
acknowledge_source(data)
# Recovery path (driver restart)
for record in wal.readAll():
reprocess(record)
# Cleanup path (periodic)
wal.clean(oldest_completed_batch_time, wait=True)