Implementation:Apache Paimon ThreadPoolUtils
| Knowledge Sources | |
|---|---|
| Domains | Concurrency, Thread Management |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
ThreadPoolUtils provides utilities for creating and managing thread pools with specialized execution patterns for parallel processing with memory control.
Description
ThreadPoolUtils is a utility class offering factory methods and execution patterns for concurrent task processing in Apache Paimon. The primary factory method createCachedThreadPool() creates a customized ThreadPoolExecutor that addresses limitations of Java's standard Executors.newCachedThreadPool() by allowing maximum thread count limits while still supporting thread timeout and termination. The implementation uses daemon threads (via ThreadUtils.newDaemonThreadFactory()) ensuring threads don't prevent JVM shutdown, and enables core thread timeout so inactive threads are automatically cleaned up.
The class provides advanced execution patterns for parallel processing with memory control. The sequentialBatchedExecute() method partitions input into batches and processes them in parallel while returning results sequentially through an iterator, preventing memory overflow from materializing all results at once. This is particularly useful for processing large datasets where holding all results in memory is impractical.
Additional utilities include randomlyOnlyExecute() for fire-and-forget parallel execution, randomlyExecuteSequentialReturn() for parallel processing with sequential result collection, and helper methods submitAllTasks() and awaitAllFutures() for manual task management. All methods preserve the caller's thread context ClassLoader across thread boundaries, ensuring proper class resolution in complex classloader hierarchies. The implementations properly handle InterruptedException by restoring interrupt status and wrap exceptions in RuntimeException for cleaner call sites.
Usage
Use ThreadPoolUtils when you need controlled parallel execution with thread limits, when processing large datasets in batches to avoid memory issues, or when coordinating multiple parallel operations with proper error handling and resource cleanup.
Code Reference
Source Location
- Repository: Apache_Paimon
- File: paimon-api/src/main/java/org/apache/paimon/utils/ThreadPoolUtils.java
Signature
public class ThreadPoolUtils {
public static ThreadPoolExecutor createCachedThreadPool(int threadNum, String namePrefix)
public static ThreadPoolExecutor createCachedThreadPool(int threadNum,
String namePrefix,
BlockingQueue<Runnable> workQueue)
public static <T, U> Iterable<T> sequentialBatchedExecute(ExecutorService executor,
Function<U, List<T>> processor,
List<U> input,
int queueSize)
public static <U> void randomlyOnlyExecute(ExecutorService executor,
Consumer<U> processor,
Collection<U> input)
public static <U, T> Iterator<T> randomlyExecuteSequentialReturn(ExecutorService executor,
Function<U, List<T>> processor,
Collection<U> input)
public static <U> List<Future<?>> submitAllTasks(ExecutorService executor,
Consumer<U> processor,
Collection<U> input)
public static void awaitAllFutures(List<Future<?>> futures)
}
Import
import org.apache.paimon.utils.ThreadPoolUtils;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| threadNum | int | Yes | Maximum number of threads in pool |
| namePrefix | String | Yes | Prefix for thread names (for debugging) |
| executor | ExecutorService | Yes | Executor service for task execution |
| processor | Function/Consumer | Yes | Function to process each input item |
| input | List/Collection | Yes | Items to process in parallel |
| queueSize | int | Context-dependent | Batch size for memory control |
Outputs
| Name | Type | Description |
|---|---|---|
| ThreadPoolExecutor | ThreadPoolExecutor | Configured thread pool with limits |
| Results | Iterable/Iterator | Lazy or sequential results from parallel processing |
| Futures | List<Future<?>> | Future objects for manual result retrieval |
Usage Examples
// Create thread pool with limits
ThreadPoolExecutor pool = ThreadPoolUtils.createCachedThreadPool(
10, // max 10 threads
"paimon-worker-" // thread name prefix
);
// Create with custom work queue
ThreadPoolExecutor customPool = ThreadPoolUtils.createCachedThreadPool(
20,
"custom-worker-",
new LinkedBlockingQueue<>(100)
);
// Sequential batched execution (memory-controlled)
List<String> files = Arrays.asList("file1", "file2", ..., "file1000");
Iterable<Record> records = ThreadPoolUtils.sequentialBatchedExecute(
pool,
file -> readRecordsFromFile(file), // returns List<Record>
files,
10 // process 10 files at a time
);
// Results are produced lazily
for (Record record : records) {
processRecord(record);
// Memory used: only ~10 files worth of records at a time
}
// Fire-and-forget parallel execution
List<Integer> ids = Arrays.asList(1, 2, 3, 4, 5);
ThreadPoolUtils.randomlyOnlyExecute(
pool,
id -> {
System.out.println("Processing " + id);
performWork(id);
},
ids
);
// Parallel execution with sequential results
Iterator<String> results = ThreadPoolUtils.randomlyExecuteSequentialReturn(
pool,
partition -> processPartition(partition), // returns List<String>
partitions
);
while (results.hasNext()) {
String result = results.next();
handleResult(result);
}
// Manual task submission and waiting
List<String> tasks = Arrays.asList("task1", "task2", "task3");
List<Future<?>> futures = ThreadPoolUtils.submitAllTasks(
pool,
task -> performTask(task),
tasks
);
// Do other work...
// Wait for all tasks to complete
ThreadPoolUtils.awaitAllFutures(futures);
System.out.println("All tasks completed");
// Example: Parallel file processing with memory control
public void processLargeDataset(List<String> filePaths) {
ThreadPoolExecutor executor = ThreadPoolUtils.createCachedThreadPool(
Runtime.getRuntime().availableProcessors(),
"file-processor-"
);
try {
Iterable<DataBatch> batches = ThreadPoolUtils.sequentialBatchedExecute(
executor,
filePath -> {
// Each file returns multiple batches
return readBatches(filePath);
},
filePaths,
5 // Process 5 files concurrently
);
for (DataBatch batch : batches) {
writeBatch(batch);
}
} finally {
executor.shutdown();
}
}
// Example: Parallel validation
List<Schema> schemas = getSchemas();
ThreadPoolUtils.randomlyOnlyExecute(
pool,
schema -> {
ValidationResult result = validate(schema);
if (!result.isValid()) {
logError(schema, result);
}
},
schemas
);
// Cleanup
pool.shutdown();