summaryrefslogtreecommitdiff
path: root/ot_fullscrape.c
diff options
context:
space:
mode:
Diffstat (limited to 'ot_fullscrape.c')
-rw-r--r--ot_fullscrape.c287
1 files changed, 152 insertions, 135 deletions
diff --git a/ot_fullscrape.c b/ot_fullscrape.c
index 86b9e26..8f8ee9d 100644
--- a/ot_fullscrape.c
+++ b/ot_fullscrape.c
@@ -6,11 +6,11 @@
6#ifdef WANT_FULLSCRAPE 6#ifdef WANT_FULLSCRAPE
7 7
8/* System */ 8/* System */
9#include <sys/param.h> 9#include <arpa/inet.h>
10#include <pthread.h>
10#include <stdio.h> 11#include <stdio.h>
11#include <string.h> 12#include <string.h>
12#include <pthread.h> 13#include <sys/param.h>
13#include <arpa/inet.h>
14#ifdef WANT_COMPRESSION_GZIP 14#ifdef WANT_COMPRESSION_GZIP
15#include <zlib.h> 15#include <zlib.h>
16#endif 16#endif
@@ -21,46 +21,56 @@
21#include "textcode.h" 21#include "textcode.h"
22 22
23/* Opentracker */ 23/* Opentracker */
24#include "trackerlogic.h"
25#include "ot_mutex.h"
26#include "ot_iovec.h"
27#include "ot_fullscrape.h" 24#include "ot_fullscrape.h"
25#include "ot_iovec.h"
26#include "ot_mutex.h"
27#include "trackerlogic.h"
28 28
29/* Fetch full scrape info for all torrents 29/* Fetch full scrape info for all torrents
30 Full scrapes usually are huge and one does not want to 30 Full scrapes usually are huge and one does not want to
31 allocate more memory. So lets get them in 512k units 31 allocate more memory. So lets get them in 512k units
32*/ 32*/
33#define OT_SCRAPE_CHUNK_SIZE (1024*1024) 33#define OT_SCRAPE_CHUNK_SIZE (1024 * 1024)
34 34
35/* "d8:completei%zde10:downloadedi%zde10:incompletei%zdee" */ 35/* "d8:completei%zde10:downloadedi%zde10:incompletei%zdee" */
36#define OT_SCRAPE_MAXENTRYLEN 256 36#define OT_SCRAPE_MAXENTRYLEN 256
37 37
38/* Forward declaration */ 38/* Forward declaration */
39static void fullscrape_make( int taskid, ot_tasktype mode); 39static void fullscrape_make(int taskid, ot_tasktype mode);
40#ifdef WANT_COMPRESSION_GZIP 40#ifdef WANT_COMPRESSION_GZIP
41static void fullscrape_make_gzip( int taskid, ot_tasktype mode); 41static void fullscrape_make_gzip(int taskid, ot_tasktype mode);
42#endif 42#endif
43 43
44/* Converter function from memory to human readable hex strings 44/* Converter function from memory to human readable hex strings
45 XXX - Duplicated from ot_stats. Needs fix. */ 45 XXX - Duplicated from ot_stats. Needs fix. */
46static char*to_hex(char*d,uint8_t*s){char*m="0123456789ABCDEF";char *t=d;char*e=d+40;while(d<e){*d++=m[*s>>4];*d++=m[*s++&15];}*d=0;return t;} 46static char *to_hex(char *d, uint8_t *s) {
47 char *m = "0123456789ABCDEF";
48 char *t = d;
49 char *e = d + 40;
50 while (d < e) {
51 *d++ = m[*s >> 4];
52 *d++ = m[*s++ & 15];
53 }
54 *d = 0;
55 return t;
56}
47 57
48/* This is the entry point into this worker thread 58/* This is the entry point into this worker thread
49 It grabs tasks from mutex_tasklist and delivers results back 59 It grabs tasks from mutex_tasklist and delivers results back
50*/ 60*/
51static void * fullscrape_worker( void * args ) { 61static void *fullscrape_worker(void *args) {
52 (void) args; 62 (void)args;
53 63
54 while( g_opentracker_running ) { 64 while (g_opentracker_running) {
55 ot_tasktype tasktype = TASK_FULLSCRAPE; 65 ot_tasktype tasktype = TASK_FULLSCRAPE;
56 ot_taskid taskid = mutex_workqueue_poptask( &tasktype ); 66 ot_taskid taskid = mutex_workqueue_poptask(&tasktype);
57#ifdef WANT_COMPRESSION_GZIP 67#ifdef WANT_COMPRESSION_GZIP
58 if (tasktype & TASK_FLAG_GZIP) 68 if (tasktype & TASK_FLAG_GZIP)
59 fullscrape_make_gzip( taskid, tasktype ); 69 fullscrape_make_gzip(taskid, tasktype);
60 else 70 else
61#endif 71#endif
62 fullscrape_make( taskid, tasktype ); 72 fullscrape_make(taskid, tasktype);
63 mutex_workqueue_pushchunked( taskid, NULL ); 73 mutex_workqueue_pushchunked(taskid, NULL);
64 } 74 }
65 return NULL; 75 return NULL;
66} 76}
@@ -83,80 +93,87 @@ static char * fullscrape_write_one( ot_tasktype mode, char *r, ot_torrent *torre
83 size_t peer_count = torrent->peer_list6->peer_count + torrent->peer_list4->peer_count; 93 size_t peer_count = torrent->peer_list6->peer_count + torrent->peer_list4->peer_count;
84 size_t down_count = torrent->peer_list6->down_count + torrent->peer_list4->down_count; 94 size_t down_count = torrent->peer_list6->down_count + torrent->peer_list4->down_count;
85 95
86 switch( mode & TASK_TASK_MASK ) { 96 switch (mode & TASK_TASK_MASK) {
87 case TASK_FULLSCRAPE: 97 case TASK_FULLSCRAPE:
88 default: 98 default:
89 /* push hash as bencoded string */ 99 /* push hash as bencoded string */
90 *r++='2'; *r++='0'; *r++=':'; 100 *r++ = '2';
91 memcpy( r, hash, sizeof(ot_hash) ); r += sizeof(ot_hash); 101 *r++ = '0';
92 /* push rest of the scrape string */ 102 *r++ = ':';
93 r += sprintf( r, "d8:completei%zde10:downloadedi%zde10:incompletei%zdee", seed_count, down_count, peer_count-seed_count ); 103 memcpy(r, hash, sizeof(ot_hash));
94 104 r += sizeof(ot_hash);
95 break; 105 /* push rest of the scrape string */
96 case TASK_FULLSCRAPE_TPB_ASCII: 106 r += sprintf(r, "d8:completei%zde10:downloadedi%zde10:incompletei%zdee", seed_count, down_count, peer_count - seed_count);
97 to_hex( r, *hash ); r+= 2 * sizeof(ot_hash); 107
98 r += sprintf( r, ":%zd:%zd\n", seed_count, peer_count-seed_count ); 108 break;
99 break; 109 case TASK_FULLSCRAPE_TPB_ASCII:
100 case TASK_FULLSCRAPE_TPB_ASCII_PLUS: 110 to_hex(r, *hash);
101 to_hex( r, *hash ); r+= 2 * sizeof(ot_hash); 111 r += 2 * sizeof(ot_hash);
102 r += sprintf( r, ":%zd:%zd:%zd\n", seed_count, peer_count-seed_count, down_count ); 112 r += sprintf(r, ":%zd:%zd\n", seed_count, peer_count - seed_count);
103 break; 113 break;
104 case TASK_FULLSCRAPE_TPB_BINARY: 114 case TASK_FULLSCRAPE_TPB_ASCII_PLUS:
105 memcpy( r, *hash, sizeof(ot_hash) ); r += sizeof(ot_hash); 115 to_hex(r, *hash);
106 *(uint32_t*)(r+0) = htonl( (uint32_t) seed_count ); 116 r += 2 * sizeof(ot_hash);
107 *(uint32_t*)(r+4) = htonl( (uint32_t)( peer_count-seed_count) ); 117 r += sprintf(r, ":%zd:%zd:%zd\n", seed_count, peer_count - seed_count, down_count);
108 r+=8; 118 break;
109 break; 119 case TASK_FULLSCRAPE_TPB_BINARY:
110 case TASK_FULLSCRAPE_TPB_URLENCODED: 120 memcpy(r, *hash, sizeof(ot_hash));
111 r += fmt_urlencoded( r, (char *)*hash, 20 ); 121 r += sizeof(ot_hash);
112 r += sprintf( r, ":%zd:%zd\n", seed_count, peer_count-seed_count ); 122 *(uint32_t *)(r + 0) = htonl((uint32_t)seed_count);
113 break; 123 *(uint32_t *)(r + 4) = htonl((uint32_t)(peer_count - seed_count));
114 case TASK_FULLSCRAPE_TRACKERSTATE: 124 r += 8;
115 to_hex( r, *hash ); r+= 2 * sizeof(ot_hash); 125 break;
116 r += sprintf( r, ":%zd:%zd\n", torrent->peer_list6->base, down_count ); 126 case TASK_FULLSCRAPE_TPB_URLENCODED:
117 break; 127 r += fmt_urlencoded(r, (char *)*hash, 20);
118 } 128 r += sprintf(r, ":%zd:%zd\n", seed_count, peer_count - seed_count);
119 return r; 129 break;
130 case TASK_FULLSCRAPE_TRACKERSTATE:
131 to_hex(r, *hash);
132 r += 2 * sizeof(ot_hash);
133 r += sprintf(r, ":%zd:%zd\n", torrent->peer_list6->base, down_count);
134 break;
135 }
136 return r;
120} 137}
121 138
122static void fullscrape_make( int taskid, ot_tasktype mode ) { 139static void fullscrape_make(int taskid, ot_tasktype mode) {
123 int bucket; 140 int bucket;
124 char *r, *re; 141 char *r, *re;
125 struct iovec iovector = { NULL, 0 }; 142 struct iovec iovector = {NULL, 0};
126 143
127 /* Setup return vector... */ 144 /* Setup return vector... */
128 r = iovector.iov_base = malloc( OT_SCRAPE_CHUNK_SIZE ); 145 r = iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE);
129 if( !r ) 146 if (!r)
130 return; 147 return;
131 148
132 /* re points to low watermark */ 149 /* re points to low watermark */
133 re = r + OT_SCRAPE_CHUNK_SIZE - OT_SCRAPE_MAXENTRYLEN; 150 re = r + OT_SCRAPE_CHUNK_SIZE - OT_SCRAPE_MAXENTRYLEN;
134 151
135 if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE ) 152 if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE)
136 r += sprintf( r, "d5:filesd" ); 153 r += sprintf(r, "d5:filesd");
137 154
138 /* For each bucket... */ 155 /* For each bucket... */
139 for( bucket=0; bucket<OT_BUCKET_COUNT; ++bucket ) { 156 for (bucket = 0; bucket < OT_BUCKET_COUNT; ++bucket) {
140 /* Get exclusive access to that bucket */ 157 /* Get exclusive access to that bucket */
141 ot_vector *torrents_list = mutex_bucket_lock( bucket ); 158 ot_vector *torrents_list = mutex_bucket_lock(bucket);
142 ot_torrent *torrents = (ot_torrent*)(torrents_list->data); 159 ot_torrent *torrents = (ot_torrent *)(torrents_list->data);
143 size_t i; 160 size_t i;
144 161
145 /* For each torrent in this bucket.. */ 162 /* For each torrent in this bucket.. */
146 for( i=0; i<torrents_list->size; ++i ) { 163 for (i = 0; i < torrents_list->size; ++i) {
147 r = fullscrape_write_one( mode, r, torrents+i, &torrents[i].hash ); 164 r = fullscrape_write_one(mode, r, torrents + i, &torrents[i].hash);
148 165
149 if( r > re) { 166 if (r > re) {
150 iovector.iov_len = r - (char *)iovector.iov_base; 167 iovector.iov_len = r - (char *)iovector.iov_base;
151 168
152 if (mutex_workqueue_pushchunked(taskid, &iovector) ) { 169 if (mutex_workqueue_pushchunked(taskid, &iovector)) {
153 free(iovector.iov_base); 170 free(iovector.iov_base);
154 return mutex_bucket_unlock( bucket, 0 ); 171 return mutex_bucket_unlock(bucket, 0);
155 } 172 }
156 /* Allocate a fresh output buffer */ 173 /* Allocate a fresh output buffer */
157 r = iovector.iov_base = malloc( OT_SCRAPE_CHUNK_SIZE ); 174 r = iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE);
158 if( !r ) 175 if (!r)
159 return mutex_bucket_unlock( bucket, 0 ); 176 return mutex_bucket_unlock(bucket, 0);
160 177
161 /* re points to low watermark */ 178 /* re points to low watermark */
162 re = r + OT_SCRAPE_CHUNK_SIZE - OT_SCRAPE_MAXENTRYLEN; 179 re = r + OT_SCRAPE_CHUNK_SIZE - OT_SCRAPE_MAXENTRYLEN;
@@ -164,132 +181,132 @@ static void fullscrape_make( int taskid, ot_tasktype mode ) {
164 } 181 }
165 182
166 /* All torrents done: release lock on current bucket */ 183 /* All torrents done: release lock on current bucket */
167 mutex_bucket_unlock( bucket, 0 ); 184 mutex_bucket_unlock(bucket, 0);
168 185
169 /* Parent thread died? */ 186 /* Parent thread died? */
170 if( !g_opentracker_running ) 187 if (!g_opentracker_running)
171 return; 188 return;
172 } 189 }
173 190
174 if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE ) 191 if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE)
175 r += sprintf( r, "ee" ); 192 r += sprintf(r, "ee");
176 193
177 /* Send rest of data */ 194 /* Send rest of data */
178 iovector.iov_len = r - (char *)iovector.iov_base; 195 iovector.iov_len = r - (char *)iovector.iov_base;
179 if( mutex_workqueue_pushchunked(taskid, &iovector) ) 196 if (mutex_workqueue_pushchunked(taskid, &iovector))
180 free(iovector.iov_base); 197 free(iovector.iov_base);
181} 198}
182 199
183#ifdef WANT_COMPRESSION_GZIP 200#ifdef WANT_COMPRESSION_GZIP
184 201
185static void fullscrape_make_gzip( int taskid, ot_tasktype mode) { 202static void fullscrape_make_gzip(int taskid, ot_tasktype mode) {
186 int bucket; 203 int bucket;
187 char *r; 204 char *r;
188 struct iovec iovector = { NULL, 0 }; 205 struct iovec iovector = {NULL, 0};
189 int zres; 206 int zres;
190 z_stream strm; 207 z_stream strm;
191fprintf(stderr, "GZIP path\n"); 208 fprintf(stderr, "GZIP path\n");
192 /* Setup return vector... */ 209 /* Setup return vector... */
193 iovector.iov_base = malloc( OT_SCRAPE_CHUNK_SIZE ); 210 iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE);
194 if( !iovector.iov_base ) 211 if (!iovector.iov_base)
195 return; 212 return;
196 213
197 byte_zero( &strm, sizeof(strm) ); 214 byte_zero(&strm, sizeof(strm));
198 strm.next_out = (uint8_t*)iovector.iov_base; 215 strm.next_out = (uint8_t *)iovector.iov_base;
199 strm.avail_out = OT_SCRAPE_CHUNK_SIZE; 216 strm.avail_out = OT_SCRAPE_CHUNK_SIZE;
200 if( deflateInit2(&strm,7,Z_DEFLATED,31,9,Z_DEFAULT_STRATEGY) != Z_OK ) 217 if (deflateInit2(&strm, 7, Z_DEFLATED, 31, 9, Z_DEFAULT_STRATEGY) != Z_OK)
201 fprintf( stderr, "not ok.\n" ); 218 fprintf(stderr, "not ok.\n");
202 219
203 if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE ) { 220 if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) {
204 strm.next_in = (uint8_t*)"d5:filesd"; 221 strm.next_in = (uint8_t *)"d5:filesd";
205 strm.avail_in = strlen("d5:filesd"); 222 strm.avail_in = strlen("d5:filesd");
206 zres = deflate( &strm, Z_NO_FLUSH ); 223 zres = deflate(&strm, Z_NO_FLUSH);
207 } 224 }
208 225
209 /* For each bucket... */ 226 /* For each bucket... */
210 for( bucket=0; bucket<OT_BUCKET_COUNT; ++bucket ) { 227 for (bucket = 0; bucket < OT_BUCKET_COUNT; ++bucket) {
211 /* Get exclusive access to that bucket */ 228 /* Get exclusive access to that bucket */
212 ot_vector *torrents_list = mutex_bucket_lock( bucket ); 229 ot_vector *torrents_list = mutex_bucket_lock(bucket);
213 ot_torrent *torrents = (ot_torrent*)(torrents_list->data); 230 ot_torrent *torrents = (ot_torrent *)(torrents_list->data);
214 size_t i; 231 size_t i;
215 232
216 /* For each torrent in this bucket.. */ 233 /* For each torrent in this bucket.. */
217 for( i=0; i<torrents_list->size; ++i ) { 234 for (i = 0; i < torrents_list->size; ++i) {
218 char compress_buffer[OT_SCRAPE_MAXENTRYLEN]; 235 char compress_buffer[OT_SCRAPE_MAXENTRYLEN];
219 r = fullscrape_write_one( mode, compress_buffer, torrents+i, &torrents[i].hash ); 236 r = fullscrape_write_one(mode, compress_buffer, torrents + i, &torrents[i].hash);
220 strm.next_in = (uint8_t*)compress_buffer; 237 strm.next_in = (uint8_t *)compress_buffer;
221 strm.avail_in = r - compress_buffer; 238 strm.avail_in = r - compress_buffer;
222 zres = deflate( &strm, Z_NO_FLUSH ); 239 zres = deflate(&strm, Z_NO_FLUSH);
223 if( ( zres < Z_OK ) && ( zres != Z_BUF_ERROR ) ) 240 if ((zres < Z_OK) && (zres != Z_BUF_ERROR))
224 fprintf( stderr, "deflate() failed while in fullscrape_make().\n" ); 241 fprintf(stderr, "deflate() failed while in fullscrape_make().\n");
225 242
226 /* Check if there still is enough buffer left */ 243 /* Check if there still is enough buffer left */
227 while( !strm.avail_out ) { 244 while (!strm.avail_out) {
228 iovector.iov_len = (char *)strm.next_out - (char *)iovector.iov_base; 245 iovector.iov_len = (char *)strm.next_out - (char *)iovector.iov_base;
229 246
230 if (mutex_workqueue_pushchunked(taskid, &iovector) ) { 247 if (mutex_workqueue_pushchunked(taskid, &iovector)) {
231 free(iovector.iov_base); 248 free(iovector.iov_base);
232 return mutex_bucket_unlock( bucket, 0 ); 249 return mutex_bucket_unlock(bucket, 0);
233 } 250 }
234 /* Allocate a fresh output buffer */ 251 /* Allocate a fresh output buffer */
235 iovector.iov_base = malloc( OT_SCRAPE_CHUNK_SIZE ); 252 iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE);
236 if( !iovector.iov_base ) { 253 if (!iovector.iov_base) {
237 fprintf( stderr, "Out of memory trying to claim ouput buffer\n" ); 254 fprintf(stderr, "Out of memory trying to claim ouput buffer\n");
238 deflateEnd(&strm); 255 deflateEnd(&strm);
239 return mutex_bucket_unlock( bucket, 0 ); 256 return mutex_bucket_unlock(bucket, 0);
240 } 257 }
241 strm.next_out = (uint8_t*)iovector.iov_base; 258 strm.next_out = (uint8_t *)iovector.iov_base;
242 strm.avail_out = OT_SCRAPE_CHUNK_SIZE; 259 strm.avail_out = OT_SCRAPE_CHUNK_SIZE;
243 zres = deflate( &strm, Z_NO_FLUSH ); 260 zres = deflate(&strm, Z_NO_FLUSH);
244 if( ( zres < Z_OK ) && ( zres != Z_BUF_ERROR ) ) 261 if ((zres < Z_OK) && (zres != Z_BUF_ERROR))
245 fprintf( stderr, "deflate() failed while in fullscrape_make().\n" ); 262 fprintf(stderr, "deflate() failed while in fullscrape_make().\n");
246 } 263 }
247 } 264 }
248 265
249 /* All torrents done: release lock on current bucket */ 266 /* All torrents done: release lock on current bucket */
250 mutex_bucket_unlock( bucket, 0 ); 267 mutex_bucket_unlock(bucket, 0);
251 268
252 /* Parent thread died? */ 269 /* Parent thread died? */
253 if( !g_opentracker_running ) 270 if (!g_opentracker_running)
254 return; 271 return;
255 } 272 }
256 273
257 if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE ) { 274 if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) {
258 strm.next_in = (uint8_t*)"ee"; 275 strm.next_in = (uint8_t *)"ee";
259 strm.avail_in = strlen("ee"); 276 strm.avail_in = strlen("ee");
260 } 277 }
261 278
262 if( deflate( &strm, Z_FINISH ) < Z_OK ) 279 if (deflate(&strm, Z_FINISH) < Z_OK)
263 fprintf( stderr, "deflate() failed while in fullscrape_make()'s endgame.\n" ); 280 fprintf(stderr, "deflate() failed while in fullscrape_make()'s endgame.\n");
264 281
265 iovector.iov_len = (char *)strm.next_out - (char *)iovector.iov_base; 282 iovector.iov_len = (char *)strm.next_out - (char *)iovector.iov_base;
266 if (mutex_workqueue_pushchunked(taskid, &iovector) ) { 283 if (mutex_workqueue_pushchunked(taskid, &iovector)) {
267 free(iovector.iov_base); 284 free(iovector.iov_base);
268 return mutex_bucket_unlock( bucket, 0 ); 285 return mutex_bucket_unlock(bucket, 0);
269 } 286 }
270 287
271 { 288 {
272 unsigned int pending; 289 unsigned int pending;
273 int bits; 290 int bits;
274 deflatePending( &strm, &pending, &bits); 291 deflatePending(&strm, &pending, &bits);
275 pending += ( bits ? 1 : 0 ); 292 pending += (bits ? 1 : 0);
276 293
277 if (pending) { 294 if (pending) {
278 /* Allocate a fresh output buffer */ 295 /* Allocate a fresh output buffer */
279 iovector.iov_base = malloc( pending ); 296 iovector.iov_base = malloc(pending);
280 iovector.iov_len = pending; 297 iovector.iov_len = pending;
281 298
282 if( !iovector.iov_base ) { 299 if (!iovector.iov_base) {
283 fprintf( stderr, "Problem with iovec_fix_increase_or_free\n" ); 300 fprintf(stderr, "Problem with iovec_fix_increase_or_free\n");
284 deflateEnd(&strm); 301 deflateEnd(&strm);
285 return mutex_bucket_unlock( bucket, 0 ); 302 return mutex_bucket_unlock(bucket, 0);
286 } 303 }
287 strm.next_out = iovector.iov_base; 304 strm.next_out = iovector.iov_base;
288 strm.avail_out = pending; 305 strm.avail_out = pending;
289 if( deflate( &strm, Z_FINISH ) < Z_OK ) 306 if (deflate(&strm, Z_FINISH) < Z_OK)
290 fprintf( stderr, "deflate() failed while in fullscrape_make()'s endgame.\n" ); 307 fprintf(stderr, "deflate() failed while in fullscrape_make()'s endgame.\n");
291 308
292 if( mutex_workqueue_pushchunked(taskid, &iovector) ) 309 if (mutex_workqueue_pushchunked(taskid, &iovector))
293 free(iovector.iov_base); 310 free(iovector.iov_base);
294 } 311 }
295 } 312 }