diff options
author | erdgeist <> | 2009-09-29 06:03:39 +0000 |
---|---|---|
committer | erdgeist <> | 2009-09-29 06:03:39 +0000 |
commit | 5168a3314c822a74389011d44277e70c329590b0 (patch) | |
tree | d04481f4046a8988e73f3835eb08ce795713d256 | |
parent | 21b5baf0c12b890f0f473a7726cac08bcd9e334e (diff) |
Reaching completion soon
-rw-r--r-- | proxy.c | 468 |
1 files changed, 413 insertions, 55 deletions
@@ -4,6 +4,7 @@ | |||
4 | $Id$ */ | 4 | $Id$ */ |
5 | 5 | ||
6 | /* System */ | 6 | /* System */ |
7 | #include <stdint.h> | ||
7 | #include <stdlib.h> | 8 | #include <stdlib.h> |
8 | #include <string.h> | 9 | #include <string.h> |
9 | #include <arpa/inet.h> | 10 | #include <arpa/inet.h> |
@@ -14,6 +15,7 @@ | |||
14 | #include <stdio.h> | 15 | #include <stdio.h> |
15 | #include <pwd.h> | 16 | #include <pwd.h> |
16 | #include <ctype.h> | 17 | #include <ctype.h> |
18 | #include <pthread.h> | ||
17 | 19 | ||
18 | /* Libowfat */ | 20 | /* Libowfat */ |
19 | #include "socket.h" | 21 | #include "socket.h" |
@@ -26,30 +28,59 @@ | |||
26 | 28 | ||
27 | /* Opentracker */ | 29 | /* Opentracker */ |
28 | #include "trackerlogic.h" | 30 | #include "trackerlogic.h" |
31 | #include "ot_vector.h" | ||
32 | #include "ot_mutex.h" | ||
29 | #include "ot_livesync.h" | 33 | #include "ot_livesync.h" |
34 | #include "ot_stats.h" | ||
30 | 35 | ||
36 | ot_ip6 g_serverip; | ||
37 | uint16_t g_serverport = 9009; | ||
31 | uint32_t g_tracker_id; | 38 | uint32_t g_tracker_id; |
32 | char groupip_1[4] = { 224,0,23,5 }; | 39 | char groupip_1[4] = { 224,0,23,5 }; |
40 | int g_self_pipe[2]; | ||
41 | |||
42 | /* If you have more than 10 peers, don't use this proxy | ||
43 | Use 20 slots for 10 peers to have room for 10 incoming connection slots | ||
44 | */ | ||
45 | #define MAX_PEERS 20 | ||
33 | 46 | ||
34 | #define LIVESYNC_INCOMING_BUFFSIZE (256*256) | 47 | #define LIVESYNC_INCOMING_BUFFSIZE (256*256) |
48 | #define STREAMSYNC_OUTGOING_BUFFSIZE (256*256) | ||
35 | 49 | ||
36 | #define LIVESYNC_OUTGOING_BUFFSIZE_PEERS 1480 | 50 | #define LIVESYNC_OUTGOING_BUFFSIZE_PEERS 1480 |
37 | #define LIVESYNC_OUTGOING_WATERMARK_PEERS (sizeof(ot_peer)+sizeof(ot_hash)) | 51 | #define LIVESYNC_OUTGOING_WATERMARK_PEERS (sizeof(ot_peer)+sizeof(ot_hash)) |
38 | 52 | ||
53 | /* The amount of time a complete sync cycle should take */ | ||
54 | #define OT_SYNC_INTERVAL_MINUTES 2 | ||
55 | |||
56 | /* So after each bucket wait 1 / OT_BUCKET_COUNT intervals */ | ||
57 | #define OT_SYNC_SLEEP ( ( ( OT_SYNC_INTERVAL_MINUTES ) * 60 * 1000000 ) / ( OT_BUCKET_COUNT ) ) | ||
58 | |||
39 | enum { OT_SYNC_PEER }; | 59 | enum { OT_SYNC_PEER }; |
60 | enum { FLAG_SERVERSOCKET = 1 }; | ||
40 | 61 | ||
41 | /* For outgoing packets */ | 62 | /* For incoming packets */ |
42 | static int64 g_socket_in = -1; | 63 | static int64 g_socket_in = -1; |
64 | static uint8_t g_inbuffer[LIVESYNC_INCOMING_BUFFSIZE]; | ||
43 | 65 | ||
44 | /* For incoming packets */ | 66 | /* For outgoing packets */ |
45 | static int64 g_socket_out = -1; | 67 | static int64 g_socket_out = -1; |
46 | static uint8_t g_inbuffer[LIVESYNC_INCOMING_BUFFSIZE]; | 68 | //static uint8_t g_outbuffer[STREAMSYNC_OUTGOING_BUFFSIZE]; |
69 | |||
70 | static void * livesync_worker( void * args ); | ||
71 | static void * streamsync_worker( void * args ); | ||
47 | 72 | ||
48 | void exerr( char * message ) { | 73 | void exerr( char * message ) { |
49 | fprintf( stderr, "%s\n", message ); | 74 | fprintf( stderr, "%s\n", message ); |
50 | exit( 111 ); | 75 | exit( 111 ); |
51 | } | 76 | } |
52 | 77 | ||
78 | void stats_issue_event( ot_status_event event, PROTO_FLAG proto, uintptr_t event_data ) { | ||
79 | (void) event; | ||
80 | (void) proto; | ||
81 | (void) event_data; | ||
82 | } | ||
83 | |||
53 | void livesync_bind_mcast( ot_ip6 ip, uint16_t port) { | 84 | void livesync_bind_mcast( ot_ip6 ip, uint16_t port) { |
54 | char tmpip[4] = {0,0,0,0}; | 85 | char tmpip[4] = {0,0,0,0}; |
55 | char *v4ip; | 86 | char *v4ip; |
@@ -80,16 +111,6 @@ void livesync_bind_mcast( ot_ip6 ip, uint16_t port) { | |||
80 | socket_mcloop4(g_socket_out, 1); | 111 | socket_mcloop4(g_socket_out, 1); |
81 | } | 112 | } |
82 | 113 | ||
83 | static ot_vector all_torrents[OT_BUCKET_COUNT]; | ||
84 | ot_vector *mutex_bucket_lock_by_hash( ot_hash hash ) { | ||
85 | return all_torrents + ( uint32_read_big( (char*)hash ) >> OT_BUCKET_COUNT_SHIFT ); | ||
86 | } | ||
87 | ot_vector *mutex_bucket_lock( int bucket ) { | ||
88 | return all_torrents + bucket; | ||
89 | } | ||
90 | #define mutex_bucket_unlock_by_hash(A,B) | ||
91 | #define mutex_bucket_unlock(A) | ||
92 | |||
93 | size_t add_peer_to_torrent_proxy( ot_hash hash, ot_peer *peer ) { | 114 | size_t add_peer_to_torrent_proxy( ot_hash hash, ot_peer *peer ) { |
94 | int exactmatch; | 115 | int exactmatch; |
95 | ot_torrent *torrent; | 116 | ot_torrent *torrent; |
@@ -106,6 +127,7 @@ size_t add_peer_to_torrent_proxy( ot_hash hash, ot_peer *peer ) { | |||
106 | 127 | ||
107 | if( !( torrent->peer_list = malloc( sizeof (ot_peerlist) ) ) ) { | 128 | if( !( torrent->peer_list = malloc( sizeof (ot_peerlist) ) ) ) { |
108 | vector_remove_torrent( torrents_list, torrent ); | 129 | vector_remove_torrent( torrents_list, torrent ); |
130 | mutex_bucket_unlock_by_hash( hash, 0 ); | ||
109 | return -1; | 131 | return -1; |
110 | } | 132 | } |
111 | 133 | ||
@@ -114,8 +136,10 @@ size_t add_peer_to_torrent_proxy( ot_hash hash, ot_peer *peer ) { | |||
114 | 136 | ||
115 | /* Check for peer in torrent */ | 137 | /* Check for peer in torrent */ |
116 | peer_dest = vector_find_or_insert_peer( &(torrent->peer_list->peers), peer, &exactmatch ); | 138 | peer_dest = vector_find_or_insert_peer( &(torrent->peer_list->peers), peer, &exactmatch ); |
117 | if( !peer_dest ) return -1; | 139 | if( !peer_dest ) { |
118 | 140 | mutex_bucket_unlock_by_hash( hash, 0 ); | |
141 | return -1; | ||
142 | } | ||
119 | /* Tell peer that it's fresh */ | 143 | /* Tell peer that it's fresh */ |
120 | OT_PEERTIME( peer ) = 0; | 144 | OT_PEERTIME( peer ) = 0; |
121 | 145 | ||
@@ -126,6 +150,7 @@ size_t add_peer_to_torrent_proxy( ot_hash hash, ot_peer *peer ) { | |||
126 | torrent->peer_list->seed_count++; | 150 | torrent->peer_list->seed_count++; |
127 | } | 151 | } |
128 | memcpy( peer_dest, peer, sizeof(ot_peer) ); | 152 | memcpy( peer_dest, peer, sizeof(ot_peer) ); |
153 | mutex_bucket_unlock_by_hash( hash, 0 ); | ||
129 | return 0; | 154 | return 0; |
130 | } | 155 | } |
131 | 156 | ||
@@ -143,6 +168,7 @@ size_t remove_peer_from_torrent_proxy( ot_hash hash, ot_peer *peer ) { | |||
143 | } | 168 | } |
144 | } | 169 | } |
145 | 170 | ||
171 | mutex_bucket_unlock_by_hash( hash, 0 ); | ||
146 | return 0; | 172 | return 0; |
147 | } | 173 | } |
148 | 174 | ||
@@ -182,16 +208,243 @@ int usage( char *self ) { | |||
182 | return 0; | 208 | return 0; |
183 | } | 209 | } |
184 | 210 | ||
185 | static uint32_t peer_counts[1024]; | 211 | enum { |
186 | #ifdef WANT_SCROOOOOOOLL | 212 | FLAG_OUTGOING = 0x80, |
187 | static char*to_hex(char*d,uint8_t*s){char*m="0123456789ABCDEF";char *t=d;char*e=d+40;while(d<e){*d++=m[*s>>4];*d++=m[*s++&15];}*d=0;return t;} | 213 | |
188 | #endif | 214 | FLAG_DISCONNECTED = 0x00, |
215 | FLAG_CONNECTING = 0x01, | ||
216 | FLAG_WAITTRACKERID = 0x02, | ||
217 | FLAG_CONNECTED = 0x03, | ||
218 | |||
219 | FLAG_MASK = 0x07 | ||
220 | }; | ||
221 | |||
222 | #define PROXYPEER_NEEDSCONNECT(flag) ((flag)==FLAG_OUTGOING) | ||
223 | #define PROXYPEER_SETDISCONNECTED(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_DISCONNECTED) | ||
224 | #define PROXYPEER_SETCONNECTING(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_CONNECTING) | ||
225 | #define PROXYPEER_SETWAITTRACKERID(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_WAITTRACKERID) | ||
226 | #define PROXYPEER_SETCONNECTED(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_CONNECTED) | ||
227 | |||
228 | typedef struct { | ||
229 | int state; /* Whether we want to connect, how far our handshake is, etc. */ | ||
230 | ot_ip6 ip; /* The peer to connect to */ | ||
231 | uint16_t port; /* The peers port */ | ||
232 | uint8_t *indata; /* Any data not processed yet */ | ||
233 | size_t indata_length; /* Length of unprocessed data */ | ||
234 | uint32_t tracker_id; /* How the other end greeted */ | ||
235 | int64 fd; /* A file handle, if connected, <= 0 is disconnected (0 initially, -1 else) */ | ||
236 | io_batch outdata; /* The iobatch containing our sync data */ | ||
237 | } proxy_peer; | ||
238 | |||
239 | /* Number of connections to peers | ||
240 | * If a peer's IP is set, we try to reconnect, when the connection drops | ||
241 | * If we already have a connected tracker_id in our records for an _incoming_ connection, drop it | ||
242 | * Multiple connections to/from the same ip are okay, if tracker_id doesn't match | ||
243 | * Reconnect attempts occur only twice a minute | ||
244 | */ | ||
245 | static int g_connection_count; | ||
246 | static ot_time g_connection_reconn; | ||
247 | static proxy_peer g_connections[MAX_PEERS]; | ||
248 | |||
249 | static void handle_reconnects( void ) { | ||
250 | int i; | ||
251 | for( i=0; i<g_connection_count; ++i ) | ||
252 | if( PROXYPEER_NEEDSCONNECT( g_connections[i].state ) ) { | ||
253 | int64 newfd = socket_tcp6( ); | ||
254 | if( newfd < 0 ) continue; /* No socket for you */ | ||
255 | io_fd(newfd); | ||
256 | if( socket_bind6_reuse(newfd,g_serverip,g_serverport,0) ) { | ||
257 | io_close( newfd ); | ||
258 | continue; | ||
259 | } | ||
260 | if( socket_connect6(newfd,g_connections[i].ip,g_connections[i].port,0) == -1 && | ||
261 | errno != EINPROGRESS && errno != EWOULDBLOCK ) { | ||
262 | close(newfd); | ||
263 | continue; | ||
264 | } | ||
265 | io_wantwrite(newfd); /* So we will be informed when it is connected */ | ||
266 | io_setcookie(newfd,g_connections+i); | ||
267 | |||
268 | /* Prepare connection info block */ | ||
269 | free( g_connections[i].indata ); | ||
270 | g_connections[i].indata = 0; | ||
271 | g_connections[i].indata_length = 0; | ||
272 | g_connections[i].fd = newfd; | ||
273 | g_connections[i].tracker_id = 0; | ||
274 | iob_reset( &g_connections[i].outdata ); | ||
275 | PROXYPEER_SETCONNECTING( g_connections[i].state ); | ||
276 | } | ||
277 | g_connection_reconn = time(NULL) + 30; | ||
278 | } | ||
279 | |||
280 | /* Handle incoming connection requests, check against whitelist */ | ||
281 | static void handle_accept( int64 serversocket ) { | ||
282 | int64 newfd; | ||
283 | ot_ip6 ip; | ||
284 | uint16 port; | ||
285 | |||
286 | while( ( newfd = socket_accept6( serversocket, ip, &port, NULL ) ) != -1 ) { | ||
287 | |||
288 | /* XXX some access control */ | ||
289 | |||
290 | /* Put fd into a non-blocking mode */ | ||
291 | io_nonblock( newfd ); | ||
292 | |||
293 | if( !io_fd( newfd ) ) | ||
294 | io_close( newfd ); | ||
295 | else { | ||
296 | /* Find a new home for our incoming connection */ | ||
297 | int i; | ||
298 | for( i=0; i<MAX_PEERS; ++i ) | ||
299 | if( g_connections[i].state == FLAG_DISCONNECTED ) | ||
300 | break; | ||
301 | if( i == MAX_PEERS ) { | ||
302 | fprintf( stderr, "No room for incoming connection." ); | ||
303 | close( newfd ); | ||
304 | continue; | ||
305 | } | ||
306 | |||
307 | /* Prepare connection info block */ | ||
308 | free( g_connections[i].indata ); | ||
309 | g_connections[i].indata = 0; | ||
310 | g_connections[i].indata_length = 0; | ||
311 | g_connections[i].port = port; | ||
312 | g_connections[i].fd = newfd; | ||
313 | g_connections[i].tracker_id = 0; | ||
314 | iob_reset( &g_connections[i].outdata ); | ||
315 | g_connections[i].tracker_id = 0; | ||
316 | |||
317 | PROXYPEER_SETCONNECTING( g_connections[i].state ); | ||
318 | |||
319 | io_setcookie( newfd, g_connections + i ); | ||
320 | |||
321 | /* We expect the connecting side to begin with its tracker_id */ | ||
322 | io_wantread( newfd ); | ||
323 | } | ||
324 | } | ||
325 | |||
326 | return; | ||
327 | } | ||
328 | |||
329 | /* New sync data on the stream */ | ||
330 | static void handle_read( int64 peersocket ) { | ||
331 | uint32_t tracker_id; | ||
332 | proxy_peer *peer = io_getcookie( peersocket ); | ||
333 | if( !peer ) { | ||
334 | /* Can't happen ;) */ | ||
335 | close( peersocket ); | ||
336 | return; | ||
337 | } | ||
338 | switch( peer->state & FLAG_MASK ) { | ||
339 | case FLAG_DISCONNECTED: break; /* Shouldnt happen */ | ||
340 | case FLAG_CONNECTING: | ||
341 | case FLAG_WAITTRACKERID: | ||
342 | /* We want at least the first four bytes to come at once, to avoid keeping extra states (for now) */ | ||
343 | if( io_tryread( peersocket, &tracker_id, sizeof( tracker_id ) ) != sizeof( tracker_id ) ) | ||
344 | goto close_socket; | ||
345 | |||
346 | /* See, if we already have a connection to that peer */ | ||
347 | for( i=0; i<MAX_PEERS; ++i ) | ||
348 | if( ( g_connections[i].state & FLAG_MASK ) == FLAG_CONNECTED && | ||
349 | g_connections[i].tracker_id == tracker_id ) | ||
350 | goto close_socket; | ||
351 | |||
352 | /* Also no need for soliloquy */ | ||
353 | if( tracker_id == g_tracker_id ) | ||
354 | goto close_socket; | ||
355 | |||
356 | /* The new connection is good, send our tracker_id on incoming connections */ | ||
357 | if( peer->state == FLAG_CONNECTING ) | ||
358 | io_trywrite( peersocket, (void*)&g_tracker_id, sizeof( g_tracker_id ) ); | ||
359 | |||
360 | peer->tracker_id = tracker_id; | ||
361 | PROXYPEER_SETCONNECTED( peer->state ); | ||
362 | |||
363 | break; | ||
364 | close_socket: | ||
365 | io_close( peersocket ); | ||
366 | PROXYPEER_SETDISCONNECTED( peer->state ); | ||
367 | break; | ||
368 | case FLAG_CONNECTED: | ||
369 | |||
370 | break; | ||
371 | |||
372 | } | ||
373 | } | ||
374 | |||
375 | /* Can write new sync data to the stream */ | ||
376 | static void handle_write( int64 peersocket ) { | ||
377 | proxy_peer *peer = io_getcookie( peersocket ); | ||
378 | if( !peer ) { | ||
379 | /* Can't happen ;) */ | ||
380 | close( peersocket ); | ||
381 | return; | ||
382 | } | ||
383 | |||
384 | switch( peer->state & FLAG_MASK ) { | ||
385 | case FLAG_DISCONNECTED: break; /* Shouldnt happen */ | ||
386 | case FLAG_CONNECTING: | ||
387 | io_trywrite( peersocket, (void*)&g_tracker_id, sizeof( g_tracker_id ) ); | ||
388 | PROXYPEER_SETWAITTRACKERID( peer->state ); | ||
389 | io_dontwantwrite( peersocket ); | ||
390 | io_wantread( peersocket ); | ||
391 | break; | ||
392 | case FLAG_CONNECTED: | ||
393 | switch( iob_send( peersocket, &peer->outdata ) ) { | ||
394 | case 0: /* all data sent */ | ||
395 | io_dontwantwrite( peersocket ); | ||
396 | break; | ||
397 | case -3: /* an error occured */ | ||
398 | io_close( peersocket ); | ||
399 | PROXYPEER_SETDISCONNECTED( peer->state ); | ||
400 | iob_reset( &peer->outdata ); | ||
401 | free( peer->indata ); | ||
402 | default: /* Normal operation or eagain */ | ||
403 | break; | ||
404 | } | ||
405 | break; | ||
406 | default: | ||
407 | break; | ||
408 | } | ||
409 | |||
410 | return; | ||
411 | } | ||
412 | |||
413 | static void server_mainloop() { | ||
414 | int64 sock; | ||
415 | tai6464 now; | ||
416 | |||
417 | while(1) { | ||
418 | /* See, if we need to connect to anyone */ | ||
419 | if( time(NULL) > g_connection_reconn ) | ||
420 | handle_reconnects( ); | ||
421 | |||
422 | /* Wait for io events until next approx reconn check time */ | ||
423 | taia_now( &now ); | ||
424 | taia_addsec( &now, &now, 30 ); | ||
425 | io_waituntil( now ); | ||
426 | |||
427 | /* Loop over readable sockets */ | ||
428 | while( ( sock = io_canread( ) ) != -1 ) { | ||
429 | const void *cookie = io_getcookie( sock ); | ||
430 | if( (uintptr_t)cookie == FLAG_SERVERSOCKET ) | ||
431 | handle_accept( sock ); | ||
432 | else | ||
433 | handle_read( sock ); | ||
434 | } | ||
435 | |||
436 | /* Loop over writable sockets */ | ||
437 | while( ( sock = io_canwrite( ) ) != -1 ) | ||
438 | handle_write( sock ); | ||
439 | } | ||
440 | } | ||
189 | 441 | ||
190 | int main( int argc, char **argv ) { | 442 | int main( int argc, char **argv ) { |
443 | static pthread_t sync_in_thread_id; | ||
444 | static pthread_t sync_out_thread_id; | ||
191 | ot_ip6 serverip; | 445 | ot_ip6 serverip; |
192 | uint16_t tmpport; | 446 | uint16_t tmpport; |
193 | int scanon = 1, bound = 0; | 447 | int scanon = 1, bound = 0; |
194 | time_t next_dump = time(NULL)+1; | ||
195 | 448 | ||
196 | srandom( time(NULL) ); | 449 | srandom( time(NULL) ); |
197 | g_tracker_id = random(); | 450 | g_tracker_id = random(); |
@@ -199,7 +452,7 @@ int main( int argc, char **argv ) { | |||
199 | while( scanon ) { | 452 | while( scanon ) { |
200 | switch( getopt( argc, argv, ":i:p:vh" ) ) { | 453 | switch( getopt( argc, argv, ":i:p:vh" ) ) { |
201 | case -1: scanon = 0; break; | 454 | case -1: scanon = 0; break; |
202 | case 'i': | 455 | case 'S': |
203 | if( !scan_ip6( optarg, serverip )) { usage( argv[0] ); exit( 1 ); } | 456 | if( !scan_ip6( optarg, serverip )) { usage( argv[0] ); exit( 1 ); } |
204 | break; | 457 | break; |
205 | case 'p': | 458 | case 'p': |
@@ -211,7 +464,144 @@ int main( int argc, char **argv ) { | |||
211 | } | 464 | } |
212 | 465 | ||
213 | if( !bound ) exerr( "No port bound." ); | 466 | if( !bound ) exerr( "No port bound." ); |
467 | pthread_create( &sync_in_thread_id, NULL, livesync_worker, NULL ); | ||
468 | pthread_create( &sync_out_thread_id, NULL, streamsync_worker, NULL ); | ||
469 | |||
470 | server_mainloop(); | ||
471 | return 0; | ||
472 | } | ||
473 | |||
474 | static void * streamsync_worker( void * args ) { | ||
475 | (void)args; | ||
476 | while( 1 ) { | ||
477 | int bucket; | ||
478 | /* For each bucket... */ | ||
479 | for( bucket=0; bucket<OT_BUCKET_COUNT; ++bucket ) { | ||
480 | /* Get exclusive access to that bucket */ | ||
481 | ot_vector *torrents_list = mutex_bucket_lock( bucket ); | ||
482 | size_t tor_offset, count_def = 0, count_one = 0, count_two = 0, count_peers = 0; | ||
483 | size_t mem, mem_a = 0, mem_b = 0; | ||
484 | uint8_t *ptr, *ptr_a, *ptr_b, *ptr_c; | ||
485 | |||
486 | /* For each torrent in this bucket.. */ | ||
487 | for( tor_offset=0; tor_offset<torrents_list->size; ++tor_offset ) { | ||
488 | /* Address torrents members */ | ||
489 | ot_peerlist *peer_list = ( ((ot_torrent*)(torrents_list->data))[tor_offset] ).peer_list; | ||
490 | switch( peer_list->peer_count ) { | ||
491 | case 2: count_two++; break; | ||
492 | case 1: count_one++; break; | ||
493 | case 0: break; | ||
494 | default: | ||
495 | count_peers += peer_list->peer_count; | ||
496 | count_def += 1 + ( peer_list->peer_count >> 8 ); | ||
497 | } | ||
498 | } | ||
499 | |||
500 | /* Maximal memory requirement: max 3 blocks, max torrents * 20 + max peers * 7 */ | ||
501 | mem = 3 * ( 4 + 1 + 1 + 2 ) + ( count_one + count_two ) * 19 + count_def * 20 + | ||
502 | ( count_one + 2 * count_two + count_peers ) * 7; | ||
503 | |||
504 | ptr = ptr_a = ptr_b = ptr_c = malloc( mem ); | ||
505 | if( !ptr ) goto unlock_continue; | ||
506 | |||
507 | if( count_one > 8 ) { | ||
508 | mem_a = 4 + 1 + 1 + 2 + count_one * ( 19 + 7 ); | ||
509 | ptr_b += mem_a; ptr_c += mem_a; | ||
510 | memcpy( ptr_a, &g_tracker_id, sizeof(g_tracker_id)); /* Offset 0: the tracker ID */ | ||
511 | ptr_a[4] = 1; /* Offset 4: packet type 1 */ | ||
512 | ptr_a[5] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 5: the shared prefix */ | ||
513 | ptr_a[6] = count_one >> 8; | ||
514 | ptr_a[7] = count_one & 255; | ||
515 | ptr_a += 8; | ||
516 | } else { | ||
517 | count_def += count_one; | ||
518 | count_peers += count_one; | ||
519 | } | ||
520 | |||
521 | if( count_two > 8 ) { | ||
522 | mem_b = 4 + 1 + 1 + 2 + count_two * ( 19 + 14 ); | ||
523 | ptr_c += mem_b; | ||
524 | memcpy( ptr_b, &g_tracker_id, sizeof(g_tracker_id)); /* Offset 0: the tracker ID */ | ||
525 | ptr_b[4] = 2; /* Offset 4: packet type 2 */ | ||
526 | ptr_b[5] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 5: the shared prefix */ | ||
527 | ptr_b[6] = count_two >> 8; | ||
528 | ptr_b[7] = count_two & 255; | ||
529 | ptr_b += 8; | ||
530 | } else { | ||
531 | count_def += count_two; | ||
532 | count_peers += 2 * count_two; | ||
533 | } | ||
534 | |||
535 | if( count_def ) { | ||
536 | memcpy( ptr_c, &g_tracker_id, sizeof(g_tracker_id)); /* Offset 0: the tracker ID */ | ||
537 | ptr_c[4] = 0; /* Offset 4: packet type 0 */ | ||
538 | ptr_c[5] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 5: the shared prefix */ | ||
539 | ptr_c[6] = count_def >> 8; | ||
540 | ptr_c[7] = count_def & 255; | ||
541 | ptr_c += 8; | ||
542 | } | ||
543 | |||
544 | /* For each torrent in this bucket.. */ | ||
545 | for( tor_offset=0; tor_offset<torrents_list->size; ++tor_offset ) { | ||
546 | /* Address torrents members */ | ||
547 | ot_torrent *torrent = ((ot_torrent*)(torrents_list->data)) + tor_offset; | ||
548 | ot_peerlist *peer_list = torrent->peer_list; | ||
549 | ot_peer *peers = (ot_peer*)(peer_list->peers.data); | ||
550 | uint8_t **dst; | ||
551 | int multi = 0; | ||
552 | switch( peer_list->peer_count ) { | ||
553 | case 0: continue; | ||
554 | case 1: dst = mem_a ? &ptr_a : &ptr_c; break; | ||
555 | case 2: dst = mem_b ? &ptr_b : &ptr_c; break; | ||
556 | default: dst = &ptr_c; multi = 1; break; | ||
557 | } | ||
558 | |||
559 | do { | ||
560 | size_t i, pc = peer_list->peer_count; | ||
561 | if( pc > 255 ) pc = 255; | ||
562 | memcpy( *dst, torrent->hash + 1, sizeof( ot_hash ) - 1); | ||
563 | *dst += sizeof( ot_hash ) - 1; | ||
564 | if( multi ) *(*dst)++ = pc; | ||
565 | for( i=0; i < pc; ++i ) { | ||
566 | memcpy( *dst, peers++, OT_IP_SIZE + 3 ); | ||
567 | *dst += OT_IP_SIZE + 3; | ||
568 | } | ||
569 | peer_list->peer_count -= pc; | ||
570 | } while( peer_list->peer_count ); | ||
571 | free_peerlist(peer_list); | ||
572 | } | ||
573 | |||
574 | free( torrents_list->data ); | ||
575 | memset( torrents_list, 0, sizeof(*torrents_list ) ); | ||
576 | unlock_continue: | ||
577 | mutex_bucket_unlock( bucket, 0 ); | ||
578 | |||
579 | if( ptr ) { | ||
580 | int i; | ||
581 | |||
582 | if( ptr_b > ptr_c ) ptr_c = ptr_b; | ||
583 | if( ptr_a > ptr_c ) ptr_c = ptr_a; | ||
584 | mem = ptr_c - ptr; | ||
585 | |||
586 | for( i=0; i<g_connection_count; ++i ) { | ||
587 | if( g_connections[i].fd != -1 ) { | ||
588 | void *tmp = malloc( mem ); | ||
589 | if( tmp ) | ||
590 | if( !iob_addbuf_free( &g_connections[i].outdata, tmp, mem ) ) | ||
591 | free( tmp ); | ||
592 | } | ||
593 | } | ||
594 | |||
595 | free( ptr ); | ||
596 | } | ||
597 | usleep( OT_SYNC_SLEEP ); | ||
598 | } | ||
599 | } | ||
600 | return 0; | ||
601 | } | ||
214 | 602 | ||
603 | static void * livesync_worker( void * args ) { | ||
604 | (void)args; | ||
215 | while( 1 ) { | 605 | while( 1 ) { |
216 | ot_ip6 in_ip; uint16_t in_port; | 606 | ot_ip6 in_ip; uint16_t in_port; |
217 | size_t datalen = socket_recv4(g_socket_in, (char*)g_inbuffer, LIVESYNC_INCOMING_BUFFSIZE, 12+(char*)in_ip, &in_port); | 607 | size_t datalen = socket_recv4(g_socket_in, (char*)g_inbuffer, LIVESYNC_INCOMING_BUFFSIZE, 12+(char*)in_ip, &in_port); |
@@ -223,7 +613,6 @@ int main( int argc, char **argv ) { | |||
223 | /* drop packet coming from ourselves */ | 613 | /* drop packet coming from ourselves */ |
224 | continue; | 614 | continue; |
225 | } | 615 | } |
226 | |||
227 | switch( uint32_read_big( sizeof( g_tracker_id ) + (char*)g_inbuffer ) ) { | 616 | switch( uint32_read_big( sizeof( g_tracker_id ) + (char*)g_inbuffer ) ) { |
228 | case OT_SYNC_PEER: | 617 | case OT_SYNC_PEER: |
229 | livesync_handle_peersync( datalen ); | 618 | livesync_handle_peersync( datalen ); |
@@ -232,37 +621,6 @@ int main( int argc, char **argv ) { | |||
232 | fprintf( stderr, "Received an unknown live sync packet type %u.\n", uint32_read_big( sizeof( g_tracker_id ) + (char*)g_inbuffer ) ); | 621 | fprintf( stderr, "Received an unknown live sync packet type %u.\n", uint32_read_big( sizeof( g_tracker_id ) + (char*)g_inbuffer ) ); |
233 | break; | 622 | break; |
234 | } | 623 | } |
235 | if( time(NULL) > next_dump ) { | ||
236 | int bucket, i; | ||
237 | /* For each bucket... */ | ||
238 | for( bucket=0; bucket<OT_BUCKET_COUNT; ++bucket ) { | ||
239 | /* Get exclusive access to that bucket */ | ||
240 | ot_vector *torrents_list = mutex_bucket_lock( bucket ); | ||
241 | size_t tor_offset; | ||
242 | |||
243 | /* For each torrent in this bucket.. */ | ||
244 | for( tor_offset=0; tor_offset<torrents_list->size; ++tor_offset ) { | ||
245 | /* Address torrents members */ | ||
246 | ot_peerlist *peer_list = ( ((ot_torrent*)(torrents_list->data))[tor_offset] ).peer_list; | ||
247 | #ifdef WANT_SCROOOOOOOLL | ||
248 | ot_hash *hash =&( ((ot_torrent*)(torrents_list->data))[tor_offset] ).hash; | ||
249 | char hash_out[41]; | ||
250 | to_hex(hash_out,*hash); | ||
251 | printf( "%s %08zd\n", hash_out, peer_list->peer_count ); | ||
252 | #endif | ||
253 | if(peer_list->peer_count<1024) peer_counts[peer_list->peer_count]++; else peer_counts[1023]++; | ||
254 | free_peerlist(peer_list); | ||
255 | } | ||
256 | free( torrents_list->data ); | ||
257 | memset( torrents_list, 0, sizeof(*torrents_list ) ); | ||
258 | } | ||
259 | for( i=1023; i>=0; --i ) | ||
260 | if( peer_counts[i] ) { | ||
261 | printf( "%d:%d ", i, peer_counts[i] ); | ||
262 | peer_counts[i] = 0; | ||
263 | } | ||
264 | printf( "\n" ); | ||
265 | next_dump = time(NULL) + 1; | ||
266 | } | ||
267 | } | 624 | } |
625 | return 0; | ||
268 | } | 626 | } |