diff options
author | Antti Määttä <[email protected]> | 2023-06-19 12:31:06 +0300 |
---|---|---|
committer | Antti Määttä <[email protected]> | 2023-08-29 16:27:06 +0300 |
commit | 1909e1e14c130fc88d2a5d52f70877de0737f275 (patch) | |
tree | c85c2fff04afac8ee637c465c2ede4df28ee5cc1 | |
parent | 4721721f6005c90cb1760b0c6e074468aa73512f (diff) |
CTF: Add zstd support for streaming
Adds zstd support for CTF streaming. The client can select which
compression scheme to use in the trace request.
Change-Id: I21423eda2c8223fd5d23eb5adaf216eb61a25501
Reviewed-by: Hatem ElKharashy <[email protected]>
Reviewed-by: Janne Koskinen <[email protected]>
-rw-r--r-- | src/plugins/tracing/CMakeLists.txt | 4 | ||||
-rw-r--r-- | src/plugins/tracing/qctfserver.cpp | 108 | ||||
-rw-r--r-- | src/plugins/tracing/qctfserver_p.h | 20 |
3 files changed, 105 insertions, 27 deletions
diff --git a/src/plugins/tracing/CMakeLists.txt b/src/plugins/tracing/CMakeLists.txt index 3a92e16474a..b4a908c7ea9 100644 --- a/src/plugins/tracing/CMakeLists.txt +++ b/src/plugins/tracing/CMakeLists.txt @@ -22,3 +22,7 @@ qt_internal_add_plugin(QCtfTracePlugin Qt6::Core Qt6::CorePrivate Qt6::Network ) +qt_internal_extend_target(QCtfTracePlugin CONDITION QT_FEATURE_zstd + LIBRARIES + WrapZSTD::WrapZSTD +) diff --git a/src/plugins/tracing/qctfserver.cpp b/src/plugins/tracing/qctfserver.cpp index 0dc01508987..d97c345e115 100644 --- a/src/plugins/tracing/qctfserver.cpp +++ b/src/plugins/tracing/qctfserver.cpp @@ -4,31 +4,34 @@ #include <qloggingcategory.h> #include "qctfserver_p.h" +#if QT_CONFIG(zstd) +#include <zstd.h> +#endif + using namespace Qt::Literals::StringLiterals; Q_LOGGING_CATEGORY(lcCtfInfoTrace, "qt.core.ctfserver", QtWarningMsg) -TracePacket &TracePacket::writePacket(TracePacket &packet, QCborStreamWriter &cbor, int compression) +#if QT_CONFIG(zstd) +static QByteArray zstdCompress(ZSTD_CCtx *&context, const QByteArray &data, int compression) { - cbor.startMap(4); - cbor.append("magic"_L1); - cbor.append(packet.PacketMagicNumber); - cbor.append("name"_L1); - cbor.append(QString::fromUtf8(packet.stream_name)); - cbor.append("flags"_L1); - cbor.append(packet.flags); - - cbor.append("data"_L1); - if (compression > 0) { - QByteArray compressed = qCompress(packet.stream_data, compression); - cbor.append(compressed); - } else { - cbor.append(packet.stream_data); + if (context == nullptr) + context = ZSTD_createCCtx(); + qsizetype size = data.size(); + size = ZSTD_COMPRESSBOUND(size); + QByteArray compressed(size, Qt::Uninitialized); + char *dst = compressed.data(); + size_t n = ZSTD_compressCCtx(context, dst, size, + data.constData(), data.size(), + compression); + if (ZSTD_isError(n)) { + qCWarning(lcCtfInfoTrace) << "Compression with zstd failed: " << QString::fromUtf8(ZSTD_getErrorName(n)); + return {}; } - - cbor.endMap(); - return packet; + compressed.truncate(n); + return compressed; } +#endif QCtfServer::QCtfServer(QObject *parent) : QThread(parent) @@ -38,7 +41,15 @@ QCtfServer::QCtfServer(QObject *parent) << "sessionName"_L1 << "sessionTracepoints"_L1 << "flags"_L1 - << "bufferSize"_L1; + << "bufferSize"_L1 + << "compressionScheme"_L1; +} + +QCtfServer::~QCtfServer() +{ +#if QT_CONFIG(zstd) + ZSTD_freeCCtx(m_zstdCCtx); +#endif } void QCtfServer::setHost(const QString &address) @@ -129,6 +140,9 @@ void QCtfServer::handleString(QCborStreamReader &cbor) case RequestSessionTracepoints: m_req.sessionTracepoints = readString(cbor); break; + case RequestCompressionScheme: + m_requestedCompressionScheme = readString(cbor); + break; default: // handle error break; @@ -201,6 +215,47 @@ void QCtfServer::readCbor(QCborStreamReader &cbor) } } +void QCtfServer::writePacket(TracePacket &packet, QCborStreamWriter &cbor) +{ + cbor.startMap(4); + cbor.append("magic"_L1); + cbor.append(packet.PacketMagicNumber); + cbor.append("name"_L1); + cbor.append(QString::fromUtf8(packet.stream_name)); + cbor.append("flags"_L1); + cbor.append(packet.flags); + + cbor.append("data"_L1); + if (m_compression > 0) { + QByteArray compressed; +#if QT_CONFIG(zstd) + if (m_requestedCompressionScheme == QStringLiteral("zstd")) + compressed = zstdCompress(m_zstdCCtx, packet.stream_data, m_compression); + else +#endif + compressed = qCompress(packet.stream_data, m_compression); + + cbor.append(compressed); + } else { + cbor.append(packet.stream_data); + } + + cbor.endMap(); +} + +bool QCtfServer::recognizedCompressionScheme() const +{ + if (m_requestedCompressionScheme.isEmpty()) + return true; +#if QT_CONFIG(zstd) + if (m_requestedCompressionScheme == QStringLiteral("zstd")) + return true; +#endif + if (m_requestedCompressionScheme == QStringLiteral("zlib")) + return true; + return false; +} + void QCtfServer::run() { m_server = new QTcpServer(); @@ -250,10 +305,21 @@ void QCtfServer::run() m_socket->close(); } else { m_compression = m_req.flags & CompressionMask; +#if QT_CONFIG(zstd) + m_compression = qMin(m_compression, ZSTD_maxCLevel()); +#else + m_compression = qMin(m_compression, 9); +#endif m_bufferOnIdle = !(m_req.flags & DontBufferOnIdle); m_maxPackets = qMax(m_req.bufferSize / TracePacket::PacketSize, 16u); + if (!recognizedCompressionScheme()) { + qCWarning(lcCtfInfoTrace) << "Client requested unrecognized compression scheme: " << m_requestedCompressionScheme; + m_requestedCompressionScheme.clear(); + m_compression = 0; + } + qCInfo(lcCtfInfoTrace) << "request received: " << m_req.sessionName << ", " << m_req.sessionTracepoints; m_cb->handleSessionChange(); @@ -273,7 +339,7 @@ void QCtfServer::run() cbor.append(resp.serverName); if (m_compression) { cbor.append("compressionScheme"_L1); - cbor.append("zlib"_L1); + cbor.append(m_requestedCompressionScheme); } cbor.endMap(); } @@ -292,7 +358,7 @@ void QCtfServer::run() { QCborStreamWriter cbor(m_socket); for (TracePacket &packet : packets) { - TracePacket::writePacket(packet, cbor, m_compression); + writePacket(packet, cbor); if (!waitSocket()) break; } diff --git a/src/plugins/tracing/qctfserver_p.h b/src/plugins/tracing/qctfserver_p.h index d754e6f8004..3660df7f091 100644 --- a/src/plugins/tracing/qctfserver_p.h +++ b/src/plugins/tracing/qctfserver_p.h @@ -28,8 +28,11 @@ #include <qcborstreamwriter.h> #include <qlist.h> +typedef struct ZSTD_CCtx_s ZSTD_CCtx; + QT_BEGIN_NAMESPACE +class QCtfServer; struct TracePacket { static constexpr quint32 PacketMagicNumber = 0x100924da; @@ -66,8 +69,6 @@ struct TracePacket flags = t.flags; return *this; } - - static TracePacket &writePacket(TracePacket &packet, QCborStreamWriter &cbor, int compression); }; auto constexpr operator""_MB(quint64 s) -> quint64 @@ -115,9 +116,9 @@ public: }; enum ServerFlags { - CompressionMask = 15, - DontBufferOnIdle = 16, // not set -> the server is buffering even without client connection - // set -> the server is buffering only when client is connected + CompressionMask = 255, + DontBufferOnIdle = 256, // not set -> the server is buffering even without client connection + // set -> the server is buffering only when client is connected }; enum RequestIds { @@ -127,6 +128,7 @@ public: RequestSessionTracepoints, RequestFlags, RequestBufferSize, + RequestCompressionScheme, }; struct ServerCallback @@ -135,6 +137,7 @@ public: virtual void handleStatusChange(ServerStatus status) = 0; }; QCtfServer(QObject *parent = nullptr); + ~QCtfServer(); void setCallback(ServerCallback *cb); void setHost(const QString &address); void setPort(int port); @@ -154,8 +157,9 @@ private: void readCbor(QCborStreamReader &cbor); void handleString(QCborStreamReader &cbor); void handleFixedWidth(QCborStreamReader &cbor); - + bool recognizedCompressionScheme() const; void setStatusAndNotify(ServerStatus status); + void writePacket(TracePacket &packet, QCborStreamWriter &cbor); QMutex m_mutex; QWaitCondition m_bufferHasData; @@ -176,6 +180,10 @@ private: QAtomicInt m_stopping; bool m_bufferOnIdle = true; QString m_currentKey; + QString m_requestedCompressionScheme; +#if QT_CONFIG(zstd) + ZSTD_CCtx *m_zstdCCtx = nullptr; +#endif static constexpr quint32 ServerId = 1; static constexpr quint32 DefaultMaxPackets = 256; // 1 MB |