summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--proxy.c181
1 files changed, 143 insertions, 38 deletions
diff --git a/proxy.c b/proxy.c
index c27599b..cdbdec5 100644
--- a/proxy.c
+++ b/proxy.c
@@ -30,9 +30,11 @@
30#include "trackerlogic.h" 30#include "trackerlogic.h"
31#include "ot_vector.h" 31#include "ot_vector.h"
32#include "ot_mutex.h" 32#include "ot_mutex.h"
33#include "ot_livesync.h"
34#include "ot_stats.h" 33#include "ot_stats.h"
35 34
35#define WANT_SYNC_LIVE
36#include "ot_livesync.h"
37
36ot_ip6 g_serverip; 38ot_ip6 g_serverip;
37uint16_t g_serverport = 9009; 39uint16_t g_serverport = 9009;
38uint32_t g_tracker_id; 40uint32_t g_tracker_id;
@@ -49,6 +51,7 @@ int g_self_pipe[2];
49 51
50#define LIVESYNC_OUTGOING_BUFFSIZE_PEERS 1480 52#define LIVESYNC_OUTGOING_BUFFSIZE_PEERS 1480
51#define LIVESYNC_OUTGOING_WATERMARK_PEERS (sizeof(ot_peer)+sizeof(ot_hash)) 53#define LIVESYNC_OUTGOING_WATERMARK_PEERS (sizeof(ot_peer)+sizeof(ot_hash))
54#define LIVESYNC_MAXDELAY 15 /* seconds */
52 55
53/* The amount of time a complete sync cycle should take */ 56/* The amount of time a complete sync cycle should take */
54#define OT_SYNC_INTERVAL_MINUTES 2 57#define OT_SYNC_INTERVAL_MINUTES 2
@@ -65,10 +68,14 @@ static uint8_t g_inbuffer[LIVESYNC_INCOMING_BUFFSIZE];
65 68
66/* For outgoing packets */ 69/* For outgoing packets */
67static int64 g_socket_out = -1; 70static int64 g_socket_out = -1;
68//static uint8_t g_outbuffer[STREAMSYNC_OUTGOING_BUFFSIZE]; 71static uint8_t g_peerbuffer_start[LIVESYNC_OUTGOING_BUFFSIZE_PEERS];
72static uint8_t *g_peerbuffer_pos;
73static uint8_t *g_peerbuffer_highwater = g_peerbuffer_start + LIVESYNC_OUTGOING_BUFFSIZE_PEERS - LIVESYNC_OUTGOING_WATERMARK_PEERS;
74static ot_time g_next_packet_time;
69 75
70static void * livesync_worker( void * args ); 76static void * livesync_worker( void * args );
71static void * streamsync_worker( void * args ); 77static void * streamsync_worker( void * args );
78static void livesync_proxytell( uint8_t prefix, uint8_t *info_hash, uint8_t *peer );
72 79
73void exerr( char * message ) { 80void exerr( char * message ) {
74 fprintf( stderr, "%s\n", message ); 81 fprintf( stderr, "%s\n", message );
@@ -226,15 +233,31 @@ enum {
226#define PROXYPEER_SETCONNECTED(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_CONNECTED) 233#define PROXYPEER_SETCONNECTED(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_CONNECTED)
227 234
228typedef struct { 235typedef struct {
229 int state; /* Whether we want to connect, how far our handshake is, etc. */ 236 int state; /* Whether we want to connect, how far our handshake is, etc. */
230 ot_ip6 ip; /* The peer to connect to */ 237 ot_ip6 ip; /* The peer to connect to */
231 uint16_t port; /* The peers port */ 238 uint16_t port; /* The peers port */
232 uint8_t *indata; /* Any data not processed yet */ 239 uint8_t indata[8192*16]; /* Any data not processed yet */
233 size_t indata_length; /* Length of unprocessed data */ 240 size_t indata_length; /* Length of unprocessed data */
234 uint32_t tracker_id; /* How the other end greeted */ 241 uint32_t tracker_id; /* How the other end greeted */
235 int64 fd; /* A file handle, if connected, <= 0 is disconnected (0 initially, -1 else) */ 242 int64 fd; /* A file handle, if connected, <= 0 is disconnected (0 initially, -1 else) */
236 io_batch outdata; /* The iobatch containing our sync data */ 243 io_batch outdata; /* The iobatch containing our sync data */
244
245 int packet_tcount; /* Number of unprocessed torrents in packet we currently receive */
246 char packet_tprefix; /* Prefix byte for all torrents in current packet */
247 char packet_type; /* Type of current packet */
248 uint32_t packet_tid; /* Tracker id for current packet */
249
237} proxy_peer; 250} proxy_peer;
251static void process_indata( proxy_peer * peer );
252
253void reset_info_block( proxy_peer * peer ) {
254 peer->indata_length = 0;
255 peer->tracker_id = 0;
256 peer->fd = -1;
257 peer->packet_tcount = 0;
258 iob_reset( &peer->outdata );
259 PROXYPEER_SETDISCONNECTED( peer->state );
260}
238 261
239/* Number of connections to peers 262/* Number of connections to peers
240 * If a peer's IP is set, we try to reconnect, when the connection drops 263 * If a peer's IP is set, we try to reconnect, when the connection drops
@@ -266,12 +289,8 @@ static void handle_reconnects( void ) {
266 io_setcookie(newfd,g_connections+i); 289 io_setcookie(newfd,g_connections+i);
267 290
268 /* Prepare connection info block */ 291 /* Prepare connection info block */
269 free( g_connections[i].indata ); 292 reset_info_block( g_connections+i );
270 g_connections[i].indata = 0;
271 g_connections[i].indata_length = 0;
272 g_connections[i].fd = newfd; 293 g_connections[i].fd = newfd;
273 g_connections[i].tracker_id = 0;
274 iob_reset( &g_connections[i].outdata );
275 PROXYPEER_SETCONNECTING( g_connections[i].state ); 294 PROXYPEER_SETCONNECTING( g_connections[i].state );
276 } 295 }
277 g_connection_reconn = time(NULL) + 30; 296 g_connection_reconn = time(NULL) + 30;
@@ -305,16 +324,10 @@ static void handle_accept( int64 serversocket ) {
305 } 324 }
306 325
307 /* Prepare connection info block */ 326 /* Prepare connection info block */
308 free( g_connections[i].indata ); 327 reset_info_block( g_connections+i );
309 g_connections[i].indata = 0;
310 g_connections[i].indata_length = 0;
311 g_connections[i].port = port;
312 g_connections[i].fd = newfd;
313 g_connections[i].tracker_id = 0;
314 iob_reset( &g_connections[i].outdata );
315 g_connections[i].tracker_id = 0;
316
317 PROXYPEER_SETCONNECTING( g_connections[i].state ); 328 PROXYPEER_SETCONNECTING( g_connections[i].state );
329 g_connections[i].port = port;
330 g_connections[i].fd = newfd;
318 331
319 io_setcookie( newfd, g_connections + i ); 332 io_setcookie( newfd, g_connections + i );
320 333
@@ -328,19 +341,25 @@ static void handle_accept( int64 serversocket ) {
328 341
329/* New sync data on the stream */ 342/* New sync data on the stream */
330static void handle_read( int64 peersocket ) { 343static void handle_read( int64 peersocket ) {
344 int i;
345 int64 datalen;
331 uint32_t tracker_id; 346 uint32_t tracker_id;
332 proxy_peer *peer = io_getcookie( peersocket ); 347 proxy_peer *peer = io_getcookie( peersocket );
348
333 if( !peer ) { 349 if( !peer ) {
334 /* Can't happen ;) */ 350 /* Can't happen ;) */
335 close( peersocket ); 351 io_close( peersocket );
336 return; 352 return;
337 } 353 }
338 switch( peer->state & FLAG_MASK ) { 354 switch( peer->state & FLAG_MASK ) {
339 case FLAG_DISCONNECTED: break; /* Shouldnt happen */ 355 case FLAG_DISCONNECTED:
356 io_close( peersocket );
357 break; /* Shouldnt happen */
340 case FLAG_CONNECTING: 358 case FLAG_CONNECTING:
341 case FLAG_WAITTRACKERID: 359 case FLAG_WAITTRACKERID:
342 /* We want at least the first four bytes to come at once, to avoid keeping extra states (for now) */ 360 /* We want at least the first four bytes to come at once, to avoid keeping extra states (for now)
343 if( io_tryread( peersocket, &tracker_id, sizeof( tracker_id ) ) != sizeof( tracker_id ) ) 361 This also catches 0 bytes reads == EOF and negative values, denoting connection errors */
362 if( io_tryread( peersocket, (void*)&tracker_id, sizeof( tracker_id ) ) != sizeof( tracker_id ) )
344 goto close_socket; 363 goto close_socket;
345 364
346 /* See, if we already have a connection to that peer */ 365 /* See, if we already have a connection to that peer */
@@ -363,12 +382,20 @@ static void handle_read( int64 peersocket ) {
363 break; 382 break;
364close_socket: 383close_socket:
365 io_close( peersocket ); 384 io_close( peersocket );
366 PROXYPEER_SETDISCONNECTED( peer->state ); 385 reset_info_block( peer );
367 break; 386 break;
368 case FLAG_CONNECTED: 387 case FLAG_CONNECTED:
369 388 /* Here we acutally expect data from peer
389 indata_length should be less than 20+256*7 bytes, for incomplete torrent entries */
390 datalen = io_tryread( peersocket, (void*)(peer->indata + peer->indata_length), sizeof( peer->indata ) - peer->indata_length );
391 if( !datalen || datalen < -1 ) {
392 io_close( peersocket );
393 reset_info_block( peer );
394 } else if( datalen > 0 ) {
395 peer->indata_length += datalen;
396 process_indata( peer );
397 }
370 break; 398 break;
371
372 } 399 }
373} 400}
374 401
@@ -377,13 +404,22 @@ static void handle_write( int64 peersocket ) {
377 proxy_peer *peer = io_getcookie( peersocket ); 404 proxy_peer *peer = io_getcookie( peersocket );
378 if( !peer ) { 405 if( !peer ) {
379 /* Can't happen ;) */ 406 /* Can't happen ;) */
380 close( peersocket ); 407 io_close( peersocket );
381 return; 408 return;
382 } 409 }
383 410
384 switch( peer->state & FLAG_MASK ) { 411 switch( peer->state & FLAG_MASK ) {
385 case FLAG_DISCONNECTED: break; /* Shouldnt happen */ 412 case FLAG_DISCONNECTED:
413 default: /* Should not happen */
414 io_close( peersocket );
415 break;
386 case FLAG_CONNECTING: 416 case FLAG_CONNECTING:
417 /* Ensure that the connection is established and handle connection error */
418 if( peer->state & FLAG_OUTGOING && !socket_connected( peersocket ) ) {
419 io_close( peersocket );
420 break;
421 }
422
387 io_trywrite( peersocket, (void*)&g_tracker_id, sizeof( g_tracker_id ) ); 423 io_trywrite( peersocket, (void*)&g_tracker_id, sizeof( g_tracker_id ) );
388 PROXYPEER_SETWAITTRACKERID( peer->state ); 424 PROXYPEER_SETWAITTRACKERID( peer->state );
389 io_dontwantwrite( peersocket ); 425 io_dontwantwrite( peersocket );
@@ -396,15 +432,12 @@ static void handle_write( int64 peersocket ) {
396 break; 432 break;
397 case -3: /* an error occured */ 433 case -3: /* an error occured */
398 io_close( peersocket ); 434 io_close( peersocket );
399 PROXYPEER_SETDISCONNECTED( peer->state ); 435 reset_info_block( peer );
400 iob_reset( &peer->outdata ); 436 break;
401 free( peer->indata );
402 default: /* Normal operation or eagain */ 437 default: /* Normal operation or eagain */
403 break; 438 break;
404 } 439 }
405 break; 440 break;
406 default:
407 break;
408 } 441 }
409 442
410 return; 443 return;
@@ -414,6 +447,14 @@ static void server_mainloop() {
414 int64 sock; 447 int64 sock;
415 tai6464 now; 448 tai6464 now;
416 449
450 /* inlined livesync_init() */
451 memset( g_peerbuffer_start, 0, sizeof( g_peerbuffer_start ) );
452 g_peerbuffer_pos = g_peerbuffer_start;
453 memcpy( g_peerbuffer_pos, &g_tracker_id, sizeof( g_tracker_id ) );
454 uint32_pack_big( (char*)g_peerbuffer_pos + sizeof( g_tracker_id ), OT_SYNC_PEER);
455 g_peerbuffer_pos += sizeof( g_tracker_id ) + sizeof( uint32_t);
456 g_next_packet_time = time(NULL) + LIVESYNC_MAXDELAY;
457
417 while(1) { 458 while(1) {
418 /* See, if we need to connect to anyone */ 459 /* See, if we need to connect to anyone */
419 if( time(NULL) > g_connection_reconn ) 460 if( time(NULL) > g_connection_reconn )
@@ -436,6 +477,8 @@ static void server_mainloop() {
436 /* Loop over writable sockets */ 477 /* Loop over writable sockets */
437 while( ( sock = io_canwrite( ) ) != -1 ) 478 while( ( sock = io_canwrite( ) ) != -1 )
438 handle_write( sock ); 479 handle_write( sock );
480
481 livesync_ticker( );
439 } 482 }
440} 483}
441 484
@@ -600,6 +643,68 @@ unlock_continue:
600 return 0; 643 return 0;
601} 644}
602 645
646static void livesync_issue_peersync( ) {
647 socket_send4(g_socket_out, (char*)g_peerbuffer_start, g_peerbuffer_pos - g_peerbuffer_start,
648 groupip_1, LIVESYNC_PORT);
649 g_peerbuffer_pos = g_peerbuffer_start + sizeof( g_tracker_id ) + sizeof( uint32_t );
650 g_next_packet_time = time(NULL) + LIVESYNC_MAXDELAY;
651}
652
653void livesync_ticker( ) {
654 /* livesync_issue_peersync sets g_next_packet_time */
655 if( time(NULL) > g_next_packet_time &&
656 g_peerbuffer_pos > g_peerbuffer_start + sizeof( g_tracker_id ) )
657 livesync_issue_peersync();
658}
659
660static void livesync_proxytell( uint8_t prefix, uint8_t *info_hash, uint8_t *peer ) {
661 *g_peerbuffer_pos = prefix;
662 memcpy( g_peerbuffer_pos + 1, info_hash, sizeof(ot_hash) - 1 );
663 memcpy( g_peerbuffer_pos + sizeof(ot_hash), peer, sizeof(ot_peer) - 1 );
664
665 g_peerbuffer_pos += sizeof(ot_hash) + sizeof(ot_peer);
666
667 if( g_peerbuffer_pos >= g_peerbuffer_highwater )
668 livesync_issue_peersync();
669}
670
671static void process_indata( proxy_peer * peer ) {
672 int ensuremem, consumed, peers;
673 uint8_t *data = peer->indata, *hash;
674 uint8_t *dataend = data + peer->indata_length;
675
676 while( 1 ) {
677 /* If we're not inside of a packet, make a new one */
678 if( !peer->packet_tcount ) {
679 /* Ensure the header is complete or postpone processing */
680 if( data + 8 > dataend ) break;
681 memcpy( &peer->packet_tid, data, sizeof(peer->packet_tid) );
682 peer->packet_type = data[4];
683 peer->packet_tprefix = data[5];
684 peer->packet_tcount = data[6] * 256 + data[7];
685 data += 8;
686 }
687
688 /* ensure size for the complete torrent block */
689 if( data + 26 > dataend ) break;
690 peers = peer->packet_type ? peer->packet_type : data[19];
691 ensuremem = 19 + ( peer->packet_type == 0 ) + 7 * peers;
692 if( data + ensuremem > dataend ) break;
693
694 hash = data;
695 data += 19 + ( peer->packet_type == 0 );
696
697 while( peers-- ) {
698 livesync_proxytell( peer->packet_tprefix, hash, data );
699 data += 7;
700 }
701 }
702
703 consumed = data - peer->indata;
704 memmove( peer->indata, data, peer->indata_length - consumed );
705 peer->indata_length -= consumed;
706}
707
603static void * livesync_worker( void * args ) { 708static void * livesync_worker( void * args ) {
604 (void)args; 709 (void)args;
605 while( 1 ) { 710 while( 1 ) {