Implementation:CARLA simulator Carla RecurrentSharedFuture
| Knowledge Sources | |
|---|---|
| Domains | Concurrency, Synchronization |
| Last Updated | 2026-02-15 05:00 GMT |
Overview
RecurrentSharedFuture is a reusable shared future that allows a value to be set and consumed any number of times, enabling multiple threads to wait for recurring events with timeout support and exception propagation.
Description
The RecurrentSharedFuture template class in the carla namespace provides a synchronization primitive similar to std::shared_future but designed for repeated value publication. Unlike standard futures which are single-use, this class allows SetValue to be called multiple times, with each call waking all waiting threads.
Thread-local wait tracking:
The class uses a std::map keyed by a thread_local tag pointer (detail::thread_tag) to track per-thread wait state. Each thread that calls WaitFor gets its own entry with a should_wait flag and a std::variant<SharedException, T> for the result value.
WaitFor(timeout):
Acquires the mutex, sets should_wait = true for the calling thread, and blocks on a std::condition_variable with the specified timeout. Returns std::nullopt if the timeout expires. If the value variant holds a SharedException (index 0), the exception is thrown via throw_exception. Otherwise, returns the stored value of type T.
SetValue(value):
Acquires the mutex, iterates over all entries in the map, sets should_wait = false and stores the value in each entry, then calls notify_all on the condition variable.
SetException(exception):
Wraps the exception in a SharedException (which holds a shared_ptr<std::exception>) and stores it as the value, causing all waiting threads to throw the exception.
SharedException:
A nested class in carla::detail that wraps a shared_ptr<std::exception> and inherits from std::exception. It forwards what() to the wrapped exception. The default constructor creates a runtime_error("uninitialized SharedException").
Usage
RecurrentSharedFuture is used in CARLA's client-server synchronization, particularly for tick synchronization where the client needs to wait for each new simulation tick from the server. It enables multiple client threads to wait for the same recurring event.
Code Reference
Source Location
- Repository: CARLA
- File:
LibCarla/source/carla/RecurrentSharedFuture.h
Signature
template <typename T>
class RecurrentSharedFuture {
public:
using SharedException = detail::SharedException;
std::optional<T> WaitFor(time_duration timeout);
template <typename T2>
void SetValue(const T2 &value);
template <typename ExceptionT>
void SetException(ExceptionT &&exception);
private:
std::mutex _mutex;
std::condition_variable _cv;
struct mapped_type {
bool should_wait;
std::variant<SharedException, T> value;
};
std::map<const char *, mapped_type> _map;
};
Import
#include "carla/RecurrentSharedFuture.h"
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| timeout | time_duration | Yes (WaitFor) | Maximum time to wait for a value |
| value | const T2& | Yes (SetValue) | Value to publish to all waiting threads |
| exception | ExceptionT&& | Yes (SetException) | Exception to propagate to all waiting threads |
Outputs
| Name | Type | Description |
|---|---|---|
| WaitFor() | std::optional<T> | The published value, or std::nullopt on timeout |
Usage Examples
carla::RecurrentSharedFuture<uint64_t> tickFuture;
// Producer thread (server tick):
tickFuture.SetValue(currentFrame);
// Consumer thread 1 (client waiting for tick):
auto frame = tickFuture.WaitFor(carla::time_duration::seconds(10));
if (frame.has_value()) {
// Process frame *frame
} else {
// Timeout occurred
}
// Consumer thread 2 (another client waiting simultaneously):
auto frame2 = tickFuture.WaitFor(carla::time_duration::seconds(5));
// Error propagation:
tickFuture.SetException(std::runtime_error("simulation crashed"));
// All waiting threads will throw SharedException