Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ClickHouse ClickHouse Poco SocketImpl Impl

From Leeroopedia


base/poco/Net/src/SocketImpl.cpp:1-1118 ClickHouse_ClickHouse ClickHouse_ClickHouse_Socket_Abstraction

Source File base/poco/Net/src/SocketImpl.cpp
Lines of Code 1118
Principle Principle:ClickHouse_ClickHouse_Socket_Abstraction
Domain Networking, Sockets
Language C++
Last Updated 2026-02-08 00:00 GMT

Purpose

Implements `SocketImpl`, the low-level socket system call wrapper that forms the foundation of the Poco socket abstraction. This class directly wraps POSIX/BSD socket APIs (`connect`, `bind`, `listen`, `accept`, `send`, `recv`, `sendto`, `recvfrom`, `poll`/`select`, `setsockopt`, `getsockopt`, `ioctl`, `fcntl`) and converts OS error codes into typed C++ exceptions. It also implements bandwidth throttling and timeout management.

Code Reference

Core Socket Operations

void SocketImpl::connect(const SocketAddress& address)
{
    if (_sockfd == POCO_INVALID_SOCKET) init(address.af());
    int rc;
    do {
        rc = ::connect(_sockfd, address.addr(), address.length());
    } while (rc != 0 && lastError() == POCO_EINTR);
    if (rc != 0) error(lastError(), address.toString());
}

void SocketImpl::bind(const SocketAddress& address, bool reuseAddress, bool reusePort)
{
    if (_sockfd == POCO_INVALID_SOCKET) init(address.af());
    if (reuseAddress) setReuseAddress(true);
    if (reusePort) setReusePort(true);
    int rc = ::bind(_sockfd, address.addr(), address.length());
    if (rc != 0) error(address.toString());
}

SocketImpl* SocketImpl::acceptConnection(SocketAddress& clientAddr)
{
    if (_sockfd == POCO_INVALID_SOCKET) throw InvalidSocketException();
    sockaddr_storage buffer;
    struct sockaddr* pSA = reinterpret_cast<struct sockaddr*>(&buffer);
    poco_socklen_t saLen = sizeof(buffer);
    poco_socket_t sd;
    do {
        sd = ::accept(_sockfd, pSA, &saLen);
    } while (sd == POCO_INVALID_SOCKET && lastError() == POCO_EINTR);
    if (sd != POCO_INVALID_SOCKET) {
        clientAddr = SocketAddress(pSA, saLen);
        return new StreamSocketImpl(sd);
    }
    error();
    return 0;
}

Send/Receive with Throttling

int SocketImpl::sendBytes(const void* buffer, int length, int flags)
{
    bool blocking = _blocking && (flags & MSG_DONTWAIT) == 0;
    throttleSend(length, blocking);

    if (_isBrokenTimeout && blocking)
    {
        if (_sndTimeout.totalMicroseconds() != 0)
            if (!poll(_sndTimeout, SELECT_WRITE))
                throw TimeoutException();
    }
    int rc;
    do {
        if (_sockfd == POCO_INVALID_SOCKET) throw InvalidSocketException();
        rc = ::send(_sockfd, reinterpret_cast<const char*>(buffer), length, flags);
    } while (blocking && rc < 0 && lastError() == POCO_EINTR);

    if (rc < 0) {
        int err = lastError();
        if ((err == POCO_EAGAIN || err == POCO_EWOULDBLOCK) && !blocking) ;
        else if (err == POCO_EAGAIN || err == POCO_ETIMEDOUT) throw TimeoutException();
        else error(err);
    }
    useSendThrottlerBudget(rc);
    return rc;
}

Poll Implementation

bool SocketImpl::pollImpl(Poco::Timespan& remainingTime, int mode)
{
    poco_socket_t sockfd = _sockfd;
    if (sockfd == POCO_INVALID_SOCKET) throw InvalidSocketException();

#if defined(POCO_HAVE_FD_POLL)
    pollfd pollBuf;
    memset(&pollBuf, 0, sizeof(pollfd));
    pollBuf.fd = _sockfd;
    if (mode & SELECT_READ)  pollBuf.events |= POLLIN;
    if (mode & SELECT_WRITE) pollBuf.events |= POLLOUT;

    int rc;
    do {
        Poco::Timestamp start;
        rc = ::poll(&pollBuf, 1, remainingTime.totalMilliseconds());
        if (rc < 0 && lastError() == POCO_EINTR && remainingTime > 0)
        {
            Poco::Timestamp end;
            Poco::Timespan waited = end - start;
            if (waited < remainingTime) remainingTime -= waited;
            else remainingTime = 0;
        }
    } while (rc < 0 && lastError() == POCO_EINTR);
    if (rc < 0) error();
    return rc > 0;
#else
    // select() fallback with fd_set
#endif
}

Socket Initialization

void SocketImpl::init(int af)
{
#ifdef SOCK_CLOEXEC
    initSocket(af, SOCK_STREAM | SOCK_CLOEXEC);
#else
    initSocket(af, SOCK_STREAM);
#endif
}

