Implementation:Google deepmind Mujoco Thread Queue
| Knowledge Sources | |
|---|---|
| Domains | Concurrency, Lock-Free Data Structures, Threading |
| Last Updated | 2026-02-15 04:00 GMT |
Overview
Template implementation of a lock-free multi-producer multi-consumer (MPMC) queue using a ring buffer for inter-thread communication in MuJoCo's thread pool.
Description
This header implements LocklessQueue<T, buffer_capacity>, a fixed-size ring buffer queue that supports concurrent pushes and pops from multiple threads without locks. The implementation uses atomic compare-and-exchange operations for both the write cursor (producer side) and read cursor (consumer side), with a separate maximum-read cursor to ensure completed writes are visible before reads. Push blocks (spins) when the queue is full, and pop blocks (yields) when empty. The ring buffer capacity is one greater than the template parameter to distinguish full from empty states.
Usage
Used internally by the thread pool implementation (thread_pool.cc) to dispatch mjTask pointers from the main thread to worker threads. The queue is instantiated with a capacity of kThreadPoolQueueSize (640) to buffer pending tasks.
Code Reference
Source Location
- Repository: Google_deepmind_Mujoco
- File: src/thread/thread_queue.h
- Lines: 1-152
Key Functions
template <typename T, size_t buffer_capacity>
class LocklessQueue {
public:
// Check if the queue is full
bool full() const;
// Check if the queue is empty
bool empty() const;
// Push an element into the queue (blocks if full)
void push(const T& input);
// Pop an element from the queue (blocks if empty)
T pop();
private:
size_t convert_to_index(size_t input) const;
size_t get_next_cursor(size_t input) const;
bool full_internal(size_t read_index, size_t write_index) const;
bool empty_internal(size_t read_index, size_t write_index) const;
std::atomic<size_t> read_cursor_;
std::atomic<size_t> write_cursor_;
std::atomic<size_t> maximum_read_cursor_;
std::atomic<T> buffer_[(buffer_capacity + 1)];
};
Import
#include "thread/thread_queue.h"
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| T | template type | Yes | Element type stored in the queue (must be trivially copyable for atomics) |
| buffer_capacity | size_t | Yes | Maximum number of elements the queue can hold |
| input | const T& | Yes (for push) | Element to enqueue |
Outputs
| Name | Type | Description |
|---|---|---|
| return value (pop) | T | The dequeued element |
| return value (full) | bool | True if the queue has no available slots |
| return value (empty) | bool | True if the queue has no elements to consume |