Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Apache Paimon ThreadPoolUtils

From Leeroopedia


Knowledge Sources
Domains Concurrency, Thread Management
Last Updated 2026-02-08 00:00 GMT

Overview

ThreadPoolUtils provides utilities for creating and managing thread pools with specialized execution patterns for parallel processing with memory control.

Description

ThreadPoolUtils is a utility class offering factory methods and execution patterns for concurrent task processing in Apache Paimon. The primary factory method createCachedThreadPool() creates a customized ThreadPoolExecutor that addresses limitations of Java's standard Executors.newCachedThreadPool() by allowing maximum thread count limits while still supporting thread timeout and termination. The implementation uses daemon threads (via ThreadUtils.newDaemonThreadFactory()) ensuring threads don't prevent JVM shutdown, and enables core thread timeout so inactive threads are automatically cleaned up.

The class provides advanced execution patterns for parallel processing with memory control. The sequentialBatchedExecute() method partitions input into batches and processes them in parallel while returning results sequentially through an iterator, preventing memory overflow from materializing all results at once. This is particularly useful for processing large datasets where holding all results in memory is impractical.

Additional utilities include randomlyOnlyExecute() for fire-and-forget parallel execution, randomlyExecuteSequentialReturn() for parallel processing with sequential result collection, and helper methods submitAllTasks() and awaitAllFutures() for manual task management. All methods preserve the caller's thread context ClassLoader across thread boundaries, ensuring proper class resolution in complex classloader hierarchies. The implementations properly handle InterruptedException by restoring interrupt status and wrap exceptions in RuntimeException for cleaner call sites.

Usage

Use ThreadPoolUtils when you need controlled parallel execution with thread limits, when processing large datasets in batches to avoid memory issues, or when coordinating multiple parallel operations with proper error handling and resource cleanup.

Code Reference

Source Location

Signature

public class ThreadPoolUtils {

    public static ThreadPoolExecutor createCachedThreadPool(int threadNum, String namePrefix)

    public static ThreadPoolExecutor createCachedThreadPool(int threadNum,
                                                            String namePrefix,
                                                            BlockingQueue<Runnable> workQueue)

    public static <T, U> Iterable<T> sequentialBatchedExecute(ExecutorService executor,
                                                               Function<U, List<T>> processor,
                                                               List<U> input,
                                                               int queueSize)

    public static <U> void randomlyOnlyExecute(ExecutorService executor,
                                               Consumer<U> processor,
                                               Collection<U> input)

    public static <U, T> Iterator<T> randomlyExecuteSequentialReturn(ExecutorService executor,
                                                                      Function<U, List<T>> processor,
                                                                      Collection<U> input)

    public static <U> List<Future<?>> submitAllTasks(ExecutorService executor,
                                                      Consumer<U> processor,
                                                      Collection<U> input)

    public static void awaitAllFutures(List<Future<?>> futures)
}

Import

import org.apache.paimon.utils.ThreadPoolUtils;

I/O Contract

Inputs

Name Type Required Description
threadNum int Yes Maximum number of threads in pool
namePrefix String Yes Prefix for thread names (for debugging)
executor ExecutorService Yes Executor service for task execution
processor Function/Consumer Yes Function to process each input item
input List/Collection Yes Items to process in parallel
queueSize int Context-dependent Batch size for memory control

Outputs

Name Type Description
ThreadPoolExecutor ThreadPoolExecutor Configured thread pool with limits
Results Iterable/Iterator Lazy or sequential results from parallel processing
Futures List<Future<?>> Future objects for manual result retrieval

Usage Examples

// Create thread pool with limits
ThreadPoolExecutor pool = ThreadPoolUtils.createCachedThreadPool(
    10,                // max 10 threads
    "paimon-worker-"   // thread name prefix
);

// Create with custom work queue
ThreadPoolExecutor customPool = ThreadPoolUtils.createCachedThreadPool(
    20,
    "custom-worker-",
    new LinkedBlockingQueue<>(100)
);

// Sequential batched execution (memory-controlled)
List<String> files = Arrays.asList("file1", "file2", ..., "file1000");
Iterable<Record> records = ThreadPoolUtils.sequentialBatchedExecute(
    pool,
    file -> readRecordsFromFile(file),  // returns List<Record>
    files,
    10  // process 10 files at a time
);

// Results are produced lazily
for (Record record : records) {
    processRecord(record);
    // Memory used: only ~10 files worth of records at a time
}

// Fire-and-forget parallel execution
List<Integer> ids = Arrays.asList(1, 2, 3, 4, 5);
ThreadPoolUtils.randomlyOnlyExecute(
    pool,
    id -> {
        System.out.println("Processing " + id);
        performWork(id);
    },
    ids
);

// Parallel execution with sequential results
Iterator<String> results = ThreadPoolUtils.randomlyExecuteSequentialReturn(
    pool,
    partition -> processPartition(partition),  // returns List<String>
    partitions
);

while (results.hasNext()) {
    String result = results.next();
    handleResult(result);
}

// Manual task submission and waiting
List<String> tasks = Arrays.asList("task1", "task2", "task3");
List<Future<?>> futures = ThreadPoolUtils.submitAllTasks(
    pool,
    task -> performTask(task),
    tasks
);

// Do other work...

// Wait for all tasks to complete
ThreadPoolUtils.awaitAllFutures(futures);
System.out.println("All tasks completed");

// Example: Parallel file processing with memory control
public void processLargeDataset(List<String> filePaths) {
    ThreadPoolExecutor executor = ThreadPoolUtils.createCachedThreadPool(
        Runtime.getRuntime().availableProcessors(),
        "file-processor-"
    );

    try {
        Iterable<DataBatch> batches = ThreadPoolUtils.sequentialBatchedExecute(
            executor,
            filePath -> {
                // Each file returns multiple batches
                return readBatches(filePath);
            },
            filePaths,
            5  // Process 5 files concurrently
        );

        for (DataBatch batch : batches) {
            writeBatch(batch);
        }
    } finally {
        executor.shutdown();
    }
}

// Example: Parallel validation
List<Schema> schemas = getSchemas();
ThreadPoolUtils.randomlyOnlyExecute(
    pool,
    schema -> {
        ValidationResult result = validate(schema);
        if (!result.isValid()) {
            logError(schema, result);
        }
    },
    schemas
);

// Cleanup
pool.shutdown();

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment