Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ClickHouse ClickHouse Poco PollSet

From Leeroopedia


base/poco/Net/src/PollSet.cpp:1-518 ClickHouse_ClickHouse ClickHouse_ClickHouse_Socket_Abstraction

Source File base/poco/Net/src/PollSet.cpp
Lines of Code 518
Principle Principle:ClickHouse_ClickHouse_Socket_Abstraction
Domain Networking, Sockets
Language C++
Last Updated 2026-02-08 00:00 GMT

Purpose

Implements I/O multiplexing for monitoring multiple sockets for readiness events. `PollSet` provides a unified API that delegates to the best available platform mechanism: `epoll` on Linux, `poll` on BSD, or `select` as a fallback. This allows ClickHouse networking code to efficiently wait on many sockets without busy-polling or spawning a thread per connection.

Code Reference

Epoll Implementation (Linux)

class PollSetImpl
{
public:
    PollSetImpl():
        _epollfd(-1),
        _events(1024)
    {
        _epollfd = epoll_create(1);
        if (_epollfd < 0)
        {
            SocketImpl::error();
        }
    }

    void add(const Socket& socket, int mode)
    {
        Poco::FastMutex::ScopedLock lock(_mutex);
        SocketImpl* sockImpl = socket.impl();
        poco_socket_t fd = sockImpl->sockfd();
        struct epoll_event ev;
        ev.events = 0;
        if (mode & PollSet::POLL_READ)  ev.events |= EPOLLIN;
        if (mode & PollSet::POLL_WRITE) ev.events |= EPOLLOUT;
        if (mode & PollSet::POLL_ERROR) ev.events |= EPOLLERR;
        ev.data.ptr = socket.impl();
        int err = epoll_ctl(_epollfd, EPOLL_CTL_ADD, fd, &ev);
        if (err)
        {
            if (errno == EEXIST) update(socket, mode);
            else SocketImpl::error();
        }
        if (_socketMap.find(sockImpl) == _socketMap.end())
            _socketMap[sockImpl] = socket;
    }

    PollSet::SocketModeMap poll(const Poco::Timespan& timeout)
    {
        PollSet::SocketModeMap result;
        if (_socketMap.empty()) return result;
        Poco::Timespan remainingTime(timeout);
        int rc;
        do
        {
            Poco::Timestamp start;
            rc = epoll_wait(_epollfd, &_events[0], _events.size(),
                            remainingTime.totalMilliseconds());
            if (rc < 0 && SocketImpl::lastError() == POCO_EINTR)
            {
                Poco::Timestamp end;
                Poco::Timespan waited = end - start;
                if (waited < remainingTime) remainingTime -= waited;
                else remainingTime = 0;
            }
        }
        while (rc < 0 && SocketImpl::lastError() == POCO_EINTR);
        if (rc < 0) SocketImpl::error();
        // ... map events back to Socket objects
        return result;
    }
};

Poll Implementation (BSD)

class PollSetImpl
{
public:
    void add(const Socket& socket, int mode)
    {
        Poco::FastMutex::ScopedLock lock(_mutex);
        poco_socket_t fd = socket.impl()->sockfd();
        _addMap[fd] = mode;
        _removeSet.erase(fd);
        _socketMap[fd] = socket;
    }

    PollSet::SocketModeMap poll(const Poco::Timespan& timeout)
    {
        // Apply pending adds/removes to _pollfds vector
        // Then call ::poll(&_pollfds[0], _pollfds.size(), timeout)
        // Map revents back to Socket objects
    }

private:
    std::map<poco_socket_t, Socket> _socketMap;
    std::map<poco_socket_t, int> _addMap;
    std::set<poco_socket_t> _removeSet;
    std::vector<pollfd> _pollfds;
};

Public API (delegates to PollSetImpl)

PollSet::PollSet(): _pImpl(new PollSetImpl) {}
PollSet::~PollSet() { delete _pImpl; }

void PollSet::add(const Socket& socket, int mode) { _pImpl->add(socket, mode); }
void PollSet::remove(const Socket& socket)        { _pImpl->remove(socket); }
void PollSet::update(const Socket& socket, int mode) { _pImpl->update(socket, mode); }
void PollSet::clear()                              { _pImpl->clear(); }

PollSet::SocketModeMap PollSet::poll(const Poco::Timespan& timeout)
{
    return _pImpl->poll(timeout);
}

I/O Contract

Input Output Side Effects
`add(socket, mode)` where mode is bitmask of `POLL_READ`, `POLL_WRITE`, `POLL_ERROR` None (void) Registers the socket's file descriptor with the OS multiplexing mechanism (epoll_ctl/pollfd vector/fd_set)
`remove(socket)` None (void) Removes the socket from the monitored set
`update(socket, mode)` None (void) Changes the event mask for an already-registered socket
`poll(timeout)` `SocketModeMap` -- map from Socket to triggered event bitmask Blocks up to `timeout` waiting for I/O events; retries on `EINTR` while subtracting elapsed time from remaining timeout
`clear()` None (void) Removes all sockets; on epoll, closes and recreates the epoll file descriptor

Usage Examples

Poco::Net::PollSet pollSet;

// Register sockets for monitoring
pollSet.add(serverSocket, PollSet::POLL_READ);
pollSet.add(clientSocket, PollSet::POLL_READ | PollSet::POLL_WRITE);

// Wait up to 1 second for events
Poco::Timespan timeout(1, 0);  // 1 second
PollSet::SocketModeMap ready = pollSet.poll(timeout);

for (auto& entry : ready)
{
    if (entry.second & PollSet::POLL_READ)
    {
        // Socket is readable
    }
    if (entry.second & PollSet::POLL_WRITE)
    {
        // Socket is writable
    }
}

Internal Details

Three Platform Backends

The implementation is selected at compile time via preprocessor macros:

  1. `POCO_HAVE_FD_EPOLL` (Linux): Uses `epoll_create`, `epoll_ctl`, `epoll_wait`. The epoll fd is created in the constructor and pre-allocates space for 1024 events. Socket-to-impl mapping uses `void*` pointers stored in `epoll_event.data.ptr`.
  2. `POCO_HAVE_FD_POLL` (BSD/macOS): Uses `::poll` with a `std::vector<pollfd>`. Defers add/remove operations into `_addMap`/`_removeSet` and applies them at the start of each `poll` call to avoid iterator invalidation.
  3. Fallback (other platforms): Uses `::select` with `fd_set`. Limited to `FD_SETSIZE` file descriptors.

EINTR Handling

All three backends implement a retry loop that subtracts elapsed wall-clock time from the remaining timeout when interrupted by a signal, ensuring the total wait time does not exceed the requested timeout.

Thread Safety

All public methods acquire a `Poco::FastMutex` before modifying internal state. The epoll `poll` method locks only after `epoll_wait` returns, minimizing lock contention during the blocking wait.

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment