Implementation:ClickHouse ClickHouse Poco SocketImpl Impl
base/poco/Net/src/SocketImpl.cpp:1-1118
ClickHouse_ClickHouse
ClickHouse_ClickHouse_Socket_Abstraction
| Source File | base/poco/Net/src/SocketImpl.cpp |
|---|---|
| Lines of Code | 1118 |
| Principle | Principle:ClickHouse_ClickHouse_Socket_Abstraction |
| Domain | Networking, Sockets |
| Language | C++ |
| Last Updated | 2026-02-08 00:00 GMT |
Purpose
Implements `SocketImpl`, the low-level socket system call wrapper that forms the foundation of the Poco socket abstraction. This class directly wraps POSIX/BSD socket APIs (`connect`, `bind`, `listen`, `accept`, `send`, `recv`, `sendto`, `recvfrom`, `poll`/`select`, `setsockopt`, `getsockopt`, `ioctl`, `fcntl`) and converts OS error codes into typed C++ exceptions. It also implements bandwidth throttling and timeout management.
Code Reference
Core Socket Operations
void SocketImpl::connect(const SocketAddress& address)
{
if (_sockfd == POCO_INVALID_SOCKET) init(address.af());
int rc;
do {
rc = ::connect(_sockfd, address.addr(), address.length());
} while (rc != 0 && lastError() == POCO_EINTR);
if (rc != 0) error(lastError(), address.toString());
}
void SocketImpl::bind(const SocketAddress& address, bool reuseAddress, bool reusePort)
{
if (_sockfd == POCO_INVALID_SOCKET) init(address.af());
if (reuseAddress) setReuseAddress(true);
if (reusePort) setReusePort(true);
int rc = ::bind(_sockfd, address.addr(), address.length());
if (rc != 0) error(address.toString());
}
SocketImpl* SocketImpl::acceptConnection(SocketAddress& clientAddr)
{
if (_sockfd == POCO_INVALID_SOCKET) throw InvalidSocketException();
sockaddr_storage buffer;
struct sockaddr* pSA = reinterpret_cast<struct sockaddr*>(&buffer);
poco_socklen_t saLen = sizeof(buffer);
poco_socket_t sd;
do {
sd = ::accept(_sockfd, pSA, &saLen);
} while (sd == POCO_INVALID_SOCKET && lastError() == POCO_EINTR);
if (sd != POCO_INVALID_SOCKET) {
clientAddr = SocketAddress(pSA, saLen);
return new StreamSocketImpl(sd);
}
error();
return 0;
}
Send/Receive with Throttling
int SocketImpl::sendBytes(const void* buffer, int length, int flags)
{
bool blocking = _blocking && (flags & MSG_DONTWAIT) == 0;
throttleSend(length, blocking);
if (_isBrokenTimeout && blocking)
{
if (_sndTimeout.totalMicroseconds() != 0)
if (!poll(_sndTimeout, SELECT_WRITE))
throw TimeoutException();
}
int rc;
do {
if (_sockfd == POCO_INVALID_SOCKET) throw InvalidSocketException();
rc = ::send(_sockfd, reinterpret_cast<const char*>(buffer), length, flags);
} while (blocking && rc < 0 && lastError() == POCO_EINTR);
if (rc < 0) {
int err = lastError();
if ((err == POCO_EAGAIN || err == POCO_EWOULDBLOCK) && !blocking) ;
else if (err == POCO_EAGAIN || err == POCO_ETIMEDOUT) throw TimeoutException();
else error(err);
}
useSendThrottlerBudget(rc);
return rc;
}
Poll Implementation
bool SocketImpl::pollImpl(Poco::Timespan& remainingTime, int mode)
{
poco_socket_t sockfd = _sockfd;
if (sockfd == POCO_INVALID_SOCKET) throw InvalidSocketException();
#if defined(POCO_HAVE_FD_POLL)
pollfd pollBuf;
memset(&pollBuf, 0, sizeof(pollfd));
pollBuf.fd = _sockfd;
if (mode & SELECT_READ) pollBuf.events |= POLLIN;
if (mode & SELECT_WRITE) pollBuf.events |= POLLOUT;
int rc;
do {
Poco::Timestamp start;
rc = ::poll(&pollBuf, 1, remainingTime.totalMilliseconds());
if (rc < 0 && lastError() == POCO_EINTR && remainingTime > 0)
{
Poco::Timestamp end;
Poco::Timespan waited = end - start;
if (waited < remainingTime) remainingTime -= waited;
else remainingTime = 0;
}
} while (rc < 0 && lastError() == POCO_EINTR);
if (rc < 0) error();
return rc > 0;
#else
// select() fallback with fd_set
#endif
}
Socket Initialization
void SocketImpl::init(int af)
{
#ifdef SOCK_CLOEXEC
initSocket(af, SOCK_STREAM | SOCK_CLOEXEC);
#else
initSocket(af, SOCK_STREAM);
#endif
}
void SocketImpl::initSocket(int af, int type, int proto)
{
poco_assert(_sockfd == POCO_INVALID_SOCKET);
_sockfd = ::socket(af, type, proto);
if (_sockfd == POCO_INVALID_SOCKET) error();
#if defined(__MACH__) && defined(__APPLE__) || defined(__FreeBSD__)
setOption(SOL_SOCKET, SO_NOSIGPIPE, 1);
#endif
}
Error Code to Exception Mapping
void SocketImpl::error(int code, const std::string& arg)
{
switch (code)
{
case POCO_ENOERR: return;
case POCO_EINTR: throw IOException("Interrupted", code);
case POCO_EACCES: throw IOException("Permission denied", code);
case POCO_ECONNREFUSED: throw ConnectionRefusedException(arg, code);
case POCO_ECONNRESET: throw ConnectionResetException(code);
case POCO_ETIMEDOUT: throw TimeoutException(code);
case POCO_EADDRINUSE: throw NetException("Address already in use", arg, code);
// ... 30+ error code mappings
}
}
I/O Contract
| Input | Output | Side Effects |
|---|---|---|
| `connect(address)` | None (void) | Calls `::connect`; initializes socket if needed; throws on error |
| `connect(address, timeout)` | None (void) | Non-blocking connect with poll-based timeout; restores blocking mode after |
| `bind(address, reuseAddress, reusePort)` | None (void) | Sets `SO_REUSEADDR`/`SO_REUSEPORT` then calls `::bind` |
| `listen(backlog)` | None (void) | Calls `::listen` on the socket |
| `acceptConnection(clientAddr)` | New `StreamSocketImpl*` for accepted connection | Calls `::accept`; populates `clientAddr` from `sockaddr_storage` |
| `sendBytes(buffer, length, flags)` | Bytes sent (int) | Applies throttling, calls `::send`; retries on EINTR in blocking mode |
| `receiveBytes(buffer, length, flags)` | Bytes received (int) | Applies throttling, calls `::recv`; retries on EINTR in blocking mode |
| `sendTo(buffer, length, address, flags)` | Bytes sent (int) | Calls `::sendto` for UDP datagrams |
| `receiveFrom(buffer, length, address, flags)` | Bytes received (int) | Calls `::recvfrom`; populates sender address |
| `poll(timeout, mode)` | True if socket is ready | Uses `::poll` or `::select` with EINTR retry and timeout adjustment |
| `setOption(level, option, value)` / `getOption(...)` | None / Option value | Wraps `::setsockopt` / `::getsockopt` |
| `close()` | None (void) | Calls `poco_closesocket` and invalidates the file descriptor |
Usage Examples
// SocketImpl is typically used indirectly through Socket/StreamSocket/DatagramSocket.
// Direct usage example:
SocketImpl impl;
SocketAddress addr("127.0.0.1", 8080);
impl.connect(addr);
char buf[1024];
int sent = impl.sendBytes("GET / HTTP/1.0\r\n\r\n", 18, 0);
int received = impl.receiveBytes(buf, sizeof(buf), 0);
// Timed connect
SocketImpl impl2;
impl2.connect(addr, Poco::Timespan(5, 0)); // 5-second timeout
// Socket options
impl.setNoDelay(true);
impl.setKeepAlive(true);
impl.setSendBufferSize(65536);
impl.setReceiveTimeout(Poco::Timespan(10, 0));
// Set bandwidth throttler
auto throttler = std::make_shared<Poco::Net::Throttler>(1048576); // 1 MB/s
impl.setSendThrottler(throttler);
Internal Details
Broken Timeout Workaround
On platforms where `SO_RCVTIMEO`/`SO_SNDTIMEO` do not work reliably (detected via `POCO_BROKEN_TIMEOUTS`), the implementation falls back to explicit `poll` calls before each blocking send/receive to enforce timeouts.
Bandwidth Throttling
`SocketImpl` integrates with an optional `Throttler` object for both send and receive paths. A "budget" system tracks how many bytes have been reserved from the throttler versus how many have actually been sent/received:
- `throttleSend`/`throttleRecv`: Requests permission from the throttler if the current budget is insufficient. Uses `THROTTLER_QUANTUM` as the minimum allocation unit.
- `useSendThrottlerBudget`/`useRecvThrottlerBudget`: Decrements the budget by the number of bytes actually transferred.
Non-Blocking Connect
`connectNB` sets the socket to non-blocking mode and calls `::connect`. It accepts `EINPROGRESS` and `EWOULDBLOCK` as expected results, leaving the socket in non-blocking mode for the caller to poll for completion.
`SOCK_CLOEXEC`
On Linux, sockets are created with `SOCK_CLOEXEC` to prevent file descriptor leaks across `fork`/`exec`.
`SO_NOSIGPIPE`
On macOS and FreeBSD, `SO_NOSIGPIPE` is set to prevent SIGPIPE signals when writing to a closed connection, ensuring consistent cross-platform exception-based error handling.
Blocking Mode
`setBlocking` uses `fcntl(F_SETFL, O_NONBLOCK)` on Unix or `ioctl(FIONBIO)` on other platforms.