Implementation:InternLM Lmdeploy ThreadComm
| Knowledge Sources | |
|---|---|
| Domains | Communication, Multi_Threading |
| Last Updated | 2026-02-07 15:00 GMT |
Overview
Implements thread-based host communication for multi-rank in-process collective operations using lock-free atomic channels and barriers.
Description
ThreadCommImpl is a concrete implementation of the HostCommImpl interface that provides inter-thread collective communication primitives (Broadcast, AllGather, AllReduce, Sync) for ranks running within the same process. It uses a shared state containing an NxN grid of std::atomic<void*> channels for data exchange between ranks, avoiding kernel-level or network-level communication overhead. The ThreadGroupId class manages initialization of shared state and creation of communicator instances using std::call_once for thread-safe setup. Reduction operations support Sum, Min, and Max for integer data types (int32, int64, uint32, uint64).
Usage
Used internally when multiple inference worker threads need to perform collective operations (broadcast, all-gather, all-reduce) within a single process. Created via CreateThreadGroupId() and then calling CreateCommunicator() on the returned group ID.
Code Reference
Source Location
- Repository: InternLM_Lmdeploy
- File: src/turbomind/comm/thread_comm.cc
- Lines: 1-364
Signature
namespace turbomind::comm {
struct ThreadCommImpl : public HostCommImpl {
ThreadCommImpl(int n_ranks, std::shared_ptr<State> state, int rank);
int rank() const override;
int n_ranks() const override;
bool is_same_process() const override;
std::shared_ptr<HostCommImpl> Split(int color, int key) override;
void Sync(bool blocking) override;
void Broadcast(void* data, int count, DataType dtype, int root,
copy_fn copy, ser_fn ser, des_fn des) override;
void AllGather(void* data, int count, DataType dtype,
copy_fn copy, ser_fn ser, des_fn des) override;
void AllReduce(void* data, int count, DataType dtype, RedOp red_op) override;
};
class ThreadGroupId : public HostGroupId {
void Initialize() override;
void Export(std::ostream& os) override;
void Import(std::istream& is) override;
HostComm CreateCommunicator(int n_ranks, int rank, int node_rank = 0) override;
};
std::unique_ptr<HostGroupId> CreateThreadGroupId();
} // namespace turbomind::comm
Import
#include "src/turbomind/comm/host_comm.h"
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| n_ranks | int | Yes | Total number of ranks in the communicator group |
| rank | int | Yes | The rank index of this thread (0-based) |
| state | std::shared_ptr<State> | Yes | Shared state containing atomic channels and barrier |
Outputs
| Name | Type | Description |
|---|---|---|
| HostComm | HostComm | A communicator handle wrapping the ThreadCommImpl |
| HostGroupId | std::unique_ptr<HostGroupId> | Factory object for creating thread-based communicators |
Usage Examples
// Create a thread group ID and initialize
auto group_id = turbomind::comm::CreateThreadGroupId();
group_id->Initialize();
// Each thread creates its own communicator
HostComm comm = group_id->CreateCommunicator(n_ranks, rank);
// Perform collective operations
int value = 42;
turbomind::comm::Broadcast(comm, value, /*root=*/0);
auto gathered = turbomind::comm::AllGather(comm, value);