diff options
| -rw-r--r-- | proxy.c | 468 |
1 files changed, 413 insertions, 55 deletions
| @@ -4,6 +4,7 @@ | |||
| 4 | $Id$ */ | 4 | $Id$ */ |
| 5 | 5 | ||
| 6 | /* System */ | 6 | /* System */ |
| 7 | #include <stdint.h> | ||
| 7 | #include <stdlib.h> | 8 | #include <stdlib.h> |
| 8 | #include <string.h> | 9 | #include <string.h> |
| 9 | #include <arpa/inet.h> | 10 | #include <arpa/inet.h> |
| @@ -14,6 +15,7 @@ | |||
| 14 | #include <stdio.h> | 15 | #include <stdio.h> |
| 15 | #include <pwd.h> | 16 | #include <pwd.h> |
| 16 | #include <ctype.h> | 17 | #include <ctype.h> |
| 18 | #include <pthread.h> | ||
| 17 | 19 | ||
| 18 | /* Libowfat */ | 20 | /* Libowfat */ |
| 19 | #include "socket.h" | 21 | #include "socket.h" |
| @@ -26,30 +28,59 @@ | |||
| 26 | 28 | ||
| 27 | /* Opentracker */ | 29 | /* Opentracker */ |
| 28 | #include "trackerlogic.h" | 30 | #include "trackerlogic.h" |
| 31 | #include "ot_vector.h" | ||
| 32 | #include "ot_mutex.h" | ||
| 29 | #include "ot_livesync.h" | 33 | #include "ot_livesync.h" |
| 34 | #include "ot_stats.h" | ||
| 30 | 35 | ||
| 36 | ot_ip6 g_serverip; | ||
| 37 | uint16_t g_serverport = 9009; | ||
| 31 | uint32_t g_tracker_id; | 38 | uint32_t g_tracker_id; |
| 32 | char groupip_1[4] = { 224,0,23,5 }; | 39 | char groupip_1[4] = { 224,0,23,5 }; |
| 40 | int g_self_pipe[2]; | ||
| 41 | |||
| 42 | /* If you have more than 10 peers, don't use this proxy | ||
| 43 | Use 20 slots for 10 peers to have room for 10 incoming connection slots | ||
| 44 | */ | ||
| 45 | #define MAX_PEERS 20 | ||
| 33 | 46 | ||
| 34 | #define LIVESYNC_INCOMING_BUFFSIZE (256*256) | 47 | #define LIVESYNC_INCOMING_BUFFSIZE (256*256) |
| 48 | #define STREAMSYNC_OUTGOING_BUFFSIZE (256*256) | ||
| 35 | 49 | ||
| 36 | #define LIVESYNC_OUTGOING_BUFFSIZE_PEERS 1480 | 50 | #define LIVESYNC_OUTGOING_BUFFSIZE_PEERS 1480 |
| 37 | #define LIVESYNC_OUTGOING_WATERMARK_PEERS (sizeof(ot_peer)+sizeof(ot_hash)) | 51 | #define LIVESYNC_OUTGOING_WATERMARK_PEERS (sizeof(ot_peer)+sizeof(ot_hash)) |
| 38 | 52 | ||
| 53 | /* The amount of time a complete sync cycle should take */ | ||
| 54 | #define OT_SYNC_INTERVAL_MINUTES 2 | ||
| 55 | |||
| 56 | /* So after each bucket wait 1 / OT_BUCKET_COUNT intervals */ | ||
| 57 | #define OT_SYNC_SLEEP ( ( ( OT_SYNC_INTERVAL_MINUTES ) * 60 * 1000000 ) / ( OT_BUCKET_COUNT ) ) | ||
| 58 | |||
| 39 | enum { OT_SYNC_PEER }; | 59 | enum { OT_SYNC_PEER }; |
| 60 | enum { FLAG_SERVERSOCKET = 1 }; | ||
| 40 | 61 | ||
| 41 | /* For outgoing packets */ | 62 | /* For incoming packets */ |
| 42 | static int64 g_socket_in = -1; | 63 | static int64 g_socket_in = -1; |
| 64 | static uint8_t g_inbuffer[LIVESYNC_INCOMING_BUFFSIZE]; | ||
| 43 | 65 | ||
| 44 | /* For incoming packets */ | 66 | /* For outgoing packets */ |
| 45 | static int64 g_socket_out = -1; | 67 | static int64 g_socket_out = -1; |
| 46 | static uint8_t g_inbuffer[LIVESYNC_INCOMING_BUFFSIZE]; | 68 | //static uint8_t g_outbuffer[STREAMSYNC_OUTGOING_BUFFSIZE]; |
| 69 | |||
| 70 | static void * livesync_worker( void * args ); | ||
| 71 | static void * streamsync_worker( void * args ); | ||
| 47 | 72 | ||
| 48 | void exerr( char * message ) { | 73 | void exerr( char * message ) { |
| 49 | fprintf( stderr, "%s\n", message ); | 74 | fprintf( stderr, "%s\n", message ); |
| 50 | exit( 111 ); | 75 | exit( 111 ); |
| 51 | } | 76 | } |
| 52 | 77 | ||
| 78 | void stats_issue_event( ot_status_event event, PROTO_FLAG proto, uintptr_t event_data ) { | ||
| 79 | (void) event; | ||
| 80 | (void) proto; | ||
| 81 | (void) event_data; | ||
| 82 | } | ||
| 83 | |||
| 53 | void livesync_bind_mcast( ot_ip6 ip, uint16_t port) { | 84 | void livesync_bind_mcast( ot_ip6 ip, uint16_t port) { |
| 54 | char tmpip[4] = {0,0,0,0}; | 85 | char tmpip[4] = {0,0,0,0}; |
| 55 | char *v4ip; | 86 | char *v4ip; |
| @@ -80,16 +111,6 @@ void livesync_bind_mcast( ot_ip6 ip, uint16_t port) { | |||
| 80 | socket_mcloop4(g_socket_out, 1); | 111 | socket_mcloop4(g_socket_out, 1); |
| 81 | } | 112 | } |
| 82 | 113 | ||
| 83 | static ot_vector all_torrents[OT_BUCKET_COUNT]; | ||
| 84 | ot_vector *mutex_bucket_lock_by_hash( ot_hash hash ) { | ||
| 85 | return all_torrents + ( uint32_read_big( (char*)hash ) >> OT_BUCKET_COUNT_SHIFT ); | ||
| 86 | } | ||
| 87 | ot_vector *mutex_bucket_lock( int bucket ) { | ||
| 88 | return all_torrents + bucket; | ||
| 89 | } | ||
| 90 | #define mutex_bucket_unlock_by_hash(A,B) | ||
| 91 | #define mutex_bucket_unlock(A) | ||
| 92 | |||
| 93 | size_t add_peer_to_torrent_proxy( ot_hash hash, ot_peer *peer ) { | 114 | size_t add_peer_to_torrent_proxy( ot_hash hash, ot_peer *peer ) { |
| 94 | int exactmatch; | 115 | int exactmatch; |
| 95 | ot_torrent *torrent; | 116 | ot_torrent *torrent; |
| @@ -106,6 +127,7 @@ size_t add_peer_to_torrent_proxy( ot_hash hash, ot_peer *peer ) { | |||
| 106 | 127 | ||
| 107 | if( !( torrent->peer_list = malloc( sizeof (ot_peerlist) ) ) ) { | 128 | if( !( torrent->peer_list = malloc( sizeof (ot_peerlist) ) ) ) { |
| 108 | vector_remove_torrent( torrents_list, torrent ); | 129 | vector_remove_torrent( torrents_list, torrent ); |
| 130 | mutex_bucket_unlock_by_hash( hash, 0 ); | ||
| 109 | return -1; | 131 | return -1; |
| 110 | } | 132 | } |
| 111 | 133 | ||
| @@ -114,8 +136,10 @@ size_t add_peer_to_torrent_proxy( ot_hash hash, ot_peer *peer ) { | |||
| 114 | 136 | ||
| 115 | /* Check for peer in torrent */ | 137 | /* Check for peer in torrent */ |
| 116 | peer_dest = vector_find_or_insert_peer( &(torrent->peer_list->peers), peer, &exactmatch ); | 138 | peer_dest = vector_find_or_insert_peer( &(torrent->peer_list->peers), peer, &exactmatch ); |
| 117 | if( !peer_dest ) return -1; | 139 | if( !peer_dest ) { |
| 118 | 140 | mutex_bucket_unlock_by_hash( hash, 0 ); | |
| 141 | return -1; | ||
| 142 | } | ||
| 119 | /* Tell peer that it's fresh */ | 143 | /* Tell peer that it's fresh */ |
| 120 | OT_PEERTIME( peer ) = 0; | 144 | OT_PEERTIME( peer ) = 0; |
| 121 | 145 | ||
| @@ -126,6 +150,7 @@ size_t add_peer_to_torrent_proxy( ot_hash hash, ot_peer *peer ) { | |||
| 126 | torrent->peer_list->seed_count++; | 150 | torrent->peer_list->seed_count++; |
| 127 | } | 151 | } |
| 128 | memcpy( peer_dest, peer, sizeof(ot_peer) ); | 152 | memcpy( peer_dest, peer, sizeof(ot_peer) ); |
| 153 | mutex_bucket_unlock_by_hash( hash, 0 ); | ||
| 129 | return 0; | 154 | return 0; |
| 130 | } | 155 | } |
| 131 | 156 | ||
| @@ -143,6 +168,7 @@ size_t remove_peer_from_torrent_proxy( ot_hash hash, ot_peer *peer ) { | |||
| 143 | } | 168 | } |
| 144 | } | 169 | } |
| 145 | 170 | ||
| 171 | mutex_bucket_unlock_by_hash( hash, 0 ); | ||
| 146 | return 0; | 172 | return 0; |
| 147 | } | 173 | } |
| 148 | 174 | ||
| @@ -182,16 +208,243 @@ int usage( char *self ) { | |||
| 182 | return 0; | 208 | return 0; |
| 183 | } | 209 | } |
| 184 | 210 | ||
| 185 | static uint32_t peer_counts[1024]; | 211 | enum { |
| 186 | #ifdef WANT_SCROOOOOOOLL | 212 | FLAG_OUTGOING = 0x80, |
| 187 | static char*to_hex(char*d,uint8_t*s){char*m="0123456789ABCDEF";char *t=d;char*e=d+40;while(d<e){*d++=m[*s>>4];*d++=m[*s++&15];}*d=0;return t;} | 213 | |
| 188 | #endif | 214 | FLAG_DISCONNECTED = 0x00, |
| 215 | FLAG_CONNECTING = 0x01, | ||
| 216 | FLAG_WAITTRACKERID = 0x02, | ||
| 217 | FLAG_CONNECTED = 0x03, | ||
| 218 | |||
| 219 | FLAG_MASK = 0x07 | ||
| 220 | }; | ||
| 221 | |||
| 222 | #define PROXYPEER_NEEDSCONNECT(flag) ((flag)==FLAG_OUTGOING) | ||
| 223 | #define PROXYPEER_SETDISCONNECTED(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_DISCONNECTED) | ||
| 224 | #define PROXYPEER_SETCONNECTING(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_CONNECTING) | ||
| 225 | #define PROXYPEER_SETWAITTRACKERID(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_WAITTRACKERID) | ||
| 226 | #define PROXYPEER_SETCONNECTED(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_CONNECTED) | ||
| 227 | |||
| 228 | typedef struct { | ||
| 229 | int state; /* Whether we want to connect, how far our handshake is, etc. */ | ||
| 230 | ot_ip6 ip; /* The peer to connect to */ | ||
| 231 | uint16_t port; /* The peers port */ | ||
| 232 | uint8_t *indata; /* Any data not processed yet */ | ||
| 233 | size_t indata_length; /* Length of unprocessed data */ | ||
| 234 | uint32_t tracker_id; /* How the other end greeted */ | ||
| 235 | int64 fd; /* A file handle, if connected, <= 0 is disconnected (0 initially, -1 else) */ | ||
| 236 | io_batch outdata; /* The iobatch containing our sync data */ | ||
| 237 | } proxy_peer; | ||
| 238 | |||
| 239 | /* Number of connections to peers | ||
| 240 | * If a peer's IP is set, we try to reconnect, when the connection drops | ||
| 241 | * If we already have a connected tracker_id in our records for an _incoming_ connection, drop it | ||
| 242 | * Multiple connections to/from the same ip are okay, if tracker_id doesn't match | ||
| 243 | * Reconnect attempts occur only twice a minute | ||
| 244 | */ | ||
| 245 | static int g_connection_count; | ||
| 246 | static ot_time g_connection_reconn; | ||
| 247 | static proxy_peer g_connections[MAX_PEERS]; | ||
| 248 | |||
| 249 | static void handle_reconnects( void ) { | ||
| 250 | int i; | ||
| 251 | for( i=0; i<g_connection_count; ++i ) | ||
| 252 | if( PROXYPEER_NEEDSCONNECT( g_connections[i].state ) ) { | ||
| 253 | int64 newfd = socket_tcp6( ); | ||
| 254 | if( newfd < 0 ) continue; /* No socket for you */ | ||
| 255 | io_fd(newfd); | ||
| 256 | if( socket_bind6_reuse(newfd,g_serverip,g_serverport,0) ) { | ||
| 257 | io_close( newfd ); | ||
| 258 | continue; | ||
| 259 | } | ||
| 260 | if( socket_connect6(newfd,g_connections[i].ip,g_connections[i].port,0) == -1 && | ||
| 261 | errno != EINPROGRESS && errno != EWOULDBLOCK ) { | ||
| 262 | close(newfd); | ||
| 263 | continue; | ||
| 264 | } | ||
| 265 | io_wantwrite(newfd); /* So we will be informed when it is connected */ | ||
| 266 | io_setcookie(newfd,g_connections+i); | ||
| 267 | |||
| 268 | /* Prepare connection info block */ | ||
| 269 | free( g_connections[i].indata ); | ||
| 270 | g_connections[i].indata = 0; | ||
| 271 | g_connections[i].indata_length = 0; | ||
| 272 | g_connections[i].fd = newfd; | ||
| 273 | g_connections[i].tracker_id = 0; | ||
| 274 | iob_reset( &g_connections[i].outdata ); | ||
| 275 | PROXYPEER_SETCONNECTING( g_connections[i].state ); | ||
| 276 | } | ||
| 277 | g_connection_reconn = time(NULL) + 30; | ||
| 278 | } | ||
| 279 | |||
| 280 | /* Handle incoming connection requests, check against whitelist */ | ||
| 281 | static void handle_accept( int64 serversocket ) { | ||
| 282 | int64 newfd; | ||
| 283 | ot_ip6 ip; | ||
| 284 | uint16 port; | ||
| 285 | |||
| 286 | while( ( newfd = socket_accept6( serversocket, ip, &port, NULL ) ) != -1 ) { | ||
| 287 | |||
| 288 | /* XXX some access control */ | ||
| 289 | |||
| 290 | /* Put fd into a non-blocking mode */ | ||
| 291 | io_nonblock( newfd ); | ||
| 292 | |||
| 293 | if( !io_fd( newfd ) ) | ||
| 294 | io_close( newfd ); | ||
| 295 | else { | ||
| 296 | /* Find a new home for our incoming connection */ | ||
| 297 | int i; | ||
| 298 | for( i=0; i<MAX_PEERS; ++i ) | ||
| 299 | if( g_connections[i].state == FLAG_DISCONNECTED ) | ||
| 300 | break; | ||
| 301 | if( i == MAX_PEERS ) { | ||
| 302 | fprintf( stderr, "No room for incoming connection." ); | ||
| 303 | close( newfd ); | ||
| 304 | continue; | ||
| 305 | } | ||
| 306 | |||
| 307 | /* Prepare connection info block */ | ||
| 308 | free( g_connections[i].indata ); | ||
| 309 | g_connections[i].indata = 0; | ||
| 310 | g_connections[i].indata_length = 0; | ||
| 311 | g_connections[i].port = port; | ||
| 312 | g_connections[i].fd = newfd; | ||
| 313 | g_connections[i].tracker_id = 0; | ||
| 314 | iob_reset( &g_connections[i].outdata ); | ||
| 315 | g_connections[i].tracker_id = 0; | ||
| 316 | |||
| 317 | PROXYPEER_SETCONNECTING( g_connections[i].state ); | ||
| 318 | |||
| 319 | io_setcookie( newfd, g_connections + i ); | ||
| 320 | |||
| 321 | /* We expect the connecting side to begin with its tracker_id */ | ||
| 322 | io_wantread( newfd ); | ||
| 323 | } | ||
| 324 | } | ||
| 325 | |||
| 326 | return; | ||
| 327 | } | ||
| 328 | |||
| 329 | /* New sync data on the stream */ | ||
| 330 | static void handle_read( int64 peersocket ) { | ||
| 331 | uint32_t tracker_id; | ||
| 332 | proxy_peer *peer = io_getcookie( peersocket ); | ||
| 333 | if( !peer ) { | ||
| 334 | /* Can't happen ;) */ | ||
| 335 | close( peersocket ); | ||
| 336 | return; | ||
| 337 | } | ||
| 338 | switch( peer->state & FLAG_MASK ) { | ||
| 339 | case FLAG_DISCONNECTED: break; /* Shouldnt happen */ | ||
| 340 | case FLAG_CONNECTING: | ||
| 341 | case FLAG_WAITTRACKERID: | ||
| 342 | /* We want at least the first four bytes to come at once, to avoid keeping extra states (for now) */ | ||
| 343 | if( io_tryread( peersocket, &tracker_id, sizeof( tracker_id ) ) != sizeof( tracker_id ) ) | ||
| 344 | goto close_socket; | ||
| 345 | |||
| 346 | /* See, if we already have a connection to that peer */ | ||
| 347 | for( i=0; i<MAX_PEERS; ++i ) | ||
| 348 | if( ( g_connections[i].state & FLAG_MASK ) == FLAG_CONNECTED && | ||
| 349 | g_connections[i].tracker_id == tracker_id ) | ||
| 350 | goto close_socket; | ||
| 351 | |||
| 352 | /* Also no need for soliloquy */ | ||
| 353 | if( tracker_id == g_tracker_id ) | ||
| 354 | goto close_socket; | ||
| 355 | |||
| 356 | /* The new connection is good, send our tracker_id on incoming connections */ | ||
| 357 | if( peer->state == FLAG_CONNECTING ) | ||
| 358 | io_trywrite( peersocket, (void*)&g_tracker_id, sizeof( g_tracker_id ) ); | ||
| 359 | |||
| 360 | peer->tracker_id = tracker_id; | ||
| 361 | PROXYPEER_SETCONNECTED( peer->state ); | ||
| 362 | |||
| 363 | break; | ||
| 364 | close_socket: | ||
| 365 | io_close( peersocket ); | ||
| 366 | PROXYPEER_SETDISCONNECTED( peer->state ); | ||
| 367 | break; | ||
| 368 | case FLAG_CONNECTED: | ||
| 369 | |||
| 370 | break; | ||
| 371 | |||
| 372 | } | ||
| 373 | } | ||
| 374 | |||
| 375 | /* Can write new sync data to the stream */ | ||
| 376 | static void handle_write( int64 peersocket ) { | ||
| 377 | proxy_peer *peer = io_getcookie( peersocket ); | ||
| 378 | if( !peer ) { | ||
| 379 | /* Can't happen ;) */ | ||
| 380 | close( peersocket ); | ||
| 381 | return; | ||
| 382 | } | ||
| 383 | |||
| 384 | switch( peer->state & FLAG_MASK ) { | ||
| 385 | case FLAG_DISCONNECTED: break; /* Shouldnt happen */ | ||
| 386 | case FLAG_CONNECTING: | ||
| 387 | io_trywrite( peersocket, (void*)&g_tracker_id, sizeof( g_tracker_id ) ); | ||
| 388 | PROXYPEER_SETWAITTRACKERID( peer->state ); | ||
| 389 | io_dontwantwrite( peersocket ); | ||
| 390 | io_wantread( peersocket ); | ||
| 391 | break; | ||
| 392 | case FLAG_CONNECTED: | ||
| 393 | switch( iob_send( peersocket, &peer->outdata ) ) { | ||
| 394 | case 0: /* all data sent */ | ||
| 395 | io_dontwantwrite( peersocket ); | ||
| 396 | break; | ||
| 397 | case -3: /* an error occured */ | ||
| 398 | io_close( peersocket ); | ||
| 399 | PROXYPEER_SETDISCONNECTED( peer->state ); | ||
| 400 | iob_reset( &peer->outdata ); | ||
| 401 | free( peer->indata ); | ||
| 402 | default: /* Normal operation or eagain */ | ||
| 403 | break; | ||
| 404 | } | ||
| 405 | break; | ||
| 406 | default: | ||
| 407 | break; | ||
| 408 | } | ||
| 409 | |||
| 410 | return; | ||
| 411 | } | ||
| 412 | |||
| 413 | static void server_mainloop() { | ||
| 414 | int64 sock; | ||
| 415 | tai6464 now; | ||
| 416 | |||
| 417 | while(1) { | ||
| 418 | /* See, if we need to connect to anyone */ | ||
| 419 | if( time(NULL) > g_connection_reconn ) | ||
| 420 | handle_reconnects( ); | ||
| 421 | |||
| 422 | /* Wait for io events until next approx reconn check time */ | ||
| 423 | taia_now( &now ); | ||
| 424 | taia_addsec( &now, &now, 30 ); | ||
| 425 | io_waituntil( now ); | ||
| 426 | |||
| 427 | /* Loop over readable sockets */ | ||
| 428 | while( ( sock = io_canread( ) ) != -1 ) { | ||
| 429 | const void *cookie = io_getcookie( sock ); | ||
| 430 | if( (uintptr_t)cookie == FLAG_SERVERSOCKET ) | ||
| 431 | handle_accept( sock ); | ||
| 432 | else | ||
| 433 | handle_read( sock ); | ||
| 434 | } | ||
| 435 | |||
| 436 | /* Loop over writable sockets */ | ||
| 437 | while( ( sock = io_canwrite( ) ) != -1 ) | ||
| 438 | handle_write( sock ); | ||
| 439 | } | ||
| 440 | } | ||
| 189 | 441 | ||
| 190 | int main( int argc, char **argv ) { | 442 | int main( int argc, char **argv ) { |
| 443 | static pthread_t sync_in_thread_id; | ||
| 444 | static pthread_t sync_out_thread_id; | ||
| 191 | ot_ip6 serverip; | 445 | ot_ip6 serverip; |
| 192 | uint16_t tmpport; | 446 | uint16_t tmpport; |
| 193 | int scanon = 1, bound = 0; | 447 | int scanon = 1, bound = 0; |
| 194 | time_t next_dump = time(NULL)+1; | ||
| 195 | 448 | ||
| 196 | srandom( time(NULL) ); | 449 | srandom( time(NULL) ); |
| 197 | g_tracker_id = random(); | 450 | g_tracker_id = random(); |
| @@ -199,7 +452,7 @@ int main( int argc, char **argv ) { | |||
| 199 | while( scanon ) { | 452 | while( scanon ) { |
| 200 | switch( getopt( argc, argv, ":i:p:vh" ) ) { | 453 | switch( getopt( argc, argv, ":i:p:vh" ) ) { |
| 201 | case -1: scanon = 0; break; | 454 | case -1: scanon = 0; break; |
| 202 | case 'i': | 455 | case 'S': |
| 203 | if( !scan_ip6( optarg, serverip )) { usage( argv[0] ); exit( 1 ); } | 456 | if( !scan_ip6( optarg, serverip )) { usage( argv[0] ); exit( 1 ); } |
| 204 | break; | 457 | break; |
| 205 | case 'p': | 458 | case 'p': |
| @@ -211,7 +464,144 @@ int main( int argc, char **argv ) { | |||
| 211 | } | 464 | } |
| 212 | 465 | ||
| 213 | if( !bound ) exerr( "No port bound." ); | 466 | if( !bound ) exerr( "No port bound." ); |
| 467 | pthread_create( &sync_in_thread_id, NULL, livesync_worker, NULL ); | ||
| 468 | pthread_create( &sync_out_thread_id, NULL, streamsync_worker, NULL ); | ||
| 469 | |||
| 470 | server_mainloop(); | ||
| 471 | return 0; | ||
| 472 | } | ||
| 473 | |||
| 474 | static void * streamsync_worker( void * args ) { | ||
| 475 | (void)args; | ||
| 476 | while( 1 ) { | ||
| 477 | int bucket; | ||
| 478 | /* For each bucket... */ | ||
| 479 | for( bucket=0; bucket<OT_BUCKET_COUNT; ++bucket ) { | ||
| 480 | /* Get exclusive access to that bucket */ | ||
| 481 | ot_vector *torrents_list = mutex_bucket_lock( bucket ); | ||
| 482 | size_t tor_offset, count_def = 0, count_one = 0, count_two = 0, count_peers = 0; | ||
| 483 | size_t mem, mem_a = 0, mem_b = 0; | ||
| 484 | uint8_t *ptr, *ptr_a, *ptr_b, *ptr_c; | ||
| 485 | |||
| 486 | /* For each torrent in this bucket.. */ | ||
| 487 | for( tor_offset=0; tor_offset<torrents_list->size; ++tor_offset ) { | ||
| 488 | /* Address torrents members */ | ||
| 489 | ot_peerlist *peer_list = ( ((ot_torrent*)(torrents_list->data))[tor_offset] ).peer_list; | ||
| 490 | switch( peer_list->peer_count ) { | ||
| 491 | case 2: count_two++; break; | ||
| 492 | case 1: count_one++; break; | ||
| 493 | case 0: break; | ||
| 494 | default: | ||
| 495 | count_peers += peer_list->peer_count; | ||
| 496 | count_def += 1 + ( peer_list->peer_count >> 8 ); | ||
| 497 | } | ||
| 498 | } | ||
| 499 | |||
| 500 | /* Maximal memory requirement: max 3 blocks, max torrents * 20 + max peers * 7 */ | ||
| 501 | mem = 3 * ( 4 + 1 + 1 + 2 ) + ( count_one + count_two ) * 19 + count_def * 20 + | ||
| 502 | ( count_one + 2 * count_two + count_peers ) * 7; | ||
| 503 | |||
| 504 | ptr = ptr_a = ptr_b = ptr_c = malloc( mem ); | ||
| 505 | if( !ptr ) goto unlock_continue; | ||
| 506 | |||
| 507 | if( count_one > 8 ) { | ||
| 508 | mem_a = 4 + 1 + 1 + 2 + count_one * ( 19 + 7 ); | ||
| 509 | ptr_b += mem_a; ptr_c += mem_a; | ||
| 510 | memcpy( ptr_a, &g_tracker_id, sizeof(g_tracker_id)); /* Offset 0: the tracker ID */ | ||
| 511 | ptr_a[4] = 1; /* Offset 4: packet type 1 */ | ||
| 512 | ptr_a[5] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 5: the shared prefix */ | ||
| 513 | ptr_a[6] = count_one >> 8; | ||
| 514 | ptr_a[7] = count_one & 255; | ||
| 515 | ptr_a += 8; | ||
| 516 | } else { | ||
| 517 | count_def += count_one; | ||
| 518 | count_peers += count_one; | ||
| 519 | } | ||
| 520 | |||
| 521 | if( count_two > 8 ) { | ||
| 522 | mem_b = 4 + 1 + 1 + 2 + count_two * ( 19 + 14 ); | ||
| 523 | ptr_c += mem_b; | ||
| 524 | memcpy( ptr_b, &g_tracker_id, sizeof(g_tracker_id)); /* Offset 0: the tracker ID */ | ||
| 525 | ptr_b[4] = 2; /* Offset 4: packet type 2 */ | ||
| 526 | ptr_b[5] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 5: the shared prefix */ | ||
| 527 | ptr_b[6] = count_two >> 8; | ||
| 528 | ptr_b[7] = count_two & 255; | ||
| 529 | ptr_b += 8; | ||
| 530 | } else { | ||
| 531 | count_def += count_two; | ||
| 532 | count_peers += 2 * count_two; | ||
| 533 | } | ||
| 534 | |||
| 535 | if( count_def ) { | ||
| 536 | memcpy( ptr_c, &g_tracker_id, sizeof(g_tracker_id)); /* Offset 0: the tracker ID */ | ||
| 537 | ptr_c[4] = 0; /* Offset 4: packet type 0 */ | ||
| 538 | ptr_c[5] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 5: the shared prefix */ | ||
| 539 | ptr_c[6] = count_def >> 8; | ||
| 540 | ptr_c[7] = count_def & 255; | ||
| 541 | ptr_c += 8; | ||
| 542 | } | ||
| 543 | |||
| 544 | /* For each torrent in this bucket.. */ | ||
| 545 | for( tor_offset=0; tor_offset<torrents_list->size; ++tor_offset ) { | ||
| 546 | /* Address torrents members */ | ||
| 547 | ot_torrent *torrent = ((ot_torrent*)(torrents_list->data)) + tor_offset; | ||
| 548 | ot_peerlist *peer_list = torrent->peer_list; | ||
| 549 | ot_peer *peers = (ot_peer*)(peer_list->peers.data); | ||
| 550 | uint8_t **dst; | ||
| 551 | int multi = 0; | ||
| 552 | switch( peer_list->peer_count ) { | ||
| 553 | case 0: continue; | ||
| 554 | case 1: dst = mem_a ? &ptr_a : &ptr_c; break; | ||
| 555 | case 2: dst = mem_b ? &ptr_b : &ptr_c; break; | ||
| 556 | default: dst = &ptr_c; multi = 1; break; | ||
| 557 | } | ||
| 558 | |||
| 559 | do { | ||
| 560 | size_t i, pc = peer_list->peer_count; | ||
| 561 | if( pc > 255 ) pc = 255; | ||
| 562 | memcpy( *dst, torrent->hash + 1, sizeof( ot_hash ) - 1); | ||
| 563 | *dst += sizeof( ot_hash ) - 1; | ||
| 564 | if( multi ) *(*dst)++ = pc; | ||
| 565 | for( i=0; i < pc; ++i ) { | ||
| 566 | memcpy( *dst, peers++, OT_IP_SIZE + 3 ); | ||
| 567 | *dst += OT_IP_SIZE + 3; | ||
| 568 | } | ||
| 569 | peer_list->peer_count -= pc; | ||
| 570 | } while( peer_list->peer_count ); | ||
| 571 | free_peerlist(peer_list); | ||
| 572 | } | ||
| 573 | |||
| 574 | free( torrents_list->data ); | ||
| 575 | memset( torrents_list, 0, sizeof(*torrents_list ) ); | ||
| 576 | unlock_continue: | ||
| 577 | mutex_bucket_unlock( bucket, 0 ); | ||
| 578 | |||
| 579 | if( ptr ) { | ||
| 580 | int i; | ||
| 581 | |||
| 582 | if( ptr_b > ptr_c ) ptr_c = ptr_b; | ||
| 583 | if( ptr_a > ptr_c ) ptr_c = ptr_a; | ||
| 584 | mem = ptr_c - ptr; | ||
| 585 | |||
| 586 | for( i=0; i<g_connection_count; ++i ) { | ||
| 587 | if( g_connections[i].fd != -1 ) { | ||
| 588 | void *tmp = malloc( mem ); | ||
| 589 | if( tmp ) | ||
| 590 | if( !iob_addbuf_free( &g_connections[i].outdata, tmp, mem ) ) | ||
| 591 | free( tmp ); | ||
| 592 | } | ||
| 593 | } | ||
| 594 | |||
| 595 | free( ptr ); | ||
| 596 | } | ||
| 597 | usleep( OT_SYNC_SLEEP ); | ||
| 598 | } | ||
| 599 | } | ||
| 600 | return 0; | ||
| 601 | } | ||
| 214 | 602 | ||
| 603 | static void * livesync_worker( void * args ) { | ||
| 604 | (void)args; | ||
| 215 | while( 1 ) { | 605 | while( 1 ) { |
| 216 | ot_ip6 in_ip; uint16_t in_port; | 606 | ot_ip6 in_ip; uint16_t in_port; |
| 217 | size_t datalen = socket_recv4(g_socket_in, (char*)g_inbuffer, LIVESYNC_INCOMING_BUFFSIZE, 12+(char*)in_ip, &in_port); | 607 | size_t datalen = socket_recv4(g_socket_in, (char*)g_inbuffer, LIVESYNC_INCOMING_BUFFSIZE, 12+(char*)in_ip, &in_port); |
| @@ -223,7 +613,6 @@ int main( int argc, char **argv ) { | |||
| 223 | /* drop packet coming from ourselves */ | 613 | /* drop packet coming from ourselves */ |
| 224 | continue; | 614 | continue; |
| 225 | } | 615 | } |
| 226 | |||
| 227 | switch( uint32_read_big( sizeof( g_tracker_id ) + (char*)g_inbuffer ) ) { | 616 | switch( uint32_read_big( sizeof( g_tracker_id ) + (char*)g_inbuffer ) ) { |
| 228 | case OT_SYNC_PEER: | 617 | case OT_SYNC_PEER: |
| 229 | livesync_handle_peersync( datalen ); | 618 | livesync_handle_peersync( datalen ); |
| @@ -232,37 +621,6 @@ int main( int argc, char **argv ) { | |||
| 232 | fprintf( stderr, "Received an unknown live sync packet type %u.\n", uint32_read_big( sizeof( g_tracker_id ) + (char*)g_inbuffer ) ); | 621 | fprintf( stderr, "Received an unknown live sync packet type %u.\n", uint32_read_big( sizeof( g_tracker_id ) + (char*)g_inbuffer ) ); |
| 233 | break; | 622 | break; |
| 234 | } | 623 | } |
| 235 | if( time(NULL) > next_dump ) { | ||
| 236 | int bucket, i; | ||
| 237 | /* For each bucket... */ | ||
| 238 | for( bucket=0; bucket<OT_BUCKET_COUNT; ++bucket ) { | ||
| 239 | /* Get exclusive access to that bucket */ | ||
| 240 | ot_vector *torrents_list = mutex_bucket_lock( bucket ); | ||
| 241 | size_t tor_offset; | ||
| 242 | |||
| 243 | /* For each torrent in this bucket.. */ | ||
| 244 | for( tor_offset=0; tor_offset<torrents_list->size; ++tor_offset ) { | ||
| 245 | /* Address torrents members */ | ||
| 246 | ot_peerlist *peer_list = ( ((ot_torrent*)(torrents_list->data))[tor_offset] ).peer_list; | ||
| 247 | #ifdef WANT_SCROOOOOOOLL | ||
| 248 | ot_hash *hash =&( ((ot_torrent*)(torrents_list->data))[tor_offset] ).hash; | ||
| 249 | char hash_out[41]; | ||
| 250 | to_hex(hash_out,*hash); | ||
| 251 | printf( "%s %08zd\n", hash_out, peer_list->peer_count ); | ||
| 252 | #endif | ||
| 253 | if(peer_list->peer_count<1024) peer_counts[peer_list->peer_count]++; else peer_counts[1023]++; | ||
| 254 | free_peerlist(peer_list); | ||
| 255 | } | ||
| 256 | free( torrents_list->data ); | ||
| 257 | memset( torrents_list, 0, sizeof(*torrents_list ) ); | ||
| 258 | } | ||
| 259 | for( i=1023; i>=0; --i ) | ||
| 260 | if( peer_counts[i] ) { | ||
| 261 | printf( "%d:%d ", i, peer_counts[i] ); | ||
| 262 | peer_counts[i] = 0; | ||
| 263 | } | ||
| 264 | printf( "\n" ); | ||
| 265 | next_dump = time(NULL) + 1; | ||
| 266 | } | ||
| 267 | } | 624 | } |
| 625 | return 0; | ||
| 268 | } | 626 | } |
