diff options
| -rw-r--r-- | proxy.c | 181 |
1 files changed, 143 insertions, 38 deletions
| @@ -30,9 +30,11 @@ | |||
| 30 | #include "trackerlogic.h" | 30 | #include "trackerlogic.h" |
| 31 | #include "ot_vector.h" | 31 | #include "ot_vector.h" |
| 32 | #include "ot_mutex.h" | 32 | #include "ot_mutex.h" |
| 33 | #include "ot_livesync.h" | ||
| 34 | #include "ot_stats.h" | 33 | #include "ot_stats.h" |
| 35 | 34 | ||
| 35 | #define WANT_SYNC_LIVE | ||
| 36 | #include "ot_livesync.h" | ||
| 37 | |||
| 36 | ot_ip6 g_serverip; | 38 | ot_ip6 g_serverip; |
| 37 | uint16_t g_serverport = 9009; | 39 | uint16_t g_serverport = 9009; |
| 38 | uint32_t g_tracker_id; | 40 | uint32_t g_tracker_id; |
| @@ -49,6 +51,7 @@ int g_self_pipe[2]; | |||
| 49 | 51 | ||
| 50 | #define LIVESYNC_OUTGOING_BUFFSIZE_PEERS 1480 | 52 | #define LIVESYNC_OUTGOING_BUFFSIZE_PEERS 1480 |
| 51 | #define LIVESYNC_OUTGOING_WATERMARK_PEERS (sizeof(ot_peer)+sizeof(ot_hash)) | 53 | #define LIVESYNC_OUTGOING_WATERMARK_PEERS (sizeof(ot_peer)+sizeof(ot_hash)) |
| 54 | #define LIVESYNC_MAXDELAY 15 /* seconds */ | ||
| 52 | 55 | ||
| 53 | /* The amount of time a complete sync cycle should take */ | 56 | /* The amount of time a complete sync cycle should take */ |
| 54 | #define OT_SYNC_INTERVAL_MINUTES 2 | 57 | #define OT_SYNC_INTERVAL_MINUTES 2 |
| @@ -65,10 +68,14 @@ static uint8_t g_inbuffer[LIVESYNC_INCOMING_BUFFSIZE]; | |||
| 65 | 68 | ||
| 66 | /* For outgoing packets */ | 69 | /* For outgoing packets */ |
| 67 | static int64 g_socket_out = -1; | 70 | static int64 g_socket_out = -1; |
| 68 | //static uint8_t g_outbuffer[STREAMSYNC_OUTGOING_BUFFSIZE]; | 71 | static uint8_t g_peerbuffer_start[LIVESYNC_OUTGOING_BUFFSIZE_PEERS]; |
| 72 | static uint8_t *g_peerbuffer_pos; | ||
| 73 | static uint8_t *g_peerbuffer_highwater = g_peerbuffer_start + LIVESYNC_OUTGOING_BUFFSIZE_PEERS - LIVESYNC_OUTGOING_WATERMARK_PEERS; | ||
| 74 | static ot_time g_next_packet_time; | ||
| 69 | 75 | ||
| 70 | static void * livesync_worker( void * args ); | 76 | static void * livesync_worker( void * args ); |
| 71 | static void * streamsync_worker( void * args ); | 77 | static void * streamsync_worker( void * args ); |
| 78 | static void livesync_proxytell( uint8_t prefix, uint8_t *info_hash, uint8_t *peer ); | ||
| 72 | 79 | ||
| 73 | void exerr( char * message ) { | 80 | void exerr( char * message ) { |
| 74 | fprintf( stderr, "%s\n", message ); | 81 | fprintf( stderr, "%s\n", message ); |
| @@ -226,15 +233,31 @@ enum { | |||
| 226 | #define PROXYPEER_SETCONNECTED(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_CONNECTED) | 233 | #define PROXYPEER_SETCONNECTED(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_CONNECTED) |
| 227 | 234 | ||
| 228 | typedef struct { | 235 | typedef struct { |
| 229 | int state; /* Whether we want to connect, how far our handshake is, etc. */ | 236 | int state; /* Whether we want to connect, how far our handshake is, etc. */ |
| 230 | ot_ip6 ip; /* The peer to connect to */ | 237 | ot_ip6 ip; /* The peer to connect to */ |
| 231 | uint16_t port; /* The peers port */ | 238 | uint16_t port; /* The peers port */ |
| 232 | uint8_t *indata; /* Any data not processed yet */ | 239 | uint8_t indata[8192*16]; /* Any data not processed yet */ |
| 233 | size_t indata_length; /* Length of unprocessed data */ | 240 | size_t indata_length; /* Length of unprocessed data */ |
| 234 | uint32_t tracker_id; /* How the other end greeted */ | 241 | uint32_t tracker_id; /* How the other end greeted */ |
| 235 | int64 fd; /* A file handle, if connected, <= 0 is disconnected (0 initially, -1 else) */ | 242 | int64 fd; /* A file handle, if connected, <= 0 is disconnected (0 initially, -1 else) */ |
| 236 | io_batch outdata; /* The iobatch containing our sync data */ | 243 | io_batch outdata; /* The iobatch containing our sync data */ |
| 244 | |||
| 245 | int packet_tcount; /* Number of unprocessed torrents in packet we currently receive */ | ||
| 246 | char packet_tprefix; /* Prefix byte for all torrents in current packet */ | ||
| 247 | char packet_type; /* Type of current packet */ | ||
| 248 | uint32_t packet_tid; /* Tracker id for current packet */ | ||
| 249 | |||
| 237 | } proxy_peer; | 250 | } proxy_peer; |
| 251 | static void process_indata( proxy_peer * peer ); | ||
| 252 | |||
| 253 | void reset_info_block( proxy_peer * peer ) { | ||
| 254 | peer->indata_length = 0; | ||
| 255 | peer->tracker_id = 0; | ||
| 256 | peer->fd = -1; | ||
| 257 | peer->packet_tcount = 0; | ||
| 258 | iob_reset( &peer->outdata ); | ||
| 259 | PROXYPEER_SETDISCONNECTED( peer->state ); | ||
| 260 | } | ||
| 238 | 261 | ||
| 239 | /* Number of connections to peers | 262 | /* Number of connections to peers |
| 240 | * If a peer's IP is set, we try to reconnect, when the connection drops | 263 | * If a peer's IP is set, we try to reconnect, when the connection drops |
| @@ -266,12 +289,8 @@ static void handle_reconnects( void ) { | |||
| 266 | io_setcookie(newfd,g_connections+i); | 289 | io_setcookie(newfd,g_connections+i); |
| 267 | 290 | ||
| 268 | /* Prepare connection info block */ | 291 | /* Prepare connection info block */ |
| 269 | free( g_connections[i].indata ); | 292 | reset_info_block( g_connections+i ); |
| 270 | g_connections[i].indata = 0; | ||
| 271 | g_connections[i].indata_length = 0; | ||
| 272 | g_connections[i].fd = newfd; | 293 | g_connections[i].fd = newfd; |
| 273 | g_connections[i].tracker_id = 0; | ||
| 274 | iob_reset( &g_connections[i].outdata ); | ||
| 275 | PROXYPEER_SETCONNECTING( g_connections[i].state ); | 294 | PROXYPEER_SETCONNECTING( g_connections[i].state ); |
| 276 | } | 295 | } |
| 277 | g_connection_reconn = time(NULL) + 30; | 296 | g_connection_reconn = time(NULL) + 30; |
| @@ -305,16 +324,10 @@ static void handle_accept( int64 serversocket ) { | |||
| 305 | } | 324 | } |
| 306 | 325 | ||
| 307 | /* Prepare connection info block */ | 326 | /* Prepare connection info block */ |
| 308 | free( g_connections[i].indata ); | 327 | reset_info_block( g_connections+i ); |
| 309 | g_connections[i].indata = 0; | ||
| 310 | g_connections[i].indata_length = 0; | ||
| 311 | g_connections[i].port = port; | ||
| 312 | g_connections[i].fd = newfd; | ||
| 313 | g_connections[i].tracker_id = 0; | ||
| 314 | iob_reset( &g_connections[i].outdata ); | ||
| 315 | g_connections[i].tracker_id = 0; | ||
| 316 | |||
| 317 | PROXYPEER_SETCONNECTING( g_connections[i].state ); | 328 | PROXYPEER_SETCONNECTING( g_connections[i].state ); |
| 329 | g_connections[i].port = port; | ||
| 330 | g_connections[i].fd = newfd; | ||
| 318 | 331 | ||
| 319 | io_setcookie( newfd, g_connections + i ); | 332 | io_setcookie( newfd, g_connections + i ); |
| 320 | 333 | ||
| @@ -328,19 +341,25 @@ static void handle_accept( int64 serversocket ) { | |||
| 328 | 341 | ||
| 329 | /* New sync data on the stream */ | 342 | /* New sync data on the stream */ |
| 330 | static void handle_read( int64 peersocket ) { | 343 | static void handle_read( int64 peersocket ) { |
| 344 | int i; | ||
| 345 | int64 datalen; | ||
| 331 | uint32_t tracker_id; | 346 | uint32_t tracker_id; |
| 332 | proxy_peer *peer = io_getcookie( peersocket ); | 347 | proxy_peer *peer = io_getcookie( peersocket ); |
| 348 | |||
| 333 | if( !peer ) { | 349 | if( !peer ) { |
| 334 | /* Can't happen ;) */ | 350 | /* Can't happen ;) */ |
| 335 | close( peersocket ); | 351 | io_close( peersocket ); |
| 336 | return; | 352 | return; |
| 337 | } | 353 | } |
| 338 | switch( peer->state & FLAG_MASK ) { | 354 | switch( peer->state & FLAG_MASK ) { |
| 339 | case FLAG_DISCONNECTED: break; /* Shouldnt happen */ | 355 | case FLAG_DISCONNECTED: |
| 356 | io_close( peersocket ); | ||
| 357 | break; /* Shouldnt happen */ | ||
| 340 | case FLAG_CONNECTING: | 358 | case FLAG_CONNECTING: |
| 341 | case FLAG_WAITTRACKERID: | 359 | case FLAG_WAITTRACKERID: |
| 342 | /* We want at least the first four bytes to come at once, to avoid keeping extra states (for now) */ | 360 | /* We want at least the first four bytes to come at once, to avoid keeping extra states (for now) |
| 343 | if( io_tryread( peersocket, &tracker_id, sizeof( tracker_id ) ) != sizeof( tracker_id ) ) | 361 | This also catches 0 bytes reads == EOF and negative values, denoting connection errors */ |
| 362 | if( io_tryread( peersocket, (void*)&tracker_id, sizeof( tracker_id ) ) != sizeof( tracker_id ) ) | ||
| 344 | goto close_socket; | 363 | goto close_socket; |
| 345 | 364 | ||
| 346 | /* See, if we already have a connection to that peer */ | 365 | /* See, if we already have a connection to that peer */ |
| @@ -363,12 +382,20 @@ static void handle_read( int64 peersocket ) { | |||
| 363 | break; | 382 | break; |
| 364 | close_socket: | 383 | close_socket: |
| 365 | io_close( peersocket ); | 384 | io_close( peersocket ); |
| 366 | PROXYPEER_SETDISCONNECTED( peer->state ); | 385 | reset_info_block( peer ); |
| 367 | break; | 386 | break; |
| 368 | case FLAG_CONNECTED: | 387 | case FLAG_CONNECTED: |
| 369 | 388 | /* Here we acutally expect data from peer | |
| 389 | indata_length should be less than 20+256*7 bytes, for incomplete torrent entries */ | ||
| 390 | datalen = io_tryread( peersocket, (void*)(peer->indata + peer->indata_length), sizeof( peer->indata ) - peer->indata_length ); | ||
| 391 | if( !datalen || datalen < -1 ) { | ||
| 392 | io_close( peersocket ); | ||
| 393 | reset_info_block( peer ); | ||
| 394 | } else if( datalen > 0 ) { | ||
| 395 | peer->indata_length += datalen; | ||
| 396 | process_indata( peer ); | ||
| 397 | } | ||
| 370 | break; | 398 | break; |
| 371 | |||
| 372 | } | 399 | } |
| 373 | } | 400 | } |
| 374 | 401 | ||
| @@ -377,13 +404,22 @@ static void handle_write( int64 peersocket ) { | |||
| 377 | proxy_peer *peer = io_getcookie( peersocket ); | 404 | proxy_peer *peer = io_getcookie( peersocket ); |
| 378 | if( !peer ) { | 405 | if( !peer ) { |
| 379 | /* Can't happen ;) */ | 406 | /* Can't happen ;) */ |
| 380 | close( peersocket ); | 407 | io_close( peersocket ); |
| 381 | return; | 408 | return; |
| 382 | } | 409 | } |
| 383 | 410 | ||
| 384 | switch( peer->state & FLAG_MASK ) { | 411 | switch( peer->state & FLAG_MASK ) { |
| 385 | case FLAG_DISCONNECTED: break; /* Shouldnt happen */ | 412 | case FLAG_DISCONNECTED: |
| 413 | default: /* Should not happen */ | ||
| 414 | io_close( peersocket ); | ||
| 415 | break; | ||
| 386 | case FLAG_CONNECTING: | 416 | case FLAG_CONNECTING: |
| 417 | /* Ensure that the connection is established and handle connection error */ | ||
| 418 | if( peer->state & FLAG_OUTGOING && !socket_connected( peersocket ) ) { | ||
| 419 | io_close( peersocket ); | ||
| 420 | break; | ||
| 421 | } | ||
| 422 | |||
| 387 | io_trywrite( peersocket, (void*)&g_tracker_id, sizeof( g_tracker_id ) ); | 423 | io_trywrite( peersocket, (void*)&g_tracker_id, sizeof( g_tracker_id ) ); |
| 388 | PROXYPEER_SETWAITTRACKERID( peer->state ); | 424 | PROXYPEER_SETWAITTRACKERID( peer->state ); |
| 389 | io_dontwantwrite( peersocket ); | 425 | io_dontwantwrite( peersocket ); |
| @@ -396,15 +432,12 @@ static void handle_write( int64 peersocket ) { | |||
| 396 | break; | 432 | break; |
| 397 | case -3: /* an error occured */ | 433 | case -3: /* an error occured */ |
| 398 | io_close( peersocket ); | 434 | io_close( peersocket ); |
| 399 | PROXYPEER_SETDISCONNECTED( peer->state ); | 435 | reset_info_block( peer ); |
| 400 | iob_reset( &peer->outdata ); | 436 | break; |
| 401 | free( peer->indata ); | ||
| 402 | default: /* Normal operation or eagain */ | 437 | default: /* Normal operation or eagain */ |
| 403 | break; | 438 | break; |
| 404 | } | 439 | } |
| 405 | break; | 440 | break; |
| 406 | default: | ||
| 407 | break; | ||
| 408 | } | 441 | } |
| 409 | 442 | ||
| 410 | return; | 443 | return; |
| @@ -414,6 +447,14 @@ static void server_mainloop() { | |||
| 414 | int64 sock; | 447 | int64 sock; |
| 415 | tai6464 now; | 448 | tai6464 now; |
| 416 | 449 | ||
| 450 | /* inlined livesync_init() */ | ||
| 451 | memset( g_peerbuffer_start, 0, sizeof( g_peerbuffer_start ) ); | ||
| 452 | g_peerbuffer_pos = g_peerbuffer_start; | ||
| 453 | memcpy( g_peerbuffer_pos, &g_tracker_id, sizeof( g_tracker_id ) ); | ||
| 454 | uint32_pack_big( (char*)g_peerbuffer_pos + sizeof( g_tracker_id ), OT_SYNC_PEER); | ||
| 455 | g_peerbuffer_pos += sizeof( g_tracker_id ) + sizeof( uint32_t); | ||
| 456 | g_next_packet_time = time(NULL) + LIVESYNC_MAXDELAY; | ||
| 457 | |||
| 417 | while(1) { | 458 | while(1) { |
| 418 | /* See, if we need to connect to anyone */ | 459 | /* See, if we need to connect to anyone */ |
| 419 | if( time(NULL) > g_connection_reconn ) | 460 | if( time(NULL) > g_connection_reconn ) |
| @@ -436,6 +477,8 @@ static void server_mainloop() { | |||
| 436 | /* Loop over writable sockets */ | 477 | /* Loop over writable sockets */ |
| 437 | while( ( sock = io_canwrite( ) ) != -1 ) | 478 | while( ( sock = io_canwrite( ) ) != -1 ) |
| 438 | handle_write( sock ); | 479 | handle_write( sock ); |
| 480 | |||
| 481 | livesync_ticker( ); | ||
| 439 | } | 482 | } |
| 440 | } | 483 | } |
| 441 | 484 | ||
| @@ -600,6 +643,68 @@ unlock_continue: | |||
| 600 | return 0; | 643 | return 0; |
| 601 | } | 644 | } |
| 602 | 645 | ||
| 646 | static void livesync_issue_peersync( ) { | ||
| 647 | socket_send4(g_socket_out, (char*)g_peerbuffer_start, g_peerbuffer_pos - g_peerbuffer_start, | ||
| 648 | groupip_1, LIVESYNC_PORT); | ||
| 649 | g_peerbuffer_pos = g_peerbuffer_start + sizeof( g_tracker_id ) + sizeof( uint32_t ); | ||
| 650 | g_next_packet_time = time(NULL) + LIVESYNC_MAXDELAY; | ||
| 651 | } | ||
| 652 | |||
| 653 | void livesync_ticker( ) { | ||
| 654 | /* livesync_issue_peersync sets g_next_packet_time */ | ||
| 655 | if( time(NULL) > g_next_packet_time && | ||
| 656 | g_peerbuffer_pos > g_peerbuffer_start + sizeof( g_tracker_id ) ) | ||
| 657 | livesync_issue_peersync(); | ||
| 658 | } | ||
| 659 | |||
| 660 | static void livesync_proxytell( uint8_t prefix, uint8_t *info_hash, uint8_t *peer ) { | ||
| 661 | *g_peerbuffer_pos = prefix; | ||
| 662 | memcpy( g_peerbuffer_pos + 1, info_hash, sizeof(ot_hash) - 1 ); | ||
| 663 | memcpy( g_peerbuffer_pos + sizeof(ot_hash), peer, sizeof(ot_peer) - 1 ); | ||
| 664 | |||
| 665 | g_peerbuffer_pos += sizeof(ot_hash) + sizeof(ot_peer); | ||
| 666 | |||
| 667 | if( g_peerbuffer_pos >= g_peerbuffer_highwater ) | ||
| 668 | livesync_issue_peersync(); | ||
| 669 | } | ||
| 670 | |||
| 671 | static void process_indata( proxy_peer * peer ) { | ||
| 672 | int ensuremem, consumed, peers; | ||
| 673 | uint8_t *data = peer->indata, *hash; | ||
| 674 | uint8_t *dataend = data + peer->indata_length; | ||
| 675 | |||
| 676 | while( 1 ) { | ||
| 677 | /* If we're not inside of a packet, make a new one */ | ||
| 678 | if( !peer->packet_tcount ) { | ||
| 679 | /* Ensure the header is complete or postpone processing */ | ||
| 680 | if( data + 8 > dataend ) break; | ||
| 681 | memcpy( &peer->packet_tid, data, sizeof(peer->packet_tid) ); | ||
| 682 | peer->packet_type = data[4]; | ||
| 683 | peer->packet_tprefix = data[5]; | ||
| 684 | peer->packet_tcount = data[6] * 256 + data[7]; | ||
| 685 | data += 8; | ||
| 686 | } | ||
| 687 | |||
| 688 | /* ensure size for the complete torrent block */ | ||
| 689 | if( data + 26 > dataend ) break; | ||
| 690 | peers = peer->packet_type ? peer->packet_type : data[19]; | ||
| 691 | ensuremem = 19 + ( peer->packet_type == 0 ) + 7 * peers; | ||
| 692 | if( data + ensuremem > dataend ) break; | ||
| 693 | |||
| 694 | hash = data; | ||
| 695 | data += 19 + ( peer->packet_type == 0 ); | ||
| 696 | |||
| 697 | while( peers-- ) { | ||
| 698 | livesync_proxytell( peer->packet_tprefix, hash, data ); | ||
| 699 | data += 7; | ||
| 700 | } | ||
| 701 | } | ||
| 702 | |||
| 703 | consumed = data - peer->indata; | ||
| 704 | memmove( peer->indata, data, peer->indata_length - consumed ); | ||
| 705 | peer->indata_length -= consumed; | ||
| 706 | } | ||
| 707 | |||
| 603 | static void * livesync_worker( void * args ) { | 708 | static void * livesync_worker( void * args ) { |
| 604 | (void)args; | 709 | (void)args; |
| 605 | while( 1 ) { | 710 | while( 1 ) { |
