summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--proxy.c468
1 files changed, 413 insertions, 55 deletions
diff --git a/proxy.c b/proxy.c
index 3577f44..c27599b 100644
--- a/proxy.c
+++ b/proxy.c
@@ -4,6 +4,7 @@
4 $Id$ */ 4 $Id$ */
5 5
6/* System */ 6/* System */
7#include <stdint.h>
7#include <stdlib.h> 8#include <stdlib.h>
8#include <string.h> 9#include <string.h>
9#include <arpa/inet.h> 10#include <arpa/inet.h>
@@ -14,6 +15,7 @@
14#include <stdio.h> 15#include <stdio.h>
15#include <pwd.h> 16#include <pwd.h>
16#include <ctype.h> 17#include <ctype.h>
18#include <pthread.h>
17 19
18/* Libowfat */ 20/* Libowfat */
19#include "socket.h" 21#include "socket.h"
@@ -26,30 +28,59 @@
26 28
27/* Opentracker */ 29/* Opentracker */
28#include "trackerlogic.h" 30#include "trackerlogic.h"
31#include "ot_vector.h"
32#include "ot_mutex.h"
29#include "ot_livesync.h" 33#include "ot_livesync.h"
34#include "ot_stats.h"
30 35
36ot_ip6 g_serverip;
37uint16_t g_serverport = 9009;
31uint32_t g_tracker_id; 38uint32_t g_tracker_id;
32char groupip_1[4] = { 224,0,23,5 }; 39char groupip_1[4] = { 224,0,23,5 };
40int g_self_pipe[2];
41
42/* If you have more than 10 peers, don't use this proxy
43 Use 20 slots for 10 peers to have room for 10 incoming connection slots
44 */
45#define MAX_PEERS 20
33 46
34#define LIVESYNC_INCOMING_BUFFSIZE (256*256) 47#define LIVESYNC_INCOMING_BUFFSIZE (256*256)
48#define STREAMSYNC_OUTGOING_BUFFSIZE (256*256)
35 49
36#define LIVESYNC_OUTGOING_BUFFSIZE_PEERS 1480 50#define LIVESYNC_OUTGOING_BUFFSIZE_PEERS 1480
37#define LIVESYNC_OUTGOING_WATERMARK_PEERS (sizeof(ot_peer)+sizeof(ot_hash)) 51#define LIVESYNC_OUTGOING_WATERMARK_PEERS (sizeof(ot_peer)+sizeof(ot_hash))
38 52
53/* The amount of time a complete sync cycle should take */
54#define OT_SYNC_INTERVAL_MINUTES 2
55
56/* So after each bucket wait 1 / OT_BUCKET_COUNT intervals */
57#define OT_SYNC_SLEEP ( ( ( OT_SYNC_INTERVAL_MINUTES ) * 60 * 1000000 ) / ( OT_BUCKET_COUNT ) )
58
39enum { OT_SYNC_PEER }; 59enum { OT_SYNC_PEER };
60enum { FLAG_SERVERSOCKET = 1 };
40 61
41/* For outgoing packets */ 62/* For incoming packets */
42static int64 g_socket_in = -1; 63static int64 g_socket_in = -1;
64static uint8_t g_inbuffer[LIVESYNC_INCOMING_BUFFSIZE];
43 65
44/* For incoming packets */ 66/* For outgoing packets */
45static int64 g_socket_out = -1; 67static int64 g_socket_out = -1;
46static uint8_t g_inbuffer[LIVESYNC_INCOMING_BUFFSIZE]; 68//static uint8_t g_outbuffer[STREAMSYNC_OUTGOING_BUFFSIZE];
69
70static void * livesync_worker( void * args );
71static void * streamsync_worker( void * args );
47 72
48void exerr( char * message ) { 73void exerr( char * message ) {
49 fprintf( stderr, "%s\n", message ); 74 fprintf( stderr, "%s\n", message );
50 exit( 111 ); 75 exit( 111 );
51} 76}
52 77
78void stats_issue_event( ot_status_event event, PROTO_FLAG proto, uintptr_t event_data ) {
79 (void) event;
80 (void) proto;
81 (void) event_data;
82}
83
53void livesync_bind_mcast( ot_ip6 ip, uint16_t port) { 84void livesync_bind_mcast( ot_ip6 ip, uint16_t port) {
54 char tmpip[4] = {0,0,0,0}; 85 char tmpip[4] = {0,0,0,0};
55 char *v4ip; 86 char *v4ip;
@@ -80,16 +111,6 @@ void livesync_bind_mcast( ot_ip6 ip, uint16_t port) {
80 socket_mcloop4(g_socket_out, 1); 111 socket_mcloop4(g_socket_out, 1);
81} 112}
82 113
83static ot_vector all_torrents[OT_BUCKET_COUNT];
84ot_vector *mutex_bucket_lock_by_hash( ot_hash hash ) {
85 return all_torrents + ( uint32_read_big( (char*)hash ) >> OT_BUCKET_COUNT_SHIFT );
86}
87ot_vector *mutex_bucket_lock( int bucket ) {
88 return all_torrents + bucket;
89}
90#define mutex_bucket_unlock_by_hash(A,B)
91#define mutex_bucket_unlock(A)
92
93size_t add_peer_to_torrent_proxy( ot_hash hash, ot_peer *peer ) { 114size_t add_peer_to_torrent_proxy( ot_hash hash, ot_peer *peer ) {
94 int exactmatch; 115 int exactmatch;
95 ot_torrent *torrent; 116 ot_torrent *torrent;
@@ -106,6 +127,7 @@ size_t add_peer_to_torrent_proxy( ot_hash hash, ot_peer *peer ) {
106 127
107 if( !( torrent->peer_list = malloc( sizeof (ot_peerlist) ) ) ) { 128 if( !( torrent->peer_list = malloc( sizeof (ot_peerlist) ) ) ) {
108 vector_remove_torrent( torrents_list, torrent ); 129 vector_remove_torrent( torrents_list, torrent );
130 mutex_bucket_unlock_by_hash( hash, 0 );
109 return -1; 131 return -1;
110 } 132 }
111 133
@@ -114,8 +136,10 @@ size_t add_peer_to_torrent_proxy( ot_hash hash, ot_peer *peer ) {
114 136
115 /* Check for peer in torrent */ 137 /* Check for peer in torrent */
116 peer_dest = vector_find_or_insert_peer( &(torrent->peer_list->peers), peer, &exactmatch ); 138 peer_dest = vector_find_or_insert_peer( &(torrent->peer_list->peers), peer, &exactmatch );
117 if( !peer_dest ) return -1; 139 if( !peer_dest ) {
118 140 mutex_bucket_unlock_by_hash( hash, 0 );
141 return -1;
142 }
119 /* Tell peer that it's fresh */ 143 /* Tell peer that it's fresh */
120 OT_PEERTIME( peer ) = 0; 144 OT_PEERTIME( peer ) = 0;
121 145
@@ -126,6 +150,7 @@ size_t add_peer_to_torrent_proxy( ot_hash hash, ot_peer *peer ) {
126 torrent->peer_list->seed_count++; 150 torrent->peer_list->seed_count++;
127 } 151 }
128 memcpy( peer_dest, peer, sizeof(ot_peer) ); 152 memcpy( peer_dest, peer, sizeof(ot_peer) );
153 mutex_bucket_unlock_by_hash( hash, 0 );
129 return 0; 154 return 0;
130} 155}
131 156
@@ -143,6 +168,7 @@ size_t remove_peer_from_torrent_proxy( ot_hash hash, ot_peer *peer ) {
143 } 168 }
144 } 169 }
145 170
171 mutex_bucket_unlock_by_hash( hash, 0 );
146 return 0; 172 return 0;
147} 173}
148 174
@@ -182,16 +208,243 @@ int usage( char *self ) {
182 return 0; 208 return 0;
183} 209}
184 210
185static uint32_t peer_counts[1024]; 211enum {
186#ifdef WANT_SCROOOOOOOLL 212 FLAG_OUTGOING = 0x80,
187static char*to_hex(char*d,uint8_t*s){char*m="0123456789ABCDEF";char *t=d;char*e=d+40;while(d<e){*d++=m[*s>>4];*d++=m[*s++&15];}*d=0;return t;} 213
188#endif 214 FLAG_DISCONNECTED = 0x00,
215 FLAG_CONNECTING = 0x01,
216 FLAG_WAITTRACKERID = 0x02,
217 FLAG_CONNECTED = 0x03,
218
219 FLAG_MASK = 0x07
220};
221
222#define PROXYPEER_NEEDSCONNECT(flag) ((flag)==FLAG_OUTGOING)
223#define PROXYPEER_SETDISCONNECTED(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_DISCONNECTED)
224#define PROXYPEER_SETCONNECTING(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_CONNECTING)
225#define PROXYPEER_SETWAITTRACKERID(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_WAITTRACKERID)
226#define PROXYPEER_SETCONNECTED(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_CONNECTED)
227
228typedef struct {
229 int state; /* Whether we want to connect, how far our handshake is, etc. */
230 ot_ip6 ip; /* The peer to connect to */
231 uint16_t port; /* The peers port */
232 uint8_t *indata; /* Any data not processed yet */
233 size_t indata_length; /* Length of unprocessed data */
234 uint32_t tracker_id; /* How the other end greeted */
235 int64 fd; /* A file handle, if connected, <= 0 is disconnected (0 initially, -1 else) */
236 io_batch outdata; /* The iobatch containing our sync data */
237} proxy_peer;
238
239/* Number of connections to peers
240 * If a peer's IP is set, we try to reconnect, when the connection drops
241 * If we already have a connected tracker_id in our records for an _incoming_ connection, drop it
242 * Multiple connections to/from the same ip are okay, if tracker_id doesn't match
243 * Reconnect attempts occur only twice a minute
244*/
245static int g_connection_count;
246static ot_time g_connection_reconn;
247static proxy_peer g_connections[MAX_PEERS];
248
249static void handle_reconnects( void ) {
250 int i;
251 for( i=0; i<g_connection_count; ++i )
252 if( PROXYPEER_NEEDSCONNECT( g_connections[i].state ) ) {
253 int64 newfd = socket_tcp6( );
254 if( newfd < 0 ) continue; /* No socket for you */
255 io_fd(newfd);
256 if( socket_bind6_reuse(newfd,g_serverip,g_serverport,0) ) {
257 io_close( newfd );
258 continue;
259 }
260 if( socket_connect6(newfd,g_connections[i].ip,g_connections[i].port,0) == -1 &&
261 errno != EINPROGRESS && errno != EWOULDBLOCK ) {
262 close(newfd);
263 continue;
264 }
265 io_wantwrite(newfd); /* So we will be informed when it is connected */
266 io_setcookie(newfd,g_connections+i);
267
268 /* Prepare connection info block */
269 free( g_connections[i].indata );
270 g_connections[i].indata = 0;
271 g_connections[i].indata_length = 0;
272 g_connections[i].fd = newfd;
273 g_connections[i].tracker_id = 0;
274 iob_reset( &g_connections[i].outdata );
275 PROXYPEER_SETCONNECTING( g_connections[i].state );
276 }
277 g_connection_reconn = time(NULL) + 30;
278}
279
280/* Handle incoming connection requests, check against whitelist */
281static void handle_accept( int64 serversocket ) {
282 int64 newfd;
283 ot_ip6 ip;
284 uint16 port;
285
286 while( ( newfd = socket_accept6( serversocket, ip, &port, NULL ) ) != -1 ) {
287
288 /* XXX some access control */
289
290 /* Put fd into a non-blocking mode */
291 io_nonblock( newfd );
292
293 if( !io_fd( newfd ) )
294 io_close( newfd );
295 else {
296 /* Find a new home for our incoming connection */
297 int i;
298 for( i=0; i<MAX_PEERS; ++i )
299 if( g_connections[i].state == FLAG_DISCONNECTED )
300 break;
301 if( i == MAX_PEERS ) {
302 fprintf( stderr, "No room for incoming connection." );
303 close( newfd );
304 continue;
305 }
306
307 /* Prepare connection info block */
308 free( g_connections[i].indata );
309 g_connections[i].indata = 0;
310 g_connections[i].indata_length = 0;
311 g_connections[i].port = port;
312 g_connections[i].fd = newfd;
313 g_connections[i].tracker_id = 0;
314 iob_reset( &g_connections[i].outdata );
315 g_connections[i].tracker_id = 0;
316
317 PROXYPEER_SETCONNECTING( g_connections[i].state );
318
319 io_setcookie( newfd, g_connections + i );
320
321 /* We expect the connecting side to begin with its tracker_id */
322 io_wantread( newfd );
323 }
324 }
325
326 return;
327}
328
329/* New sync data on the stream */
330static void handle_read( int64 peersocket ) {
331 uint32_t tracker_id;
332 proxy_peer *peer = io_getcookie( peersocket );
333 if( !peer ) {
334 /* Can't happen ;) */
335 close( peersocket );
336 return;
337 }
338 switch( peer->state & FLAG_MASK ) {
339 case FLAG_DISCONNECTED: break; /* Shouldnt happen */
340 case FLAG_CONNECTING:
341 case FLAG_WAITTRACKERID:
342 /* We want at least the first four bytes to come at once, to avoid keeping extra states (for now) */
343 if( io_tryread( peersocket, &tracker_id, sizeof( tracker_id ) ) != sizeof( tracker_id ) )
344 goto close_socket;
345
346 /* See, if we already have a connection to that peer */
347 for( i=0; i<MAX_PEERS; ++i )
348 if( ( g_connections[i].state & FLAG_MASK ) == FLAG_CONNECTED &&
349 g_connections[i].tracker_id == tracker_id )
350 goto close_socket;
351
352 /* Also no need for soliloquy */
353 if( tracker_id == g_tracker_id )
354 goto close_socket;
355
356 /* The new connection is good, send our tracker_id on incoming connections */
357 if( peer->state == FLAG_CONNECTING )
358 io_trywrite( peersocket, (void*)&g_tracker_id, sizeof( g_tracker_id ) );
359
360 peer->tracker_id = tracker_id;
361 PROXYPEER_SETCONNECTED( peer->state );
362
363 break;
364close_socket:
365 io_close( peersocket );
366 PROXYPEER_SETDISCONNECTED( peer->state );
367 break;
368 case FLAG_CONNECTED:
369
370 break;
371
372 }
373}
374
375/* Can write new sync data to the stream */
376static void handle_write( int64 peersocket ) {
377 proxy_peer *peer = io_getcookie( peersocket );
378 if( !peer ) {
379 /* Can't happen ;) */
380 close( peersocket );
381 return;
382 }
383
384 switch( peer->state & FLAG_MASK ) {
385 case FLAG_DISCONNECTED: break; /* Shouldnt happen */
386 case FLAG_CONNECTING:
387 io_trywrite( peersocket, (void*)&g_tracker_id, sizeof( g_tracker_id ) );
388 PROXYPEER_SETWAITTRACKERID( peer->state );
389 io_dontwantwrite( peersocket );
390 io_wantread( peersocket );
391 break;
392 case FLAG_CONNECTED:
393 switch( iob_send( peersocket, &peer->outdata ) ) {
394 case 0: /* all data sent */
395 io_dontwantwrite( peersocket );
396 break;
397 case -3: /* an error occured */
398 io_close( peersocket );
399 PROXYPEER_SETDISCONNECTED( peer->state );
400 iob_reset( &peer->outdata );
401 free( peer->indata );
402 default: /* Normal operation or eagain */
403 break;
404 }
405 break;
406 default:
407 break;
408 }
409
410 return;
411}
412
413static void server_mainloop() {
414 int64 sock;
415 tai6464 now;
416
417 while(1) {
418 /* See, if we need to connect to anyone */
419 if( time(NULL) > g_connection_reconn )
420 handle_reconnects( );
421
422 /* Wait for io events until next approx reconn check time */
423 taia_now( &now );
424 taia_addsec( &now, &now, 30 );
425 io_waituntil( now );
426
427 /* Loop over readable sockets */
428 while( ( sock = io_canread( ) ) != -1 ) {
429 const void *cookie = io_getcookie( sock );
430 if( (uintptr_t)cookie == FLAG_SERVERSOCKET )
431 handle_accept( sock );
432 else
433 handle_read( sock );
434 }
435
436 /* Loop over writable sockets */
437 while( ( sock = io_canwrite( ) ) != -1 )
438 handle_write( sock );
439 }
440}
189 441
190int main( int argc, char **argv ) { 442int main( int argc, char **argv ) {
443 static pthread_t sync_in_thread_id;
444 static pthread_t sync_out_thread_id;
191 ot_ip6 serverip; 445 ot_ip6 serverip;
192 uint16_t tmpport; 446 uint16_t tmpport;
193 int scanon = 1, bound = 0; 447 int scanon = 1, bound = 0;
194 time_t next_dump = time(NULL)+1;
195 448
196 srandom( time(NULL) ); 449 srandom( time(NULL) );
197 g_tracker_id = random(); 450 g_tracker_id = random();
@@ -199,7 +452,7 @@ int main( int argc, char **argv ) {
199 while( scanon ) { 452 while( scanon ) {
200 switch( getopt( argc, argv, ":i:p:vh" ) ) { 453 switch( getopt( argc, argv, ":i:p:vh" ) ) {
201 case -1: scanon = 0; break; 454 case -1: scanon = 0; break;
202 case 'i': 455 case 'S':
203 if( !scan_ip6( optarg, serverip )) { usage( argv[0] ); exit( 1 ); } 456 if( !scan_ip6( optarg, serverip )) { usage( argv[0] ); exit( 1 ); }
204 break; 457 break;
205 case 'p': 458 case 'p':
@@ -211,7 +464,144 @@ int main( int argc, char **argv ) {
211 } 464 }
212 465
213 if( !bound ) exerr( "No port bound." ); 466 if( !bound ) exerr( "No port bound." );
467 pthread_create( &sync_in_thread_id, NULL, livesync_worker, NULL );
468 pthread_create( &sync_out_thread_id, NULL, streamsync_worker, NULL );
469
470 server_mainloop();
471 return 0;
472}
473
474static void * streamsync_worker( void * args ) {
475 (void)args;
476 while( 1 ) {
477 int bucket;
478 /* For each bucket... */
479 for( bucket=0; bucket<OT_BUCKET_COUNT; ++bucket ) {
480 /* Get exclusive access to that bucket */
481 ot_vector *torrents_list = mutex_bucket_lock( bucket );
482 size_t tor_offset, count_def = 0, count_one = 0, count_two = 0, count_peers = 0;
483 size_t mem, mem_a = 0, mem_b = 0;
484 uint8_t *ptr, *ptr_a, *ptr_b, *ptr_c;
485
486 /* For each torrent in this bucket.. */
487 for( tor_offset=0; tor_offset<torrents_list->size; ++tor_offset ) {
488 /* Address torrents members */
489 ot_peerlist *peer_list = ( ((ot_torrent*)(torrents_list->data))[tor_offset] ).peer_list;
490 switch( peer_list->peer_count ) {
491 case 2: count_two++; break;
492 case 1: count_one++; break;
493 case 0: break;
494 default:
495 count_peers += peer_list->peer_count;
496 count_def += 1 + ( peer_list->peer_count >> 8 );
497 }
498 }
499
500 /* Maximal memory requirement: max 3 blocks, max torrents * 20 + max peers * 7 */
501 mem = 3 * ( 4 + 1 + 1 + 2 ) + ( count_one + count_two ) * 19 + count_def * 20 +
502 ( count_one + 2 * count_two + count_peers ) * 7;
503
504 ptr = ptr_a = ptr_b = ptr_c = malloc( mem );
505 if( !ptr ) goto unlock_continue;
506
507 if( count_one > 8 ) {
508 mem_a = 4 + 1 + 1 + 2 + count_one * ( 19 + 7 );
509 ptr_b += mem_a; ptr_c += mem_a;
510 memcpy( ptr_a, &g_tracker_id, sizeof(g_tracker_id)); /* Offset 0: the tracker ID */
511 ptr_a[4] = 1; /* Offset 4: packet type 1 */
512 ptr_a[5] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 5: the shared prefix */
513 ptr_a[6] = count_one >> 8;
514 ptr_a[7] = count_one & 255;
515 ptr_a += 8;
516 } else {
517 count_def += count_one;
518 count_peers += count_one;
519 }
520
521 if( count_two > 8 ) {
522 mem_b = 4 + 1 + 1 + 2 + count_two * ( 19 + 14 );
523 ptr_c += mem_b;
524 memcpy( ptr_b, &g_tracker_id, sizeof(g_tracker_id)); /* Offset 0: the tracker ID */
525 ptr_b[4] = 2; /* Offset 4: packet type 2 */
526 ptr_b[5] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 5: the shared prefix */
527 ptr_b[6] = count_two >> 8;
528 ptr_b[7] = count_two & 255;
529 ptr_b += 8;
530 } else {
531 count_def += count_two;
532 count_peers += 2 * count_two;
533 }
534
535 if( count_def ) {
536 memcpy( ptr_c, &g_tracker_id, sizeof(g_tracker_id)); /* Offset 0: the tracker ID */
537 ptr_c[4] = 0; /* Offset 4: packet type 0 */
538 ptr_c[5] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 5: the shared prefix */
539 ptr_c[6] = count_def >> 8;
540 ptr_c[7] = count_def & 255;
541 ptr_c += 8;
542 }
543
544 /* For each torrent in this bucket.. */
545 for( tor_offset=0; tor_offset<torrents_list->size; ++tor_offset ) {
546 /* Address torrents members */
547 ot_torrent *torrent = ((ot_torrent*)(torrents_list->data)) + tor_offset;
548 ot_peerlist *peer_list = torrent->peer_list;
549 ot_peer *peers = (ot_peer*)(peer_list->peers.data);
550 uint8_t **dst;
551 int multi = 0;
552 switch( peer_list->peer_count ) {
553 case 0: continue;
554 case 1: dst = mem_a ? &ptr_a : &ptr_c; break;
555 case 2: dst = mem_b ? &ptr_b : &ptr_c; break;
556 default: dst = &ptr_c; multi = 1; break;
557 }
558
559 do {
560 size_t i, pc = peer_list->peer_count;
561 if( pc > 255 ) pc = 255;
562 memcpy( *dst, torrent->hash + 1, sizeof( ot_hash ) - 1);
563 *dst += sizeof( ot_hash ) - 1;
564 if( multi ) *(*dst)++ = pc;
565 for( i=0; i < pc; ++i ) {
566 memcpy( *dst, peers++, OT_IP_SIZE + 3 );
567 *dst += OT_IP_SIZE + 3;
568 }
569 peer_list->peer_count -= pc;
570 } while( peer_list->peer_count );
571 free_peerlist(peer_list);
572 }
573
574 free( torrents_list->data );
575 memset( torrents_list, 0, sizeof(*torrents_list ) );
576unlock_continue:
577 mutex_bucket_unlock( bucket, 0 );
578
579 if( ptr ) {
580 int i;
581
582 if( ptr_b > ptr_c ) ptr_c = ptr_b;
583 if( ptr_a > ptr_c ) ptr_c = ptr_a;
584 mem = ptr_c - ptr;
585
586 for( i=0; i<g_connection_count; ++i ) {
587 if( g_connections[i].fd != -1 ) {
588 void *tmp = malloc( mem );
589 if( tmp )
590 if( !iob_addbuf_free( &g_connections[i].outdata, tmp, mem ) )
591 free( tmp );
592 }
593 }
594
595 free( ptr );
596 }
597 usleep( OT_SYNC_SLEEP );
598 }
599 }
600 return 0;
601}
214 602
603static void * livesync_worker( void * args ) {
604 (void)args;
215 while( 1 ) { 605 while( 1 ) {
216 ot_ip6 in_ip; uint16_t in_port; 606 ot_ip6 in_ip; uint16_t in_port;
217 size_t datalen = socket_recv4(g_socket_in, (char*)g_inbuffer, LIVESYNC_INCOMING_BUFFSIZE, 12+(char*)in_ip, &in_port); 607 size_t datalen = socket_recv4(g_socket_in, (char*)g_inbuffer, LIVESYNC_INCOMING_BUFFSIZE, 12+(char*)in_ip, &in_port);
@@ -223,7 +613,6 @@ int main( int argc, char **argv ) {
223 /* drop packet coming from ourselves */ 613 /* drop packet coming from ourselves */
224 continue; 614 continue;
225 } 615 }
226
227 switch( uint32_read_big( sizeof( g_tracker_id ) + (char*)g_inbuffer ) ) { 616 switch( uint32_read_big( sizeof( g_tracker_id ) + (char*)g_inbuffer ) ) {
228 case OT_SYNC_PEER: 617 case OT_SYNC_PEER:
229 livesync_handle_peersync( datalen ); 618 livesync_handle_peersync( datalen );
@@ -232,37 +621,6 @@ int main( int argc, char **argv ) {
232 fprintf( stderr, "Received an unknown live sync packet type %u.\n", uint32_read_big( sizeof( g_tracker_id ) + (char*)g_inbuffer ) ); 621 fprintf( stderr, "Received an unknown live sync packet type %u.\n", uint32_read_big( sizeof( g_tracker_id ) + (char*)g_inbuffer ) );
233 break; 622 break;
234 } 623 }
235 if( time(NULL) > next_dump ) {
236 int bucket, i;
237 /* For each bucket... */
238 for( bucket=0; bucket<OT_BUCKET_COUNT; ++bucket ) {
239 /* Get exclusive access to that bucket */
240 ot_vector *torrents_list = mutex_bucket_lock( bucket );
241 size_t tor_offset;
242
243 /* For each torrent in this bucket.. */
244 for( tor_offset=0; tor_offset<torrents_list->size; ++tor_offset ) {
245 /* Address torrents members */
246 ot_peerlist *peer_list = ( ((ot_torrent*)(torrents_list->data))[tor_offset] ).peer_list;
247#ifdef WANT_SCROOOOOOOLL
248 ot_hash *hash =&( ((ot_torrent*)(torrents_list->data))[tor_offset] ).hash;
249 char hash_out[41];
250 to_hex(hash_out,*hash);
251 printf( "%s %08zd\n", hash_out, peer_list->peer_count );
252#endif
253 if(peer_list->peer_count<1024) peer_counts[peer_list->peer_count]++; else peer_counts[1023]++;
254 free_peerlist(peer_list);
255 }
256 free( torrents_list->data );
257 memset( torrents_list, 0, sizeof(*torrents_list ) );
258 }
259 for( i=1023; i>=0; --i )
260 if( peer_counts[i] ) {
261 printf( "%d:%d ", i, peer_counts[i] );
262 peer_counts[i] = 0;
263 }
264 printf( "\n" );
265 next_dump = time(NULL) + 1;
266 }
267 } 624 }
625 return 0;
268} 626}