Implementation:ClickHouse ClickHouse Poco PollSet
base/poco/Net/src/PollSet.cpp:1-518
ClickHouse_ClickHouse
ClickHouse_ClickHouse_Socket_Abstraction
| Source File | base/poco/Net/src/PollSet.cpp |
|---|---|
| Lines of Code | 518 |
| Principle | Principle:ClickHouse_ClickHouse_Socket_Abstraction |
| Domain | Networking, Sockets |
| Language | C++ |
| Last Updated | 2026-02-08 00:00 GMT |
Purpose
Implements I/O multiplexing for monitoring multiple sockets for readiness events. `PollSet` provides a unified API that delegates to the best available platform mechanism: `epoll` on Linux, `poll` on BSD, or `select` as a fallback. This allows ClickHouse networking code to efficiently wait on many sockets without busy-polling or spawning a thread per connection.
Code Reference
Epoll Implementation (Linux)
class PollSetImpl
{
public:
PollSetImpl():
_epollfd(-1),
_events(1024)
{
_epollfd = epoll_create(1);
if (_epollfd < 0)
{
SocketImpl::error();
}
}
void add(const Socket& socket, int mode)
{
Poco::FastMutex::ScopedLock lock(_mutex);
SocketImpl* sockImpl = socket.impl();
poco_socket_t fd = sockImpl->sockfd();
struct epoll_event ev;
ev.events = 0;
if (mode & PollSet::POLL_READ) ev.events |= EPOLLIN;
if (mode & PollSet::POLL_WRITE) ev.events |= EPOLLOUT;
if (mode & PollSet::POLL_ERROR) ev.events |= EPOLLERR;
ev.data.ptr = socket.impl();
int err = epoll_ctl(_epollfd, EPOLL_CTL_ADD, fd, &ev);
if (err)
{
if (errno == EEXIST) update(socket, mode);
else SocketImpl::error();
}
if (_socketMap.find(sockImpl) == _socketMap.end())
_socketMap[sockImpl] = socket;
}
PollSet::SocketModeMap poll(const Poco::Timespan& timeout)
{
PollSet::SocketModeMap result;
if (_socketMap.empty()) return result;
Poco::Timespan remainingTime(timeout);
int rc;
do
{
Poco::Timestamp start;
rc = epoll_wait(_epollfd, &_events[0], _events.size(),
remainingTime.totalMilliseconds());
if (rc < 0 && SocketImpl::lastError() == POCO_EINTR)
{
Poco::Timestamp end;
Poco::Timespan waited = end - start;
if (waited < remainingTime) remainingTime -= waited;
else remainingTime = 0;
}
}
while (rc < 0 && SocketImpl::lastError() == POCO_EINTR);
if (rc < 0) SocketImpl::error();
// ... map events back to Socket objects
return result;
}
};
Poll Implementation (BSD)
class PollSetImpl
{
public:
void add(const Socket& socket, int mode)
{
Poco::FastMutex::ScopedLock lock(_mutex);
poco_socket_t fd = socket.impl()->sockfd();
_addMap[fd] = mode;
_removeSet.erase(fd);
_socketMap[fd] = socket;
}
PollSet::SocketModeMap poll(const Poco::Timespan& timeout)
{
// Apply pending adds/removes to _pollfds vector
// Then call ::poll(&_pollfds[0], _pollfds.size(), timeout)
// Map revents back to Socket objects
}
private:
std::map<poco_socket_t, Socket> _socketMap;
std::map<poco_socket_t, int> _addMap;
std::set<poco_socket_t> _removeSet;
std::vector<pollfd> _pollfds;
};
Public API (delegates to PollSetImpl)
PollSet::PollSet(): _pImpl(new PollSetImpl) {}
PollSet::~PollSet() { delete _pImpl; }
void PollSet::add(const Socket& socket, int mode) { _pImpl->add(socket, mode); }
void PollSet::remove(const Socket& socket) { _pImpl->remove(socket); }
void PollSet::update(const Socket& socket, int mode) { _pImpl->update(socket, mode); }
void PollSet::clear() { _pImpl->clear(); }
PollSet::SocketModeMap PollSet::poll(const Poco::Timespan& timeout)
{
return _pImpl->poll(timeout);
}
I/O Contract
| Input | Output | Side Effects |
|---|---|---|
| `add(socket, mode)` where mode is bitmask of `POLL_READ`, `POLL_WRITE`, `POLL_ERROR` | None (void) | Registers the socket's file descriptor with the OS multiplexing mechanism (epoll_ctl/pollfd vector/fd_set) |
| `remove(socket)` | None (void) | Removes the socket from the monitored set |
| `update(socket, mode)` | None (void) | Changes the event mask for an already-registered socket |
| `poll(timeout)` | `SocketModeMap` -- map from Socket to triggered event bitmask | Blocks up to `timeout` waiting for I/O events; retries on `EINTR` while subtracting elapsed time from remaining timeout |
| `clear()` | None (void) | Removes all sockets; on epoll, closes and recreates the epoll file descriptor |
Usage Examples
Poco::Net::PollSet pollSet;
// Register sockets for monitoring
pollSet.add(serverSocket, PollSet::POLL_READ);
pollSet.add(clientSocket, PollSet::POLL_READ | PollSet::POLL_WRITE);
// Wait up to 1 second for events
Poco::Timespan timeout(1, 0); // 1 second
PollSet::SocketModeMap ready = pollSet.poll(timeout);
for (auto& entry : ready)
{
if (entry.second & PollSet::POLL_READ)
{
// Socket is readable
}
if (entry.second & PollSet::POLL_WRITE)
{
// Socket is writable
}
}
Internal Details
Three Platform Backends
The implementation is selected at compile time via preprocessor macros:
- `POCO_HAVE_FD_EPOLL` (Linux): Uses `epoll_create`, `epoll_ctl`, `epoll_wait`. The epoll fd is created in the constructor and pre-allocates space for 1024 events. Socket-to-impl mapping uses `void*` pointers stored in `epoll_event.data.ptr`.
- `POCO_HAVE_FD_POLL` (BSD/macOS): Uses `::poll` with a `std::vector<pollfd>`. Defers add/remove operations into `_addMap`/`_removeSet` and applies them at the start of each `poll` call to avoid iterator invalidation.
- Fallback (other platforms): Uses `::select` with `fd_set`. Limited to `FD_SETSIZE` file descriptors.
EINTR Handling
All three backends implement a retry loop that subtracts elapsed wall-clock time from the remaining timeout when interrupted by a signal, ensuring the total wait time does not exceed the requested timeout.
Thread Safety
All public methods acquire a `Poco::FastMutex` before modifying internal state. The epoll `poll` method locks only after `epoll_wait` returns, minimizing lock contention during the blocking wait.