Implementation:ClickHouse ClickHouse Poco RemoteSyslogListener
base/poco/Net/src/RemoteSyslogListener.cpp:1-625
ClickHouse_ClickHouse
ClickHouse_ClickHouse_Remote_Syslog_Logging
| Source File | base/poco/Net/src/RemoteSyslogListener.cpp |
|---|---|
| Lines of Code | 625 |
| Principle | Principle:ClickHouse_ClickHouse_Remote_Syslog_Logging |
| Domain | Networking, Logging |
| Language | C++ |
| Last Updated | 2026-02-08 00:00 GMT |
Purpose
Implements a UDP syslog message receiver and parser that listens on a configurable port, receives raw syslog datagrams, parses them according to either RFC 5424 or RFC 3164 (BSD) format, and forwards the resulting `Poco::Message` objects through the Poco logging framework. Uses a producer/consumer architecture with a notification queue to decouple UDP reception from message parsing.
Code Reference
UDP Listener Thread
class RemoteUDPListener: public Poco::Runnable
{
public:
enum { WAITTIME_MILLISEC = 1000, BUFFER_SIZE = 65536 };
RemoteUDPListener(Poco::NotificationQueue& queue, Poco::UInt16 port):
_queue(queue),
_socket(Poco::Net::SocketAddress(Poco::Net::IPAddress(), port)),
_stopped(false)
{}
void run()
{
Poco::Buffer<char> buffer(BUFFER_SIZE);
Poco::Timespan waitTime(WAITTIME_MILLISEC * 1000);
while (!_stopped)
{
try
{
if (_socket.poll(waitTime, Socket::SELECT_READ))
{
Poco::Net::SocketAddress sourceAddress;
int n = _socket.receiveFrom(buffer.begin(), BUFFER_SIZE, sourceAddress);
if (n > 0)
{
_queue.enqueueNotification(
new MessageNotification(buffer.begin(), n, sourceAddress));
}
}
}
catch (...) { /* lazy exception catching */ }
}
}
void safeStop() { _stopped = true; }
};
Syslog Parser
void SyslogParser::parse(const std::string& line, Poco::Message& message)
{
std::size_t pos = 0;
RemoteSyslogChannel::Severity severity;
RemoteSyslogChannel::Facility fac;
parsePrio(line, pos, severity, fac);
// Detect format: digit after <PRI> = RFC 5424, letter = BSD
if (Poco::Ascii::isDigit(line[pos]))
parseNew(line, severity, fac, pos, message);
else
parseBSD(line, severity, fac, pos, message);
}
Priority Parsing
void SyslogParser::parsePrio(const std::string& line, std::size_t& pos,
RemoteSyslogChannel::Severity& severity,
RemoteSyslogChannel::Facility& fac)
{
poco_assert(line[pos] == '<');
++pos;
std::size_t start = pos;
while (pos < line.size() && Poco::Ascii::isDigit(line[pos])) ++pos;
poco_assert(line[pos] == '>');
std::string valStr = line.substr(start, pos - start);
++pos;
int val = Poco::NumberParser::parse(valStr);
Poco::UInt16 pri = static_cast<Poco::UInt16>(val);
severity = static_cast<RemoteSyslogChannel::Severity>(pri & 0x0007u);
fac = static_cast<RemoteSyslogChannel::Facility>(pri & 0xfff8u);
}
Listener Open/Close Lifecycle
void RemoteSyslogListener::open()
{
SplitterChannel::open();
_pParser = new SyslogParser(_queue, this);
if (_port > 0)
_pListener = new RemoteUDPListener(_queue, _port);
for (int i = 0; i < _threads; i++)
_threadPool.start(*_pParser);
if (_pListener)
_threadPool.start(*_pListener);
}
void RemoteSyslogListener::close()
{
if (_pListener) _pListener->safeStop();
if (_pParser) _pParser->safeStop();
_queue.clear();
_threadPool.joinAll();
delete _pListener;
delete _pParser;
_pListener = 0;
_pParser = 0;
SplitterChannel::close();
}
I/O Contract
| Input | Output | Side Effects |
|---|---|---|
| UDP datagrams arriving on the configured port (default 514) | Parsed `Poco::Message` objects forwarded to attached channels via `SplitterChannel::log` | Binds a UDP socket on `open`; spawns 1 listener thread + N parser threads (configurable, default 1) |
| `setProperty("port", "1514")` | None (void) | Sets the UDP port to listen on (0-65535; port 0 disables UDP listener) |
| `setProperty("threads", "4")` | None (void) | Sets the number of parser threads (1-15) |
| `processMessage(text)` | None (void) | Synchronously parses a syslog message string and logs the result |
| `enqueueMessage(text, addr)` | None (void) | Enqueues a message notification for asynchronous parsing |
Usage Examples
// Create a syslog listener on port 1514 with 2 parser threads
RemoteSyslogListener* pListener = new RemoteSyslogListener(1514, 2);
// Attach a downstream channel (e.g., ConsoleChannel, FileChannel)
pListener->addChannel(new Poco::ConsoleChannel);
// Start listening
pListener->open();
// Messages arriving on UDP port 1514 are automatically
// parsed and forwarded to the ConsoleChannel
// Programmatic injection of a message
pListener->processMessage("<14>1 2024-01-15T10:30:00.000Z host app 1234 msgid - Hello");
// Shutdown
pListener->close();
// Register with logging factory
RemoteSyslogListener::registerChannel();
Internal Details
Architecture
The implementation uses three internal classes:
- `MessageNotification`: A `Poco::Notification` subclass that wraps a raw syslog message string and the sender's `SocketAddress`.
- `RemoteUDPListener`: A `Poco::Runnable` that polls a `DatagramSocket` with a 1-second timeout, reads up to 64 KB datagrams, and enqueues `MessageNotification` objects into a `NotificationQueue`.
- `SyslogParser`: A `Poco::Runnable` that dequeues notifications, parses the syslog format, and calls `_pListener->log(message)` to forward parsed messages.
Format Auto-Detection
After extracting the priority value from `<PRI>`, the parser checks the next character:
- If it is a digit, the message is parsed as RFC 5424 (starts with version number "1").
- If it is a letter, the message is parsed as BSD/RFC 3164 (starts with month abbreviation like "Jan").
RFC 5424 Parsing
Fields parsed in order: VERSION, TIMESTAMP, HOSTNAME, APP-NAME, PROCID, MSGID, STRUCTURED-DATA, MSG. The structured data parser handles nested bracket tokens and quoted strings.
BSD Parsing
Parses the BSD timestamp format (`Mmm dd HH:MM:SS`), detecting double-spaces for single-digit day values. The year is inferred from the current system time since BSD format omits it.
Severity to Priority Conversion
// SYSLOG_EMERGENCY/ALERT -> PRIO_FATAL
// SYSLOG_CRITICAL -> PRIO_CRITICAL
// SYSLOG_ERROR -> PRIO_ERROR
// SYSLOG_WARNING -> PRIO_WARNING
// SYSLOG_NOTICE -> PRIO_NOTICE
// SYSLOG_INFORMATIONAL -> PRIO_INFORMATION
// SYSLOG_DEBUG -> PRIO_DEBUG