summaryrefslogtreecommitdiff
path: root/proxy.c
diff options
context:
space:
mode:
Diffstat (limited to 'proxy.c')
-rw-r--r--proxy.c828
1 files changed, 434 insertions, 394 deletions
diff --git a/proxy.c b/proxy.c
index c25611b..9946240 100644
--- a/proxy.c
+++ b/proxy.c
@@ -4,33 +4,33 @@
4 $Id$ */ 4 $Id$ */
5 5
6/* System */ 6/* System */
7#include <arpa/inet.h>
8#include <ctype.h>
9#include <errno.h>
10#include <pthread.h>
11#include <pwd.h>
12#include <signal.h>
7#include <stdint.h> 13#include <stdint.h>
14#include <stdio.h>
8#include <stdlib.h> 15#include <stdlib.h>
9#include <string.h> 16#include <string.h>
10#include <arpa/inet.h>
11#include <sys/socket.h> 17#include <sys/socket.h>
12#include <unistd.h> 18#include <unistd.h>
13#include <errno.h>
14#include <signal.h>
15#include <stdio.h>
16#include <pwd.h>
17#include <ctype.h>
18#include <pthread.h>
19 19
20/* Libowfat */ 20/* Libowfat */
21#include "socket.h" 21#include "byte.h"
22#include "io.h" 22#include "io.h"
23#include "iob.h" 23#include "iob.h"
24#include "byte.h"
25#include "scan.h"
26#include "ip6.h" 24#include "ip6.h"
27#include "ndelay.h" 25#include "ndelay.h"
26#include "scan.h"
27#include "socket.h"
28 28
29/* Opentracker */ 29/* Opentracker */
30#include "trackerlogic.h"
31#include "ot_vector.h"
32#include "ot_mutex.h" 30#include "ot_mutex.h"
33#include "ot_stats.h" 31#include "ot_stats.h"
32#include "ot_vector.h"
33#include "trackerlogic.h"
34 34
35#ifndef WANT_SYNC_LIVE 35#ifndef WANT_SYNC_LIVE
36#define WANT_SYNC_LIVE 36#define WANT_SYNC_LIVE
@@ -40,26 +40,26 @@
40ot_ip6 g_serverip; 40ot_ip6 g_serverip;
41uint16_t g_serverport = 9009; 41uint16_t g_serverport = 9009;
42uint32_t g_tracker_id; 42uint32_t g_tracker_id;
43char groupip_1[4] = { 224,0,23,5 }; 43char groupip_1[4] = {224, 0, 23, 5};
44int g_self_pipe[2]; 44int g_self_pipe[2];
45 45
46/* If you have more than 10 peers, don't use this proxy 46/* If you have more than 10 peers, don't use this proxy
47 Use 20 slots for 10 peers to have room for 10 incoming connection slots 47 Use 20 slots for 10 peers to have room for 10 incoming connection slots
48 */ 48 */
49#define MAX_PEERS 20 49#define MAX_PEERS 20
50 50
51#define LIVESYNC_INCOMING_BUFFSIZE (256*256) 51#define LIVESYNC_INCOMING_BUFFSIZE (256 * 256)
52#define STREAMSYNC_OUTGOING_BUFFSIZE (256*256) 52#define STREAMSYNC_OUTGOING_BUFFSIZE (256 * 256)
53 53
54#define LIVESYNC_OUTGOING_BUFFSIZE_PEERS 1480 54#define LIVESYNC_OUTGOING_BUFFSIZE_PEERS 1480
55#define LIVESYNC_OUTGOING_WATERMARK_PEERS (sizeof(ot_peer)+sizeof(ot_hash)) 55#define LIVESYNC_OUTGOING_WATERMARK_PEERS (sizeof(ot_peer) + sizeof(ot_hash))
56#define LIVESYNC_MAXDELAY 15 /* seconds */ 56#define LIVESYNC_MAXDELAY 15 /* seconds */
57 57
58/* The amount of time a complete sync cycle should take */ 58/* The amount of time a complete sync cycle should take */
59#define OT_SYNC_INTERVAL_MINUTES 2 59#define OT_SYNC_INTERVAL_MINUTES 2
60 60
61/* So after each bucket wait 1 / OT_BUCKET_COUNT intervals */ 61/* So after each bucket wait 1 / OT_BUCKET_COUNT intervals */
62#define OT_SYNC_SLEEP ( ( ( OT_SYNC_INTERVAL_MINUTES ) * 60 * 1000000 ) / ( OT_BUCKET_COUNT ) ) 62#define OT_SYNC_SLEEP (((OT_SYNC_INTERVAL_MINUTES) * 60 * 1000000) / (OT_BUCKET_COUNT))
63 63
64enum { OT_SYNC_PEER4, OT_SYNC_PEER6 }; 64enum { OT_SYNC_PEER4, OT_SYNC_PEER6 };
65enum { FLAG_SERVERSOCKET = 1 }; 65enum { FLAG_SERVERSOCKET = 1 };
@@ -75,151 +75,153 @@ static uint8_t *g_peerbuffer_pos;
75static uint8_t *g_peerbuffer_highwater = g_peerbuffer_start + LIVESYNC_OUTGOING_BUFFSIZE_PEERS - LIVESYNC_OUTGOING_WATERMARK_PEERS; 75static uint8_t *g_peerbuffer_highwater = g_peerbuffer_start + LIVESYNC_OUTGOING_BUFFSIZE_PEERS - LIVESYNC_OUTGOING_WATERMARK_PEERS;
76static ot_time g_next_packet_time; 76static ot_time g_next_packet_time;
77 77
78static void * livesync_worker( void * args ); 78static void *livesync_worker(void *args);
79static void * streamsync_worker( void * args ); 79static void *streamsync_worker(void *args);
80static void livesync_proxytell( uint8_t prefix, uint8_t *info_hash, uint8_t *peer ); 80static void livesync_proxytell(uint8_t prefix, uint8_t *info_hash, uint8_t *peer);
81 81
82void exerr( char * message ) { 82void exerr(char *message) {
83 fprintf( stderr, "%s\n", message ); 83 fprintf(stderr, "%s\n", message);
84 exit( 111 ); 84 exit(111);
85} 85}
86 86
87void stats_issue_event( ot_status_event event, PROTO_FLAG proto, uintptr_t event_data ) { 87void stats_issue_event(ot_status_event event, PROTO_FLAG proto, uintptr_t event_data) {
88 (void) event; 88 (void)event;
89 (void) proto; 89 (void)proto;
90 (void) event_data; 90 (void)event_data;
91} 91}
92 92
93void livesync_bind_mcast( ot_ip6 ip, uint16_t port) { 93void livesync_bind_mcast(ot_ip6 ip, uint16_t port) {
94 char tmpip[4] = {0,0,0,0}; 94 char tmpip[4] = {0, 0, 0, 0};
95 char *v4ip; 95 char *v4ip;
96 96
97 if( !ip6_isv4mapped(ip)) 97 if (!ip6_isv4mapped(ip))
98 exerr("v6 mcast support not yet available."); 98 exerr("v6 mcast support not yet available.");
99 v4ip = ip+12; 99 v4ip = ip + 12;
100 100
101 if( g_socket_in != -1 ) 101 if (g_socket_in != -1)
102 exerr("Error: Livesync listen ip specified twice."); 102 exerr("Error: Livesync listen ip specified twice.");
103 103
104 if( ( g_socket_in = socket_udp4( )) < 0) 104 if ((g_socket_in = socket_udp4()) < 0)
105 exerr("Error: Cant create live sync incoming socket." ); 105 exerr("Error: Cant create live sync incoming socket.");
106 ndelay_off(g_socket_in); 106 ndelay_off(g_socket_in);
107 107
108 if( socket_bind4_reuse( g_socket_in, tmpip, port ) == -1 ) 108 if (socket_bind4_reuse(g_socket_in, tmpip, port) == -1)
109 exerr("Error: Cant bind live sync incoming socket." ); 109 exerr("Error: Cant bind live sync incoming socket.");
110 110
111 if( socket_mcjoin4( g_socket_in, groupip_1, v4ip ) ) 111 if (socket_mcjoin4(g_socket_in, groupip_1, v4ip))
112 exerr("Error: Cant make live sync incoming socket join mcast group."); 112 exerr("Error: Cant make live sync incoming socket join mcast group.");
113 113
114 if( ( g_socket_out = socket_udp4()) < 0) 114 if ((g_socket_out = socket_udp4()) < 0)
115 exerr("Error: Cant create live sync outgoing socket." ); 115 exerr("Error: Cant create live sync outgoing socket.");
116 if( socket_bind4_reuse( g_socket_out, v4ip, port ) == -1 ) 116 if (socket_bind4_reuse(g_socket_out, v4ip, port) == -1)
117 exerr("Error: Cant bind live sync outgoing socket." ); 117 exerr("Error: Cant bind live sync outgoing socket.");
118 118
119 socket_mcttl4(g_socket_out, 1); 119 socket_mcttl4(g_socket_out, 1);
120 socket_mcloop4(g_socket_out, 1); 120 socket_mcloop4(g_socket_out, 1);
121} 121}
122 122
123size_t add_peer_to_torrent_proxy( ot_hash hash, ot_peer *peer, size_t peer_size ) { 123size_t add_peer_to_torrent_proxy(ot_hash hash, ot_peer *peer, size_t peer_size) {
124 int exactmatch; 124 int exactmatch;
125 ot_torrent *torrent; 125 ot_torrent *torrent;
126 ot_peerlist *peer_list; 126 ot_peerlist *peer_list;
127 ot_peer *peer_dest; 127 ot_peer *peer_dest;
128 ot_vector *torrents_list = mutex_bucket_lock_by_hash( hash ); 128 ot_vector *torrents_list = mutex_bucket_lock_by_hash(hash);
129 size_t compare_size = OT_PEER_COMPARE_SIZE_FROM_PEER_SIZE(peer_size); 129 size_t compare_size = OT_PEER_COMPARE_SIZE_FROM_PEER_SIZE(peer_size);
130 130
131 torrent = vector_find_or_insert( torrents_list, (void*)hash, sizeof( ot_torrent ), compare_size, &exactmatch ); 131 torrent = vector_find_or_insert(torrents_list, (void *)hash, sizeof(ot_torrent), compare_size, &exactmatch);
132 if( !torrent ) 132 if (!torrent)
133 return -1; 133 return -1;
134 134
135 if( !exactmatch ) { 135 if (!exactmatch) {
136 /* Create a new torrent entry, then */ 136 /* Create a new torrent entry, then */
137 memcpy( torrent->hash, hash, sizeof(ot_hash) ); 137 memcpy(torrent->hash, hash, sizeof(ot_hash));
138 138
139 if( !( torrent->peer_list6 = malloc( sizeof (ot_peerlist) ) ) || 139 if (!(torrent->peer_list6 = malloc(sizeof(ot_peerlist))) || !(torrent->peer_list4 = malloc(sizeof(ot_peerlist)))) {
140 !( torrent->peer_list4 = malloc( sizeof (ot_peerlist) ) ) ) { 140 vector_remove_torrent(torrents_list, torrent);
141 vector_remove_torrent( torrents_list, torrent ); 141 mutex_bucket_unlock_by_hash(hash, 0);
142 mutex_bucket_unlock_by_hash( hash, 0 );
143 return -1; 142 return -1;
144 } 143 }
145 144
146 byte_zero( torrent->peer_list6, sizeof( ot_peerlist ) ); 145 byte_zero(torrent->peer_list6, sizeof(ot_peerlist));
147 byte_zero( torrent->peer_list4, sizeof( ot_peerlist ) ); 146 byte_zero(torrent->peer_list4, sizeof(ot_peerlist));
148 } 147 }
149 148
150 peer_list = peer_size == OT_PEER_SIZE6 ? torrent->peer_list6 : torrent->peer_list4; 149 peer_list = peer_size == OT_PEER_SIZE6 ? torrent->peer_list6 : torrent->peer_list4;
151 150
152 /* Check for peer in torrent */ 151 /* Check for peer in torrent */
153 peer_dest = vector_find_or_insert_peer( &(peer_list->peers), peer, peer_size, &exactmatch ); 152 peer_dest = vector_find_or_insert_peer(&(peer_list->peers), peer, peer_size, &exactmatch);
154 if( !peer_dest ) { 153 if (!peer_dest) {
155 mutex_bucket_unlock_by_hash( hash, 0 ); 154 mutex_bucket_unlock_by_hash(hash, 0);
156 return -1; 155 return -1;
157 } 156 }
158 /* Tell peer that it's fresh */ 157 /* Tell peer that it's fresh */
159 OT_PEERTIME( peer, peer_size ) = 0; 158 OT_PEERTIME(peer, peer_size) = 0;
160 159
161 /* If we hadn't had a match create peer there */ 160 /* If we hadn't had a match create peer there */
162 if( !exactmatch ) { 161 if (!exactmatch) {
163 peer_list->peer_count++; 162 peer_list->peer_count++;
164 if( OT_PEERFLAG_D(peer, peer_size) & PEER_FLAG_SEEDING ) 163 if (OT_PEERFLAG_D(peer, peer_size) & PEER_FLAG_SEEDING)
165 peer_list->seed_count++; 164 peer_list->seed_count++;
166 } 165 }
167 memcpy( peer_dest, peer, peer_size ); 166 memcpy(peer_dest, peer, peer_size);
168 mutex_bucket_unlock_by_hash( hash, 0 ); 167 mutex_bucket_unlock_by_hash(hash, 0);
169 return 0; 168 return 0;
170} 169}
171 170
172size_t remove_peer_from_torrent_proxy( ot_hash hash, ot_peer *peer, size_t peer_size ) { 171size_t remove_peer_from_torrent_proxy(ot_hash hash, ot_peer *peer, size_t peer_size) {
173 int exactmatch; 172 int exactmatch;
174 ot_vector *torrents_list = mutex_bucket_lock_by_hash( hash ); 173 ot_vector *torrents_list = mutex_bucket_lock_by_hash(hash);
175 ot_torrent *torrent = binary_search( hash, torrents_list->data, torrents_list->size, sizeof( ot_torrent ), OT_HASH_COMPARE_SIZE, &exactmatch ); 174 ot_torrent *torrent = binary_search(hash, torrents_list->data, torrents_list->size, sizeof(ot_torrent), OT_HASH_COMPARE_SIZE, &exactmatch);
176 175
177 if( exactmatch ) { 176 if (exactmatch) {
178 ot_peerlist *peer_list = peer_list = peer_size == OT_PEER_SIZE6 ? torrent->peer_list6 : torrent->peer_list4; 177 ot_peerlist *peer_list = peer_list = peer_size == OT_PEER_SIZE6 ? torrent->peer_list6 : torrent->peer_list4;
179 switch( vector_remove_peer( &peer_list->peers, peer, peer_size ) ) { 178 switch (vector_remove_peer(&peer_list->peers, peer, peer_size)) {
180 case 2: peer_list->seed_count--; /* Intentional fallthrough */ 179 case 2:
181 case 1: peer_list->peer_count--; /* Intentional fallthrough */ 180 peer_list->seed_count--; /* Intentional fallthrough */
182 default: break; 181 case 1:
182 peer_list->peer_count--; /* Intentional fallthrough */
183 default:
184 break;
183 } 185 }
184 } 186 }
185 187
186 mutex_bucket_unlock_by_hash( hash, 0 ); 188 mutex_bucket_unlock_by_hash(hash, 0);
187 return 0; 189 return 0;
188} 190}
189 191
190void free_peerlist( ot_peerlist *peer_list ) { 192void free_peerlist(ot_peerlist *peer_list) {
191 if( peer_list->peers.data ) { 193 if (peer_list->peers.data) {
192 if( OT_PEERLIST_HASBUCKETS( peer_list ) ) { 194 if (OT_PEERLIST_HASBUCKETS(peer_list)) {
193 ot_vector *bucket_list = (ot_vector*)(peer_list->peers.data); 195 ot_vector *bucket_list = (ot_vector *)(peer_list->peers.data);
194 196
195 while( peer_list->peers.size-- ) 197 while (peer_list->peers.size--)
196 free( bucket_list++->data ); 198 free(bucket_list++->data);
197 } 199 }
198 free( peer_list->peers.data ); 200 free(peer_list->peers.data);
199 } 201 }
200 free( peer_list ); 202 free(peer_list);
201} 203}
202 204
203static void livesync_handle_peersync( ssize_t datalen, size_t peer_size ) { 205static void livesync_handle_peersync(ssize_t datalen, size_t peer_size) {
204 int off = sizeof( g_tracker_id ) + sizeof( uint32_t ); 206 int off = sizeof(g_tracker_id) + sizeof(uint32_t);
205 207
206 fprintf( stderr, "." ); 208 fprintf(stderr, ".");
207 209
208 while( (ssize_t)(off + sizeof( ot_hash ) + peer_size) <= datalen ) { 210 while ((ssize_t)(off + sizeof(ot_hash) + peer_size) <= datalen) {
209 ot_peer *peer = (ot_peer*)(g_inbuffer + off + sizeof(ot_hash)); 211 ot_peer *peer = (ot_peer *)(g_inbuffer + off + sizeof(ot_hash));
210 ot_hash *hash = (ot_hash*)(g_inbuffer + off); 212 ot_hash *hash = (ot_hash *)(g_inbuffer + off);
211 213
212 if( OT_PEERFLAG_D(peer, peer_size) & PEER_FLAG_STOPPED ) 214 if (OT_PEERFLAG_D(peer, peer_size) & PEER_FLAG_STOPPED)
213 remove_peer_from_torrent_proxy( *hash, peer, peer_size ); 215 remove_peer_from_torrent_proxy(*hash, peer, peer_size);
214 else 216 else
215 add_peer_to_torrent_proxy( *hash, peer, peer_size ); 217 add_peer_to_torrent_proxy(*hash, peer, peer_size);
216 218
217 off += sizeof( ot_hash ) + peer_size; 219 off += sizeof(ot_hash) + peer_size;
218 } 220 }
219} 221}
220 222
221int usage( char *self ) { 223int usage(char *self) {
222 fprintf( stderr, "Usage: %s -L <livesync_iface_ip> -l <listenip>:<listenport> -c <connectip>:<connectport>\n", self ); 224 fprintf(stderr, "Usage: %s -L <livesync_iface_ip> -l <listenip>:<listenport> -c <connectip>:<connectport>\n", self);
223 return 0; 225 return 0;
224} 226}
225 227
@@ -234,115 +236,115 @@ enum {
234 FLAG_MASK = 0x07 236 FLAG_MASK = 0x07
235}; 237};
236 238
237#define PROXYPEER_NEEDSCONNECT(flag) ((flag)==FLAG_OUTGOING) 239#define PROXYPEER_NEEDSCONNECT(flag) ((flag) == FLAG_OUTGOING)
238#define PROXYPEER_ISCONNECTED(flag) (((flag)&FLAG_MASK)==FLAG_CONNECTED) 240#define PROXYPEER_ISCONNECTED(flag) (((flag) & FLAG_MASK) == FLAG_CONNECTED)
239#define PROXYPEER_SETDISCONNECTED(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_DISCONNECTED) 241#define PROXYPEER_SETDISCONNECTED(flag) (flag) = (((flag) & FLAG_OUTGOING) | FLAG_DISCONNECTED)
240#define PROXYPEER_SETCONNECTING(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_CONNECTING) 242#define PROXYPEER_SETCONNECTING(flag) (flag) = (((flag) & FLAG_OUTGOING) | FLAG_CONNECTING)
241#define PROXYPEER_SETWAITTRACKERID(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_WAITTRACKERID) 243#define PROXYPEER_SETWAITTRACKERID(flag) (flag) = (((flag) & FLAG_OUTGOING) | FLAG_WAITTRACKERID)
242#define PROXYPEER_SETCONNECTED(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_CONNECTED) 244#define PROXYPEER_SETCONNECTED(flag) (flag) = (((flag) & FLAG_OUTGOING) | FLAG_CONNECTED)
243 245
244typedef struct { 246typedef struct {
245 int state; /* Whether we want to connect, how far our handshake is, etc. */ 247 int state; /* Whether we want to connect, how far our handshake is, etc. */
246 ot_ip6 ip; /* The peer to connect to */ 248 ot_ip6 ip; /* The peer to connect to */
247 uint16_t port; /* The peers port */ 249 uint16_t port; /* The peers port */
248 uint8_t indata[8192*16]; /* Any data not processed yet */ 250 uint8_t indata[8192 * 16]; /* Any data not processed yet */
249 size_t indata_length; /* Length of unprocessed data */ 251 size_t indata_length; /* Length of unprocessed data */
250 uint32_t tracker_id; /* How the other end greeted */ 252 uint32_t tracker_id; /* How the other end greeted */
251 int64 fd; /* A file handle, if connected, <= 0 is disconnected (0 initially, -1 else) */ 253 int64 fd; /* A file handle, if connected, <= 0 is disconnected (0 initially, -1 else) */
252 io_batch outdata; /* The iobatch containing our sync data */ 254 io_batch outdata; /* The iobatch containing our sync data */
253 255
254 size_t packet_tcount; /* Number of unprocessed torrents in packet we currently receive */ 256 size_t packet_tcount; /* Number of unprocessed torrents in packet we currently receive */
255 uint8_t packet_tprefix; /* Prefix byte for all torrents in current packet */ 257 uint8_t packet_tprefix; /* Prefix byte for all torrents in current packet */
256 uint8_t packet_type; /* Type of current packet */ 258 uint8_t packet_type; /* Type of current packet */
257 uint32_t packet_tid; /* Tracker id for current packet */ 259 uint32_t packet_tid; /* Tracker id for current packet */
258 260
259} proxy_peer; 261} proxy_peer;
260static void process_indata( proxy_peer * peer ); 262static void process_indata(proxy_peer *peer);
261 263
262void reset_info_block( proxy_peer * peer ) { 264void reset_info_block(proxy_peer *peer) {
263 peer->indata_length = 0; 265 peer->indata_length = 0;
264 peer->tracker_id = 0; 266 peer->tracker_id = 0;
265 peer->fd = -1; 267 peer->fd = -1;
266 peer->packet_tcount = 0; 268 peer->packet_tcount = 0;
267 iob_reset( &peer->outdata ); 269 iob_reset(&peer->outdata);
268 PROXYPEER_SETDISCONNECTED( peer->state ); 270 PROXYPEER_SETDISCONNECTED(peer->state);
269} 271}
270 272
271/* Number of connections to peers 273/* Number of connections to peers
272 * If a peer's IP is set, we try to reconnect, when the connection drops 274 * If a peer's IP is set, we try to reconnect, when the connection drops
273 * If we already have a connected tracker_id in our records for an _incoming_ connection, drop it 275 * If we already have a connected tracker_id in our records for an _incoming_ connection, drop it
274 * Multiple connections to/from the same ip are okay, if tracker_id doesn't match 276 * Multiple connections to/from the same ip are okay, if tracker_id doesn't match
275 * Reconnect attempts occur only twice a minute 277 * Reconnect attempts occur only twice a minute
276*/ 278 */
277static int g_connection_count; 279static int g_connection_count;
278static ot_time g_connection_reconn; 280static ot_time g_connection_reconn;
279static proxy_peer g_connections[MAX_PEERS]; 281static proxy_peer g_connections[MAX_PEERS];
280 282
281static void handle_reconnects( void ) { 283static void handle_reconnects(void) {
282 int i; 284 int i;
283 for( i=0; i<g_connection_count; ++i ) 285 for (i = 0; i < g_connection_count; ++i)
284 if( PROXYPEER_NEEDSCONNECT( g_connections[i].state ) ) { 286 if (PROXYPEER_NEEDSCONNECT(g_connections[i].state)) {
285 int64 newfd = socket_tcp6( ); 287 int64 newfd = socket_tcp6();
286 fprintf( stderr, "(Re)connecting to peer..." ); 288 fprintf(stderr, "(Re)connecting to peer...");
287 if( newfd < 0 ) continue; /* No socket for you */ 289 if (newfd < 0)
290 continue; /* No socket for you */
288 io_fd(newfd); 291 io_fd(newfd);
289 if( socket_bind6_reuse(newfd,g_serverip,g_serverport,0) ) { 292 if (socket_bind6_reuse(newfd, g_serverip, g_serverport, 0)) {
290 io_close( newfd ); 293 io_close(newfd);
291 continue; 294 continue;
292 } 295 }
293 if( socket_connect6(newfd,g_connections[i].ip,g_connections[i].port,0) == -1 && 296 if (socket_connect6(newfd, g_connections[i].ip, g_connections[i].port, 0) == -1 && errno != EINPROGRESS && errno != EWOULDBLOCK) {
294 errno != EINPROGRESS && errno != EWOULDBLOCK ) {
295 close(newfd); 297 close(newfd);
296 continue; 298 continue;
297 } 299 }
298 io_wantwrite(newfd); /* So we will be informed when it is connected */ 300 io_wantwrite(newfd); /* So we will be informed when it is connected */
299 io_setcookie(newfd,g_connections+i); 301 io_setcookie(newfd, g_connections + i);
300 302
301 /* Prepare connection info block */ 303 /* Prepare connection info block */
302 reset_info_block( g_connections+i ); 304 reset_info_block(g_connections + i);
303 g_connections[i].fd = newfd; 305 g_connections[i].fd = newfd;
304 PROXYPEER_SETCONNECTING( g_connections[i].state ); 306 PROXYPEER_SETCONNECTING(g_connections[i].state);
305 } 307 }
306 g_connection_reconn = time(NULL) + 30; 308 g_connection_reconn = time(NULL) + 30;
307} 309}
308 310
309/* Handle incoming connection requests, check against whitelist */ 311/* Handle incoming connection requests, check against whitelist */
310static void handle_accept( int64 serversocket ) { 312static void handle_accept(int64 serversocket) {
311 int64 newfd; 313 int64 newfd;
312 ot_ip6 ip; 314 ot_ip6 ip;
313 uint16 port; 315 uint16 port;
314 316
315 while( ( newfd = socket_accept6( serversocket, ip, &port, NULL ) ) != -1 ) { 317 while ((newfd = socket_accept6(serversocket, ip, &port, NULL)) != -1) {
316 318
317 /* XXX some access control */ 319 /* XXX some access control */
318 320
319 /* Put fd into a non-blocking mode */ 321 /* Put fd into a non-blocking mode */
320 io_nonblock( newfd ); 322 io_nonblock(newfd);
321 323
322 if( !io_fd( newfd ) ) 324 if (!io_fd(newfd))
323 io_close( newfd ); 325 io_close(newfd);
324 else { 326 else {
325 /* Find a new home for our incoming connection */ 327 /* Find a new home for our incoming connection */
326 int i; 328 int i;
327 for( i=0; i<MAX_PEERS; ++i ) 329 for (i = 0; i < MAX_PEERS; ++i)
328 if( g_connections[i].state == FLAG_DISCONNECTED ) 330 if (g_connections[i].state == FLAG_DISCONNECTED)
329 break; 331 break;
330 if( i == MAX_PEERS ) { 332 if (i == MAX_PEERS) {
331 fprintf( stderr, "No room for incoming connection." ); 333 fprintf(stderr, "No room for incoming connection.");
332 close( newfd ); 334 close(newfd);
333 continue; 335 continue;
334 } 336 }
335 337
336 /* Prepare connection info block */ 338 /* Prepare connection info block */
337 reset_info_block( g_connections+i ); 339 reset_info_block(g_connections + i);
338 PROXYPEER_SETCONNECTING( g_connections[i].state ); 340 PROXYPEER_SETCONNECTING(g_connections[i].state);
339 g_connections[i].port = port; 341 g_connections[i].port = port;
340 g_connections[i].fd = newfd; 342 g_connections[i].fd = newfd;
341 343
342 io_setcookie( newfd, g_connections + i ); 344 io_setcookie(newfd, g_connections + i);
343 345
344 /* We expect the connecting side to begin with its tracker_id */ 346 /* We expect the connecting side to begin with its tracker_id */
345 io_wantread( newfd ); 347 io_wantread(newfd);
346 } 348 }
347 } 349 }
348 350
@@ -350,117 +352,116 @@ static void handle_accept( int64 serversocket ) {
350} 352}
351 353
352/* New sync data on the stream */ 354/* New sync data on the stream */
353static void handle_read( int64 peersocket ) { 355static void handle_read(int64 peersocket) {
354 int i; 356 int i;
355 int64 datalen; 357 int64 datalen;
356 uint32_t tracker_id; 358 uint32_t tracker_id;
357 proxy_peer *peer = io_getcookie( peersocket ); 359 proxy_peer *peer = io_getcookie(peersocket);
358 360
359 if( !peer ) { 361 if (!peer) {
360 /* Can't happen ;) */ 362 /* Can't happen ;) */
361 io_close( peersocket ); 363 io_close(peersocket);
362 return; 364 return;
363 } 365 }
364 switch( peer->state & FLAG_MASK ) { 366 switch (peer->state & FLAG_MASK) {
365 case FLAG_DISCONNECTED: 367 case FLAG_DISCONNECTED:
366 io_close( peersocket ); 368 io_close(peersocket);
367 break; /* Shouldnt happen */ 369 break; /* Shouldnt happen */
368 case FLAG_CONNECTING: 370 case FLAG_CONNECTING:
369 case FLAG_WAITTRACKERID: 371 case FLAG_WAITTRACKERID:
370 /* We want at least the first four bytes to come at once, to avoid keeping extra states (for now) 372 /* We want at least the first four bytes to come at once, to avoid keeping extra states (for now)
371 This also catches 0 bytes reads == EOF and negative values, denoting connection errors */ 373 This also catches 0 bytes reads == EOF and negative values, denoting connection errors */
372 if( io_tryread( peersocket, (void*)&tracker_id, sizeof( tracker_id ) ) != sizeof( tracker_id ) ) 374 if (io_tryread(peersocket, (void *)&tracker_id, sizeof(tracker_id)) != sizeof(tracker_id))
373 goto close_socket; 375 goto close_socket;
374 376
375 /* See, if we already have a connection to that peer */ 377 /* See, if we already have a connection to that peer */
376 for( i=0; i<MAX_PEERS; ++i ) 378 for (i = 0; i < MAX_PEERS; ++i)
377 if( ( g_connections[i].state & FLAG_MASK ) == FLAG_CONNECTED && 379 if ((g_connections[i].state & FLAG_MASK) == FLAG_CONNECTED && g_connections[i].tracker_id == tracker_id) {
378 g_connections[i].tracker_id == tracker_id ) { 380 fprintf(stderr, "Peer already connected. Closing connection.\n");
379 fprintf( stderr, "Peer already connected. Closing connection.\n" );
380 goto close_socket; 381 goto close_socket;
381 } 382 }
382 383
383 /* Also no need for soliloquy */ 384 /* Also no need for soliloquy */
384 if( tracker_id == g_tracker_id ) 385 if (tracker_id == g_tracker_id)
385 goto close_socket; 386 goto close_socket;
386 387
387 /* The new connection is good, send our tracker_id on incoming connections */ 388 /* The new connection is good, send our tracker_id on incoming connections */
388 if( peer->state == FLAG_CONNECTING ) 389 if (peer->state == FLAG_CONNECTING)
389 if( io_trywrite( peersocket, (void*)&g_tracker_id, sizeof( g_tracker_id ) ) != sizeof( g_tracker_id ) ) 390 if (io_trywrite(peersocket, (void *)&g_tracker_id, sizeof(g_tracker_id)) != sizeof(g_tracker_id))
390 goto close_socket; 391 goto close_socket;
391 392
392 peer->tracker_id = tracker_id; 393 peer->tracker_id = tracker_id;
393 PROXYPEER_SETCONNECTED( peer->state ); 394 PROXYPEER_SETCONNECTED(peer->state);
394 395
395 if( peer->state & FLAG_OUTGOING ) 396 if (peer->state & FLAG_OUTGOING)
396 fprintf( stderr, "succeeded.\n" ); 397 fprintf(stderr, "succeeded.\n");
397 else 398 else
398 fprintf( stderr, "Incoming connection successful.\n" ); 399 fprintf(stderr, "Incoming connection successful.\n");
399 400
400 break; 401 break;
401close_socket: 402 close_socket:
402 fprintf( stderr, "Handshake incomplete, closing socket\n" ); 403 fprintf(stderr, "Handshake incomplete, closing socket\n");
403 io_close( peersocket ); 404 io_close(peersocket);
404 reset_info_block( peer ); 405 reset_info_block(peer);
405 break; 406 break;
406 case FLAG_CONNECTED: 407 case FLAG_CONNECTED:
407 /* Here we acutally expect data from peer 408 /* Here we acutally expect data from peer
408 indata_length should be less than 20+256*7 bytes, for incomplete torrent entries */ 409 indata_length should be less than 20+256*7 bytes, for incomplete torrent entries */
409 datalen = io_tryread( peersocket, (void*)(peer->indata + peer->indata_length), sizeof( peer->indata ) - peer->indata_length ); 410 datalen = io_tryread(peersocket, (void *)(peer->indata + peer->indata_length), sizeof(peer->indata) - peer->indata_length);
410 if( !datalen || datalen < -1 ) { 411 if (!datalen || datalen < -1) {
411 fprintf( stderr, "Connection closed by remote peer.\n" ); 412 fprintf(stderr, "Connection closed by remote peer.\n");
412 io_close( peersocket ); 413 io_close(peersocket);
413 reset_info_block( peer ); 414 reset_info_block(peer);
414 } else if( datalen > 0 ) { 415 } else if (datalen > 0) {
415 peer->indata_length += datalen; 416 peer->indata_length += datalen;
416 process_indata( peer ); 417 process_indata(peer);
417 } 418 }
418 break; 419 break;
419 } 420 }
420} 421}
421 422
422/* Can write new sync data to the stream */ 423/* Can write new sync data to the stream */
423static void handle_write( int64 peersocket ) { 424static void handle_write(int64 peersocket) {
424 proxy_peer *peer = io_getcookie( peersocket ); 425 proxy_peer *peer = io_getcookie(peersocket);
425 426
426 if( !peer ) { 427 if (!peer) {
427 /* Can't happen ;) */ 428 /* Can't happen ;) */
428 io_close( peersocket ); 429 io_close(peersocket);
429 return; 430 return;
430 } 431 }
431 432
432 switch( peer->state & FLAG_MASK ) { 433 switch (peer->state & FLAG_MASK) {
433 case FLAG_DISCONNECTED: 434 case FLAG_DISCONNECTED:
434 default: /* Should not happen */ 435 default: /* Should not happen */
435 io_close( peersocket ); 436 io_close(peersocket);
436 break; 437 break;
437 case FLAG_CONNECTING: 438 case FLAG_CONNECTING:
438 /* Ensure that the connection is established and handle connection error */ 439 /* Ensure that the connection is established and handle connection error */
439 if( peer->state & FLAG_OUTGOING && !socket_connected( peersocket ) ) { 440 if (peer->state & FLAG_OUTGOING && !socket_connected(peersocket)) {
440 fprintf( stderr, "failed\n" ); 441 fprintf(stderr, "failed\n");
441 reset_info_block( peer ); 442 reset_info_block(peer);
442 io_close( peersocket ); 443 io_close(peersocket);
443 break; 444 break;
444 } 445 }
445 446
446 if( io_trywrite( peersocket, (void*)&g_tracker_id, sizeof( g_tracker_id ) ) == sizeof( g_tracker_id ) ) { 447 if (io_trywrite(peersocket, (void *)&g_tracker_id, sizeof(g_tracker_id)) == sizeof(g_tracker_id)) {
447 PROXYPEER_SETWAITTRACKERID( peer->state ); 448 PROXYPEER_SETWAITTRACKERID(peer->state);
448 io_dontwantwrite( peersocket ); 449 io_dontwantwrite(peersocket);
449 io_wantread( peersocket ); 450 io_wantread(peersocket);
450 } else { 451 } else {
451 fprintf( stderr, "Handshake incomplete, closing socket\n" ); 452 fprintf(stderr, "Handshake incomplete, closing socket\n");
452 io_close( peersocket ); 453 io_close(peersocket);
453 reset_info_block( peer ); 454 reset_info_block(peer);
454 } 455 }
455 break; 456 break;
456 case FLAG_CONNECTED: 457 case FLAG_CONNECTED:
457 switch( iob_send( peersocket, &peer->outdata ) ) { 458 switch (iob_send(peersocket, &peer->outdata)) {
458 case 0: /* all data sent */ 459 case 0: /* all data sent */
459 io_dontwantwrite( peersocket ); 460 io_dontwantwrite(peersocket);
460 break; 461 break;
461 case -3: /* an error occured */ 462 case -3: /* an error occured */
462 io_close( peersocket ); 463 io_close(peersocket);
463 reset_info_block( peer ); 464 reset_info_block(peer);
464 break; 465 break;
465 default: /* Normal operation or eagain */ 466 default: /* Normal operation or eagain */
466 break; 467 break;
@@ -475,289 +476,324 @@ static void server_mainloop() {
475 int64 sock; 476 int64 sock;
476 477
477 /* inlined livesync_init() */ 478 /* inlined livesync_init() */
478 memset( g_peerbuffer_start, 0, sizeof( g_peerbuffer_start ) ); 479 memset(g_peerbuffer_start, 0, sizeof(g_peerbuffer_start));
479 g_peerbuffer_pos = g_peerbuffer_start; 480 g_peerbuffer_pos = g_peerbuffer_start;
480 memcpy( g_peerbuffer_pos, &g_tracker_id, sizeof( g_tracker_id ) ); 481 memcpy(g_peerbuffer_pos, &g_tracker_id, sizeof(g_tracker_id));
481 uint32_pack_big( (char*)g_peerbuffer_pos + sizeof( g_tracker_id ), OT_SYNC_PEER); 482 uint32_pack_big((char *)g_peerbuffer_pos + sizeof(g_tracker_id), OT_SYNC_PEER);
482 g_peerbuffer_pos += sizeof( g_tracker_id ) + sizeof( uint32_t); 483 g_peerbuffer_pos += sizeof(g_tracker_id) + sizeof(uint32_t);
483 g_next_packet_time = time(NULL) + LIVESYNC_MAXDELAY; 484 g_next_packet_time = time(NULL) + LIVESYNC_MAXDELAY;
484 485
485 while(1) { 486 while (1) {
486 /* See if we need to connect to anyone */ 487 /* See if we need to connect to anyone */
487 if( time(NULL) > g_connection_reconn ) 488 if (time(NULL) > g_connection_reconn)
488 handle_reconnects( ); 489 handle_reconnects();
489 490
490 /* Wait for io events until next approx reconn check time */ 491 /* Wait for io events until next approx reconn check time */
491 io_waituntil2( 30*1000 ); 492 io_waituntil2(30 * 1000);
492 493
493 /* Loop over readable sockets */ 494 /* Loop over readable sockets */
494 while( ( sock = io_canread( ) ) != -1 ) { 495 while ((sock = io_canread()) != -1) {
495 const void *cookie = io_getcookie( sock ); 496 const void *cookie = io_getcookie(sock);
496 if( (uintptr_t)cookie == FLAG_SERVERSOCKET ) 497 if ((uintptr_t)cookie == FLAG_SERVERSOCKET)
497 handle_accept( sock ); 498 handle_accept(sock);
498 else 499 else
499 handle_read( sock ); 500 handle_read(sock);
500 } 501 }
501 502
502 /* Loop over writable sockets */ 503 /* Loop over writable sockets */
503 while( ( sock = io_canwrite( ) ) != -1 ) 504 while ((sock = io_canwrite()) != -1)
504 handle_write( sock ); 505 handle_write(sock);
505 506
506 livesync_ticker( ); 507 livesync_ticker();
507 } 508 }
508} 509}
509 510
510static void panic( const char *routine ) { 511static void panic(const char *routine) {
511 fprintf( stderr, "%s: %s\n", routine, strerror(errno) ); 512 fprintf(stderr, "%s: %s\n", routine, strerror(errno));
512 exit( 111 ); 513 exit(111);
513} 514}
514 515
515static int64_t ot_try_bind( ot_ip6 ip, uint16_t port ) { 516static int64_t ot_try_bind(ot_ip6 ip, uint16_t port) {
516 int64 sock = socket_tcp6( ); 517 int64 sock = socket_tcp6();
517 518
518 if( socket_bind6_reuse( sock, ip, port, 0 ) == -1 ) 519 if (socket_bind6_reuse(sock, ip, port, 0) == -1)
519 panic( "socket_bind6_reuse" ); 520 panic("socket_bind6_reuse");
520 521
521 if( socket_listen( sock, SOMAXCONN) == -1 ) 522 if (socket_listen(sock, SOMAXCONN) == -1)
522 panic( "socket_listen" ); 523 panic("socket_listen");
523 524
524 if( !io_fd( sock ) ) 525 if (!io_fd(sock))
525 panic( "io_fd" ); 526 panic("io_fd");
526 527
527 io_setcookie( sock, (void*)FLAG_SERVERSOCKET ); 528 io_setcookie(sock, (void *)FLAG_SERVERSOCKET);
528 io_wantread( sock ); 529 io_wantread(sock);
529 return sock; 530 return sock;
530} 531}
531 532
532 533static int scan_ip6_port(const char *src, ot_ip6 ip, uint16 *port) {
533static int scan_ip6_port( const char *src, ot_ip6 ip, uint16 *port ) {
534 const char *s = src; 534 const char *s = src;
535 int off, bracket = 0; 535 int off, bracket = 0;
536 while( isspace(*s) ) ++s; 536 while (isspace(*s))
537 if( *s == '[' ) ++s, ++bracket; /* for v6 style notation */ 537 ++s;
538 if( !(off = scan_ip6( s, ip ) ) ) 538 if (*s == '[')
539 ++s, ++bracket; /* for v6 style notation */
540 if (!(off = scan_ip6(s, ip)))
539 return 0; 541 return 0;
540 s += off; 542 s += off;
541 if( *s == 0 || isspace(*s)) return s-src; 543 if (*s == 0 || isspace(*s))
542 if( *s == ']' && bracket ) ++s; 544 return s - src;
543 if( !ip6_isv4mapped(ip)){ 545 if (*s == ']' && bracket)
544 if( ( bracket && *(s) != ':' ) || ( *(s) != '.' ) ) return 0; 546 ++s;
547 if (!ip6_isv4mapped(ip)) {
548 if ((bracket && *(s) != ':') || (*(s) != '.'))
549 return 0;
545 s++; 550 s++;
546 } else { 551 } else {
547 if( *(s++) != ':' ) return 0; 552 if (*(s++) != ':')
553 return 0;
548 } 554 }
549 if( !(off = scan_ushort (s, port ) ) ) 555 if (!(off = scan_ushort(s, port)))
550 return 0; 556 return 0;
551 return off+s-src; 557 return off + s - src;
552} 558}
553 559
554int main( int argc, char **argv ) { 560int main(int argc, char **argv) {
555 static pthread_t sync_in_thread_id; 561 static pthread_t sync_in_thread_id;
556 static pthread_t sync_out_thread_id; 562 static pthread_t sync_out_thread_id;
557 ot_ip6 serverip; 563 ot_ip6 serverip;
558 uint16_t tmpport; 564 uint16_t tmpport;
559 int scanon = 1, lbound = 0, sbound = 0; 565 int scanon = 1, lbound = 0, sbound = 0;
560 566
561 srandom( time(NULL) ); 567 srandom(time(NULL));
562#ifdef WANT_ARC4RANDOM 568#ifdef WANT_ARC4RANDOM
563 g_tracker_id = arc4random(); 569 g_tracker_id = arc4random();
564#else 570#else
565 g_tracker_id = random(); 571 g_tracker_id = random();
566#endif 572#endif
567 573
568 while( scanon ) { 574 while (scanon) {
569 switch( getopt( argc, argv, ":l:c:L:h" ) ) { 575 switch (getopt(argc, argv, ":l:c:L:h")) {
570 case -1: scanon = 0; break; 576 case -1:
577 scanon = 0;
578 break;
571 case 'l': 579 case 'l':
572 tmpport = 0; 580 tmpport = 0;
573 if( !scan_ip6_port( optarg, serverip, &tmpport ) || !tmpport ) { usage( argv[0] ); exit( 1 ); } 581 if (!scan_ip6_port(optarg, serverip, &tmpport) || !tmpport) {
574 ot_try_bind( serverip, tmpport ); 582 usage(argv[0]);
583 exit(1);
584 }
585 ot_try_bind(serverip, tmpport);
575 ++sbound; 586 ++sbound;
576 break; 587 break;
577 case 'c': 588 case 'c':
578 if( g_connection_count > MAX_PEERS / 2 ) exerr( "Connection limit exceeded.\n" ); 589 if (g_connection_count > MAX_PEERS / 2)
590 exerr("Connection limit exceeded.\n");
579 tmpport = 0; 591 tmpport = 0;
580 if( !scan_ip6_port( optarg, 592 if (!scan_ip6_port(optarg, g_connections[g_connection_count].ip, &g_connections[g_connection_count].port) || !g_connections[g_connection_count].port) {
581 g_connections[g_connection_count].ip, 593 usage(argv[0]);
582 &g_connections[g_connection_count].port ) || 594 exit(1);
583 !g_connections[g_connection_count].port ) { usage( argv[0] ); exit( 1 ); } 595 }
584 g_connections[g_connection_count++].state = FLAG_OUTGOING; 596 g_connections[g_connection_count++].state = FLAG_OUTGOING;
585 break; 597 break;
586 case 'L': 598 case 'L':
587 tmpport = 9696; 599 tmpport = 9696;
588 if( !scan_ip6_port( optarg, serverip, &tmpport ) || !tmpport ) { usage( argv[0] ); exit( 1 ); } 600 if (!scan_ip6_port(optarg, serverip, &tmpport) || !tmpport) {
589 livesync_bind_mcast( serverip, tmpport); ++lbound; break; 601 usage(argv[0]);
602 exit(1);
603 }
604 livesync_bind_mcast(serverip, tmpport);
605 ++lbound;
606 break;
590 default: 607 default:
591 case '?': usage( argv[0] ); exit( 1 ); 608 case '?':
609 usage(argv[0]);
610 exit(1);
592 } 611 }
593 } 612 }
594 613
595 if( !lbound ) exerr( "No livesync port bound." ); 614 if (!lbound)
596 if( !g_connection_count && !sbound ) exerr( "No streamsync port bound." ); 615 exerr("No livesync port bound.");
597 pthread_create( &sync_in_thread_id, NULL, livesync_worker, NULL ); 616 if (!g_connection_count && !sbound)
598 pthread_create( &sync_out_thread_id, NULL, streamsync_worker, NULL ); 617 exerr("No streamsync port bound.");
618 pthread_create(&sync_in_thread_id, NULL, livesync_worker, NULL);
619 pthread_create(&sync_out_thread_id, NULL, streamsync_worker, NULL);
599 620
600 server_mainloop(); 621 server_mainloop();
601 return 0; 622 return 0;
602} 623}
603 624
604static void * streamsync_worker( void * args ) { 625static void *streamsync_worker(void *args) {
605 (void)args; 626 (void)args;
606 while( 1 ) { 627 while (1) {
607 int bucket; 628 int bucket;
608 /* For each bucket... */ 629 /* For each bucket... */
609 for( bucket=0; bucket<OT_BUCKET_COUNT; ++bucket ) { 630 for (bucket = 0; bucket < OT_BUCKET_COUNT; ++bucket) {
610 /* Get exclusive access to that bucket */ 631 /* Get exclusive access to that bucket */
611 ot_vector *torrents_list = mutex_bucket_lock( bucket ); 632 ot_vector *torrents_list = mutex_bucket_lock(bucket);
612 size_t tor_offset, count_def = 0, count_one = 0, count_two = 0, count_peers = 0; 633 size_t tor_offset, count_def = 0, count_one = 0, count_two = 0, count_peers = 0;
613 size_t mem, mem_a = 0, mem_b = 0; 634 size_t mem, mem_a = 0, mem_b = 0;
614 uint8_t *ptr = 0, *ptr_a, *ptr_b, *ptr_c; 635 uint8_t *ptr = 0, *ptr_a, *ptr_b, *ptr_c;
615 636
616 if( !torrents_list->size ) goto unlock_continue; 637 if (!torrents_list->size)
638 goto unlock_continue;
617 639
618 /* For each torrent in this bucket.. */ 640 /* For each torrent in this bucket.. */
619 for( tor_offset=0; tor_offset<torrents_list->size; ++tor_offset ) { 641 for (tor_offset = 0; tor_offset < torrents_list->size; ++tor_offset) {
620 /* Address torrents members */ 642 /* Address torrents members */
621 ot_peerlist *peer_list = ( ((ot_torrent*)(torrents_list->data))[tor_offset] ).peer_list; 643 ot_peerlist *peer_list = (((ot_torrent *)(torrents_list->data))[tor_offset]).peer_list;
622 switch( peer_list->peer_count ) { 644 switch (peer_list->peer_count) {
623 case 2: count_two++; break; 645 case 2:
624 case 1: count_one++; break; 646 count_two++;
625 case 0: break; 647 break;
626 default: count_def++; 648 case 1:
627 count_peers += peer_list->peer_count; 649 count_one++;
650 break;
651 case 0:
652 break;
653 default:
654 count_def++;
655 count_peers += peer_list->peer_count;
628 } 656 }
629 } 657 }
630 658
631 /* Maximal memory requirement: max 3 blocks, max torrents * 20 + max peers * 7 */ 659 /* Maximal memory requirement: max 3 blocks, max torrents * 20 + max peers * 7 */
632 mem = 3 * ( 1 + 1 + 2 ) + ( count_one + count_two ) * ( 19 + 1 ) + count_def * ( 19 + 8 ) + 660 mem = 3 * (1 + 1 + 2) + (count_one + count_two) * (19 + 1) + count_def * (19 + 8) + (count_one + 2 * count_two + count_peers) * 7;
633 ( count_one + 2 * count_two + count_peers ) * 7; 661
634 662 fprintf(stderr, "Mem: %zd\n", mem);
635 fprintf( stderr, "Mem: %zd\n", mem ); 663
636 664 ptr = ptr_a = ptr_b = ptr_c = malloc(mem);
637 ptr = ptr_a = ptr_b = ptr_c = malloc( mem ); 665 if (!ptr)
638 if( !ptr ) goto unlock_continue; 666 goto unlock_continue;
639 667
640 if( count_one > 4 || !count_def ) { 668 if (count_one > 4 || !count_def) {
641 mem_a = 1 + 1 + 2 + count_one * ( 19 + 7 ); 669 mem_a = 1 + 1 + 2 + count_one * (19 + 7);
642 ptr_b += mem_a; ptr_c += mem_a; 670 ptr_b += mem_a;
643 ptr_a[0] = 1; /* Offset 0: packet type 1 */ 671 ptr_c += mem_a;
644 ptr_a[1] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */ 672 ptr_a[0] = 1; /* Offset 0: packet type 1 */
645 ptr_a[2] = count_one >> 8; 673 ptr_a[1] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */
646 ptr_a[3] = count_one & 255; 674 ptr_a[2] = count_one >> 8;
647 ptr_a += 4; 675 ptr_a[3] = count_one & 255;
676 ptr_a += 4;
648 } else 677 } else
649 count_def += count_one; 678 count_def += count_one;
650 679
651 if( count_two > 4 || !count_def ) { 680 if (count_two > 4 || !count_def) {
652 mem_b = 1 + 1 + 2 + count_two * ( 19 + 14 ); 681 mem_b = 1 + 1 + 2 + count_two * (19 + 14);
653 ptr_c += mem_b; 682 ptr_c += mem_b;
654 ptr_b[0] = 2; /* Offset 0: packet type 2 */ 683 ptr_b[0] = 2; /* Offset 0: packet type 2 */
655 ptr_b[1] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */ 684 ptr_b[1] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */
656 ptr_b[2] = count_two >> 8; 685 ptr_b[2] = count_two >> 8;
657 ptr_b[3] = count_two & 255; 686 ptr_b[3] = count_two & 255;
658 ptr_b += 4; 687 ptr_b += 4;
659 } else 688 } else
660 count_def += count_two; 689 count_def += count_two;
661 690
662 if( count_def ) { 691 if (count_def) {
663 ptr_c[0] = 0; /* Offset 0: packet type 0 */ 692 ptr_c[0] = 0; /* Offset 0: packet type 0 */
664 ptr_c[1] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */ 693 ptr_c[1] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */
665 ptr_c[2] = count_def >> 8; 694 ptr_c[2] = count_def >> 8;
666 ptr_c[3] = count_def & 255; 695 ptr_c[3] = count_def & 255;
667 ptr_c += 4; 696 ptr_c += 4;
668 } 697 }
669 698
670 /* For each torrent in this bucket.. */ 699 /* For each torrent in this bucket.. */
671 for( tor_offset=0; tor_offset<torrents_list->size; ++tor_offset ) { 700 for (tor_offset = 0; tor_offset < torrents_list->size; ++tor_offset) {
672 /* Address torrents members */ 701 /* Address torrents members */
673 ot_torrent *torrent = ((ot_torrent*)(torrents_list->data)) + tor_offset; 702 ot_torrent *torrent = ((ot_torrent *)(torrents_list->data)) + tor_offset;
674 ot_peerlist *peer_list = torrent->peer_list; 703 ot_peerlist *peer_list = torrent->peer_list;
675 ot_peer *peers = (ot_peer*)(peer_list->peers.data); 704 ot_peer *peers = (ot_peer *)(peer_list->peers.data);
676 uint8_t **dst; 705 uint8_t **dst;
677 706
678 /* Determine destination slot */ 707 /* Determine destination slot */
679 count_peers = peer_list->peer_count; 708 count_peers = peer_list->peer_count;
680 switch( count_peers ) { 709 switch (count_peers) {
681 case 0: continue; 710 case 0:
682 case 1: dst = mem_a ? &ptr_a : &ptr_c; break; 711 continue;
683 case 2: dst = mem_b ? &ptr_b : &ptr_c; break; 712 case 1:
684 default: dst = &ptr_c; break; 713 dst = mem_a ? &ptr_a : &ptr_c;
714 break;
715 case 2:
716 dst = mem_b ? &ptr_b : &ptr_c;
717 break;
718 default:
719 dst = &ptr_c;
720 break;
685 } 721 }
686 722
687 /* Copy tail of info_hash, advance pointer */ 723 /* Copy tail of info_hash, advance pointer */
688 memcpy( *dst, ((uint8_t*)torrent->hash) + 1, sizeof( ot_hash ) - 1); 724 memcpy(*dst, ((uint8_t *)torrent->hash) + 1, sizeof(ot_hash) - 1);
689 *dst += sizeof( ot_hash ) - 1; 725 *dst += sizeof(ot_hash) - 1;
690 726
691 /* Encode peer count */ 727 /* Encode peer count */
692 if( dst == &ptr_c ) 728 if (dst == &ptr_c)
693 while( count_peers ) { 729 while (count_peers) {
694 if( count_peers <= 0x7f ) 730 if (count_peers <= 0x7f)
695 *(*dst)++ = count_peers; 731 *(*dst)++ = count_peers;
696 else 732 else
697 *(*dst)++ = 0x80 | ( count_peers & 0x7f ); 733 *(*dst)++ = 0x80 | (count_peers & 0x7f);
698 count_peers >>= 7; 734 count_peers >>= 7;
699 } 735 }
700 736
701 /* Copy peers */ 737 /* Copy peers */
702 count_peers = peer_list->peer_count; 738 count_peers = peer_list->peer_count;
703 while( count_peers-- ) { 739 while (count_peers--) {
704 memcpy( *dst, peers++, OT_IP_SIZE + 3 ); 740 memcpy(*dst, peers++, OT_IP_SIZE + 3);
705 *dst += OT_IP_SIZE + 3; 741 *dst += OT_IP_SIZE + 3;
706 } 742 }
707 free_peerlist(peer_list); 743 free_peerlist(peer_list);
708 } 744 }
709 745
710 free( torrents_list->data ); 746 free(torrents_list->data);
711 memset( torrents_list, 0, sizeof(*torrents_list ) ); 747 memset(torrents_list, 0, sizeof(*torrents_list));
712unlock_continue: 748 unlock_continue:
713 mutex_bucket_unlock( bucket, 0 ); 749 mutex_bucket_unlock(bucket, 0);
714 750
715 if( ptr ) { 751 if (ptr) {
716 int i; 752 int i;
717 753
718 if( ptr_b > ptr_c ) ptr_c = ptr_b; 754 if (ptr_b > ptr_c)
719 if( ptr_a > ptr_c ) ptr_c = ptr_a; 755 ptr_c = ptr_b;
756 if (ptr_a > ptr_c)
757 ptr_c = ptr_a;
720 mem = ptr_c - ptr; 758 mem = ptr_c - ptr;
721 759
722 for( i=0; i < MAX_PEERS; ++i ) { 760 for (i = 0; i < MAX_PEERS; ++i) {
723 if( PROXYPEER_ISCONNECTED(g_connections[i].state) ) { 761 if (PROXYPEER_ISCONNECTED(g_connections[i].state)) {
724 void *tmp = malloc( mem ); 762 void *tmp = malloc(mem);
725 if( tmp ) { 763 if (tmp) {
726 memcpy( tmp, ptr, mem ); 764 memcpy(tmp, ptr, mem);
727 iob_addbuf_free( &g_connections[i].outdata, tmp, mem ); 765 iob_addbuf_free(&g_connections[i].outdata, tmp, mem);
728 io_wantwrite( g_connections[i].fd ); 766 io_wantwrite(g_connections[i].fd);
729 } 767 }
730 } 768 }
731 } 769 }
732 770
733 free( ptr ); 771 free(ptr);
734 } 772 }
735 usleep( OT_SYNC_SLEEP ); 773 usleep(OT_SYNC_SLEEP);
736 } 774 }
737 } 775 }
738 return 0; 776 return 0;
739} 777}
740 778
741static void livesync_issue_peersync( ) { 779static void livesync_issue_peersync() {
742 socket_send4(g_socket_out, (char*)g_peerbuffer_start, g_peerbuffer_pos - g_peerbuffer_start, 780 socket_send4(g_socket_out, (char *)g_peerbuffer_start, g_peerbuffer_pos - g_peerbuffer_start, groupip_1, LIVESYNC_PORT);
743 groupip_1, LIVESYNC_PORT); 781 g_peerbuffer_pos = g_peerbuffer_start + sizeof(g_tracker_id) + sizeof(uint32_t);
744 g_peerbuffer_pos = g_peerbuffer_start + sizeof( g_tracker_id ) + sizeof( uint32_t );
745 g_next_packet_time = time(NULL) + LIVESYNC_MAXDELAY; 782 g_next_packet_time = time(NULL) + LIVESYNC_MAXDELAY;
746} 783}
747 784
748void livesync_ticker( ) { 785void livesync_ticker() {
749 /* livesync_issue_peersync sets g_next_packet_time */ 786 /* livesync_issue_peersync sets g_next_packet_time */
750 if( time(NULL) > g_next_packet_time && 787 if (time(NULL) > g_next_packet_time && g_peerbuffer_pos > g_peerbuffer_start + sizeof(g_tracker_id))
751 g_peerbuffer_pos > g_peerbuffer_start + sizeof( g_tracker_id ) )
752 livesync_issue_peersync(); 788 livesync_issue_peersync();
753} 789}
754 790
755static void livesync_proxytell( uint8_t prefix, uint8_t *info_hash, uint8_t *peer ) { 791static void livesync_proxytell(uint8_t prefix, uint8_t *info_hash, uint8_t *peer) {
756// unsigned int i; 792 // unsigned int i;
757 793
758 *g_peerbuffer_pos = prefix; 794 *g_peerbuffer_pos = prefix;
759 memcpy( g_peerbuffer_pos + 1, info_hash, sizeof(ot_hash) - 1 ); 795 memcpy(g_peerbuffer_pos + 1, info_hash, sizeof(ot_hash) - 1);
760 memcpy( g_peerbuffer_pos + sizeof(ot_hash), peer, sizeof(ot_peer) - 1 ); 796 memcpy(g_peerbuffer_pos + sizeof(ot_hash), peer, sizeof(ot_peer) - 1);
761 797
762#if 0 798#if 0
763 /* Dump info_hash */ 799 /* Dump info_hash */
@@ -772,80 +808,84 @@ static void livesync_proxytell( uint8_t prefix, uint8_t *info_hash, uint8_t *pee
772#endif 808#endif
773 g_peerbuffer_pos += sizeof(ot_peer); 809 g_peerbuffer_pos += sizeof(ot_peer);
774 810
775 if( g_peerbuffer_pos >= g_peerbuffer_highwater ) 811 if (g_peerbuffer_pos >= g_peerbuffer_highwater)
776 livesync_issue_peersync(); 812 livesync_issue_peersync();
777} 813}
778 814
779static void process_indata( proxy_peer * peer ) { 815static void process_indata(proxy_peer *peer) {
780 size_t consumed, peers; 816 size_t consumed, peers;
781 uint8_t *data = peer->indata, *hash; 817 uint8_t *data = peer->indata, *hash;
782 uint8_t *dataend = data + peer->indata_length; 818 uint8_t *dataend = data + peer->indata_length;
783 819
784 while( 1 ) { 820 while (1) {
785 /* If we're not inside of a packet, make a new one */ 821 /* If we're not inside of a packet, make a new one */
786 if( !peer->packet_tcount ) { 822 if (!peer->packet_tcount) {
787 /* Ensure the header is complete or postpone processing */ 823 /* Ensure the header is complete or postpone processing */
788 if( data + 4 > dataend ) break; 824 if (data + 4 > dataend)
789 peer->packet_type = data[0]; 825 break;
790 peer->packet_tprefix = data[1]; 826 peer->packet_type = data[0];
791 peer->packet_tcount = data[2] * 256 + data[3]; 827 peer->packet_tprefix = data[1];
792 data += 4; 828 peer->packet_tcount = data[2] * 256 + data[3];
793printf( "type: %hhu, prefix: %02X, torrentcount: %zd\n", peer->packet_type, peer->packet_tprefix, peer->packet_tcount ); 829 data += 4;
830 printf("type: %hhu, prefix: %02X, torrentcount: %zd\n", peer->packet_type, peer->packet_tprefix, peer->packet_tcount);
794 } 831 }
795 832
796 /* Ensure size for a minimal torrent block */ 833 /* Ensure size for a minimal torrent block */
797 if( data + sizeof(ot_hash) + OT_IP_SIZE + 3 > dataend ) break; 834 if (data + sizeof(ot_hash) + OT_IP_SIZE + 3 > dataend)
835 break;
798 836
799 /* Advance pointer to peer count or peers */ 837 /* Advance pointer to peer count or peers */
800 hash = data; 838 hash = data;
801 data += sizeof(ot_hash) - 1; 839 data += sizeof(ot_hash) - 1;
802 840
803 /* Type 0 has peer count encoded before each peers */ 841 /* Type 0 has peer count encoded before each peers */
804 peers = peer->packet_type; 842 peers = peer->packet_type;
805 if( !peers ) { 843 if (!peers) {
806 int shift = 0; 844 int shift = 0;
807 do peers |= ( 0x7f & *data ) << ( 7 * shift ); 845 do
808 while ( *(data++) & 0x80 && shift++ < 6 ); 846 peers |= (0x7f & *data) << (7 * shift);
847 while (*(data++) & 0x80 && shift++ < 6);
809 } 848 }
810#if 0 849#if 0
811printf( "peers: %zd\n", peers ); 850printf( "peers: %zd\n", peers );
812#endif 851#endif
813 /* Ensure enough data being read to hold all peers */ 852 /* Ensure enough data being read to hold all peers */
814 if( data + (OT_IP_SIZE + 3) * peers > dataend ) { 853 if (data + (OT_IP_SIZE + 3) * peers > dataend) {
815 data = hash; 854 data = hash;
816 break; 855 break;
817 } 856 }
818 while( peers-- ) { 857 while (peers--) {
819 livesync_proxytell( peer->packet_tprefix, hash, data ); 858 livesync_proxytell(peer->packet_tprefix, hash, data);
820 data += OT_IP_SIZE + 3; 859 data += OT_IP_SIZE + 3;
821 } 860 }
822 --peer->packet_tcount; 861 --peer->packet_tcount;
823 } 862 }
824 863
825 consumed = data - peer->indata; 864 consumed = data - peer->indata;
826 memmove( peer->indata, data, peer->indata_length - consumed ); 865 memmove(peer->indata, data, peer->indata_length - consumed);
827 peer->indata_length -= consumed; 866 peer->indata_length -= consumed;
828} 867}
829 868
830static void * livesync_worker( void * args ) { 869static void *livesync_worker(void *args) {
831 (void)args; 870 (void)args;
832 while( 1 ) { 871 while (1) {
833 ot_ip6 in_ip; uint16_t in_port; 872 ot_ip6 in_ip;
834 size_t datalen = socket_recv4(g_socket_in, (char*)g_inbuffer, LIVESYNC_INCOMING_BUFFSIZE, 12+(char*)in_ip, &in_port); 873 uint16_t in_port;
874 size_t datalen = socket_recv4(g_socket_in, (char *)g_inbuffer, LIVESYNC_INCOMING_BUFFSIZE, 12 + (char *)in_ip, &in_port);
835 875
836 /* Expect at least tracker id and packet type */ 876 /* Expect at least tracker id and packet type */
837 if( datalen <= (ssize_t)(sizeof( g_tracker_id ) + sizeof( uint32_t )) ) 877 if (datalen <= (ssize_t)(sizeof(g_tracker_id) + sizeof(uint32_t)))
838 continue; 878 continue;
839 if( !memcmp( g_inbuffer, &g_tracker_id, sizeof( g_tracker_id ) ) ) { 879 if (!memcmp(g_inbuffer, &g_tracker_id, sizeof(g_tracker_id))) {
840 /* drop packet coming from ourselves */ 880 /* drop packet coming from ourselves */
841 continue; 881 continue;
842 } 882 }
843 switch( uint32_read_big( (char*)g_inbuffer + sizeof( g_tracker_id ) ) ) { 883 switch (uint32_read_big((char *)g_inbuffer + sizeof(g_tracker_id))) {
844 case OT_SYNC_PEER4: 884 case OT_SYNC_PEER4:
845 livesync_handle_peersync( datalen, OT_PEER_SIZE4 ); 885 livesync_handle_peersync(datalen, OT_PEER_SIZE4);
846 break; 886 break;
847 case OT_SYNC_PEER6: 887 case OT_SYNC_PEER6:
848 livesync_handle_peersync( datalen, OT_PEER_SIZE6 ); 888 livesync_handle_peersync(datalen, OT_PEER_SIZE6);
849 break; 889 break;
850 default: 890 default:
851 // fprintf( stderr, "Received an unknown live sync packet type %u.\n", uint32_read_big( sizeof( g_tracker_id ) + (char*)g_inbuffer ) ); 891 // fprintf( stderr, "Received an unknown live sync packet type %u.\n", uint32_read_big( sizeof( g_tracker_id ) + (char*)g_inbuffer ) );