void SocketImpl::initSocket(int af, int type, int proto)
{
    poco_assert(_sockfd == POCO_INVALID_SOCKET);
    _sockfd = ::socket(af, type, proto);
    if (_sockfd == POCO_INVALID_SOCKET) error();

#if defined(__MACH__) && defined(__APPLE__) || defined(__FreeBSD__)
    setOption(SOL_SOCKET, SO_NOSIGPIPE, 1);
#endif
}

Error Code to Exception Mapping

void SocketImpl::error(int code, const std::string& arg)
{
    switch (code)
    {
    case POCO_ENOERR:        return;
    case POCO_EINTR:         throw IOException("Interrupted", code);
    case POCO_EACCES:        throw IOException("Permission denied", code);
    case POCO_ECONNREFUSED:  throw ConnectionRefusedException(arg, code);
    case POCO_ECONNRESET:    throw ConnectionResetException(code);
    case POCO_ETIMEDOUT:     throw TimeoutException(code);
    case POCO_EADDRINUSE:    throw NetException("Address already in use", arg, code);
    // ... 30+ error code mappings
    }
}

I/O Contract

Input Output Side Effects
`connect(address)` None (void) Calls `::connect`; initializes socket if needed; throws on error
`connect(address, timeout)` None (void) Non-blocking connect with poll-based timeout; restores blocking mode after
`bind(address, reuseAddress, reusePort)` None (void) Sets `SO_REUSEADDR`/`SO_REUSEPORT` then calls `::bind`
`listen(backlog)` None (void) Calls `::listen` on the socket
`acceptConnection(clientAddr)` New `StreamSocketImpl*` for accepted connection Calls `::accept`; populates `clientAddr` from `sockaddr_storage`
`sendBytes(buffer, length, flags)` Bytes sent (int) Applies throttling, calls `::send`; retries on EINTR in blocking mode
`receiveBytes(buffer, length, flags)` Bytes received (int) Applies throttling, calls `::recv`; retries on EINTR in blocking mode
`sendTo(buffer, length, address, flags)` Bytes sent (int) Calls `::sendto` for UDP datagrams
`receiveFrom(buffer, length, address, flags)` Bytes received (int) Calls `::recvfrom`; populates sender address
`poll(timeout, mode)` True if socket is ready Uses `::poll` or `::select` with EINTR retry and timeout adjustment
`setOption(level, option, value)` / `getOption(...)` None / Option value Wraps `::setsockopt` / `::getsockopt`
`close()` None (void) Calls `poco_closesocket` and invalidates the file descriptor

Usage Examples

// SocketImpl is typically used indirectly through Socket/StreamSocket/DatagramSocket.
// Direct usage example:

SocketImpl impl;
SocketAddress addr("127.0.0.1", 8080);
impl.connect(addr);

char buf[1024];
int sent = impl.sendBytes("GET / HTTP/1.0\r\n\r\n", 18, 0);
int received = impl.receiveBytes(buf, sizeof(buf), 0);

// Timed connect
SocketImpl impl2;
impl2.connect(addr, Poco::Timespan(5, 0));  // 5-second timeout

// Socket options
impl.setNoDelay(true);
impl.setKeepAlive(true);
impl.setSendBufferSize(65536);
impl.setReceiveTimeout(Poco::Timespan(10, 0));

// Set bandwidth throttler
auto throttler = std::make_shared<Poco::Net::Throttler>(1048576);  // 1 MB/s
impl.setSendThrottler(throttler);

Internal Details

Broken Timeout Workaround

On platforms where `SO_RCVTIMEO`/`SO_SNDTIMEO` do not work reliably (detected via `POCO_BROKEN_TIMEOUTS`), the implementation falls back to explicit `poll` calls before each blocking send/receive to enforce timeouts.

Bandwidth Throttling

`SocketImpl` integrates with an optional `Throttler` object for both send and receive paths. A "budget" system tracks how many bytes have been reserved from the throttler versus how many have actually been sent/received:

  • `throttleSend`/`throttleRecv`: Requests permission from the throttler if the current budget is insufficient. Uses `THROTTLER_QUANTUM` as the minimum allocation unit.
  • `useSendThrottlerBudget`/`useRecvThrottlerBudget`: Decrements the budget by the number of bytes actually transferred.

Non-Blocking Connect

`connectNB` sets the socket to non-blocking mode and calls `::connect`. It accepts `EINPROGRESS` and `EWOULDBLOCK` as expected results, leaving the socket in non-blocking mode for the caller to poll for completion.

`SOCK_CLOEXEC`

On Linux, sockets are created with `SOCK_CLOEXEC` to prevent file descriptor leaks across `fork`/`exec`.

`SO_NOSIGPIPE`

On macOS and FreeBSD, `SO_NOSIGPIPE` is set to prevent SIGPIPE signals when writing to a closed connection, ensuring consistent cross-platform exception-based error handling.

Blocking Mode

`setBlocking` uses `fcntl(F_SETFL, O_NONBLOCK)` on Unix or `ioctl(FIONBIO)` on other platforms.

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment