Implementation:Duckdb Duckdb ConcurrentQueue
| Knowledge Sources | |
|---|---|
| Domains | Concurrency, Third_Party |
| Last Updated | 2026-02-07 12:00 GMT |
Overview
ConcurrentQueue is a lock-free, multi-producer, multi-consumer queue library by Cameron Desrochers (moodycamel) that provides both non-blocking and blocking variants for high-throughput concurrent data exchange between threads.
Description
The library consists of three main components, all residing in the duckdb_moodycamel namespace:
- ConcurrentQueue -- A lock-free multi-producer, multi-consumer queue implemented using per-producer sub-queues to minimize contention. It supports both implicit (automatic) and explicit (token-based) producer/consumer registration. The queue uses block-based memory allocation with configurable block sizes and capacity thresholds.
- BlockingConcurrentQueue -- A wrapper around
ConcurrentQueuethat adds blocking dequeue semantics via an internal semaphore. It provideswait_dequeue()andwait_dequeue_timed()methods in addition to the non-blockingtry_dequeue()interface. The blocking queue signals the semaphore on every enqueue and waits on it during dequeues.
- LightweightSemaphore -- A cross-platform semaphore implementation adapted from Jeff Preshing's portable semaphore. It uses platform-specific primitives (Win32 semaphores, Mach semaphores, or POSIX semaphores) with a fast-path atomic spin to avoid kernel calls when contention is low.
The queue is not sequentially consistent; items enqueued by different producers may be dequeued in any relative order, but items from the same producer are always dequeued in FIFO order.
Usage
DuckDB uses ConcurrentQueue for inter-thread communication in its parallel execution engine. The blocking variant is used when worker threads need to wait for tasks from the scheduler, and the non-blocking variant is used in scenarios where polling or batching is preferred. The queue is used in the task scheduler, pipeline execution, and parallel data exchange operators.
Code Reference
Source Location
- Repository: Duckdb_Duckdb
- Files:
Signature
namespace duckdb_moodycamel {
// Lock-free concurrent queue
template<typename T, typename Traits = ConcurrentQueueDefaultTraits>
class ConcurrentQueue {
public:
explicit ConcurrentQueue(size_t capacity = 6 * BLOCK_SIZE);
bool enqueue(T const& item);
bool enqueue(T&& item);
template<typename It> bool enqueue_bulk(It itemFirst, size_t count);
template<typename U> bool try_dequeue(U& item);
template<typename It> size_t try_dequeue_bulk(It itemFirst, size_t max);
size_t size_approx() const;
};
// Blocking variant with semaphore-backed waits
template<typename T, typename Traits = ConcurrentQueueDefaultTraits>
class BlockingConcurrentQueue {
public:
explicit BlockingConcurrentQueue(size_t capacity = 6 * BLOCK_SIZE);
bool enqueue(T const& item);
bool enqueue(T&& item);
bool try_enqueue(T const& item);
bool try_enqueue(T&& item);
template<typename It> bool enqueue_bulk(It itemFirst, size_t count);
template<typename It> bool try_enqueue_bulk(It itemFirst, size_t count);
template<typename U> bool try_dequeue(U& item);
template<typename U> void wait_dequeue(U& item);
template<typename U> bool wait_dequeue_timed(U& item, std::int64_t timeout_usecs);
template<typename U, typename Rep, typename Period>
bool wait_dequeue_timed(U& item, std::chrono::duration<Rep, Period> const& timeout);
template<typename It> size_t try_dequeue_bulk(It itemFirst, size_t max);
};
// Cross-platform lightweight semaphore
class LightweightSemaphore {
public:
LightweightSemaphore(ssize_t initialCount = 0);
bool tryWait();
bool wait();
bool wait(std::int64_t timeout_usecs);
ssize_t tryWaitMany(ssize_t max);
void signal(ssize_t count = 1);
};
} // namespace duckdb_moodycamel
Import
#include "concurrentqueue/concurrentqueue.h"
#include "concurrentqueue/blockingconcurrentqueue.h"
#include "concurrentqueue/lightweightsemaphore.h"
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| item | T const& or T&& | Yes | Element to enqueue into the queue (by copy or move) |
| itemFirst | It (iterator) | Yes (bulk) | Iterator to the first element in a range to enqueue |
| count | size_t | Yes (bulk) | Number of elements to enqueue in a bulk operation |
| capacity | size_t | No | Initial capacity hint (default: 6 * BLOCK_SIZE) |
| timeout_usecs | std::int64_t | No | Timeout in microseconds for timed dequeue operations |
Outputs
| Name | Type | Description |
|---|---|---|
| return (enqueue) | bool | True if the item was successfully enqueued; false on allocation failure or capacity exceeded |
| item (dequeue) | U& | The dequeued element, written by reference on successful dequeue |
| return (try_dequeue) | bool | True if an item was successfully dequeued; false if the queue appeared empty |
| return (try_dequeue_bulk) | size_t | Number of items actually dequeued (0 if queue appeared empty) |
| return (wait_dequeue_timed) | bool | True if an item was dequeued before the timeout expired; false otherwise |
Usage Examples
#include "concurrentqueue/blockingconcurrentqueue.h"
// Create a blocking queue for task distribution
duckdb_moodycamel::BlockingConcurrentQueue<int> task_queue;
// Producer thread: enqueue work items
task_queue.enqueue(42);
// Consumer thread: block until an item is available
int item;
task_queue.wait_dequeue(item);
// Consumer thread: dequeue with a 100ms timeout
bool got_item = task_queue.wait_dequeue_timed(item, std::chrono::milliseconds(100));
// Non-blocking dequeue attempt
if (task_queue.try_dequeue(item)) {
// Process item
}
// Bulk operations
int items[10] = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
task_queue.enqueue_bulk(items, 10);
int results[10];
size_t count = task_queue.try_dequeue_bulk(results, 10);