Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:InternLM Lmdeploy ThreadComm

From Leeroopedia
Revision as of 15:16, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/InternLM_Lmdeploy_ThreadComm.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Knowledge Sources
Domains Communication, Multi_Threading
Last Updated 2026-02-07 15:00 GMT

Overview

Implements thread-based host communication for multi-rank in-process collective operations using lock-free atomic channels and barriers.

Description

ThreadCommImpl is a concrete implementation of the HostCommImpl interface that provides inter-thread collective communication primitives (Broadcast, AllGather, AllReduce, Sync) for ranks running within the same process. It uses a shared state containing an NxN grid of std::atomic<void*> channels for data exchange between ranks, avoiding kernel-level or network-level communication overhead. The ThreadGroupId class manages initialization of shared state and creation of communicator instances using std::call_once for thread-safe setup. Reduction operations support Sum, Min, and Max for integer data types (int32, int64, uint32, uint64).

Usage

Used internally when multiple inference worker threads need to perform collective operations (broadcast, all-gather, all-reduce) within a single process. Created via CreateThreadGroupId() and then calling CreateCommunicator() on the returned group ID.

Code Reference

Source Location

Signature

namespace turbomind::comm {

struct ThreadCommImpl : public HostCommImpl {
    ThreadCommImpl(int n_ranks, std::shared_ptr<State> state, int rank);

    int rank() const override;
    int n_ranks() const override;
    bool is_same_process() const override;

    std::shared_ptr<HostCommImpl> Split(int color, int key) override;
    void Sync(bool blocking) override;

    void Broadcast(void* data, int count, DataType dtype, int root,
                   copy_fn copy, ser_fn ser, des_fn des) override;
    void AllGather(void* data, int count, DataType dtype,
                   copy_fn copy, ser_fn ser, des_fn des) override;
    void AllReduce(void* data, int count, DataType dtype, RedOp red_op) override;
};

class ThreadGroupId : public HostGroupId {
    void Initialize() override;
    void Export(std::ostream& os) override;
    void Import(std::istream& is) override;
    HostComm CreateCommunicator(int n_ranks, int rank, int node_rank = 0) override;
};

std::unique_ptr<HostGroupId> CreateThreadGroupId();

}  // namespace turbomind::comm

Import

#include "src/turbomind/comm/host_comm.h"

I/O Contract

Inputs

Name Type Required Description
n_ranks int Yes Total number of ranks in the communicator group
rank int Yes The rank index of this thread (0-based)
state std::shared_ptr<State> Yes Shared state containing atomic channels and barrier

Outputs

Name Type Description
HostComm HostComm A communicator handle wrapping the ThreadCommImpl
HostGroupId std::unique_ptr<HostGroupId> Factory object for creating thread-based communicators

Usage Examples

// Create a thread group ID and initialize
auto group_id = turbomind::comm::CreateThreadGroupId();
group_id->Initialize();

// Each thread creates its own communicator
HostComm comm = group_id->CreateCommunicator(n_ranks, rank);

// Perform collective operations
int value = 42;
turbomind::comm::Broadcast(comm, value, /*root=*/0);

auto gathered = turbomind::comm::AllGather(comm, value);

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment