Implementation:ClickHouse ClickHouse Poco WebSocketImpl
base/poco/Net/src/WebSocketImpl.cpp:1-424
ClickHouse_ClickHouse
ClickHouse_ClickHouse_WebSocket_Communication
| Source File | base/poco/Net/src/WebSocketImpl.cpp |
|---|---|
| Lines of Code | 424 |
| Principle | Principle:ClickHouse_ClickHouse_WebSocket_Communication |
| Domain | Networking, WebSocket |
| Language | C++ |
| Last Updated | 2026-02-08 00:00 GMT |
Purpose
Implements the WebSocket wire protocol (RFC 6455) for encoding and decoding WebSocket frames over an existing TCP stream socket. Handles frame header construction, payload length encoding (7-bit, 16-bit, 64-bit), optional XOR masking for client-to-server frames, and delegates all actual network I/O to the underlying `StreamSocketImpl`. Unsupported socket operations (bind, listen, connect, sendTo, receiveFrom) throw `InvalidAccessException` to enforce the WebSocket usage contract.
Code Reference
Frame Sending (Encoding)
int WebSocketImpl::sendBytes(const void* buffer, int length, int flags)
{
Poco::Buffer<char> frame(length + MAX_HEADER_LENGTH);
Poco::MemoryOutputStream ostr(frame.begin(), frame.size());
Poco::BinaryWriter writer(ostr, Poco::BinaryWriter::NETWORK_BYTE_ORDER);
if (flags == 0) flags = WebSocket::FRAME_BINARY;
flags &= 0xff;
writer << static_cast<Poco::UInt8>(flags);
Poco::UInt8 lengthByte(0);
if (_mustMaskPayload) lengthByte |= FRAME_FLAG_MASK;
if (length < 126)
{
lengthByte |= static_cast<Poco::UInt8>(length);
writer << lengthByte;
}
else if (length < 65536)
{
lengthByte |= 126;
writer << lengthByte << static_cast<Poco::UInt16>(length);
}
else
{
lengthByte |= 127;
writer << lengthByte << static_cast<Poco::UInt64>(length);
}
if (_mustMaskPayload)
{
const Poco::UInt32 mask = _rnd.next();
const char* m = reinterpret_cast<const char*>(&mask);
const char* b = reinterpret_cast<const char*>(buffer);
writer.writeRaw(m, 4);
char* p = frame.begin() + ostr.charsWritten();
for (int i = 0; i < length; i++)
p[i] = b[i] ^ m[i % 4];
}
else
{
std::memcpy(frame.begin() + ostr.charsWritten(), buffer, length);
}
_pStreamSocketImpl->sendBytes(frame.begin(),
length + static_cast<int>(ostr.charsWritten()));
return length;
}
Frame Receiving (Header Decoding)
int WebSocketImpl::receiveHeader(char mask[4], bool& useMask)
{
char header[MAX_HEADER_LENGTH];
int n = receiveNBytes(header, 2);
if (n <= 0) { _frameFlags = 0; return n; }
Poco::UInt8 flags = static_cast<Poco::UInt8>(header[0]);
_frameFlags = flags;
Poco::UInt8 lengthByte = static_cast<Poco::UInt8>(header[1]);
useMask = ((lengthByte & FRAME_FLAG_MASK) != 0);
int payloadLength;
lengthByte &= 0x7f;
if (lengthByte == 127)
{
n = receiveNBytes(header + 2, 8);
// Read 64-bit payload length
Poco::UInt64 l;
reader >> l;
if (l > _maxPayloadSize)
throw WebSocketException("Payload too big", WebSocket::WS_ERR_PAYLOAD_TOO_BIG);
payloadLength = static_cast<int>(l);
}
else if (lengthByte == 126)
{
n = receiveNBytes(header + 2, 2);
// Read 16-bit payload length
Poco::UInt16 l;
reader >> l;
if (l > _maxPayloadSize)
throw WebSocketException("Payload too big", WebSocket::WS_ERR_PAYLOAD_TOO_BIG);
payloadLength = static_cast<int>(l);
}
else
{
if (lengthByte > _maxPayloadSize)
throw WebSocketException("Payload too big", WebSocket::WS_ERR_PAYLOAD_TOO_BIG);
payloadLength = lengthByte;
}
if (useMask) { n = receiveNBytes(mask, 4); }
return payloadLength;
}
Payload Receive and Unmask
int WebSocketImpl::receivePayload(char* buffer, int payloadLength,
char mask[4], bool useMask)
{
int received = receiveNBytes(buffer, payloadLength);
if (received <= 0)
throw WebSocketException("Incomplete frame received",
WebSocket::WS_ERR_INCOMPLETE_FRAME);
if (useMask)
{
for (int i = 0; i < received; i++)
buffer[i] ^= mask[i % 4];
}
return received;
}
Buffered Receive
int WebSocketImpl::receiveSomeBytes(char* buffer, int bytes)
{
int n = static_cast<int>(_buffer.size()) - _bufferOffset;
if (n > 0)
{
if (bytes < n) n = bytes;
std::memcpy(buffer, _buffer.begin() + _bufferOffset, n);
_bufferOffset += n;
return n;
}
else
{
return _pStreamSocketImpl->receiveBytes(buffer, bytes);
}
}
I/O Contract
| Input | Output | Side Effects |
|---|---|---|
| `sendBytes(buffer, length, flags)` | Number of payload bytes sent (int) | Constructs a WebSocket frame (header + optionally masked payload) and sends it via the underlying `StreamSocketImpl`. If flags is 0, defaults to `FRAME_BINARY`. |
| `receiveBytes(buffer, length, flags)` | Number of payload bytes received (int) | Reads a complete WebSocket frame header, validates payload size against `_maxPayloadSize`, reads and unmasks the payload. Frame flags are stored in `_frameFlags`. |
| `receiveBytes(Buffer<char>&, flags)` | Number of payload bytes received (int) | Same as above but auto-resizes the buffer to accommodate the payload. |
| `setMaxPayloadSize(size)` | None (void) | Sets the maximum allowed payload size; frames exceeding this throw `WebSocketException` with `WS_ERR_PAYLOAD_TOO_BIG`. |
| `close()` | None (void) | Delegates to `_pStreamSocketImpl->close()` and resets the socket fd. |
| `connect`, `bind`, `listen`, `sendTo`, `receiveFrom`, `sendUrgent`, `acceptConnection` | N/A | All throw `Poco::InvalidAccessException` -- these operations are not valid on a WebSocket. |
| `setSendTimeout`, `setReceiveTimeout`, `setSendThrottler`, `setReceiveThrottler` | None (void) | Delegates to the underlying `StreamSocketImpl`. |
Usage Examples
// WebSocketImpl is created internally by the WebSocket class after HTTP upgrade.
// Typical usage through the public WebSocket API:
// Server-side: accept WebSocket connection
HTTPServerRequest& request = ...;
HTTPServerResponse& response = ...;
WebSocket ws(request, response);
// Send a text frame
std::string msg = "Hello, WebSocket!";
ws.sendFrame(msg.data(), msg.size(), WebSocket::FRAME_TEXT);
// Receive a frame
char buffer[4096];
int flags;
int n = ws.receiveFrame(buffer, sizeof(buffer), flags);
if (flags & WebSocket::FRAME_OP_CLOSE)
{
// Handle close frame
}
// Set maximum payload size to prevent memory exhaustion
ws.setMaxPayloadSize(1048576); // 1 MB
// Internal: WebSocketImpl wraps the upgraded stream socket
// WebSocketImpl* pImpl = new WebSocketImpl(pStreamImpl, session, mustMask);
Internal Details
Frame Format (RFC 6455)
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len | Extended payload length |
|I|S|S|S| (4) |A| (7) | (16/64) |
|N|V|V|V| |S| | (if payload len==126/127) |
| |1|2|3| |K| | |
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - -+
| Extended payload length continued, if payload len == 127 |
+-+-+-+-+-------+-+-------------+-------------------------------+
| |Masking-key, if MASK set to 1 |
+-------------------------------+-------------------------------+
| Masking-key (continued) | Payload Data |
+-------------------------------+ - - - - - - - - - - - - - - -+
Payload Length Encoding
- 0-125: Stored directly in the 7-bit length field
- 126-65535: Length field set to 126, followed by a 16-bit unsigned integer
- 65536+: Length field set to 127, followed by a 64-bit unsigned integer
Masking
Client-to-server frames must be masked (`_mustMaskPayload = true`). A random 32-bit mask key is generated using `Poco::Random`, written into the frame header, and each payload byte is XOR'd with `mask[i % 4]`. Server-to-client frames are unmasked.
Internal Buffer
The constructor calls `session.drainBuffer(_buffer)` to consume any bytes the HTTP session has already buffered beyond the upgrade response. `receiveSomeBytes` first serves data from this buffer before reading from the underlying socket, ensuring no data is lost during the HTTP-to-WebSocket transition.
Operation Restrictions
`WebSocketImpl` overrides `connect`, `bind`, `listen`, `sendTo`, `receiveFrom`, `sendUrgent`, and `acceptConnection` to throw `InvalidAccessException`, since these low-level socket operations are not meaningful for an already-established WebSocket connection.
Delegation Pattern
Timeout, throttler, shutdown, close, and `secure` queries are all delegated to the underlying `_pStreamSocketImpl`, which holds the actual TCP connection. The `WebSocketImpl` only interposes on `sendBytes` and `receiveBytes` to add frame encoding/decoding.