Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ClickHouse ClickHouse Poco SocketImpl

From Leeroopedia


Knowledge Sources
Domains Networking, Sockets
Last Updated 2026-02-08 00:00 GMT

Overview

Low-level socket implementation encapsulating Berkeley sockets API with reference counting and platform abstraction.

Description

The Poco `SocketImpl` class provides the actual implementation of socket operations, encapsulating the Berkeley sockets API with platform-specific differences abstracted away. It's a reference-counted object (extends `RefCountedObject`) that manages a native socket descriptor and provides methods for all socket operations: connection, binding, listening, I/O, and configuration.

The class handles both blocking and non-blocking modes, implements throttling for rate-limiting network I/O, manages socket options at the system level, and provides error handling that throws appropriate exceptions. It includes support for poll/select operations and integrates with Poco's throttling infrastructure.

Usage

ClickHouse relies on this as the underlying implementation for all socket types. While `Socket` and its subclasses provide the public API, `SocketImpl` does the actual work of interfacing with OS socket APIs.

Code Reference

Source Location

Signature

class SocketImpl : public Poco::RefCountedObject {
public:
    enum SelectMode {
        SELECT_READ = 1,
        SELECT_WRITE = 2,
        SELECT_ERROR = 4
    };

    virtual SocketImpl* acceptConnection(SocketAddress& clientAddr);
    virtual void connect(const SocketAddress& address);
    virtual void connect(const SocketAddress& address, const Poco::Timespan& timeout);
    virtual void connectNB(const SocketAddress& address);

    virtual void bind(const SocketAddress& address, bool reuseAddress = false);
    virtual void bind(const SocketAddress& address, bool reuseAddress, bool reusePort);
    virtual void bind6(const SocketAddress& address, bool reuseAddress = false, bool ipV6Only = false);
    virtual void bind6(const SocketAddress& address, bool reuseAddress, bool reusePort, bool ipV6Only);

    virtual void listen(int backlog = 64);
    virtual void close();
    virtual void shutdownReceive();
    virtual void shutdownSend();
    virtual void shutdown();

    virtual int sendBytes(const void* buffer, int length, int flags = 0);
    virtual int receiveBytes(void* buffer, int length, int flags = 0);
    virtual int sendTo(const void* buffer, int length, const SocketAddress& address, int flags = 0);
    virtual int receiveFrom(void* buffer, int length, SocketAddress& address, int flags = 0);
    virtual void sendUrgent(unsigned char data);

    virtual int available();
    virtual bool poll(const Poco::Timespan& timeout, int mode);

    virtual void setSendBufferSize(int size);
    virtual int getSendBufferSize();
    virtual void setReceiveBufferSize(int size);
    virtual int getReceiveBufferSize();

    virtual void setSendTimeout(const Poco::Timespan& timeout);
    virtual Poco::Timespan getSendTimeout();
    virtual void setReceiveTimeout(const Poco::Timespan& timeout);
    virtual Poco::Timespan getReceiveTimeout();

    virtual void setSendThrottler(const ThrottlerPtr& throttler);
    virtual ThrottlerPtr getSendThrottler();
    virtual void setReceiveThrottler(const ThrottlerPtr& throttler);
    virtual ThrottlerPtr getReceiveThrottler();

    virtual SocketAddress address();
    virtual SocketAddress peerAddress();

    void setOption(int level, int option, int value);
    void setOption(int level, int option, unsigned value);
    void setOption(int level, int option, const Poco::Timespan& value);
    void setOption(int level, int option, const IPAddress& value);
    virtual void setRawOption(int level, int option, const void* value, poco_socklen_t length);

    void getOption(int level, int option, int& value);
    void getOption(int level, int option, unsigned& value);
    void getOption(int level, int option, Poco::Timespan& value);
    void getOption(int level, int option, IPAddress& value);
    virtual void getRawOption(int level, int option, void* value, poco_socklen_t& length);

    void setLinger(bool on, int seconds);
    void getLinger(bool& on, int& seconds);
    void setNoDelay(bool flag);
    bool getNoDelay();
    void setKeepAlive(bool flag);
    bool getKeepAlive();
    void setReuseAddress(bool flag);
    bool getReuseAddress();
    void setReusePort(bool flag);
    bool getReusePort();
    void setOOBInline(bool flag);
    bool getOOBInline();
    void setBroadcast(bool flag);
    bool getBroadcast();
    virtual void setBlocking(bool flag);
    virtual bool getBlocking() const;
    virtual bool secure() const;

    int socketError();
    poco_socket_t sockfd() const;
    void ioctl(poco_ioctl_request_t request, int& arg);
    void ioctl(poco_ioctl_request_t request, void* arg);

    bool initialized() const;
    static void error(int code);

protected:
    SocketImpl();
    SocketImpl(poco_socket_t sockfd);
    virtual ~SocketImpl();

    virtual void init(int af);
    void initSocket(int af, int type, int proto = 0);
    void reset(poco_socket_t fd = POCO_INVALID_SOCKET);

    static int lastError();
    static void error();
    static void error(const std::string& arg);
    static void error(int code, const std::string& arg);

    void throttleSend(size_t length, bool blocking);
    void throttleRecv(size_t length, bool blocking);
};

Import

#include <Poco/Net/SocketImpl.h>

I/O Contract

Input Output
Address family (AF_INET, AF_INET6) Initialized native socket descriptor
`SocketAddress` for binding/connecting Bound or connected socket
Buffer and length for send/receive Bytes transferred or error
Timeout value for poll/select Boolean indicating I/O readiness

Socket Creation Parameters

Parameter Description
`af` Address family (AF_INET, AF_INET6, AF_UNIX)
`type` Socket type (SOCK_STREAM, SOCK_DGRAM, SOCK_RAW)
`proto` Protocol (usually 0 for default, or IPPROTO_*)

Usage Examples

// SocketImpl is typically used internally by Socket classes
// Direct usage example (normally not needed in application code):

// Create TCP socket implementation
class StreamSocketImpl : public SocketImpl {
protected:
    void init(int af) override {
        initSocket(af, SOCK_STREAM);
    }
};

// Create and use socket implementation
StreamSocketImpl* pImpl = new StreamSocketImpl();
pImpl->init(AF_INET);

// Bind to address
SocketAddress bindAddr("0.0.0.0", 8080);
pImpl->bind(bindAddr, true);  // reuseAddress = true

// Listen for connections
pImpl->listen(64);

// Accept connection
SocketAddress clientAddr;
SocketImpl* pClient = pImpl->acceptConnection(clientAddr);
std::cout << "Client: " << clientAddr.toString() << std::endl;

// Configure client socket
pClient->setNoDelay(true);
pClient->setKeepAlive(true);
pClient->setSendTimeout(Poco::Timespan(30, 0));
pClient->setReceiveTimeout(Poco::Timespan(30, 0));

// Set throttling
auto throttler = std::make_shared<Throttler>(1024 * 1024);
pClient->setSendThrottler(throttler);
pClient->setReceiveThrottler(throttler);

// Send data
const char* msg = "Hello, World!";
int sent = pClient->sendBytes(msg, strlen(msg));

// Receive data
char buffer[1024];
int received = pClient->receiveBytes(buffer, sizeof(buffer));

// Check for available data
int avail = pClient->available();

// Poll for readability
if (pClient->poll(Poco::Timespan(1, 0), SocketImpl::SELECT_READ)) {
    // Socket is readable
    received = pClient->receiveBytes(buffer, sizeof(buffer));
}

// Query socket addresses
SocketAddress local = pClient->address();
SocketAddress peer = pClient->peerAddress();

// Graceful shutdown
pClient->shutdownSend();
pClient->shutdownReceive();
pClient->close();

// The SocketImpl is reference-counted and will be deleted
// when the last reference is released
pImpl->release();
pClient->release();

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment