diff options
| author | erdgeist <> | 2007-11-23 18:15:38 +0000 |
|---|---|---|
| committer | erdgeist <> | 2007-11-23 18:15:38 +0000 |
| commit | 8f7ef4b2eaadef95de2ea2734abd39109e141ff7 (patch) | |
| tree | ed3debf8db1e3196dbac115a49da1458976c7d96 | |
| parent | 54560fdcd37b0fb47311aa53074137e5e85341c4 (diff) | |
Completely rewritten fullscrape code. All assumptions on how deflate() works were a little naive. Needs more error checking and testing.
| -rw-r--r-- | ot_fullscrape.c | 133 |
1 files changed, 67 insertions, 66 deletions
diff --git a/ot_fullscrape.c b/ot_fullscrape.c index d9c872e..36249fb 100644 --- a/ot_fullscrape.c +++ b/ot_fullscrape.c | |||
| @@ -25,10 +25,18 @@ | |||
| 25 | Full scrapes usually are huge and one does not want to | 25 | Full scrapes usually are huge and one does not want to |
| 26 | allocate more memory. So lets get them in 512k units | 26 | allocate more memory. So lets get them in 512k units |
| 27 | */ | 27 | */ |
| 28 | #define OT_SCRAPE_CHUNK_SIZE (512*1024) | 28 | #define OT_SCRAPE_CHUNK_SIZE (1024) |
| 29 | 29 | ||
| 30 | /* "d8:completei%zde10:downloadedi%zde10:incompletei%zdee" */ | 30 | /* "d8:completei%zde10:downloadedi%zde10:incompletei%zdee" */ |
| 31 | #define OT_FULLSCRAPE_MAXENTRYLEN 256 | 31 | #define OT_SCRAPE_MAXENTRYLEN 256 |
| 32 | |||
| 33 | #ifdef WANT_COMPRESSION_GZIP | ||
| 34 | #define IF_COMPRESSION( TASK ) if( mode & TASK_FLAG_GZIP ) TASK | ||
| 35 | #define WANT_COMPRESSION_GZIP_PARAM( param1, param2 ) , param1, param2 | ||
| 36 | #else | ||
| 37 | #define IF_COMPRESSION( TASK ) | ||
| 38 | #define WANT_COMPRESSION_GZIP_PARAM( param1, param2 ) | ||
| 39 | #endif | ||
| 32 | 40 | ||
| 33 | /* Forward declaration */ | 41 | /* Forward declaration */ |
| 34 | static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tasktype mode ); | 42 | static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tasktype mode ); |
| @@ -69,11 +77,38 @@ void fullscrape_deliver( int64 socket, ot_tasktype tasktype ) { | |||
| 69 | mutex_workqueue_pushtask( socket, tasktype ); | 77 | mutex_workqueue_pushtask( socket, tasktype ); |
| 70 | } | 78 | } |
| 71 | 79 | ||
| 80 | static int fullscrape_increase( int *iovec_entries, struct iovec **iovector, | ||
| 81 | char **r, char **re WANT_COMPRESSION_GZIP_PARAM( z_stream *strm, ot_tasktype mode ) ) { | ||
| 82 | /* Allocate a fresh output buffer at the end of our buffers list */ | ||
| 83 | if( !( *r = iovec_fix_increase_or_free( iovec_entries, iovector, *r, OT_SCRAPE_CHUNK_SIZE ) ) ) { | ||
| 84 | |||
| 85 | /* Deallocate gzip buffers */ | ||
| 86 | IF_COMPRESSION( deflateEnd(strm); ) | ||
| 87 | |||
| 88 | /* Release lock on current bucket and return */ | ||
| 89 | return -1; | ||
| 90 | } | ||
| 91 | |||
| 92 | /* Adjust new end of output buffer */ | ||
| 93 | *re = *r + OT_SCRAPE_CHUNK_SIZE - OT_SCRAPE_MAXENTRYLEN; | ||
| 94 | |||
| 95 | /* When compressing, we have all the bytes in output buffer */ | ||
| 96 | IF_COMPRESSION( { \ | ||
| 97 | *re -= OT_SCRAPE_MAXENTRYLEN; \ | ||
| 98 | strm->next_out = (ot_byte*)*r; \ | ||
| 99 | strm->avail_out = OT_SCRAPE_CHUNK_SIZE; \ | ||
| 100 | deflate( strm, Z_NO_FLUSH ); \ | ||
| 101 | *r = (char*)strm->next_out; \ | ||
| 102 | } ) | ||
| 103 | |||
| 104 | return 0; | ||
| 105 | } | ||
| 106 | |||
| 72 | static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tasktype mode ) { | 107 | static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tasktype mode ) { |
| 73 | int bucket; | 108 | int bucket; |
| 74 | char *r, *re; | 109 | char *r, *re; |
| 75 | #ifdef WANT_COMPRESSION_GZIP | 110 | #ifdef WANT_COMPRESSION_GZIP |
| 76 | char compress_buffer[OT_FULLSCRAPE_MAXENTRYLEN]; | 111 | char compress_buffer[OT_SCRAPE_MAXENTRYLEN]; |
| 77 | z_stream strm; | 112 | z_stream strm; |
| 78 | #endif | 113 | #endif |
| 79 | 114 | ||
| @@ -83,28 +118,24 @@ static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tas | |||
| 83 | if( !( r = iovec_increase( iovec_entries, iovector, OT_SCRAPE_CHUNK_SIZE ) ) ) | 118 | if( !( r = iovec_increase( iovec_entries, iovector, OT_SCRAPE_CHUNK_SIZE ) ) ) |
| 84 | return; | 119 | return; |
| 85 | 120 | ||
| 86 | /* ... and pointer to end of current output buffer. | 121 | /* re points to low watermark */ |
| 87 | This works as a low watermark */ | 122 | re = r + OT_SCRAPE_CHUNK_SIZE - OT_SCRAPE_MAXENTRYLEN; |
| 88 | re = r + OT_SCRAPE_CHUNK_SIZE; | ||
| 89 | 123 | ||
| 90 | #ifdef WANT_COMPRESSION_GZIP | 124 | #ifdef WANT_COMPRESSION_GZIP |
| 91 | if( mode & TASK_FLAG_GZIP ) { | 125 | if( mode & TASK_FLAG_GZIP ) { |
| 126 | re += OT_SCRAPE_MAXENTRYLEN; | ||
| 92 | byte_zero( &strm, sizeof(strm) ); | 127 | byte_zero( &strm, sizeof(strm) ); |
| 93 | strm.next_in = (ot_byte*)r; | 128 | strm.next_in = (ot_byte*)compress_buffer; |
| 129 | strm.next_out = (ot_byte*)r; | ||
| 130 | strm.avail_out = OT_SCRAPE_CHUNK_SIZE; | ||
| 94 | if( deflateInit2(&strm,9,Z_DEFLATED,31,8,Z_DEFAULT_STRATEGY) != Z_OK ) | 131 | if( deflateInit2(&strm,9,Z_DEFLATED,31,8,Z_DEFAULT_STRATEGY) != Z_OK ) |
| 95 | fprintf( stderr, "not ok.\n" ); | 132 | fprintf( stderr, "not ok.\n" ); |
| 96 | |||
| 97 | strm.next_out = (unsigned char*)r; | ||
| 98 | strm.avail_out = OT_SCRAPE_CHUNK_SIZE; | ||
| 99 | r = compress_buffer; | 133 | r = compress_buffer; |
| 100 | } | 134 | } |
| 101 | #endif | 135 | #endif |
| 102 | 136 | ||
| 103 | /* Reply dictionary only needed for bencoded fullscrape */ | 137 | if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE ) |
| 104 | if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE ) { | 138 | r += sprintf( r, "d5:filesd" ); |
| 105 | memmove( r, "d5:filesd", 9 ); | ||
| 106 | r += 9; | ||
| 107 | } | ||
| 108 | 139 | ||
| 109 | /* For each bucket... */ | 140 | /* For each bucket... */ |
| 110 | for( bucket=0; bucket<OT_BUCKET_COUNT; ++bucket ) { | 141 | for( bucket=0; bucket<OT_BUCKET_COUNT; ++bucket ) { |
| @@ -121,13 +152,15 @@ static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tas | |||
| 121 | switch( mode & TASK_TASK_MASK ) { | 152 | switch( mode & TASK_TASK_MASK ) { |
| 122 | case TASK_FULLSCRAPE: | 153 | case TASK_FULLSCRAPE: |
| 123 | default: | 154 | default: |
| 155 | |||
| 124 | /* push hash as bencoded string */ | 156 | /* push hash as bencoded string */ |
| 125 | *r++='2'; *r++='0'; *r++=':'; | 157 | *r++='2'; *r++='0'; *r++=':'; |
| 126 | memmove( r, hash, 20 ); r+=20; | 158 | memmove( r, hash, 20 ); r+=20; |
| 127 | 159 | ||
| 128 | /* push rest of the scrape string */ | 160 | /* push rest of the scrape string */ |
| 129 | r += sprintf( r, "d8:completei%zde10:downloadedi%zde10:incompletei%zdee", peer_list->seed_count, peer_list->down_count, peer_list->peer_count-peer_list->seed_count ); | 161 | r += sprintf( r, "d8:completei%zde10:downloadedi%zde10:incompletei%zdee", peer_list->seed_count, peer_list->down_count, peer_list->peer_count-peer_list->seed_count ); |
| 130 | break; | 162 | |
| 163 | break; | ||
| 131 | case TASK_FULLSCRAPE_TPB_ASCII: | 164 | case TASK_FULLSCRAPE_TPB_ASCII: |
| 132 | to_hex( r, *hash ); r+=40; | 165 | to_hex( r, *hash ); r+=40; |
| 133 | r += sprintf( r, ":%zd:%zd\n", peer_list->seed_count, peer_list->peer_count-peer_list->seed_count ); | 166 | r += sprintf( r, ":%zd:%zd\n", peer_list->seed_count, peer_list->peer_count-peer_list->seed_count ); |
| @@ -144,73 +177,41 @@ static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tas | |||
| 144 | } | 177 | } |
| 145 | 178 | ||
| 146 | #ifdef WANT_COMPRESSION_GZIP | 179 | #ifdef WANT_COMPRESSION_GZIP |
| 147 | if( mode & TASK_FLAG_GZIP ) { | 180 | if( mode & TASK_FLAG_GZIP ) { |
| 148 | strm.next_in = (ot_byte*)compress_buffer; | 181 | strm.next_in = (ot_byte*)compress_buffer; |
| 149 | strm.avail_in = r - compress_buffer; | 182 | strm.avail_in = r - compress_buffer; |
| 150 | if( deflate( &strm, Z_NO_FLUSH ) != Z_OK ) | 183 | deflate( &strm, Z_NO_FLUSH ); |
| 151 | fprintf( stderr, "Not ok.\n" ); | ||
| 152 | r = (char*)strm.next_out; | 184 | r = (char*)strm.next_out; |
| 153 | } | 185 | } |
| 154 | #endif | 186 | #endif |
| 155 | 187 | ||
| 156 | /* If we reached our low watermark in buffer... */ | 188 | /* Check if there still is enough buffer left */ |
| 157 | if( re - r <= OT_FULLSCRAPE_MAXENTRYLEN ) { | 189 | while( ( r > re ) && fullscrape_increase( iovec_entries, iovector, &r, &re WANT_COMPRESSION_GZIP_PARAM( &strm, mode ) ) ) |
| 158 | 190 | return mutex_bucket_unlock( bucket ); | |
| 159 | /* crop current output buffer to the amount really used */ | ||
| 160 | iovec_fixlast( iovec_entries, iovector, OT_SCRAPE_CHUNK_SIZE - ( re - r ) ); | ||
| 161 | |||
| 162 | /* And allocate a fresh output buffer at the end of our buffers list */ | ||
| 163 | if( !( r = iovec_increase( iovec_entries, iovector, OT_SCRAPE_CHUNK_SIZE ) ) ) { | ||
| 164 | |||
| 165 | /* If this fails: free buffers */ | ||
| 166 | iovec_free( iovec_entries, iovector ); | ||
| 167 | |||
| 168 | #ifdef WANT_COMPRESSION_GZIP | ||
| 169 | deflateEnd(&strm); | ||
| 170 | #endif | ||
| 171 | |||
| 172 | /* Release lock on current bucket and return */ | ||
| 173 | mutex_bucket_unlock( bucket ); | ||
| 174 | return; | ||
| 175 | } | ||
| 176 | |||
| 177 | /* Adjust new end of output buffer */ | ||
| 178 | re = r + OT_SCRAPE_CHUNK_SIZE; | ||
| 179 | 191 | ||
| 180 | #ifdef WANT_COMPRESSION_GZIP | 192 | IF_COMPRESSION( r = compress_buffer; ) |
| 181 | if( mode & TASK_FLAG_GZIP ) { | ||
| 182 | strm.next_out = (ot_byte*)r; | ||
| 183 | strm.avail_out = OT_SCRAPE_CHUNK_SIZE; | ||
| 184 | } | ||
| 185 | #endif | ||
| 186 | } | ||
| 187 | #ifdef WANT_COMPRESSION_GZIP | ||
| 188 | if( mode & TASK_FLAG_GZIP ) { | ||
| 189 | r = compress_buffer; | ||
| 190 | } | ||
| 191 | #endif | ||
| 192 | } | 193 | } |
| 193 | 194 | ||
| 194 | /* All torrents done: release lock on currenct bucket */ | 195 | /* All torrents done: release lock on currenct bucket */ |
| 195 | mutex_bucket_unlock( bucket ); | 196 | mutex_bucket_unlock( bucket ); |
| 196 | } | 197 | } |
| 197 | 198 | ||
| 198 | /* Close bencoded scrape dictionary if necessary */ | 199 | if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE ) |
| 199 | if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE ) { | 200 | r += sprintf( r, "ee" ); |
| 200 | *r++='e'; *r++='e'; | ||
| 201 | } | ||
| 202 | 201 | ||
| 203 | #ifdef WANT_COMPRESSION_GZIP | 202 | #ifdef WANT_COMPRESSION_GZIP |
| 204 | if( mode & TASK_FLAG_GZIP ) { | 203 | if( mode & TASK_FLAG_GZIP ) { |
| 205 | strm.next_in = (ot_byte*) compress_buffer; | 204 | strm.next_in = (ot_byte*)compress_buffer; |
| 206 | strm.avail_in = r - compress_buffer; | 205 | strm.avail_in = r - compress_buffer; |
| 207 | if( deflate( &strm, Z_FINISH ) != Z_STREAM_END ) | 206 | deflate( &strm, Z_FINISH ); |
| 208 | fprintf( stderr, "Not ok.\n" ); | ||
| 209 | r = (char*)strm.next_out; | 207 | r = (char*)strm.next_out; |
| 210 | deflateEnd(&strm); | 208 | |
| 209 | while( ( r > re ) && fullscrape_increase( iovec_entries, iovector, &r, &re WANT_COMPRESSION_GZIP_PARAM( &strm, mode ) ) ) | ||
| 210 | return mutex_bucket_unlock( bucket ); | ||
| 211 | deflateEnd(&strm); | ||
| 211 | } | 212 | } |
| 212 | #endif | 213 | #endif |
| 213 | 214 | ||
| 214 | /* Release unused memory in current output buffer */ | 215 | /* Release unused memory in current output buffer */ |
| 215 | iovec_fixlast( iovec_entries, iovector, OT_SCRAPE_CHUNK_SIZE - ( re - r ) ); | 216 | iovec_fixlast( iovec_entries, iovector, r ); |
| 216 | } | 217 | } |
