diff options
Diffstat (limited to 'proxy.c')
-rw-r--r-- | proxy.c | 181 |
1 files changed, 143 insertions, 38 deletions
@@ -30,9 +30,11 @@ | |||
30 | #include "trackerlogic.h" | 30 | #include "trackerlogic.h" |
31 | #include "ot_vector.h" | 31 | #include "ot_vector.h" |
32 | #include "ot_mutex.h" | 32 | #include "ot_mutex.h" |
33 | #include "ot_livesync.h" | ||
34 | #include "ot_stats.h" | 33 | #include "ot_stats.h" |
35 | 34 | ||
35 | #define WANT_SYNC_LIVE | ||
36 | #include "ot_livesync.h" | ||
37 | |||
36 | ot_ip6 g_serverip; | 38 | ot_ip6 g_serverip; |
37 | uint16_t g_serverport = 9009; | 39 | uint16_t g_serverport = 9009; |
38 | uint32_t g_tracker_id; | 40 | uint32_t g_tracker_id; |
@@ -49,6 +51,7 @@ int g_self_pipe[2]; | |||
49 | 51 | ||
50 | #define LIVESYNC_OUTGOING_BUFFSIZE_PEERS 1480 | 52 | #define LIVESYNC_OUTGOING_BUFFSIZE_PEERS 1480 |
51 | #define LIVESYNC_OUTGOING_WATERMARK_PEERS (sizeof(ot_peer)+sizeof(ot_hash)) | 53 | #define LIVESYNC_OUTGOING_WATERMARK_PEERS (sizeof(ot_peer)+sizeof(ot_hash)) |
54 | #define LIVESYNC_MAXDELAY 15 /* seconds */ | ||
52 | 55 | ||
53 | /* The amount of time a complete sync cycle should take */ | 56 | /* The amount of time a complete sync cycle should take */ |
54 | #define OT_SYNC_INTERVAL_MINUTES 2 | 57 | #define OT_SYNC_INTERVAL_MINUTES 2 |
@@ -65,10 +68,14 @@ static uint8_t g_inbuffer[LIVESYNC_INCOMING_BUFFSIZE]; | |||
65 | 68 | ||
66 | /* For outgoing packets */ | 69 | /* For outgoing packets */ |
67 | static int64 g_socket_out = -1; | 70 | static int64 g_socket_out = -1; |
68 | //static uint8_t g_outbuffer[STREAMSYNC_OUTGOING_BUFFSIZE]; | 71 | static uint8_t g_peerbuffer_start[LIVESYNC_OUTGOING_BUFFSIZE_PEERS]; |
72 | static uint8_t *g_peerbuffer_pos; | ||
73 | static uint8_t *g_peerbuffer_highwater = g_peerbuffer_start + LIVESYNC_OUTGOING_BUFFSIZE_PEERS - LIVESYNC_OUTGOING_WATERMARK_PEERS; | ||
74 | static ot_time g_next_packet_time; | ||
69 | 75 | ||
70 | static void * livesync_worker( void * args ); | 76 | static void * livesync_worker( void * args ); |
71 | static void * streamsync_worker( void * args ); | 77 | static void * streamsync_worker( void * args ); |
78 | static void livesync_proxytell( uint8_t prefix, uint8_t *info_hash, uint8_t *peer ); | ||
72 | 79 | ||
73 | void exerr( char * message ) { | 80 | void exerr( char * message ) { |
74 | fprintf( stderr, "%s\n", message ); | 81 | fprintf( stderr, "%s\n", message ); |
@@ -226,15 +233,31 @@ enum { | |||
226 | #define PROXYPEER_SETCONNECTED(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_CONNECTED) | 233 | #define PROXYPEER_SETCONNECTED(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_CONNECTED) |
227 | 234 | ||
228 | typedef struct { | 235 | typedef struct { |
229 | int state; /* Whether we want to connect, how far our handshake is, etc. */ | 236 | int state; /* Whether we want to connect, how far our handshake is, etc. */ |
230 | ot_ip6 ip; /* The peer to connect to */ | 237 | ot_ip6 ip; /* The peer to connect to */ |
231 | uint16_t port; /* The peers port */ | 238 | uint16_t port; /* The peers port */ |
232 | uint8_t *indata; /* Any data not processed yet */ | 239 | uint8_t indata[8192*16]; /* Any data not processed yet */ |
233 | size_t indata_length; /* Length of unprocessed data */ | 240 | size_t indata_length; /* Length of unprocessed data */ |
234 | uint32_t tracker_id; /* How the other end greeted */ | 241 | uint32_t tracker_id; /* How the other end greeted */ |
235 | int64 fd; /* A file handle, if connected, <= 0 is disconnected (0 initially, -1 else) */ | 242 | int64 fd; /* A file handle, if connected, <= 0 is disconnected (0 initially, -1 else) */ |
236 | io_batch outdata; /* The iobatch containing our sync data */ | 243 | io_batch outdata; /* The iobatch containing our sync data */ |
244 | |||
245 | int packet_tcount; /* Number of unprocessed torrents in packet we currently receive */ | ||
246 | char packet_tprefix; /* Prefix byte for all torrents in current packet */ | ||
247 | char packet_type; /* Type of current packet */ | ||
248 | uint32_t packet_tid; /* Tracker id for current packet */ | ||
249 | |||
237 | } proxy_peer; | 250 | } proxy_peer; |
251 | static void process_indata( proxy_peer * peer ); | ||
252 | |||
253 | void reset_info_block( proxy_peer * peer ) { | ||
254 | peer->indata_length = 0; | ||
255 | peer->tracker_id = 0; | ||
256 | peer->fd = -1; | ||
257 | peer->packet_tcount = 0; | ||
258 | iob_reset( &peer->outdata ); | ||
259 | PROXYPEER_SETDISCONNECTED( peer->state ); | ||
260 | } | ||
238 | 261 | ||
239 | /* Number of connections to peers | 262 | /* Number of connections to peers |
240 | * If a peer's IP is set, we try to reconnect, when the connection drops | 263 | * If a peer's IP is set, we try to reconnect, when the connection drops |
@@ -266,12 +289,8 @@ static void handle_reconnects( void ) { | |||
266 | io_setcookie(newfd,g_connections+i); | 289 | io_setcookie(newfd,g_connections+i); |
267 | 290 | ||
268 | /* Prepare connection info block */ | 291 | /* Prepare connection info block */ |
269 | free( g_connections[i].indata ); | 292 | reset_info_block( g_connections+i ); |
270 | g_connections[i].indata = 0; | ||
271 | g_connections[i].indata_length = 0; | ||
272 | g_connections[i].fd = newfd; | 293 | g_connections[i].fd = newfd; |
273 | g_connections[i].tracker_id = 0; | ||
274 | iob_reset( &g_connections[i].outdata ); | ||
275 | PROXYPEER_SETCONNECTING( g_connections[i].state ); | 294 | PROXYPEER_SETCONNECTING( g_connections[i].state ); |
276 | } | 295 | } |
277 | g_connection_reconn = time(NULL) + 30; | 296 | g_connection_reconn = time(NULL) + 30; |
@@ -305,16 +324,10 @@ static void handle_accept( int64 serversocket ) { | |||
305 | } | 324 | } |
306 | 325 | ||
307 | /* Prepare connection info block */ | 326 | /* Prepare connection info block */ |
308 | free( g_connections[i].indata ); | 327 | reset_info_block( g_connections+i ); |
309 | g_connections[i].indata = 0; | ||
310 | g_connections[i].indata_length = 0; | ||
311 | g_connections[i].port = port; | ||
312 | g_connections[i].fd = newfd; | ||
313 | g_connections[i].tracker_id = 0; | ||
314 | iob_reset( &g_connections[i].outdata ); | ||
315 | g_connections[i].tracker_id = 0; | ||
316 | |||
317 | PROXYPEER_SETCONNECTING( g_connections[i].state ); | 328 | PROXYPEER_SETCONNECTING( g_connections[i].state ); |
329 | g_connections[i].port = port; | ||
330 | g_connections[i].fd = newfd; | ||
318 | 331 | ||
319 | io_setcookie( newfd, g_connections + i ); | 332 | io_setcookie( newfd, g_connections + i ); |
320 | 333 | ||
@@ -328,19 +341,25 @@ static void handle_accept( int64 serversocket ) { | |||
328 | 341 | ||
329 | /* New sync data on the stream */ | 342 | /* New sync data on the stream */ |
330 | static void handle_read( int64 peersocket ) { | 343 | static void handle_read( int64 peersocket ) { |
344 | int i; | ||
345 | int64 datalen; | ||
331 | uint32_t tracker_id; | 346 | uint32_t tracker_id; |
332 | proxy_peer *peer = io_getcookie( peersocket ); | 347 | proxy_peer *peer = io_getcookie( peersocket ); |
348 | |||
333 | if( !peer ) { | 349 | if( !peer ) { |
334 | /* Can't happen ;) */ | 350 | /* Can't happen ;) */ |
335 | close( peersocket ); | 351 | io_close( peersocket ); |
336 | return; | 352 | return; |
337 | } | 353 | } |
338 | switch( peer->state & FLAG_MASK ) { | 354 | switch( peer->state & FLAG_MASK ) { |
339 | case FLAG_DISCONNECTED: break; /* Shouldnt happen */ | 355 | case FLAG_DISCONNECTED: |
356 | io_close( peersocket ); | ||
357 | break; /* Shouldnt happen */ | ||
340 | case FLAG_CONNECTING: | 358 | case FLAG_CONNECTING: |
341 | case FLAG_WAITTRACKERID: | 359 | case FLAG_WAITTRACKERID: |
342 | /* We want at least the first four bytes to come at once, to avoid keeping extra states (for now) */ | 360 | /* We want at least the first four bytes to come at once, to avoid keeping extra states (for now) |
343 | if( io_tryread( peersocket, &tracker_id, sizeof( tracker_id ) ) != sizeof( tracker_id ) ) | 361 | This also catches 0 bytes reads == EOF and negative values, denoting connection errors */ |
362 | if( io_tryread( peersocket, (void*)&tracker_id, sizeof( tracker_id ) ) != sizeof( tracker_id ) ) | ||
344 | goto close_socket; | 363 | goto close_socket; |
345 | 364 | ||
346 | /* See, if we already have a connection to that peer */ | 365 | /* See, if we already have a connection to that peer */ |
@@ -363,12 +382,20 @@ static void handle_read( int64 peersocket ) { | |||
363 | break; | 382 | break; |
364 | close_socket: | 383 | close_socket: |
365 | io_close( peersocket ); | 384 | io_close( peersocket ); |
366 | PROXYPEER_SETDISCONNECTED( peer->state ); | 385 | reset_info_block( peer ); |
367 | break; | 386 | break; |
368 | case FLAG_CONNECTED: | 387 | case FLAG_CONNECTED: |
369 | 388 | /* Here we acutally expect data from peer | |
389 | indata_length should be less than 20+256*7 bytes, for incomplete torrent entries */ | ||
390 | datalen = io_tryread( peersocket, (void*)(peer->indata + peer->indata_length), sizeof( peer->indata ) - peer->indata_length ); | ||
391 | if( !datalen || datalen < -1 ) { | ||
392 | io_close( peersocket ); | ||
393 | reset_info_block( peer ); | ||
394 | } else if( datalen > 0 ) { | ||
395 | peer->indata_length += datalen; | ||
396 | process_indata( peer ); | ||
397 | } | ||
370 | break; | 398 | break; |
371 | |||
372 | } | 399 | } |
373 | } | 400 | } |
374 | 401 | ||
@@ -377,13 +404,22 @@ static void handle_write( int64 peersocket ) { | |||
377 | proxy_peer *peer = io_getcookie( peersocket ); | 404 | proxy_peer *peer = io_getcookie( peersocket ); |
378 | if( !peer ) { | 405 | if( !peer ) { |
379 | /* Can't happen ;) */ | 406 | /* Can't happen ;) */ |
380 | close( peersocket ); | 407 | io_close( peersocket ); |
381 | return; | 408 | return; |
382 | } | 409 | } |
383 | 410 | ||
384 | switch( peer->state & FLAG_MASK ) { | 411 | switch( peer->state & FLAG_MASK ) { |
385 | case FLAG_DISCONNECTED: break; /* Shouldnt happen */ | 412 | case FLAG_DISCONNECTED: |
413 | default: /* Should not happen */ | ||
414 | io_close( peersocket ); | ||
415 | break; | ||
386 | case FLAG_CONNECTING: | 416 | case FLAG_CONNECTING: |
417 | /* Ensure that the connection is established and handle connection error */ | ||
418 | if( peer->state & FLAG_OUTGOING && !socket_connected( peersocket ) ) { | ||
419 | io_close( peersocket ); | ||
420 | break; | ||
421 | } | ||
422 | |||
387 | io_trywrite( peersocket, (void*)&g_tracker_id, sizeof( g_tracker_id ) ); | 423 | io_trywrite( peersocket, (void*)&g_tracker_id, sizeof( g_tracker_id ) ); |
388 | PROXYPEER_SETWAITTRACKERID( peer->state ); | 424 | PROXYPEER_SETWAITTRACKERID( peer->state ); |
389 | io_dontwantwrite( peersocket ); | 425 | io_dontwantwrite( peersocket ); |
@@ -396,15 +432,12 @@ static void handle_write( int64 peersocket ) { | |||
396 | break; | 432 | break; |
397 | case -3: /* an error occured */ | 433 | case -3: /* an error occured */ |
398 | io_close( peersocket ); | 434 | io_close( peersocket ); |
399 | PROXYPEER_SETDISCONNECTED( peer->state ); | 435 | reset_info_block( peer ); |
400 | iob_reset( &peer->outdata ); | 436 | break; |
401 | free( peer->indata ); | ||
402 | default: /* Normal operation or eagain */ | 437 | default: /* Normal operation or eagain */ |
403 | break; | 438 | break; |
404 | } | 439 | } |
405 | break; | 440 | break; |
406 | default: | ||
407 | break; | ||
408 | } | 441 | } |
409 | 442 | ||
410 | return; | 443 | return; |
@@ -414,6 +447,14 @@ static void server_mainloop() { | |||
414 | int64 sock; | 447 | int64 sock; |
415 | tai6464 now; | 448 | tai6464 now; |
416 | 449 | ||
450 | /* inlined livesync_init() */ | ||
451 | memset( g_peerbuffer_start, 0, sizeof( g_peerbuffer_start ) ); | ||
452 | g_peerbuffer_pos = g_peerbuffer_start; | ||
453 | memcpy( g_peerbuffer_pos, &g_tracker_id, sizeof( g_tracker_id ) ); | ||
454 | uint32_pack_big( (char*)g_peerbuffer_pos + sizeof( g_tracker_id ), OT_SYNC_PEER); | ||
455 | g_peerbuffer_pos += sizeof( g_tracker_id ) + sizeof( uint32_t); | ||
456 | g_next_packet_time = time(NULL) + LIVESYNC_MAXDELAY; | ||
457 | |||
417 | while(1) { | 458 | while(1) { |
418 | /* See, if we need to connect to anyone */ | 459 | /* See, if we need to connect to anyone */ |
419 | if( time(NULL) > g_connection_reconn ) | 460 | if( time(NULL) > g_connection_reconn ) |
@@ -436,6 +477,8 @@ static void server_mainloop() { | |||
436 | /* Loop over writable sockets */ | 477 | /* Loop over writable sockets */ |
437 | while( ( sock = io_canwrite( ) ) != -1 ) | 478 | while( ( sock = io_canwrite( ) ) != -1 ) |
438 | handle_write( sock ); | 479 | handle_write( sock ); |
480 | |||
481 | livesync_ticker( ); | ||
439 | } | 482 | } |
440 | } | 483 | } |
441 | 484 | ||
@@ -600,6 +643,68 @@ unlock_continue: | |||
600 | return 0; | 643 | return 0; |
601 | } | 644 | } |
602 | 645 | ||
646 | static void livesync_issue_peersync( ) { | ||
647 | socket_send4(g_socket_out, (char*)g_peerbuffer_start, g_peerbuffer_pos - g_peerbuffer_start, | ||
648 | groupip_1, LIVESYNC_PORT); | ||
649 | g_peerbuffer_pos = g_peerbuffer_start + sizeof( g_tracker_id ) + sizeof( uint32_t ); | ||
650 | g_next_packet_time = time(NULL) + LIVESYNC_MAXDELAY; | ||
651 | } | ||
652 | |||
653 | void livesync_ticker( ) { | ||
654 | /* livesync_issue_peersync sets g_next_packet_time */ | ||
655 | if( time(NULL) > g_next_packet_time && | ||
656 | g_peerbuffer_pos > g_peerbuffer_start + sizeof( g_tracker_id ) ) | ||
657 | livesync_issue_peersync(); | ||
658 | } | ||
659 | |||
660 | static void livesync_proxytell( uint8_t prefix, uint8_t *info_hash, uint8_t *peer ) { | ||
661 | *g_peerbuffer_pos = prefix; | ||
662 | memcpy( g_peerbuffer_pos + 1, info_hash, sizeof(ot_hash) - 1 ); | ||
663 | memcpy( g_peerbuffer_pos + sizeof(ot_hash), peer, sizeof(ot_peer) - 1 ); | ||
664 | |||
665 | g_peerbuffer_pos += sizeof(ot_hash) + sizeof(ot_peer); | ||
666 | |||
667 | if( g_peerbuffer_pos >= g_peerbuffer_highwater ) | ||
668 | livesync_issue_peersync(); | ||
669 | } | ||
670 | |||
671 | static void process_indata( proxy_peer * peer ) { | ||
672 | int ensuremem, consumed, peers; | ||
673 | uint8_t *data = peer->indata, *hash; | ||
674 | uint8_t *dataend = data + peer->indata_length; | ||
675 | |||
676 | while( 1 ) { | ||
677 | /* If we're not inside of a packet, make a new one */ | ||
678 | if( !peer->packet_tcount ) { | ||
679 | /* Ensure the header is complete or postpone processing */ | ||
680 | if( data + 8 > dataend ) break; | ||
681 | memcpy( &peer->packet_tid, data, sizeof(peer->packet_tid) ); | ||
682 | peer->packet_type = data[4]; | ||
683 | peer->packet_tprefix = data[5]; | ||
684 | peer->packet_tcount = data[6] * 256 + data[7]; | ||
685 | data += 8; | ||
686 | } | ||
687 | |||
688 | /* ensure size for the complete torrent block */ | ||
689 | if( data + 26 > dataend ) break; | ||
690 | peers = peer->packet_type ? peer->packet_type : data[19]; | ||
691 | ensuremem = 19 + ( peer->packet_type == 0 ) + 7 * peers; | ||
692 | if( data + ensuremem > dataend ) break; | ||
693 | |||
694 | hash = data; | ||
695 | data += 19 + ( peer->packet_type == 0 ); | ||
696 | |||
697 | while( peers-- ) { | ||
698 | livesync_proxytell( peer->packet_tprefix, hash, data ); | ||
699 | data += 7; | ||
700 | } | ||
701 | } | ||
702 | |||
703 | consumed = data - peer->indata; | ||
704 | memmove( peer->indata, data, peer->indata_length - consumed ); | ||
705 | peer->indata_length -= consumed; | ||
706 | } | ||
707 | |||
603 | static void * livesync_worker( void * args ) { | 708 | static void * livesync_worker( void * args ) { |
604 | (void)args; | 709 | (void)args; |
605 | while( 1 ) { | 710 | while( 1 ) { |