Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Duckdb Duckdb ConcurrentQueue

From Leeroopedia
Revision as of 14:50, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/Duckdb_Duckdb_ConcurrentQueue.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Knowledge Sources
Domains Concurrency, Third_Party
Last Updated 2026-02-07 12:00 GMT

Overview

ConcurrentQueue is a lock-free, multi-producer, multi-consumer queue library by Cameron Desrochers (moodycamel) that provides both non-blocking and blocking variants for high-throughput concurrent data exchange between threads.

Description

The library consists of three main components, all residing in the duckdb_moodycamel namespace:

  • ConcurrentQueue -- A lock-free multi-producer, multi-consumer queue implemented using per-producer sub-queues to minimize contention. It supports both implicit (automatic) and explicit (token-based) producer/consumer registration. The queue uses block-based memory allocation with configurable block sizes and capacity thresholds.
  • BlockingConcurrentQueue -- A wrapper around ConcurrentQueue that adds blocking dequeue semantics via an internal semaphore. It provides wait_dequeue() and wait_dequeue_timed() methods in addition to the non-blocking try_dequeue() interface. The blocking queue signals the semaphore on every enqueue and waits on it during dequeues.
  • LightweightSemaphore -- A cross-platform semaphore implementation adapted from Jeff Preshing's portable semaphore. It uses platform-specific primitives (Win32 semaphores, Mach semaphores, or POSIX semaphores) with a fast-path atomic spin to avoid kernel calls when contention is low.

The queue is not sequentially consistent; items enqueued by different producers may be dequeued in any relative order, but items from the same producer are always dequeued in FIFO order.

Usage

DuckDB uses ConcurrentQueue for inter-thread communication in its parallel execution engine. The blocking variant is used when worker threads need to wait for tasks from the scheduler, and the non-blocking variant is used in scenarios where polling or batching is preferred. The queue is used in the task scheduler, pipeline execution, and parallel data exchange operators.

Code Reference

Source Location

Signature

namespace duckdb_moodycamel {

// Lock-free concurrent queue
template<typename T, typename Traits = ConcurrentQueueDefaultTraits>
class ConcurrentQueue {
public:
    explicit ConcurrentQueue(size_t capacity = 6 * BLOCK_SIZE);
    bool enqueue(T const& item);
    bool enqueue(T&& item);
    template<typename It> bool enqueue_bulk(It itemFirst, size_t count);
    template<typename U> bool try_dequeue(U& item);
    template<typename It> size_t try_dequeue_bulk(It itemFirst, size_t max);
    size_t size_approx() const;
};

// Blocking variant with semaphore-backed waits
template<typename T, typename Traits = ConcurrentQueueDefaultTraits>
class BlockingConcurrentQueue {
public:
    explicit BlockingConcurrentQueue(size_t capacity = 6 * BLOCK_SIZE);
    bool enqueue(T const& item);
    bool enqueue(T&& item);
    bool try_enqueue(T const& item);
    bool try_enqueue(T&& item);
    template<typename It> bool enqueue_bulk(It itemFirst, size_t count);
    template<typename It> bool try_enqueue_bulk(It itemFirst, size_t count);
    template<typename U> bool try_dequeue(U& item);
    template<typename U> void wait_dequeue(U& item);
    template<typename U> bool wait_dequeue_timed(U& item, std::int64_t timeout_usecs);
    template<typename U, typename Rep, typename Period>
    bool wait_dequeue_timed(U& item, std::chrono::duration<Rep, Period> const& timeout);
    template<typename It> size_t try_dequeue_bulk(It itemFirst, size_t max);
};

// Cross-platform lightweight semaphore
class LightweightSemaphore {
public:
    LightweightSemaphore(ssize_t initialCount = 0);
    bool tryWait();
    bool wait();
    bool wait(std::int64_t timeout_usecs);
    ssize_t tryWaitMany(ssize_t max);
    void signal(ssize_t count = 1);
};

} // namespace duckdb_moodycamel

Import

#include "concurrentqueue/concurrentqueue.h"
#include "concurrentqueue/blockingconcurrentqueue.h"
#include "concurrentqueue/lightweightsemaphore.h"

I/O Contract

Inputs

Name Type Required Description
item T const& or T&& Yes Element to enqueue into the queue (by copy or move)
itemFirst It (iterator) Yes (bulk) Iterator to the first element in a range to enqueue
count size_t Yes (bulk) Number of elements to enqueue in a bulk operation
capacity size_t No Initial capacity hint (default: 6 * BLOCK_SIZE)
timeout_usecs std::int64_t No Timeout in microseconds for timed dequeue operations

Outputs

Name Type Description
return (enqueue) bool True if the item was successfully enqueued; false on allocation failure or capacity exceeded
item (dequeue) U& The dequeued element, written by reference on successful dequeue
return (try_dequeue) bool True if an item was successfully dequeued; false if the queue appeared empty
return (try_dequeue_bulk) size_t Number of items actually dequeued (0 if queue appeared empty)
return (wait_dequeue_timed) bool True if an item was dequeued before the timeout expired; false otherwise

Usage Examples

#include "concurrentqueue/blockingconcurrentqueue.h"

// Create a blocking queue for task distribution
duckdb_moodycamel::BlockingConcurrentQueue<int> task_queue;

// Producer thread: enqueue work items
task_queue.enqueue(42);

// Consumer thread: block until an item is available
int item;
task_queue.wait_dequeue(item);

// Consumer thread: dequeue with a 100ms timeout
bool got_item = task_queue.wait_dequeue_timed(item, std::chrono::milliseconds(100));

// Non-blocking dequeue attempt
if (task_queue.try_dequeue(item)) {
    // Process item
}

// Bulk operations
int items[10] = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
task_queue.enqueue_bulk(items, 10);

int results[10];
size_t count = task_queue.try_dequeue_bulk(results, 10);

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment