From 33bd2c9094e7f90a62cb59cdf5cf670ac58d5308 Mon Sep 17 00:00:00 2001 From: Dirk Engling Date: Thu, 18 Apr 2024 14:54:34 +0200 Subject: Add support for zstd --- Makefile | 5 ++ ot_fullscrape.c | 150 ++++++++++++++++++++++++++++++++++++++++++++++++++++++-- ot_http.c | 35 +++++++++---- ot_http.h | 5 +- ot_mutex.h | 3 +- ot_stats.h | 1 + 6 files changed, 183 insertions(+), 16 deletions(-) diff --git a/Makefile b/Makefile index a224845..e5ca6e4 100644 --- a/Makefile +++ b/Makefile @@ -27,6 +27,11 @@ STRIP?=strip #FEATURES+=-DWANT_IP_FROM_QUERY_STRING FEATURES+=-DWANT_COMPRESSION_GZIP FEATURES+=-DWANT_COMPRESSION_GZIP_ALWAYS + +#FEATURES+=-DWANT_COMPRESSION_ZSTD +#FEATURES+=-DWANT_COMPRESSION_ZSTD_ALWAYS +#LDFLAGS+=-lzstd + #FEATURES+=-DWANT_LOG_NETWORKS #FEATURES+=-DWANT_RESTRICT_STATS #FEATURES+=-DWANT_IP_FROM_PROXY diff --git a/ot_fullscrape.c b/ot_fullscrape.c index aed2ad9..6fd6d1c 100644 --- a/ot_fullscrape.c +++ b/ot_fullscrape.c @@ -14,6 +14,10 @@ #ifdef WANT_COMPRESSION_GZIP #include #endif +#ifdef WANT_COMPRESSION_ZSTD +#include +#endif + /* Libowfat */ #include "byte.h" @@ -40,6 +44,9 @@ static void fullscrape_make(int taskid, ot_tasktype mode); #ifdef WANT_COMPRESSION_GZIP static void fullscrape_make_gzip(int taskid, ot_tasktype mode); #endif +#ifdef WANT_COMPRESSION_ZSTD +static void fullscrape_make_zstd(int taskid, ot_tasktype mode); +#endif /* Converter function from memory to human readable hex strings XXX - Duplicated from ot_stats. Needs fix. */ @@ -64,6 +71,11 @@ static void *fullscrape_worker(void *args) { while (g_opentracker_running) { ot_tasktype tasktype = TASK_FULLSCRAPE; ot_taskid taskid = mutex_workqueue_poptask(&tasktype); +#ifdef WANT_COMPRESSION_ZSTD + if (tasktype & TASK_FLAG_ZSTD) + fullscrape_make_zstd(taskid, tasktype); + else +#endif #ifdef WANT_COMPRESSION_GZIP if (tasktype & TASK_FLAG_GZIP) fullscrape_make_gzip(taskid, tasktype); @@ -205,7 +217,6 @@ static void fullscrape_make_gzip(int taskid, ot_tasktype mode) { struct iovec iovector = {NULL, 0}; int zres; z_stream strm; - fprintf(stderr, "GZIP path\n"); /* Setup return vector... */ iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE); if (!iovector.iov_base) @@ -267,8 +278,10 @@ static void fullscrape_make_gzip(int taskid, ot_tasktype mode) { mutex_bucket_unlock(bucket, 0); /* Parent thread died? */ - if (!g_opentracker_running) + if (!g_opentracker_running) { + deflateEnd(&strm); return; + } } if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) { @@ -282,7 +295,8 @@ static void fullscrape_make_gzip(int taskid, ot_tasktype mode) { iovector.iov_len = (char *)strm.next_out - (char *)iovector.iov_base; if (mutex_workqueue_pushchunked(taskid, &iovector)) { free(iovector.iov_base); - return mutex_bucket_unlock(bucket, 0); + deflateEnd(&strm); + return; } /* Check if there's a last batch of data in the zlib buffer */ @@ -293,7 +307,7 @@ static void fullscrape_make_gzip(int taskid, ot_tasktype mode) { if (!iovector.iov_base) { fprintf(stderr, "Problem with iovec_fix_increase_or_free\n"); deflateEnd(&strm); - return mutex_bucket_unlock(bucket, 0); + return; } strm.next_out = iovector.iov_base; strm.avail_out = OT_SCRAPE_CHUNK_SIZE; @@ -311,5 +325,133 @@ static void fullscrape_make_gzip(int taskid, ot_tasktype mode) { /* WANT_COMPRESSION_GZIP */ #endif +#ifdef WANT_COMPRESSION_ZSTD + +static void fullscrape_make_zstd(int taskid, ot_tasktype mode) { + int bucket; + char *r; + struct iovec iovector = {NULL, 0}; + ZSTD_CCtx *zstream = ZSTD_createCCtx(); + ZSTD_inBuffer inbuf; + ZSTD_outBuffer outbuf; + size_t more_bytes; + + if (!zstream) + return; + + /* Setup return vector... */ + iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE); + if (!iovector.iov_base) { + ZSTD_freeCCtx(zstream); + return; + } + + /* Working with a compression level 6 is half as fast as level 3, but + seems to be the last reasonable bump that's worth extra cpu */ + ZSTD_CCtx_setParameter(zstream, ZSTD_c_compressionLevel, 6); + + outbuf.dst = iovector.iov_base; + outbuf.size = OT_SCRAPE_CHUNK_SIZE; + outbuf.pos = 0; + + if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) { + inbuf.src = (const void *)"d5:filesd"; + inbuf.size = strlen("d5:filesd"); + inbuf.pos = 0; + ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_continue); + } + + /* For each bucket... */ + for (bucket = 0; bucket < OT_BUCKET_COUNT; ++bucket) { + /* Get exclusive access to that bucket */ + ot_vector *torrents_list = mutex_bucket_lock(bucket); + ot_torrent *torrents = (ot_torrent *)(torrents_list->data); + size_t i; + + /* For each torrent in this bucket.. */ + for (i = 0; i < torrents_list->size; ++i) { + char compress_buffer[OT_SCRAPE_MAXENTRYLEN]; + r = fullscrape_write_one(mode, compress_buffer, torrents + i, &torrents[i].hash); + inbuf.src = compress_buffer; + inbuf.size = r - compress_buffer; + inbuf.pos = 0; + ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_continue); + + /* Check if there still is enough buffer left */ + while (outbuf.pos + OT_SCRAPE_MAXENTRYLEN > outbuf.size) { + iovector.iov_len = outbuf.size; + + if (mutex_workqueue_pushchunked(taskid, &iovector)) { + free(iovector.iov_base); + ZSTD_freeCCtx(zstream); + return mutex_bucket_unlock(bucket, 0); + } + /* Allocate a fresh output buffer */ + iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE); + if (!iovector.iov_base) { + fprintf(stderr, "Out of memory trying to claim ouput buffer\n"); + ZSTD_freeCCtx(zstream); + return mutex_bucket_unlock(bucket, 0); + } + + outbuf.dst = iovector.iov_base; + outbuf.size = OT_SCRAPE_CHUNK_SIZE; + outbuf.pos = 0; + + ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_continue); + } + } + + /* All torrents done: release lock on current bucket */ + mutex_bucket_unlock(bucket, 0); + + /* Parent thread died? */ + if (!g_opentracker_running) + return; + } + + if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) { + inbuf.src = (const void *)"ee"; + inbuf.size = strlen("ee"); + inbuf.pos = 0; + } + + more_bytes = ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_end); + + iovector.iov_len = outbuf.pos; + if (mutex_workqueue_pushchunked(taskid, &iovector)) { + free(iovector.iov_base); + ZSTD_freeCCtx(zstream); + return; + } + + /* Check if there's a last batch of data in the zlib buffer */ + if (more_bytes) { + /* Allocate a fresh output buffer */ + iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE); + + if (!iovector.iov_base) { + fprintf(stderr, "Problem with iovec_fix_increase_or_free\n"); + ZSTD_freeCCtx(zstream); + return; + } + + outbuf.dst = iovector.iov_base; + outbuf.size = OT_SCRAPE_CHUNK_SIZE; + outbuf.pos = 0; + + ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_end); + + /* Only pass the new buffer if there actually was some data left in the buffer */ + iovector.iov_len = outbuf.pos; + if (!iovector.iov_len || mutex_workqueue_pushchunked(taskid, &iovector)) + free(iovector.iov_base); + } + + ZSTD_freeCCtx(zstream); +} +/* WANT_COMPRESSION_ZSTD */ +#endif + /* WANT_FULLSCRAPE */ #endif diff --git a/ot_http.c b/ot_http.c index cd2dfc1..af3f210 100644 --- a/ot_http.c +++ b/ot_http.c @@ -159,7 +159,9 @@ ssize_t http_sendiovecdata(const int64 sock, struct ot_workstruct *ws, int iovec if (iovec_entries) { - if (cookie->flag & STRUCT_HTTP_FLAG_GZIP) + if (cookie->flag & STRUCT_HTTP_FLAG_ZSTD) + encoding = "Content-Encoding: zstd\r\n"; + else if (cookie->flag & STRUCT_HTTP_FLAG_GZIP) encoding = "Content-Encoding: gzip\r\n"; else if (cookie->flag & STRUCT_HTTP_FLAG_BZIP2) encoding = "Content-Encoding: bzip2\r\n"; @@ -369,19 +371,34 @@ static ssize_t http_handle_fullscrape(const int64 sock, struct ot_workstruct *ws } #endif -#ifdef WANT_COMPRESSION_GZIP + +#if defined(WANT_COMPRESSION_GZIP) || defined(WANT_COMPRESSION_ZSTD) ws->request[ws->request_size - 1] = 0; -#ifndef WANT_COMPRESSION_GZIP_ALWAYS +#ifdef WANT_COMPRESSION_GZIP if (strstr(ws->request, "gzip")) { -#endif cookie->flag |= STRUCT_HTTP_FLAG_GZIP; - format = TASK_FLAG_GZIP; - stats_issue_event(EVENT_FULLSCRAPE_REQUEST_GZIP, 0, (uintptr_t)cookie->ip); -#ifndef WANT_COMPRESSION_GZIP_ALWAYS - } else + format |= TASK_FLAG_GZIP; + } #endif +#ifdef WANT_COMPRESSION_ZSTD + if (strstr(ws->request, "zstd")) { + cookie->flag |= STRUCT_HTTP_FLAG_ZSTD; + format |= TASK_FLAG_ZSTD; + } +#endif + +#if defined(WANT_COMPRESSION_ZSTD) && defined(WANT_COMPRESSION_ZSTD_ALWAYS) + cookie->flag |= STRUCT_HTTP_FLAG_ZSTD; + format |= TASK_FLAG_ZSTD; #endif - stats_issue_event(EVENT_FULLSCRAPE_REQUEST, 0, (uintptr_t)cookie->ip); + +#if defined(WANT_COMPRESSION_GZIP) && defined(WANT_COMPRESSION_GZIP_ALWAYS) + cookie->flag |= STRUCT_HTTP_FLAG_GZIP; + format |= TASK_FLAG_GZIP; +#endif +#endif + + stats_issue_event(EVENT_FULLSCRAPE_REQUEST, 0, (uintptr_t)cookie->ip); #ifdef _DEBUG_HTTPERROR fprintf(stderr, "%s", ws->debugbuf); diff --git a/ot_http.h b/ot_http.h index fecb4eb..b5ae9ff 100644 --- a/ot_http.h +++ b/ot_http.h @@ -10,8 +10,9 @@ typedef enum { STRUCT_HTTP_FLAG_WAITINGFORTASK = 1, STRUCT_HTTP_FLAG_GZIP = 2, STRUCT_HTTP_FLAG_BZIP2 = 4, - STRUCT_HTTP_FLAG_CHUNKED = 8, - STRUCT_HTTP_FLAG_CHUNKED_IN_TRANSFER = 16 + STRUCT_HTTP_FLAG_ZSTD = 8, + STRUCT_HTTP_FLAG_CHUNKED = 16, + STRUCT_HTTP_FLAG_CHUNKED_IN_TRANSFER = 32 } STRUCT_HTTP_FLAG; struct http_data { diff --git a/ot_mutex.h b/ot_mutex.h index 66b627f..cdfabc9 100644 --- a/ot_mutex.h +++ b/ot_mutex.h @@ -59,7 +59,8 @@ typedef enum { TASK_FLAG_GZIP = 0x1000, TASK_FLAG_BZIP2 = 0x2000, - TASK_FLAG_CHUNKED = 0x4000, + TASK_FLAG_ZSTD = 0x4000, + TASK_FLAG_CHUNKED = 0x8000, TASK_TASK_MASK = 0x0fff, TASK_CLASS_MASK = 0x0f00, diff --git a/ot_stats.h b/ot_stats.h index 4f75049..8ed2b1e 100644 --- a/ot_stats.h +++ b/ot_stats.h @@ -19,6 +19,7 @@ typedef enum { EVENT_SCRAPE, EVENT_FULLSCRAPE_REQUEST, EVENT_FULLSCRAPE_REQUEST_GZIP, + EVENT_FULLSCRAPE_REQUEST_ZSTD, EVENT_FULLSCRAPE, /* TCP only */ EVENT_FAILED, EVENT_BUCKET_LOCKED, -- cgit v1.2.3