diff options
| author | Dirk Engling <erdgeist@erdgeist.org> | 2024-04-13 00:47:29 +0200 |
|---|---|---|
| committer | Dirk Engling <erdgeist@erdgeist.org> | 2024-04-13 00:47:29 +0200 |
| commit | 1a70d9f9ef81ac1b5e843ac71f3538f7845e03ae (patch) | |
| tree | 20a20077503c01dc024e88a6a8d82bf89faf22fd | |
| parent | 301faeb10c5994a6fd31adc5f0b4f8f2b5c23502 (diff) | |
First shot on chunked transfers
| -rw-r--r-- | opentracker.c | 39 | ||||
| -rw-r--r-- | ot_fullscrape.c | 33 | ||||
| -rw-r--r-- | ot_http.c | 93 | ||||
| -rw-r--r-- | ot_http.h | 10 | ||||
| -rw-r--r-- | ot_iovec.c | 21 | ||||
| -rw-r--r-- | ot_iovec.h | 1 | ||||
| -rw-r--r-- | ot_mutex.c | 54 | ||||
| -rw-r--r-- | ot_mutex.h | 5 | ||||
| -rw-r--r-- | trackerlogic.c | 23 | ||||
| -rw-r--r-- | trackerlogic.h | 1 |
10 files changed, 205 insertions, 75 deletions
diff --git a/opentracker.c b/opentracker.c index 7c67f26..73a3ff3 100644 --- a/opentracker.c +++ b/opentracker.c | |||
| @@ -79,6 +79,7 @@ static void defaul_signal_handlers( void ) { | |||
| 79 | sigaddset (&signal_mask, SIGPIPE); | 79 | sigaddset (&signal_mask, SIGPIPE); |
| 80 | sigaddset (&signal_mask, SIGHUP); | 80 | sigaddset (&signal_mask, SIGHUP); |
| 81 | sigaddset (&signal_mask, SIGINT); | 81 | sigaddset (&signal_mask, SIGINT); |
| 82 | sigaddset (&signal_mask, SIGALRM); | ||
| 82 | pthread_sigmask (SIG_BLOCK, &signal_mask, NULL); | 83 | pthread_sigmask (SIG_BLOCK, &signal_mask, NULL); |
| 83 | } | 84 | } |
| 84 | 85 | ||
| @@ -90,7 +91,7 @@ static void install_signal_handlers( void ) { | |||
| 90 | sa.sa_handler = signal_handler; | 91 | sa.sa_handler = signal_handler; |
| 91 | sigemptyset(&sa.sa_mask); | 92 | sigemptyset(&sa.sa_mask); |
| 92 | sa.sa_flags = SA_RESTART; | 93 | sa.sa_flags = SA_RESTART; |
| 93 | if ((sigaction(SIGINT, &sa, NULL) == -1)) | 94 | if ((sigaction(SIGINT, &sa, NULL) == -1) || (sigaction(SIGALRM, &sa, NULL) == -1) ) |
| 94 | panic( "install_signal_handlers" ); | 95 | panic( "install_signal_handlers" ); |
| 95 | 96 | ||
| 96 | sigaddset (&signal_mask, SIGINT); | 97 | sigaddset (&signal_mask, SIGINT); |
| @@ -208,15 +209,23 @@ static void handle_read( const int64 sock, struct ot_workstruct *ws ) { | |||
| 208 | static void handle_write( const int64 sock ) { | 209 | static void handle_write( const int64 sock ) { |
| 209 | struct http_data* cookie=io_getcookie( sock ); | 210 | struct http_data* cookie=io_getcookie( sock ); |
| 210 | size_t i; | 211 | size_t i; |
| 212 | int chunked = 0; | ||
| 211 | 213 | ||
| 212 | /* Look for the first io_batch still containing bytes to write */ | 214 | /* Look for the first io_batch still containing bytes to write */ |
| 213 | if( cookie ) | 215 | if( cookie ) { |
| 214 | for( i = 0; i < cookie->batches; ++i ) | 216 | if( cookie->flag & STRUCT_HTTP_FLAG_CHUNKED_IN_TRANSFER ) |
| 217 | chunked = 1; | ||
| 218 | |||
| 219 | for( i = 0; i < cookie->batches; ++i ) { | ||
| 220 | fprintf(stderr, "handle_write inspects batch %d of %d (bytes left: %d)\n", i, cookie->batches, cookie->batch[i].bytesleft); | ||
| 215 | if( cookie->batch[i].bytesleft ) { | 221 | if( cookie->batch[i].bytesleft ) { |
| 216 | int64 res = iob_send( sock, cookie->batch + i ); | 222 | int64 res = iob_send( sock, cookie->batch + i ); |
| 217 | 223 | ||
| 218 | if( res == -3 ) | 224 | fprintf(stderr, "handle_write yields res %lld when trying to iob_send\n", res); |
| 219 | break; | 225 | if( res == -3 ) { |
| 226 | handle_dead( sock ); | ||
| 227 | return; | ||
| 228 | } | ||
| 220 | 229 | ||
| 221 | if( !cookie->batch[i].bytesleft ) | 230 | if( !cookie->batch[i].bytesleft ) |
| 222 | continue; | 231 | continue; |
| @@ -224,8 +233,17 @@ static void handle_write( const int64 sock ) { | |||
| 224 | if( res == -1 || res > 0 || i < cookie->batches - 1 ) | 233 | if( res == -1 || res > 0 || i < cookie->batches - 1 ) |
| 225 | return; | 234 | return; |
| 226 | } | 235 | } |
| 236 | } | ||
| 237 | } | ||
| 227 | 238 | ||
| 228 | handle_dead( sock ); | 239 | /* In a chunked transfer after all batches accumulated have been sent, wait for the next one */ |
| 240 | if( chunked ) { | ||
| 241 | //fprintf( stderr, "handle_write is STRUCT_HTTP_FLAG_CHUNKED_IN_TRANSFER => dont want write on sock %lld\n", sock); | ||
| 242 | //io_dontwantwrite( sock ); | ||
| 243 | } else { | ||
| 244 | fprintf( stderr, "handle_write is STRUCT_HTTP_FLAG_CHUNKED_IN_TRANSFER => handle dead on sock %lld\n", sock); | ||
| 245 | handle_dead( sock ); | ||
| 246 | } | ||
| 229 | } | 247 | } |
| 230 | 248 | ||
| 231 | static void handle_accept( const int64 serversocket ) { | 249 | static void handle_accept( const int64 serversocket ) { |
| @@ -266,7 +284,7 @@ static void * server_mainloop( void * args ) { | |||
| 266 | struct ot_workstruct ws; | 284 | struct ot_workstruct ws; |
| 267 | time_t next_timeout_check = g_now_seconds + OT_CLIENT_TIMEOUT_CHECKINTERVAL; | 285 | time_t next_timeout_check = g_now_seconds + OT_CLIENT_TIMEOUT_CHECKINTERVAL; |
| 268 | struct iovec *iovector; | 286 | struct iovec *iovector; |
| 269 | int iovec_entries; | 287 | int iovec_entries, is_partial; |
| 270 | 288 | ||
| 271 | (void)args; | 289 | (void)args; |
| 272 | 290 | ||
| @@ -305,8 +323,8 @@ static void * server_mainloop( void * args ) { | |||
| 305 | handle_read( sock, &ws ); | 323 | handle_read( sock, &ws ); |
| 306 | } | 324 | } |
| 307 | 325 | ||
| 308 | while( ( sock = mutex_workqueue_popresult( &iovec_entries, &iovector ) ) != -1 ) | 326 | while( ( sock = mutex_workqueue_popresult( &iovec_entries, &iovector, &is_partial ) ) != -1 ) |
| 309 | http_sendiovecdata( sock, &ws, iovec_entries, iovector ); | 327 | http_sendiovecdata( sock, &ws, iovec_entries, iovector, is_partial ); |
| 310 | 328 | ||
| 311 | while( ( sock = io_canwrite( ) ) != -1 ) | 329 | while( ( sock = io_canwrite( ) ) != -1 ) |
| 312 | handle_write( sock ); | 330 | handle_write( sock ); |
| @@ -318,9 +336,6 @@ static void * server_mainloop( void * args ) { | |||
| 318 | } | 336 | } |
| 319 | 337 | ||
| 320 | livesync_ticker(); | 338 | livesync_ticker(); |
| 321 | |||
| 322 | /* Enforce setting the clock */ | ||
| 323 | signal_handler( SIGALRM ); | ||
| 324 | } | 339 | } |
| 325 | return 0; | 340 | return 0; |
| 326 | } | 341 | } |
diff --git a/ot_fullscrape.c b/ot_fullscrape.c index d7d3518..fafd83c 100644 --- a/ot_fullscrape.c +++ b/ot_fullscrape.c | |||
| @@ -36,9 +36,9 @@ | |||
| 36 | #define OT_SCRAPE_MAXENTRYLEN 256 | 36 | #define OT_SCRAPE_MAXENTRYLEN 256 |
| 37 | 37 | ||
| 38 | /* Forward declaration */ | 38 | /* Forward declaration */ |
| 39 | static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tasktype mode ); | 39 | static void fullscrape_make( int taskid, ot_tasktype mode); |
| 40 | #ifdef WANT_COMPRESSION_GZIP | 40 | #ifdef WANT_COMPRESSION_GZIP |
| 41 | static void fullscrape_make_gzip( int *iovec_entries, struct iovec **iovector, ot_tasktype mode ); | 41 | static void fullscrape_make_gzip( int taskid, ot_tasktype mode); |
| 42 | #endif | 42 | #endif |
| 43 | 43 | ||
| 44 | /* Converter function from memory to human readable hex strings | 44 | /* Converter function from memory to human readable hex strings |
| @@ -49,9 +49,6 @@ static char*to_hex(char*d,uint8_t*s){char*m="0123456789ABCDEF";char *t=d;char*e= | |||
| 49 | It grabs tasks from mutex_tasklist and delivers results back | 49 | It grabs tasks from mutex_tasklist and delivers results back |
| 50 | */ | 50 | */ |
| 51 | static void * fullscrape_worker( void * args ) { | 51 | static void * fullscrape_worker( void * args ) { |
| 52 | int iovec_entries; | ||
| 53 | struct iovec *iovector; | ||
| 54 | |||
| 55 | (void) args; | 52 | (void) args; |
| 56 | 53 | ||
| 57 | while( g_opentracker_running ) { | 54 | while( g_opentracker_running ) { |
| @@ -59,12 +56,11 @@ static void * fullscrape_worker( void * args ) { | |||
| 59 | ot_taskid taskid = mutex_workqueue_poptask( &tasktype ); | 56 | ot_taskid taskid = mutex_workqueue_poptask( &tasktype ); |
| 60 | #ifdef WANT_COMPRESSION_GZIP | 57 | #ifdef WANT_COMPRESSION_GZIP |
| 61 | if (tasktype & TASK_FLAG_GZIP) | 58 | if (tasktype & TASK_FLAG_GZIP) |
| 62 | fullscrape_make_gzip( &iovec_entries, &iovector, tasktype ); | 59 | fullscrape_make_gzip( taskid, tasktype ); |
| 63 | else | 60 | else |
| 64 | #endif | 61 | #endif |
| 65 | fullscrape_make( &iovec_entries, &iovector, tasktype ); | 62 | fullscrape_make( taskid, tasktype ); |
| 66 | if( mutex_workqueue_pushresult( taskid, iovec_entries, iovector ) ) | 63 | mutex_workqueue_pushchunked( taskid, NULL ); |
| 67 | iovec_free( &iovec_entries, &iovector ); | ||
| 68 | } | 64 | } |
| 69 | return NULL; | 65 | return NULL; |
| 70 | } | 66 | } |
| @@ -123,14 +119,13 @@ static char * fullscrape_write_one( ot_tasktype mode, char *r, ot_torrent *torre | |||
| 123 | return r; | 119 | return r; |
| 124 | } | 120 | } |
| 125 | 121 | ||
| 126 | static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tasktype mode ) { | 122 | static void fullscrape_make( int taskid, ot_tasktype mode ) { |
| 127 | int bucket; | 123 | int bucket; |
| 128 | char *r, *re; | 124 | char *r, *re; |
| 125 | struct iovec iovector = { NULL, 0 }; | ||
| 129 | 126 | ||
| 130 | /* Setup return vector... */ | 127 | /* Setup return vector... */ |
| 131 | *iovec_entries = 0; | 128 | r = iovector.iov_base = malloc( OT_SCRAPE_CHUNK_SIZE ); |
| 132 | *iovector = NULL; | ||
| 133 | r = iovec_increase( iovec_entries, iovector, OT_SCRAPE_CHUNK_SIZE ); | ||
| 134 | if( !r ) | 129 | if( !r ) |
| 135 | return; | 130 | return; |
| 136 | 131 | ||
| @@ -152,8 +147,14 @@ static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tas | |||
| 152 | r = fullscrape_write_one( mode, r, torrents+i, &torrents[i].hash ); | 147 | r = fullscrape_write_one( mode, r, torrents+i, &torrents[i].hash ); |
| 153 | 148 | ||
| 154 | if( r > re) { | 149 | if( r > re) { |
| 150 | iovector.iov_len = r - (char *)iovector.iov_base; | ||
| 151 | |||
| 152 | if (mutex_workqueue_pushchunked(taskid, &iovector) ) { | ||
| 153 | free(iovector.iov_base); | ||
| 154 | return mutex_bucket_unlock( bucket, 0 ); | ||
| 155 | } | ||
| 155 | /* Allocate a fresh output buffer at the end of our buffers list */ | 156 | /* Allocate a fresh output buffer at the end of our buffers list */ |
| 156 | r = iovec_fix_increase_or_free( iovec_entries, iovector, r, OT_SCRAPE_CHUNK_SIZE ); | 157 | r = iovector.iov_base = malloc( OT_SCRAPE_CHUNK_SIZE ); |
| 157 | if( !r ) | 158 | if( !r ) |
| 158 | return mutex_bucket_unlock( bucket, 0 ); | 159 | return mutex_bucket_unlock( bucket, 0 ); |
| 159 | 160 | ||
| @@ -174,7 +175,9 @@ static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tas | |||
| 174 | r += sprintf( r, "ee" ); | 175 | r += sprintf( r, "ee" ); |
| 175 | 176 | ||
| 176 | /* Release unused memory in current output buffer */ | 177 | /* Release unused memory in current output buffer */ |
| 177 | iovec_fixlast( iovec_entries, iovector, r ); | 178 | iovector.iov_len = r - (char *)iovector.iov_base; |
| 179 | if( mutex_workqueue_pushchunked(taskid, &iovector) ) | ||
| 180 | free(iovector.iov_base); | ||
| 178 | } | 181 | } |
| 179 | 182 | ||
| 180 | #ifdef WANT_COMPRESSION_GZIP | 183 | #ifdef WANT_COMPRESSION_GZIP |
| @@ -121,9 +121,10 @@ ssize_t http_issue_error( const int64 sock, struct ot_workstruct *ws, int code ) | |||
| 121 | return ws->reply_size = -2; | 121 | return ws->reply_size = -2; |
| 122 | } | 122 | } |
| 123 | 123 | ||
| 124 | ssize_t http_sendiovecdata( const int64 sock, struct ot_workstruct *ws, int iovec_entries, struct iovec *iovector ) { | 124 | ssize_t http_sendiovecdata( const int64 sock, struct ot_workstruct *ws, int iovec_entries, struct iovec *iovector, int is_partial ) { |
| 125 | struct http_data *cookie = io_getcookie( sock ); | 125 | struct http_data *cookie = io_getcookie( sock ); |
| 126 | char *header; | 126 | char *header; |
| 127 | const char *encoding = ""; | ||
| 127 | int i; | 128 | int i; |
| 128 | size_t header_size, size = iovec_length( &iovec_entries, (const struct iovec **)&iovector ); | 129 | size_t header_size, size = iovec_length( &iovec_entries, (const struct iovec **)&iovector ); |
| 129 | tai6464 t; | 130 | tai6464 t; |
| @@ -140,54 +141,72 @@ ssize_t http_sendiovecdata( const int64 sock, struct ot_workstruct *ws, int iove | |||
| 140 | /* If we came here, wait for the answer is over */ | 141 | /* If we came here, wait for the answer is over */ |
| 141 | cookie->flag &= ~STRUCT_HTTP_FLAG_WAITINGFORTASK; | 142 | cookie->flag &= ~STRUCT_HTTP_FLAG_WAITINGFORTASK; |
| 142 | 143 | ||
| 143 | /* Our answers never are 0 vectors. Return an error. */ | 144 | fprintf(stderr, "http_sendiovecdata sending %d iovec entries found cookie->batch == %p\n", iovec_entries, cookie->batch); |
| 144 | if( !iovec_entries ) { | ||
| 145 | HTTPERROR_500; | ||
| 146 | } | ||
| 147 | 145 | ||
| 148 | /* Prepare space for http header */ | 146 | if( iovec_entries ) { |
| 149 | header = malloc( SUCCESS_HTTP_HEADER_LENGTH + SUCCESS_HTTP_HEADER_LENGTH_CONTENT_ENCODING ); | ||
| 150 | if( !header ) { | ||
| 151 | iovec_free( &iovec_entries, &iovector ); | ||
| 152 | HTTPERROR_500; | ||
| 153 | } | ||
| 154 | 147 | ||
| 155 | if( cookie->flag & STRUCT_HTTP_FLAG_GZIP ) | 148 | /* Prepare space for http header */ |
| 156 | header_size = sprintf( header, "HTTP/1.0 200 OK\r\nContent-Type: text/plain\r\nContent-Encoding: gzip\r\nContent-Length: %zd\r\n\r\n", size ); | 149 | header = malloc( SUCCESS_HTTP_HEADER_LENGTH + SUCCESS_HTTP_HEADER_LENGTH_CONTENT_ENCODING ); |
| 157 | else if( cookie->flag & STRUCT_HTTP_FLAG_BZIP2 ) | 150 | if( !header ) { |
| 158 | header_size = sprintf( header, "HTTP/1.0 200 OK\r\nContent-Type: text/plain\r\nContent-Encoding: bzip2\r\nContent-Length: %zd\r\n\r\n", size ); | 151 | iovec_free( &iovec_entries, &iovector ); |
| 159 | else | 152 | HTTPERROR_500; |
| 160 | header_size = sprintf( header, "HTTP/1.0 200 OK\r\nContent-Type: text/plain\r\nContent-Length: %zd\r\n\r\n", size ); | 153 | } |
| 161 | 154 | ||
| 162 | if (!cookie->batch ) { | 155 | if( cookie->flag & STRUCT_HTTP_FLAG_GZIP ) |
| 163 | cookie->batch = malloc( sizeof(io_batch) ); | 156 | encoding = "Content-Encoding: gzip\r\n"; |
| 164 | memset( cookie->batch, 0, sizeof(io_batch) ); | 157 | else if( cookie->flag & STRUCT_HTTP_FLAG_BZIP2 ) |
| 165 | cookie->batches = 1; | 158 | encoding = "Content-Encoding: bzip2\r\n"; |
| 166 | } | 159 | |
| 167 | iob_addbuf_free( cookie->batch, header, header_size ); | 160 | if( !(cookie->flag & STRUCT_HTTP_FLAG_CHUNKED) ) |
| 161 | header_size = sprintf( header, "HTTP/1.0 200 OK\r\nContent-Type: text/plain\r\n%sContent-Length: %zd\r\n\r\n", encoding, size ); | ||
| 162 | else { | ||
| 163 | if ( !(cookie->flag & STRUCT_HTTP_FLAG_CHUNKED_IN_TRANSFER )) { | ||
| 164 | header_size = sprintf( header, "HTTP/1.0 200 OK\r\nContent-Type: text/plain\r\n%sTransfer-Encoding: chunked\r\n\r\n%zx\r\n", encoding, size ); | ||
| 165 | cookie->flag |= STRUCT_HTTP_FLAG_CHUNKED_IN_TRANSFER; | ||
| 166 | } else | ||
| 167 | header_size = sprintf( header, "%zx\r\n", size ); | ||
| 168 | } | ||
| 168 | 169 | ||
| 169 | /* Split huge iovectors into separate io_batches */ | 170 | if (!cookie->batch ) { |
| 170 | for( i=0; i<iovec_entries; ++i ) { | 171 | cookie->batch = malloc( sizeof(io_batch) ); |
| 171 | io_batch *current = cookie->batch + cookie->batches - 1; | 172 | memset( cookie->batch, 0, sizeof(io_batch) ); |
| 173 | cookie->batches = 1; | ||
| 174 | } | ||
| 175 | iob_addbuf_free( cookie->batch, header, header_size ); | ||
| 172 | 176 | ||
| 173 | /* If the current batch's limit is reached, try to reallocate a new batch to work on */ | 177 | /* Split huge iovectors into separate io_batches */ |
| 174 | if( current->bytesleft > OT_BATCH_LIMIT ) { | 178 | for( i=0; i<iovec_entries; ++i ) { |
| 175 | io_batch * new_batch = realloc( current, (cookie->batches + 1) * sizeof(io_batch) ); | 179 | io_batch *current = cookie->batch + cookie->batches - 1; |
| 180 | |||
| 181 | /* If the current batch's limit is reached, try to reallocate a new batch to work on */ | ||
| 182 | if( current->bytesleft > OT_BATCH_LIMIT ) { | ||
| 183 | fprintf(stderr, "http_sendiovecdata found batch above limit: %zd\n", current->bytesleft); | ||
| 184 | io_batch * new_batch = realloc( cookie->batch, (cookie->batches + 1) * sizeof(io_batch) ); | ||
| 176 | if( new_batch ) { | 185 | if( new_batch ) { |
| 177 | cookie->batches++; | 186 | cookie->batch = new_batch; |
| 178 | current = cookie->batch = new_batch; | 187 | current = cookie->batch + cookie->batches++; |
| 179 | memset( current, 0, sizeof(io_batch) ); | 188 | memset( current, 0, sizeof(io_batch) ); |
| 180 | } | 189 | } |
| 190 | } | ||
| 191 | fprintf(stderr, "http_sendiovecdata calling iob_addbuf_free with %zd\n", iovector[i].iov_len); | ||
| 192 | iob_addbuf_free( current, iovector[i].iov_base, iovector[i].iov_len ); | ||
| 181 | } | 193 | } |
| 194 | free( iovector ); | ||
| 195 | if ( cookie->flag & STRUCT_HTTP_FLAG_CHUNKED_IN_TRANSFER ) | ||
| 196 | iob_addbuf(cookie->batch + cookie->batches - 1, "\r\n", 2); | ||
| 197 | } | ||
| 182 | 198 | ||
| 183 | iob_addbuf_free( current, iovector[i].iov_base, iovector[i].iov_len ); | 199 | if ((cookie->flag & STRUCT_HTTP_FLAG_CHUNKED_IN_TRANSFER) && cookie->batch && !is_partial) { |
| 200 | fprintf(stderr, "http_sendiovecdata adds a terminating 0 size buffer to batch\n"); | ||
| 201 | iob_addbuf(cookie->batch + cookie->batches - 1, "0\r\n\r\n", 5); | ||
| 202 | cookie->flag &= ~STRUCT_HTTP_FLAG_CHUNKED_IN_TRANSFER; | ||
| 184 | } | 203 | } |
| 185 | free( iovector ); | ||
| 186 | 204 | ||
| 187 | /* writeable sockets timeout after 10 minutes */ | 205 | /* writeable sockets timeout after 10 minutes */ |
| 188 | taia_now( &t ); taia_addsec( &t, &t, OT_CLIENT_TIMEOUT_SEND ); | 206 | taia_now( &t ); taia_addsec( &t, &t, OT_CLIENT_TIMEOUT_SEND ); |
| 189 | io_timeout( sock, t ); | 207 | io_timeout( sock, t ); |
| 190 | io_dontwantread( sock ); | 208 | io_dontwantread( sock ); |
| 209 | fprintf (stderr, "http_sendiovecdata marks socket %lld as wantwrite\n", sock); | ||
| 191 | io_wantwrite( sock ); | 210 | io_wantwrite( sock ); |
| 192 | return 0; | 211 | return 0; |
| 193 | } | 212 | } |
| @@ -254,7 +273,7 @@ static const ot_keywords keywords_format[] = | |||
| 254 | #endif | 273 | #endif |
| 255 | #endif | 274 | #endif |
| 256 | /* Pass this task to the worker thread */ | 275 | /* Pass this task to the worker thread */ |
| 257 | cookie->flag |= STRUCT_HTTP_FLAG_WAITINGFORTASK; | 276 | cookie->flag |= STRUCT_HTTP_FLAG_WAITINGFORTASK | STRUCT_HTTP_FLAG_CHUNKED; |
| 258 | 277 | ||
| 259 | /* Clients waiting for us should not easily timeout */ | 278 | /* Clients waiting for us should not easily timeout */ |
| 260 | taia_uint( &t, 0 ); io_timeout( sock, t ); | 279 | taia_uint( &t, 0 ); io_timeout( sock, t ); |
| @@ -278,7 +297,7 @@ static const ot_keywords keywords_format[] = | |||
| 278 | } | 297 | } |
| 279 | 298 | ||
| 280 | #ifdef WANT_MODEST_FULLSCRAPES | 299 | #ifdef WANT_MODEST_FULLSCRAPES |
| 281 | static pthread_mutex_t g_modest_fullscrape_mutex = PTHREAD_MUTEX_INITIALIZER; | 300 | static pthread_mutex_t g_modest_fullscrape_mutex = PTHREAD_MUTEX_INITIALIZER; |
| 282 | static ot_vector g_modest_fullscrape_timeouts; | 301 | static ot_vector g_modest_fullscrape_timeouts; |
| 283 | typedef struct { ot_ip6 ip; ot_time last_fullscrape; } ot_scrape_log; | 302 | typedef struct { ot_ip6 ip; ot_time last_fullscrape; } ot_scrape_log; |
| 284 | #endif | 303 | #endif |
| @@ -325,7 +344,7 @@ static ssize_t http_handle_fullscrape( const int64 sock, struct ot_workstruct *w | |||
| 325 | #endif | 344 | #endif |
| 326 | 345 | ||
| 327 | /* Pass this task to the worker thread */ | 346 | /* Pass this task to the worker thread */ |
| 328 | cookie->flag |= STRUCT_HTTP_FLAG_WAITINGFORTASK; | 347 | cookie->flag |= STRUCT_HTTP_FLAG_WAITINGFORTASK | STRUCT_HTTP_FLAG_CHUNKED; |
| 329 | /* Clients waiting for us should not easily timeout */ | 348 | /* Clients waiting for us should not easily timeout */ |
| 330 | taia_uint( &t, 0 ); io_timeout( sock, t ); | 349 | taia_uint( &t, 0 ); io_timeout( sock, t ); |
| 331 | fullscrape_deliver( sock, TASK_FULLSCRAPE | format ); | 350 | fullscrape_deliver( sock, TASK_FULLSCRAPE | format ); |
| @@ -7,9 +7,11 @@ | |||
| 7 | #define OT_HTTP_H__ | 7 | #define OT_HTTP_H__ |
| 8 | 8 | ||
| 9 | typedef enum { | 9 | typedef enum { |
| 10 | STRUCT_HTTP_FLAG_WAITINGFORTASK = 1, | 10 | STRUCT_HTTP_FLAG_WAITINGFORTASK = 1, |
| 11 | STRUCT_HTTP_FLAG_GZIP = 2, | 11 | STRUCT_HTTP_FLAG_GZIP = 2, |
| 12 | STRUCT_HTTP_FLAG_BZIP2 = 4 | 12 | STRUCT_HTTP_FLAG_BZIP2 = 4, |
| 13 | STRUCT_HTTP_FLAG_CHUNKED = 8, | ||
| 14 | STRUCT_HTTP_FLAG_CHUNKED_IN_TRANSFER = 16 | ||
| 13 | } STRUCT_HTTP_FLAG; | 15 | } STRUCT_HTTP_FLAG; |
| 14 | 16 | ||
| 15 | struct http_data { | 17 | struct http_data { |
| @@ -21,7 +23,7 @@ struct http_data { | |||
| 21 | }; | 23 | }; |
| 22 | 24 | ||
| 23 | ssize_t http_handle_request( const int64 s, struct ot_workstruct *ws ); | 25 | ssize_t http_handle_request( const int64 s, struct ot_workstruct *ws ); |
| 24 | ssize_t http_sendiovecdata( const int64 s, struct ot_workstruct *ws, int iovec_entries, struct iovec *iovector ); | 26 | ssize_t http_sendiovecdata( const int64 s, struct ot_workstruct *ws, int iovec_entries, struct iovec *iovector, int is_partial ); |
| 25 | ssize_t http_issue_error( const int64 s, struct ot_workstruct *ws, int code ); | 27 | ssize_t http_issue_error( const int64 s, struct ot_workstruct *ws, int code ); |
| 26 | 28 | ||
| 27 | extern char *g_stats_path; | 29 | extern char *g_stats_path; |
| @@ -35,6 +35,26 @@ void *iovec_increase( int *iovec_entries, struct iovec **iovector, size_t new_al | |||
| 35 | return new_data; | 35 | return new_data; |
| 36 | } | 36 | } |
| 37 | 37 | ||
| 38 | void *iovec_append( int *iovec_entries, struct iovec **iovector, struct iovec *append_iovector) { | ||
| 39 | int new_entries = *iovec_entries + 1; | ||
| 40 | struct iovec *new_vec = realloc( *iovector, new_entries * sizeof( struct iovec ) ); | ||
| 41 | if( !new_vec ) | ||
| 42 | return NULL; | ||
| 43 | |||
| 44 | /* Take over data from appended iovec */ | ||
| 45 | new_vec[*iovec_entries].iov_base = append_iovector->iov_base; | ||
| 46 | new_vec[*iovec_entries].iov_len = append_iovector->iov_len; | ||
| 47 | |||
| 48 | append_iovector->iov_base = NULL; | ||
| 49 | append_iovector->iov_len = 0; | ||
| 50 | |||
| 51 | *iovector = new_vec; | ||
| 52 | *iovec_entries = new_entries; | ||
| 53 | |||
| 54 | return new_vec; | ||
| 55 | } | ||
| 56 | |||
| 57 | |||
| 38 | void iovec_free( int *iovec_entries, struct iovec **iovector ) { | 58 | void iovec_free( int *iovec_entries, struct iovec **iovector ) { |
| 39 | int i; | 59 | int i; |
| 40 | for( i=0; i<*iovec_entries; ++i ) | 60 | for( i=0; i<*iovec_entries; ++i ) |
| @@ -64,7 +84,6 @@ void *iovec_fix_increase_or_free( int *iovec_entries, struct iovec **iovector, v | |||
| 64 | return new_data; | 84 | return new_data; |
| 65 | } | 85 | } |
| 66 | 86 | ||
| 67 | |||
| 68 | size_t iovec_length( const int *iovec_entries, const struct iovec **iovector ) { | 87 | size_t iovec_length( const int *iovec_entries, const struct iovec **iovector ) { |
| 69 | size_t length = 0; | 88 | size_t length = 0; |
| 70 | int i; | 89 | int i; |
| @@ -9,6 +9,7 @@ | |||
| 9 | #include <sys/uio.h> | 9 | #include <sys/uio.h> |
| 10 | 10 | ||
| 11 | void *iovec_increase( int *iovec_entries, struct iovec **iovector, size_t new_alloc ); | 11 | void *iovec_increase( int *iovec_entries, struct iovec **iovector, size_t new_alloc ); |
| 12 | void *iovec_append( int *iovec_entries, struct iovec **iovector, struct iovec *append_iovector ); | ||
| 12 | void iovec_fixlast( int *iovec_entries, struct iovec **iovector, void *last_ptr ); | 13 | void iovec_fixlast( int *iovec_entries, struct iovec **iovector, void *last_ptr ); |
| 13 | void iovec_free( int *iovec_entries, struct iovec **iovector ); | 14 | void iovec_free( int *iovec_entries, struct iovec **iovector ); |
| 14 | 15 | ||
| @@ -17,6 +17,7 @@ | |||
| 17 | 17 | ||
| 18 | /* Opentracker */ | 18 | /* Opentracker */ |
| 19 | #include "trackerlogic.h" | 19 | #include "trackerlogic.h" |
| 20 | #include "ot_iovec.h" | ||
| 20 | #include "ot_mutex.h" | 21 | #include "ot_mutex.h" |
| 21 | #include "ot_stats.h" | 22 | #include "ot_stats.h" |
| 22 | 23 | ||
| @@ -194,23 +195,66 @@ int mutex_workqueue_pushresult( ot_taskid taskid, int iovec_entries, struct iove | |||
| 194 | return task ? 0 : -1; | 195 | return task ? 0 : -1; |
| 195 | } | 196 | } |
| 196 | 197 | ||
| 197 | int64 mutex_workqueue_popresult( int *iovec_entries, struct iovec ** iovec ) { | 198 | int mutex_workqueue_pushchunked(ot_taskid taskid, struct iovec *iovec) { |
| 199 | struct ot_task * task; | ||
| 200 | const char byte = 'o'; | ||
| 201 | |||
| 202 | /* Want exclusive access to tasklist */ | ||
| 203 | pthread_mutex_lock( &tasklist_mutex ); | ||
| 204 | |||
| 205 | for (task = tasklist; task; task = task->next) | ||
| 206 | if (task->taskid == taskid) { | ||
| 207 | if( iovec ) { | ||
| 208 | fprintf(stderr, "mutex_workqueue_pushchunked pushing on taskid %d\n", taskid); | ||
| 209 | if (!iovec_append(&task->iovec_entries, &task->iovec, iovec) ) | ||
| 210 | return -1; | ||
| 211 | task->tasktype = TASK_DONE_PARTIAL; | ||
| 212 | } else { | ||
| 213 | fprintf(stderr, "mutex_workqueue_pushchunked finished taskid %d\n", taskid); | ||
| 214 | task->tasktype = TASK_DONE; | ||
| 215 | } | ||
| 216 | break; | ||
| 217 | } | ||
| 218 | |||
| 219 | /* Release lock */ | ||
| 220 | pthread_mutex_unlock( &tasklist_mutex ); | ||
| 221 | |||
| 222 | io_trywrite( g_self_pipe[1], &byte, 1 ); | ||
| 223 | if(!task) | ||
| 224 | fprintf(stderr, "mutex_workqueue_pushchunked taskid %d not found\n", taskid); | ||
| 225 | |||
| 226 | /* Indicate whether the worker has to throw away results */ | ||
| 227 | return task ? 0 : -1; | ||
| 228 | } | ||
| 229 | |||
| 230 | |||
| 231 | int64 mutex_workqueue_popresult( int *iovec_entries, struct iovec ** iovec, int *is_partial ) { | ||
| 198 | struct ot_task ** task; | 232 | struct ot_task ** task; |
| 199 | int64 sock = -1; | 233 | int64 sock = -1; |
| 200 | 234 | ||
| 235 | *is_partial = 0; | ||
| 236 | |||
| 201 | /* Want exclusive access to tasklist */ | 237 | /* Want exclusive access to tasklist */ |
| 202 | pthread_mutex_lock( &tasklist_mutex ); | 238 | pthread_mutex_lock( &tasklist_mutex ); |
| 203 | 239 | ||
| 204 | for (task = &tasklist; *task; task = &((*task)->next)) | 240 | for (task = &tasklist; *task; task = &((*task)->next)) |
| 205 | if ((*task)->tasktype == TASK_DONE) { | 241 | if (((*task)->tasktype & TASK_CLASS_MASK ) == TASK_DONE) { |
| 206 | struct ot_task *ptask = *task; | 242 | struct ot_task *ptask = *task; |
| 207 | 243 | fprintf(stderr, "Got task %d type %d with %d entries\n", (*task)->taskid, (*task)->tasktype, ptask->iovec_entries); | |
| 208 | *iovec_entries = ptask->iovec_entries; | 244 | *iovec_entries = ptask->iovec_entries; |
| 209 | *iovec = ptask->iovec; | 245 | *iovec = ptask->iovec; |
| 210 | sock = ptask->sock; | 246 | sock = ptask->sock; |
| 211 | 247 | ||
| 212 | *task = ptask->next; | 248 | if ((*task)->tasktype == TASK_DONE) { |
| 213 | free( ptask ); | 249 | *task = ptask->next; |
| 250 | free( ptask ); | ||
| 251 | } else { | ||
| 252 | ptask->iovec_entries = 0; | ||
| 253 | ptask->iovec = NULL; | ||
| 254 | *is_partial = 1; | ||
| 255 | /* Prevent task from showing up immediately again unless new data was added */ | ||
| 256 | (*task)->tasktype = TASK_FULLSCRAPE; | ||
| 257 | } | ||
| 214 | break; | 258 | break; |
| 215 | } | 259 | } |
| 216 | 260 | ||
| @@ -54,9 +54,11 @@ typedef enum { | |||
| 54 | TASK_DMEM = 0x0300, | 54 | TASK_DMEM = 0x0300, |
| 55 | 55 | ||
| 56 | TASK_DONE = 0x0f00, | 56 | TASK_DONE = 0x0f00, |
| 57 | TASK_DONE_PARTIAL = 0x0f01, | ||
| 57 | 58 | ||
| 58 | TASK_FLAG_GZIP = 0x1000, | 59 | TASK_FLAG_GZIP = 0x1000, |
| 59 | TASK_FLAG_BZIP2 = 0x2000, | 60 | TASK_FLAG_BZIP2 = 0x2000, |
| 61 | TASK_FLAG_CHUNKED = 0x4000, | ||
| 60 | 62 | ||
| 61 | TASK_TASK_MASK = 0x0fff, | 63 | TASK_TASK_MASK = 0x0fff, |
| 62 | TASK_CLASS_MASK = 0x0f00, | 64 | TASK_CLASS_MASK = 0x0f00, |
| @@ -70,6 +72,7 @@ void mutex_workqueue_canceltask( int64 sock ); | |||
| 70 | void mutex_workqueue_pushsuccess( ot_taskid taskid ); | 72 | void mutex_workqueue_pushsuccess( ot_taskid taskid ); |
| 71 | ot_taskid mutex_workqueue_poptask( ot_tasktype *tasktype ); | 73 | ot_taskid mutex_workqueue_poptask( ot_tasktype *tasktype ); |
| 72 | int mutex_workqueue_pushresult( ot_taskid taskid, int iovec_entries, struct iovec *iovector ); | 74 | int mutex_workqueue_pushresult( ot_taskid taskid, int iovec_entries, struct iovec *iovector ); |
| 73 | int64 mutex_workqueue_popresult( int *iovec_entries, struct iovec ** iovector ); | 75 | int mutex_workqueue_pushchunked(ot_taskid taskid, struct iovec *iovec); |
| 76 | int64 mutex_workqueue_popresult( int *iovec_entries, struct iovec ** iovector, int *is_partial ); | ||
| 74 | 77 | ||
| 75 | #endif | 78 | #endif |
diff --git a/trackerlogic.c b/trackerlogic.c index 6fd2724..13d2741 100644 --- a/trackerlogic.c +++ b/trackerlogic.c | |||
| @@ -517,6 +517,29 @@ size_t peer_size_from_peer6(ot_peer6 *peer) { | |||
| 517 | return OT_PEER_SIZE4; | 517 | return OT_PEER_SIZE4; |
| 518 | } | 518 | } |
| 519 | 519 | ||
| 520 | void trackerlogic_add_random_torrents(size_t amount) { | ||
| 521 | struct ot_workstruct ws; | ||
| 522 | memset( &ws, 0, sizeof(ws) ); | ||
| 523 | |||
| 524 | ws.inbuf=malloc(G_INBUF_SIZE); | ||
| 525 | ws.outbuf=malloc(G_OUTBUF_SIZE); | ||
| 526 | ws.reply=ws.outbuf; | ||
| 527 | ws.hash=ws.inbuf; | ||
| 528 | |||
| 529 | while( amount-- ) { | ||
| 530 | arc4random_buf(ws.hash, sizeof(ot_hash)); | ||
| 531 | arc4random_buf(&ws.peer, sizeof(ws.peer)); | ||
| 532 | |||
| 533 | OT_PEERFLAG(ws.peer) &= PEER_FLAG_SEEDING | PEER_FLAG_COMPLETED | PEER_FLAG_STOPPED; | ||
| 534 | |||
| 535 | add_peer_to_torrent_and_return_peers( FLAG_TCP, &ws, 1 ); | ||
| 536 | } | ||
| 537 | |||
| 538 | free(ws.inbuf); | ||
| 539 | free(ws.outbuf); | ||
| 540 | } | ||
| 541 | |||
| 542 | |||
| 520 | void exerr( char * message ) { | 543 | void exerr( char * message ) { |
| 521 | fprintf( stderr, "%s\n", message ); | 544 | fprintf( stderr, "%s\n", message ); |
| 522 | exit( 111 ); | 545 | exit( 111 ); |
diff --git a/trackerlogic.h b/trackerlogic.h index 9f5886d..bc488c9 100644 --- a/trackerlogic.h +++ b/trackerlogic.h | |||
| @@ -190,6 +190,7 @@ size_t remove_peer_from_torrent( PROTO_FLAG proto, struct ot_workstruct *ws ); | |||
| 190 | size_t return_tcp_scrape_for_torrent( ot_hash const *hash_list, int amount, char *reply ); | 190 | size_t return_tcp_scrape_for_torrent( ot_hash const *hash_list, int amount, char *reply ); |
| 191 | size_t return_udp_scrape_for_torrent( ot_hash const hash, char *reply ); | 191 | size_t return_udp_scrape_for_torrent( ot_hash const hash, char *reply ); |
| 192 | void add_torrent_from_saved_state( ot_hash const hash, ot_time base, size_t down_count ); | 192 | void add_torrent_from_saved_state( ot_hash const hash, ot_time base, size_t down_count ); |
| 193 | void trackerlogic_add_random_torrents(size_t amount); | ||
| 193 | 194 | ||
| 194 | /* torrent iterator */ | 195 | /* torrent iterator */ |
| 195 | void iterate_all_torrents( int (*for_each)( ot_torrent* torrent, uintptr_t data ), uintptr_t data ); | 196 | void iterate_all_torrents( int (*for_each)( ot_torrent* torrent, uintptr_t data ), uintptr_t data ); |
