Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Google deepmind Mujoco Thread Queue

From Leeroopedia
Knowledge Sources
Domains Concurrency, Lock-Free Data Structures, Threading
Last Updated 2026-02-15 04:00 GMT

Overview

Template implementation of a lock-free multi-producer multi-consumer (MPMC) queue using a ring buffer for inter-thread communication in MuJoCo's thread pool.

Description

This header implements LocklessQueue<T, buffer_capacity>, a fixed-size ring buffer queue that supports concurrent pushes and pops from multiple threads without locks. The implementation uses atomic compare-and-exchange operations for both the write cursor (producer side) and read cursor (consumer side), with a separate maximum-read cursor to ensure completed writes are visible before reads. Push blocks (spins) when the queue is full, and pop blocks (yields) when empty. The ring buffer capacity is one greater than the template parameter to distinguish full from empty states.

Usage

Used internally by the thread pool implementation (thread_pool.cc) to dispatch mjTask pointers from the main thread to worker threads. The queue is instantiated with a capacity of kThreadPoolQueueSize (640) to buffer pending tasks.

Code Reference

Source Location

Key Functions

template <typename T, size_t buffer_capacity>
class LocklessQueue {
 public:
  // Check if the queue is full
  bool full() const;

  // Check if the queue is empty
  bool empty() const;

  // Push an element into the queue (blocks if full)
  void push(const T& input);

  // Pop an element from the queue (blocks if empty)
  T pop();

 private:
  size_t convert_to_index(size_t input) const;
  size_t get_next_cursor(size_t input) const;
  bool full_internal(size_t read_index, size_t write_index) const;
  bool empty_internal(size_t read_index, size_t write_index) const;

  std::atomic<size_t> read_cursor_;
  std::atomic<size_t> write_cursor_;
  std::atomic<size_t> maximum_read_cursor_;
  std::atomic<T> buffer_[(buffer_capacity + 1)];
};

Import

#include "thread/thread_queue.h"

I/O Contract

Inputs

Name Type Required Description
T template type Yes Element type stored in the queue (must be trivially copyable for atomics)
buffer_capacity size_t Yes Maximum number of elements the queue can hold
input const T& Yes (for push) Element to enqueue

Outputs

Name Type Description
return value (pop) T The dequeued element
return value (full) bool True if the queue has no available slots
return value (empty) bool True if the queue has no elements to consume

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment