Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ClickHouse ClickHouse Poco WebSocketImpl

From Leeroopedia
Revision as of 14:38, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/ClickHouse_ClickHouse_Poco_WebSocketImpl.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


base/poco/Net/src/WebSocketImpl.cpp:1-424 ClickHouse_ClickHouse ClickHouse_ClickHouse_WebSocket_Communication

Source File base/poco/Net/src/WebSocketImpl.cpp
Lines of Code 424
Principle Principle:ClickHouse_ClickHouse_WebSocket_Communication
Domain Networking, WebSocket
Language C++
Last Updated 2026-02-08 00:00 GMT

Purpose

Implements the WebSocket wire protocol (RFC 6455) for encoding and decoding WebSocket frames over an existing TCP stream socket. Handles frame header construction, payload length encoding (7-bit, 16-bit, 64-bit), optional XOR masking for client-to-server frames, and delegates all actual network I/O to the underlying `StreamSocketImpl`. Unsupported socket operations (bind, listen, connect, sendTo, receiveFrom) throw `InvalidAccessException` to enforce the WebSocket usage contract.

Code Reference

Frame Sending (Encoding)

int WebSocketImpl::sendBytes(const void* buffer, int length, int flags)
{
    Poco::Buffer<char> frame(length + MAX_HEADER_LENGTH);
    Poco::MemoryOutputStream ostr(frame.begin(), frame.size());
    Poco::BinaryWriter writer(ostr, Poco::BinaryWriter::NETWORK_BYTE_ORDER);

    if (flags == 0) flags = WebSocket::FRAME_BINARY;
    flags &= 0xff;
    writer << static_cast<Poco::UInt8>(flags);

    Poco::UInt8 lengthByte(0);
    if (_mustMaskPayload) lengthByte |= FRAME_FLAG_MASK;

    if (length < 126)
    {
        lengthByte |= static_cast<Poco::UInt8>(length);
        writer << lengthByte;
    }
    else if (length < 65536)
    {
        lengthByte |= 126;
        writer << lengthByte << static_cast<Poco::UInt16>(length);
    }
    else
    {
        lengthByte |= 127;
        writer << lengthByte << static_cast<Poco::UInt64>(length);
    }

    if (_mustMaskPayload)
    {
        const Poco::UInt32 mask = _rnd.next();
        const char* m = reinterpret_cast<const char*>(&mask);
        const char* b = reinterpret_cast<const char*>(buffer);
        writer.writeRaw(m, 4);
        char* p = frame.begin() + ostr.charsWritten();
        for (int i = 0; i < length; i++)
            p[i] = b[i] ^ m[i % 4];
    }
    else
    {
        std::memcpy(frame.begin() + ostr.charsWritten(), buffer, length);
    }
    _pStreamSocketImpl->sendBytes(frame.begin(),
        length + static_cast<int>(ostr.charsWritten()));
    return length;
}

Frame Receiving (Header Decoding)

int WebSocketImpl::receiveHeader(char mask[4], bool& useMask)
{
    char header[MAX_HEADER_LENGTH];
    int n = receiveNBytes(header, 2);
    if (n <= 0) { _frameFlags = 0; return n; }

    Poco::UInt8 flags = static_cast<Poco::UInt8>(header[0]);
    _frameFlags = flags;
    Poco::UInt8 lengthByte = static_cast<Poco::UInt8>(header[1]);
    useMask = ((lengthByte & FRAME_FLAG_MASK) != 0);

    int payloadLength;
    lengthByte &= 0x7f;
    if (lengthByte == 127)
    {
        n = receiveNBytes(header + 2, 8);
        // Read 64-bit payload length
        Poco::UInt64 l;
        reader >> l;
        if (l > _maxPayloadSize)
            throw WebSocketException("Payload too big", WebSocket::WS_ERR_PAYLOAD_TOO_BIG);
        payloadLength = static_cast<int>(l);
    }
    else if (lengthByte == 126)
    {
        n = receiveNBytes(header + 2, 2);
        // Read 16-bit payload length
        Poco::UInt16 l;
        reader >> l;
        if (l > _maxPayloadSize)
            throw WebSocketException("Payload too big", WebSocket::WS_ERR_PAYLOAD_TOO_BIG);
        payloadLength = static_cast<int>(l);
    }
    else
    {
        if (lengthByte > _maxPayloadSize)
            throw WebSocketException("Payload too big", WebSocket::WS_ERR_PAYLOAD_TOO_BIG);
        payloadLength = lengthByte;
    }

    if (useMask) { n = receiveNBytes(mask, 4); }
    return payloadLength;
}

Payload Receive and Unmask

int WebSocketImpl::receivePayload(char* buffer, int payloadLength,
                                   char mask[4], bool useMask)
{
    int received = receiveNBytes(buffer, payloadLength);
    if (received <= 0)
        throw WebSocketException("Incomplete frame received",
                                 WebSocket::WS_ERR_INCOMPLETE_FRAME);
    if (useMask)
    {
        for (int i = 0; i < received; i++)
            buffer[i] ^= mask[i % 4];
    }
    return received;
}

Buffered Receive

int WebSocketImpl::receiveSomeBytes(char* buffer, int bytes)
{
    int n = static_cast<int>(_buffer.size()) - _bufferOffset;
    if (n > 0)
    {
        if (bytes < n) n = bytes;
        std::memcpy(buffer, _buffer.begin() + _bufferOffset, n);
        _bufferOffset += n;
        return n;
    }
    else
    {
        return _pStreamSocketImpl->receiveBytes(buffer, bytes);
    }
}

I/O Contract

Input Output Side Effects
`sendBytes(buffer, length, flags)` Number of payload bytes sent (int) Constructs a WebSocket frame (header + optionally masked payload) and sends it via the underlying `StreamSocketImpl`. If flags is 0, defaults to `FRAME_BINARY`.
`receiveBytes(buffer, length, flags)` Number of payload bytes received (int) Reads a complete WebSocket frame header, validates payload size against `_maxPayloadSize`, reads and unmasks the payload. Frame flags are stored in `_frameFlags`.
`receiveBytes(Buffer<char>&, flags)` Number of payload bytes received (int) Same as above but auto-resizes the buffer to accommodate the payload.
`setMaxPayloadSize(size)` None (void) Sets the maximum allowed payload size; frames exceeding this throw `WebSocketException` with `WS_ERR_PAYLOAD_TOO_BIG`.
`close()` None (void) Delegates to `_pStreamSocketImpl->close()` and resets the socket fd.
`connect`, `bind`, `listen`, `sendTo`, `receiveFrom`, `sendUrgent`, `acceptConnection` N/A All throw `Poco::InvalidAccessException` -- these operations are not valid on a WebSocket.
`setSendTimeout`, `setReceiveTimeout`, `setSendThrottler`, `setReceiveThrottler` None (void) Delegates to the underlying `StreamSocketImpl`.

Usage Examples

// WebSocketImpl is created internally by the WebSocket class after HTTP upgrade.
// Typical usage through the public WebSocket API:

// Server-side: accept WebSocket connection
HTTPServerRequest& request = ...;
HTTPServerResponse& response = ...;
WebSocket ws(request, response);

// Send a text frame
std::string msg = "Hello, WebSocket!";
ws.sendFrame(msg.data(), msg.size(), WebSocket::FRAME_TEXT);

// Receive a frame
char buffer[4096];
int flags;
int n = ws.receiveFrame(buffer, sizeof(buffer), flags);
if (flags & WebSocket::FRAME_OP_CLOSE)
{
    // Handle close frame
}

// Set maximum payload size to prevent memory exhaustion
ws.setMaxPayloadSize(1048576);  // 1 MB

// Internal: WebSocketImpl wraps the upgraded stream socket
// WebSocketImpl* pImpl = new WebSocketImpl(pStreamImpl, session, mustMask);

Internal Details

Frame Format (RFC 6455)

 0                   1                   2                   3
 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len |    Extended payload length    |
|I|S|S|S|  (4)  |A|     (7)     |             (16/64)           |
|N|V|V|V|       |S|             |   (if payload len==126/127)   |
| |1|2|3|       |K|             |                               |
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - -+
|     Extended payload length continued, if payload len == 127  |
+-+-+-+-+-------+-+-------------+-------------------------------+
|                               |Masking-key, if MASK set to 1  |
+-------------------------------+-------------------------------+
| Masking-key (continued)       |          Payload Data         |
+-------------------------------+ - - - - - - - - - - - - - - -+

Payload Length Encoding

  • 0-125: Stored directly in the 7-bit length field
  • 126-65535: Length field set to 126, followed by a 16-bit unsigned integer
  • 65536+: Length field set to 127, followed by a 64-bit unsigned integer

Masking

Client-to-server frames must be masked (`_mustMaskPayload = true`). A random 32-bit mask key is generated using `Poco::Random`, written into the frame header, and each payload byte is XOR'd with `mask[i % 4]`. Server-to-client frames are unmasked.

Internal Buffer

The constructor calls `session.drainBuffer(_buffer)` to consume any bytes the HTTP session has already buffered beyond the upgrade response. `receiveSomeBytes` first serves data from this buffer before reading from the underlying socket, ensuring no data is lost during the HTTP-to-WebSocket transition.

Operation Restrictions

`WebSocketImpl` overrides `connect`, `bind`, `listen`, `sendTo`, `receiveFrom`, `sendUrgent`, and `acceptConnection` to throw `InvalidAccessException`, since these low-level socket operations are not meaningful for an already-established WebSocket connection.

Delegation Pattern

Timeout, throttler, shutdown, close, and `secure` queries are all delegated to the underlying `_pStreamSocketImpl`, which holds the actual TCP connection. The `WebSocketImpl` only interposes on `sendBytes` and `receiveBytes` to add frame encoding/decoding.

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment