diff options
-rw-r--r-- | proxy.c | 92 |
1 files changed, 79 insertions, 13 deletions
@@ -211,7 +211,7 @@ static void livesync_handle_peersync( ssize_t datalen ) { | |||
211 | } | 211 | } |
212 | 212 | ||
213 | int usage( char *self ) { | 213 | int usage( char *self ) { |
214 | fprintf( stderr, "Usage: %s -i ip -p port\n", self ); | 214 | fprintf( stderr, "Usage: %s -L <livesync_iface_ip> -l <listenip>:<listenport> -c <connectip>:<connectport>\n", self ); |
215 | return 0; | 215 | return 0; |
216 | } | 216 | } |
217 | 217 | ||
@@ -274,6 +274,7 @@ static void handle_reconnects( void ) { | |||
274 | for( i=0; i<g_connection_count; ++i ) | 274 | for( i=0; i<g_connection_count; ++i ) |
275 | if( PROXYPEER_NEEDSCONNECT( g_connections[i].state ) ) { | 275 | if( PROXYPEER_NEEDSCONNECT( g_connections[i].state ) ) { |
276 | int64 newfd = socket_tcp6( ); | 276 | int64 newfd = socket_tcp6( ); |
277 | fprintf( stderr, "(Re)connecting to peer..." ); | ||
277 | if( newfd < 0 ) continue; /* No socket for you */ | 278 | if( newfd < 0 ) continue; /* No socket for you */ |
278 | io_fd(newfd); | 279 | io_fd(newfd); |
279 | if( socket_bind6_reuse(newfd,g_serverip,g_serverport,0) ) { | 280 | if( socket_bind6_reuse(newfd,g_serverip,g_serverport,0) ) { |
@@ -389,6 +390,7 @@ close_socket: | |||
389 | indata_length should be less than 20+256*7 bytes, for incomplete torrent entries */ | 390 | indata_length should be less than 20+256*7 bytes, for incomplete torrent entries */ |
390 | datalen = io_tryread( peersocket, (void*)(peer->indata + peer->indata_length), sizeof( peer->indata ) - peer->indata_length ); | 391 | datalen = io_tryread( peersocket, (void*)(peer->indata + peer->indata_length), sizeof( peer->indata ) - peer->indata_length ); |
391 | if( !datalen || datalen < -1 ) { | 392 | if( !datalen || datalen < -1 ) { |
393 | fprintf( stderr, "Connection closed by remote peer.\n" ); | ||
392 | io_close( peersocket ); | 394 | io_close( peersocket ); |
393 | reset_info_block( peer ); | 395 | reset_info_block( peer ); |
394 | } else if( datalen > 0 ) { | 396 | } else if( datalen > 0 ) { |
@@ -402,6 +404,7 @@ close_socket: | |||
402 | /* Can write new sync data to the stream */ | 404 | /* Can write new sync data to the stream */ |
403 | static void handle_write( int64 peersocket ) { | 405 | static void handle_write( int64 peersocket ) { |
404 | proxy_peer *peer = io_getcookie( peersocket ); | 406 | proxy_peer *peer = io_getcookie( peersocket ); |
407 | |||
405 | if( !peer ) { | 408 | if( !peer ) { |
406 | /* Can't happen ;) */ | 409 | /* Can't happen ;) */ |
407 | io_close( peersocket ); | 410 | io_close( peersocket ); |
@@ -416,25 +419,32 @@ static void handle_write( int64 peersocket ) { | |||
416 | case FLAG_CONNECTING: | 419 | case FLAG_CONNECTING: |
417 | /* Ensure that the connection is established and handle connection error */ | 420 | /* Ensure that the connection is established and handle connection error */ |
418 | if( peer->state & FLAG_OUTGOING && !socket_connected( peersocket ) ) { | 421 | if( peer->state & FLAG_OUTGOING && !socket_connected( peersocket ) ) { |
422 | fprintf( stderr, "failed\n" ); | ||
423 | reset_info_block( peer ); | ||
419 | io_close( peersocket ); | 424 | io_close( peersocket ); |
420 | break; | 425 | break; |
421 | } | 426 | } |
422 | 427 | ||
423 | io_trywrite( peersocket, (void*)&g_tracker_id, sizeof( g_tracker_id ) ); | 428 | io_trywrite( peersocket, (void*)&g_tracker_id, sizeof( g_tracker_id ) ); |
424 | PROXYPEER_SETWAITTRACKERID( peer->state ); | 429 | PROXYPEER_SETWAITTRACKERID( peer->state ); |
430 | fprintf( stderr, " succeeded.\n" ); | ||
431 | |||
425 | io_dontwantwrite( peersocket ); | 432 | io_dontwantwrite( peersocket ); |
426 | io_wantread( peersocket ); | 433 | io_wantread( peersocket ); |
427 | break; | 434 | break; |
428 | case FLAG_CONNECTED: | 435 | case FLAG_CONNECTED: |
429 | switch( iob_send( peersocket, &peer->outdata ) ) { | 436 | switch( iob_send( peersocket, &peer->outdata ) ) { |
430 | case 0: /* all data sent */ | 437 | case 0: /* all data sent */ |
438 | fprintf( stderr, "EMPTY\n" ); | ||
431 | io_dontwantwrite( peersocket ); | 439 | io_dontwantwrite( peersocket ); |
432 | break; | 440 | break; |
433 | case -3: /* an error occured */ | 441 | case -3: /* an error occured */ |
442 | fprintf( stderr, "ERROR\n" ); | ||
434 | io_close( peersocket ); | 443 | io_close( peersocket ); |
435 | reset_info_block( peer ); | 444 | reset_info_block( peer ); |
436 | break; | 445 | break; |
437 | default: /* Normal operation or eagain */ | 446 | default: /* Normal operation or eagain */ |
447 | fprintf( stderr, "EGAIN\n" ); | ||
438 | break; | 448 | break; |
439 | } | 449 | } |
440 | break; | 450 | break; |
@@ -445,7 +455,6 @@ static void handle_write( int64 peersocket ) { | |||
445 | 455 | ||
446 | static void server_mainloop() { | 456 | static void server_mainloop() { |
447 | int64 sock; | 457 | int64 sock; |
448 | tai6464 now; | ||
449 | 458 | ||
450 | /* inlined livesync_init() */ | 459 | /* inlined livesync_init() */ |
451 | memset( g_peerbuffer_start, 0, sizeof( g_peerbuffer_start ) ); | 460 | memset( g_peerbuffer_start, 0, sizeof( g_peerbuffer_start ) ); |
@@ -461,9 +470,7 @@ static void server_mainloop() { | |||
461 | handle_reconnects( ); | 470 | handle_reconnects( ); |
462 | 471 | ||
463 | /* Wait for io events until next approx reconn check time */ | 472 | /* Wait for io events until next approx reconn check time */ |
464 | taia_now( &now ); | 473 | io_waituntil2( 30*1000 ); |
465 | taia_addsec( &now, &now, 30 ); | ||
466 | io_waituntil( now ); | ||
467 | 474 | ||
468 | /* Loop over readable sockets */ | 475 | /* Loop over readable sockets */ |
469 | while( ( sock = io_canread( ) ) != -1 ) { | 476 | while( ( sock = io_canread( ) ) != -1 ) { |
@@ -482,31 +489,90 @@ static void server_mainloop() { | |||
482 | } | 489 | } |
483 | } | 490 | } |
484 | 491 | ||
492 | static void panic( const char *routine ) { | ||
493 | fprintf( stderr, "%s: %s\n", routine, strerror(errno) ); | ||
494 | exit( 111 ); | ||
495 | } | ||
496 | |||
497 | static int64_t ot_try_bind( ot_ip6 ip, uint16_t port ) { | ||
498 | int64 sock = socket_tcp6( ); | ||
499 | |||
500 | if( socket_bind6_reuse( sock, ip, port, 0 ) == -1 ) | ||
501 | panic( "socket_bind6_reuse" ); | ||
502 | |||
503 | if( socket_listen( sock, SOMAXCONN) == -1 ) | ||
504 | panic( "socket_listen" ); | ||
505 | |||
506 | if( !io_fd( sock ) ) | ||
507 | panic( "io_fd" ); | ||
508 | |||
509 | io_setcookie( sock, (void*)FLAG_SERVERSOCKET ); | ||
510 | io_wantread( sock ); | ||
511 | return sock; | ||
512 | } | ||
513 | |||
514 | |||
515 | static int scan_ip6_port( const char *src, ot_ip6 ip, uint16 *port ) { | ||
516 | const char *s = src; | ||
517 | int off, bracket = 0; | ||
518 | while( isspace(*s) ) ++s; | ||
519 | if( *s == '[' ) ++s, ++bracket; /* for v6 style notation */ | ||
520 | if( !(off = scan_ip6( s, ip ) ) ) | ||
521 | return 0; | ||
522 | s += off; | ||
523 | if( *s == 0 || isspace(*s)) return s-src; | ||
524 | if( *s == ']' && bracket ) ++s; | ||
525 | if( !ip6_isv4mapped(ip)){ | ||
526 | if( ( bracket && *(s) != ':' ) || ( *(s) != '.' ) ) return 0; | ||
527 | s++; | ||
528 | } else { | ||
529 | if( *(s++) != ':' ) return 0; | ||
530 | } | ||
531 | if( !(off = scan_ushort (s, port ) ) ) | ||
532 | return 0; | ||
533 | return off+s-src; | ||
534 | } | ||
535 | |||
485 | int main( int argc, char **argv ) { | 536 | int main( int argc, char **argv ) { |
486 | static pthread_t sync_in_thread_id; | 537 | static pthread_t sync_in_thread_id; |
487 | static pthread_t sync_out_thread_id; | 538 | static pthread_t sync_out_thread_id; |
488 | ot_ip6 serverip; | 539 | ot_ip6 serverip; |
489 | uint16_t tmpport; | 540 | uint16_t tmpport; |
490 | int scanon = 1, bound = 0; | 541 | int scanon = 1, lbound = 0, sbound = 0; |
491 | 542 | ||
492 | srandom( time(NULL) ); | 543 | srandom( time(NULL) ); |
493 | g_tracker_id = random(); | 544 | g_tracker_id = random(); |
545 | noipv6=1; | ||
494 | 546 | ||
495 | while( scanon ) { | 547 | while( scanon ) { |
496 | switch( getopt( argc, argv, ":i:p:vh" ) ) { | 548 | switch( getopt( argc, argv, ":l:c:L:h" ) ) { |
497 | case -1: scanon = 0; break; | 549 | case -1: scanon = 0; break; |
498 | case 'S': | 550 | case 'l': |
499 | if( !scan_ip6( optarg, serverip )) { usage( argv[0] ); exit( 1 ); } | 551 | tmpport = 0; |
552 | if( !scan_ip6_port( optarg, serverip, &tmpport ) || !tmpport ) { usage( argv[0] ); exit( 1 ); } | ||
553 | ot_try_bind( serverip, tmpport ); | ||
554 | ++sbound; | ||
555 | break; | ||
556 | case 'c': | ||
557 | if( g_connection_count > MAX_PEERS / 2 ) exerr( "Connection limit exceeded.\n" ); | ||
558 | tmpport = 0; | ||
559 | if( !scan_ip6_port( optarg, | ||
560 | g_connections[g_connection_count].ip, | ||
561 | &g_connections[g_connection_count].port ) || | ||
562 | !g_connections[g_connection_count].port ) { usage( argv[0] ); exit( 1 ); } | ||
563 | g_connections[g_connection_count++].state = FLAG_OUTGOING; | ||
500 | break; | 564 | break; |
501 | case 'p': | 565 | case 'L': |
502 | if( !scan_ushort( optarg, &tmpport)) { usage( argv[0] ); exit( 1 ); } | 566 | tmpport = 9696; |
503 | livesync_bind_mcast( serverip, tmpport); bound++; break; | 567 | if( !scan_ip6_port( optarg, serverip, &tmpport ) || !tmpport ) { usage( argv[0] ); exit( 1 ); } |
568 | livesync_bind_mcast( serverip, tmpport); ++lbound; break; | ||
504 | default: | 569 | default: |
505 | case '?': usage( argv[0] ); exit( 1 ); | 570 | case '?': usage( argv[0] ); exit( 1 ); |
506 | } | 571 | } |
507 | } | 572 | } |
508 | 573 | ||
509 | if( !bound ) exerr( "No port bound." ); | 574 | if( !lbound ) exerr( "No livesync port bound." ); |
575 | if( !g_connection_count && !sbound ) exerr( "No streamsync port bound." ); | ||
510 | pthread_create( &sync_in_thread_id, NULL, livesync_worker, NULL ); | 576 | pthread_create( &sync_in_thread_id, NULL, livesync_worker, NULL ); |
511 | pthread_create( &sync_out_thread_id, NULL, streamsync_worker, NULL ); | 577 | pthread_create( &sync_out_thread_id, NULL, streamsync_worker, NULL ); |
512 | 578 | ||