diff options
-rw-r--r-- | proxy.c | 828 |
1 files changed, 434 insertions, 394 deletions
@@ -4,33 +4,33 @@ | |||
4 | $Id$ */ | 4 | $Id$ */ |
5 | 5 | ||
6 | /* System */ | 6 | /* System */ |
7 | #include <arpa/inet.h> | ||
8 | #include <ctype.h> | ||
9 | #include <errno.h> | ||
10 | #include <pthread.h> | ||
11 | #include <pwd.h> | ||
12 | #include <signal.h> | ||
7 | #include <stdint.h> | 13 | #include <stdint.h> |
14 | #include <stdio.h> | ||
8 | #include <stdlib.h> | 15 | #include <stdlib.h> |
9 | #include <string.h> | 16 | #include <string.h> |
10 | #include <arpa/inet.h> | ||
11 | #include <sys/socket.h> | 17 | #include <sys/socket.h> |
12 | #include <unistd.h> | 18 | #include <unistd.h> |
13 | #include <errno.h> | ||
14 | #include <signal.h> | ||
15 | #include <stdio.h> | ||
16 | #include <pwd.h> | ||
17 | #include <ctype.h> | ||
18 | #include <pthread.h> | ||
19 | 19 | ||
20 | /* Libowfat */ | 20 | /* Libowfat */ |
21 | #include "socket.h" | 21 | #include "byte.h" |
22 | #include "io.h" | 22 | #include "io.h" |
23 | #include "iob.h" | 23 | #include "iob.h" |
24 | #include "byte.h" | ||
25 | #include "scan.h" | ||
26 | #include "ip6.h" | 24 | #include "ip6.h" |
27 | #include "ndelay.h" | 25 | #include "ndelay.h" |
26 | #include "scan.h" | ||
27 | #include "socket.h" | ||
28 | 28 | ||
29 | /* Opentracker */ | 29 | /* Opentracker */ |
30 | #include "trackerlogic.h" | ||
31 | #include "ot_vector.h" | ||
32 | #include "ot_mutex.h" | 30 | #include "ot_mutex.h" |
33 | #include "ot_stats.h" | 31 | #include "ot_stats.h" |
32 | #include "ot_vector.h" | ||
33 | #include "trackerlogic.h" | ||
34 | 34 | ||
35 | #ifndef WANT_SYNC_LIVE | 35 | #ifndef WANT_SYNC_LIVE |
36 | #define WANT_SYNC_LIVE | 36 | #define WANT_SYNC_LIVE |
@@ -40,26 +40,26 @@ | |||
40 | ot_ip6 g_serverip; | 40 | ot_ip6 g_serverip; |
41 | uint16_t g_serverport = 9009; | 41 | uint16_t g_serverport = 9009; |
42 | uint32_t g_tracker_id; | 42 | uint32_t g_tracker_id; |
43 | char groupip_1[4] = { 224,0,23,5 }; | 43 | char groupip_1[4] = {224, 0, 23, 5}; |
44 | int g_self_pipe[2]; | 44 | int g_self_pipe[2]; |
45 | 45 | ||
46 | /* If you have more than 10 peers, don't use this proxy | 46 | /* If you have more than 10 peers, don't use this proxy |
47 | Use 20 slots for 10 peers to have room for 10 incoming connection slots | 47 | Use 20 slots for 10 peers to have room for 10 incoming connection slots |
48 | */ | 48 | */ |
49 | #define MAX_PEERS 20 | 49 | #define MAX_PEERS 20 |
50 | 50 | ||
51 | #define LIVESYNC_INCOMING_BUFFSIZE (256*256) | 51 | #define LIVESYNC_INCOMING_BUFFSIZE (256 * 256) |
52 | #define STREAMSYNC_OUTGOING_BUFFSIZE (256*256) | 52 | #define STREAMSYNC_OUTGOING_BUFFSIZE (256 * 256) |
53 | 53 | ||
54 | #define LIVESYNC_OUTGOING_BUFFSIZE_PEERS 1480 | 54 | #define LIVESYNC_OUTGOING_BUFFSIZE_PEERS 1480 |
55 | #define LIVESYNC_OUTGOING_WATERMARK_PEERS (sizeof(ot_peer)+sizeof(ot_hash)) | 55 | #define LIVESYNC_OUTGOING_WATERMARK_PEERS (sizeof(ot_peer) + sizeof(ot_hash)) |
56 | #define LIVESYNC_MAXDELAY 15 /* seconds */ | 56 | #define LIVESYNC_MAXDELAY 15 /* seconds */ |
57 | 57 | ||
58 | /* The amount of time a complete sync cycle should take */ | 58 | /* The amount of time a complete sync cycle should take */ |
59 | #define OT_SYNC_INTERVAL_MINUTES 2 | 59 | #define OT_SYNC_INTERVAL_MINUTES 2 |
60 | 60 | ||
61 | /* So after each bucket wait 1 / OT_BUCKET_COUNT intervals */ | 61 | /* So after each bucket wait 1 / OT_BUCKET_COUNT intervals */ |
62 | #define OT_SYNC_SLEEP ( ( ( OT_SYNC_INTERVAL_MINUTES ) * 60 * 1000000 ) / ( OT_BUCKET_COUNT ) ) | 62 | #define OT_SYNC_SLEEP (((OT_SYNC_INTERVAL_MINUTES) * 60 * 1000000) / (OT_BUCKET_COUNT)) |
63 | 63 | ||
64 | enum { OT_SYNC_PEER4, OT_SYNC_PEER6 }; | 64 | enum { OT_SYNC_PEER4, OT_SYNC_PEER6 }; |
65 | enum { FLAG_SERVERSOCKET = 1 }; | 65 | enum { FLAG_SERVERSOCKET = 1 }; |
@@ -75,151 +75,153 @@ static uint8_t *g_peerbuffer_pos; | |||
75 | static uint8_t *g_peerbuffer_highwater = g_peerbuffer_start + LIVESYNC_OUTGOING_BUFFSIZE_PEERS - LIVESYNC_OUTGOING_WATERMARK_PEERS; | 75 | static uint8_t *g_peerbuffer_highwater = g_peerbuffer_start + LIVESYNC_OUTGOING_BUFFSIZE_PEERS - LIVESYNC_OUTGOING_WATERMARK_PEERS; |
76 | static ot_time g_next_packet_time; | 76 | static ot_time g_next_packet_time; |
77 | 77 | ||
78 | static void * livesync_worker( void * args ); | 78 | static void *livesync_worker(void *args); |
79 | static void * streamsync_worker( void * args ); | 79 | static void *streamsync_worker(void *args); |
80 | static void livesync_proxytell( uint8_t prefix, uint8_t *info_hash, uint8_t *peer ); | 80 | static void livesync_proxytell(uint8_t prefix, uint8_t *info_hash, uint8_t *peer); |
81 | 81 | ||
82 | void exerr( char * message ) { | 82 | void exerr(char *message) { |
83 | fprintf( stderr, "%s\n", message ); | 83 | fprintf(stderr, "%s\n", message); |
84 | exit( 111 ); | 84 | exit(111); |
85 | } | 85 | } |
86 | 86 | ||
87 | void stats_issue_event( ot_status_event event, PROTO_FLAG proto, uintptr_t event_data ) { | 87 | void stats_issue_event(ot_status_event event, PROTO_FLAG proto, uintptr_t event_data) { |
88 | (void) event; | 88 | (void)event; |
89 | (void) proto; | 89 | (void)proto; |
90 | (void) event_data; | 90 | (void)event_data; |
91 | } | 91 | } |
92 | 92 | ||
93 | void livesync_bind_mcast( ot_ip6 ip, uint16_t port) { | 93 | void livesync_bind_mcast(ot_ip6 ip, uint16_t port) { |
94 | char tmpip[4] = {0,0,0,0}; | 94 | char tmpip[4] = {0, 0, 0, 0}; |
95 | char *v4ip; | 95 | char *v4ip; |
96 | 96 | ||
97 | if( !ip6_isv4mapped(ip)) | 97 | if (!ip6_isv4mapped(ip)) |
98 | exerr("v6 mcast support not yet available."); | 98 | exerr("v6 mcast support not yet available."); |
99 | v4ip = ip+12; | 99 | v4ip = ip + 12; |
100 | 100 | ||
101 | if( g_socket_in != -1 ) | 101 | if (g_socket_in != -1) |
102 | exerr("Error: Livesync listen ip specified twice."); | 102 | exerr("Error: Livesync listen ip specified twice."); |
103 | 103 | ||
104 | if( ( g_socket_in = socket_udp4( )) < 0) | 104 | if ((g_socket_in = socket_udp4()) < 0) |
105 | exerr("Error: Cant create live sync incoming socket." ); | 105 | exerr("Error: Cant create live sync incoming socket."); |
106 | ndelay_off(g_socket_in); | 106 | ndelay_off(g_socket_in); |
107 | 107 | ||
108 | if( socket_bind4_reuse( g_socket_in, tmpip, port ) == -1 ) | 108 | if (socket_bind4_reuse(g_socket_in, tmpip, port) == -1) |
109 | exerr("Error: Cant bind live sync incoming socket." ); | 109 | exerr("Error: Cant bind live sync incoming socket."); |
110 | 110 | ||
111 | if( socket_mcjoin4( g_socket_in, groupip_1, v4ip ) ) | 111 | if (socket_mcjoin4(g_socket_in, groupip_1, v4ip)) |
112 | exerr("Error: Cant make live sync incoming socket join mcast group."); | 112 | exerr("Error: Cant make live sync incoming socket join mcast group."); |
113 | 113 | ||
114 | if( ( g_socket_out = socket_udp4()) < 0) | 114 | if ((g_socket_out = socket_udp4()) < 0) |
115 | exerr("Error: Cant create live sync outgoing socket." ); | 115 | exerr("Error: Cant create live sync outgoing socket."); |
116 | if( socket_bind4_reuse( g_socket_out, v4ip, port ) == -1 ) | 116 | if (socket_bind4_reuse(g_socket_out, v4ip, port) == -1) |
117 | exerr("Error: Cant bind live sync outgoing socket." ); | 117 | exerr("Error: Cant bind live sync outgoing socket."); |
118 | 118 | ||
119 | socket_mcttl4(g_socket_out, 1); | 119 | socket_mcttl4(g_socket_out, 1); |
120 | socket_mcloop4(g_socket_out, 1); | 120 | socket_mcloop4(g_socket_out, 1); |
121 | } | 121 | } |
122 | 122 | ||
123 | size_t add_peer_to_torrent_proxy( ot_hash hash, ot_peer *peer, size_t peer_size ) { | 123 | size_t add_peer_to_torrent_proxy(ot_hash hash, ot_peer *peer, size_t peer_size) { |
124 | int exactmatch; | 124 | int exactmatch; |
125 | ot_torrent *torrent; | 125 | ot_torrent *torrent; |
126 | ot_peerlist *peer_list; | 126 | ot_peerlist *peer_list; |
127 | ot_peer *peer_dest; | 127 | ot_peer *peer_dest; |
128 | ot_vector *torrents_list = mutex_bucket_lock_by_hash( hash ); | 128 | ot_vector *torrents_list = mutex_bucket_lock_by_hash(hash); |
129 | size_t compare_size = OT_PEER_COMPARE_SIZE_FROM_PEER_SIZE(peer_size); | 129 | size_t compare_size = OT_PEER_COMPARE_SIZE_FROM_PEER_SIZE(peer_size); |
130 | 130 | ||
131 | torrent = vector_find_or_insert( torrents_list, (void*)hash, sizeof( ot_torrent ), compare_size, &exactmatch ); | 131 | torrent = vector_find_or_insert(torrents_list, (void *)hash, sizeof(ot_torrent), compare_size, &exactmatch); |
132 | if( !torrent ) | 132 | if (!torrent) |
133 | return -1; | 133 | return -1; |
134 | 134 | ||
135 | if( !exactmatch ) { | 135 | if (!exactmatch) { |
136 | /* Create a new torrent entry, then */ | 136 | /* Create a new torrent entry, then */ |
137 | memcpy( torrent->hash, hash, sizeof(ot_hash) ); | 137 | memcpy(torrent->hash, hash, sizeof(ot_hash)); |
138 | 138 | ||
139 | if( !( torrent->peer_list6 = malloc( sizeof (ot_peerlist) ) ) || | 139 | if (!(torrent->peer_list6 = malloc(sizeof(ot_peerlist))) || !(torrent->peer_list4 = malloc(sizeof(ot_peerlist)))) { |
140 | !( torrent->peer_list4 = malloc( sizeof (ot_peerlist) ) ) ) { | 140 | vector_remove_torrent(torrents_list, torrent); |
141 | vector_remove_torrent( torrents_list, torrent ); | 141 | mutex_bucket_unlock_by_hash(hash, 0); |
142 | mutex_bucket_unlock_by_hash( hash, 0 ); | ||
143 | return -1; | 142 | return -1; |
144 | } | 143 | } |
145 | 144 | ||
146 | byte_zero( torrent->peer_list6, sizeof( ot_peerlist ) ); | 145 | byte_zero(torrent->peer_list6, sizeof(ot_peerlist)); |
147 | byte_zero( torrent->peer_list4, sizeof( ot_peerlist ) ); | 146 | byte_zero(torrent->peer_list4, sizeof(ot_peerlist)); |
148 | } | 147 | } |
149 | 148 | ||
150 | peer_list = peer_size == OT_PEER_SIZE6 ? torrent->peer_list6 : torrent->peer_list4; | 149 | peer_list = peer_size == OT_PEER_SIZE6 ? torrent->peer_list6 : torrent->peer_list4; |
151 | 150 | ||
152 | /* Check for peer in torrent */ | 151 | /* Check for peer in torrent */ |
153 | peer_dest = vector_find_or_insert_peer( &(peer_list->peers), peer, peer_size, &exactmatch ); | 152 | peer_dest = vector_find_or_insert_peer(&(peer_list->peers), peer, peer_size, &exactmatch); |
154 | if( !peer_dest ) { | 153 | if (!peer_dest) { |
155 | mutex_bucket_unlock_by_hash( hash, 0 ); | 154 | mutex_bucket_unlock_by_hash(hash, 0); |
156 | return -1; | 155 | return -1; |
157 | } | 156 | } |
158 | /* Tell peer that it's fresh */ | 157 | /* Tell peer that it's fresh */ |
159 | OT_PEERTIME( peer, peer_size ) = 0; | 158 | OT_PEERTIME(peer, peer_size) = 0; |
160 | 159 | ||
161 | /* If we hadn't had a match create peer there */ | 160 | /* If we hadn't had a match create peer there */ |
162 | if( !exactmatch ) { | 161 | if (!exactmatch) { |
163 | peer_list->peer_count++; | 162 | peer_list->peer_count++; |
164 | if( OT_PEERFLAG_D(peer, peer_size) & PEER_FLAG_SEEDING ) | 163 | if (OT_PEERFLAG_D(peer, peer_size) & PEER_FLAG_SEEDING) |
165 | peer_list->seed_count++; | 164 | peer_list->seed_count++; |
166 | } | 165 | } |
167 | memcpy( peer_dest, peer, peer_size ); | 166 | memcpy(peer_dest, peer, peer_size); |
168 | mutex_bucket_unlock_by_hash( hash, 0 ); | 167 | mutex_bucket_unlock_by_hash(hash, 0); |
169 | return 0; | 168 | return 0; |
170 | } | 169 | } |
171 | 170 | ||
172 | size_t remove_peer_from_torrent_proxy( ot_hash hash, ot_peer *peer, size_t peer_size ) { | 171 | size_t remove_peer_from_torrent_proxy(ot_hash hash, ot_peer *peer, size_t peer_size) { |
173 | int exactmatch; | 172 | int exactmatch; |
174 | ot_vector *torrents_list = mutex_bucket_lock_by_hash( hash ); | 173 | ot_vector *torrents_list = mutex_bucket_lock_by_hash(hash); |
175 | ot_torrent *torrent = binary_search( hash, torrents_list->data, torrents_list->size, sizeof( ot_torrent ), OT_HASH_COMPARE_SIZE, &exactmatch ); | 174 | ot_torrent *torrent = binary_search(hash, torrents_list->data, torrents_list->size, sizeof(ot_torrent), OT_HASH_COMPARE_SIZE, &exactmatch); |
176 | 175 | ||
177 | if( exactmatch ) { | 176 | if (exactmatch) { |
178 | ot_peerlist *peer_list = peer_list = peer_size == OT_PEER_SIZE6 ? torrent->peer_list6 : torrent->peer_list4; | 177 | ot_peerlist *peer_list = peer_list = peer_size == OT_PEER_SIZE6 ? torrent->peer_list6 : torrent->peer_list4; |
179 | switch( vector_remove_peer( &peer_list->peers, peer, peer_size ) ) { | 178 | switch (vector_remove_peer(&peer_list->peers, peer, peer_size)) { |
180 | case 2: peer_list->seed_count--; /* Intentional fallthrough */ | 179 | case 2: |
181 | case 1: peer_list->peer_count--; /* Intentional fallthrough */ | 180 | peer_list->seed_count--; /* Intentional fallthrough */ |
182 | default: break; | 181 | case 1: |
182 | peer_list->peer_count--; /* Intentional fallthrough */ | ||
183 | default: | ||
184 | break; | ||
183 | } | 185 | } |
184 | } | 186 | } |
185 | 187 | ||
186 | mutex_bucket_unlock_by_hash( hash, 0 ); | 188 | mutex_bucket_unlock_by_hash(hash, 0); |
187 | return 0; | 189 | return 0; |
188 | } | 190 | } |
189 | 191 | ||
190 | void free_peerlist( ot_peerlist *peer_list ) { | 192 | void free_peerlist(ot_peerlist *peer_list) { |
191 | if( peer_list->peers.data ) { | 193 | if (peer_list->peers.data) { |
192 | if( OT_PEERLIST_HASBUCKETS( peer_list ) ) { | 194 | if (OT_PEERLIST_HASBUCKETS(peer_list)) { |
193 | ot_vector *bucket_list = (ot_vector*)(peer_list->peers.data); | 195 | ot_vector *bucket_list = (ot_vector *)(peer_list->peers.data); |
194 | 196 | ||
195 | while( peer_list->peers.size-- ) | 197 | while (peer_list->peers.size--) |
196 | free( bucket_list++->data ); | 198 | free(bucket_list++->data); |
197 | } | 199 | } |
198 | free( peer_list->peers.data ); | 200 | free(peer_list->peers.data); |
199 | } | 201 | } |
200 | free( peer_list ); | 202 | free(peer_list); |
201 | } | 203 | } |
202 | 204 | ||
203 | static void livesync_handle_peersync( ssize_t datalen, size_t peer_size ) { | 205 | static void livesync_handle_peersync(ssize_t datalen, size_t peer_size) { |
204 | int off = sizeof( g_tracker_id ) + sizeof( uint32_t ); | 206 | int off = sizeof(g_tracker_id) + sizeof(uint32_t); |
205 | 207 | ||
206 | fprintf( stderr, "." ); | 208 | fprintf(stderr, "."); |
207 | 209 | ||
208 | while( (ssize_t)(off + sizeof( ot_hash ) + peer_size) <= datalen ) { | 210 | while ((ssize_t)(off + sizeof(ot_hash) + peer_size) <= datalen) { |
209 | ot_peer *peer = (ot_peer*)(g_inbuffer + off + sizeof(ot_hash)); | 211 | ot_peer *peer = (ot_peer *)(g_inbuffer + off + sizeof(ot_hash)); |
210 | ot_hash *hash = (ot_hash*)(g_inbuffer + off); | 212 | ot_hash *hash = (ot_hash *)(g_inbuffer + off); |
211 | 213 | ||
212 | if( OT_PEERFLAG_D(peer, peer_size) & PEER_FLAG_STOPPED ) | 214 | if (OT_PEERFLAG_D(peer, peer_size) & PEER_FLAG_STOPPED) |
213 | remove_peer_from_torrent_proxy( *hash, peer, peer_size ); | 215 | remove_peer_from_torrent_proxy(*hash, peer, peer_size); |
214 | else | 216 | else |
215 | add_peer_to_torrent_proxy( *hash, peer, peer_size ); | 217 | add_peer_to_torrent_proxy(*hash, peer, peer_size); |
216 | 218 | ||
217 | off += sizeof( ot_hash ) + peer_size; | 219 | off += sizeof(ot_hash) + peer_size; |
218 | } | 220 | } |
219 | } | 221 | } |
220 | 222 | ||
221 | int usage( char *self ) { | 223 | int usage(char *self) { |
222 | fprintf( stderr, "Usage: %s -L <livesync_iface_ip> -l <listenip>:<listenport> -c <connectip>:<connectport>\n", self ); | 224 | fprintf(stderr, "Usage: %s -L <livesync_iface_ip> -l <listenip>:<listenport> -c <connectip>:<connectport>\n", self); |
223 | return 0; | 225 | return 0; |
224 | } | 226 | } |
225 | 227 | ||
@@ -234,115 +236,115 @@ enum { | |||
234 | FLAG_MASK = 0x07 | 236 | FLAG_MASK = 0x07 |
235 | }; | 237 | }; |
236 | 238 | ||
237 | #define PROXYPEER_NEEDSCONNECT(flag) ((flag)==FLAG_OUTGOING) | 239 | #define PROXYPEER_NEEDSCONNECT(flag) ((flag) == FLAG_OUTGOING) |
238 | #define PROXYPEER_ISCONNECTED(flag) (((flag)&FLAG_MASK)==FLAG_CONNECTED) | 240 | #define PROXYPEER_ISCONNECTED(flag) (((flag) & FLAG_MASK) == FLAG_CONNECTED) |
239 | #define PROXYPEER_SETDISCONNECTED(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_DISCONNECTED) | 241 | #define PROXYPEER_SETDISCONNECTED(flag) (flag) = (((flag) & FLAG_OUTGOING) | FLAG_DISCONNECTED) |
240 | #define PROXYPEER_SETCONNECTING(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_CONNECTING) | 242 | #define PROXYPEER_SETCONNECTING(flag) (flag) = (((flag) & FLAG_OUTGOING) | FLAG_CONNECTING) |
241 | #define PROXYPEER_SETWAITTRACKERID(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_WAITTRACKERID) | 243 | #define PROXYPEER_SETWAITTRACKERID(flag) (flag) = (((flag) & FLAG_OUTGOING) | FLAG_WAITTRACKERID) |
242 | #define PROXYPEER_SETCONNECTED(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_CONNECTED) | 244 | #define PROXYPEER_SETCONNECTED(flag) (flag) = (((flag) & FLAG_OUTGOING) | FLAG_CONNECTED) |
243 | 245 | ||
244 | typedef struct { | 246 | typedef struct { |
245 | int state; /* Whether we want to connect, how far our handshake is, etc. */ | 247 | int state; /* Whether we want to connect, how far our handshake is, etc. */ |
246 | ot_ip6 ip; /* The peer to connect to */ | 248 | ot_ip6 ip; /* The peer to connect to */ |
247 | uint16_t port; /* The peers port */ | 249 | uint16_t port; /* The peers port */ |
248 | uint8_t indata[8192*16]; /* Any data not processed yet */ | 250 | uint8_t indata[8192 * 16]; /* Any data not processed yet */ |
249 | size_t indata_length; /* Length of unprocessed data */ | 251 | size_t indata_length; /* Length of unprocessed data */ |
250 | uint32_t tracker_id; /* How the other end greeted */ | 252 | uint32_t tracker_id; /* How the other end greeted */ |
251 | int64 fd; /* A file handle, if connected, <= 0 is disconnected (0 initially, -1 else) */ | 253 | int64 fd; /* A file handle, if connected, <= 0 is disconnected (0 initially, -1 else) */ |
252 | io_batch outdata; /* The iobatch containing our sync data */ | 254 | io_batch outdata; /* The iobatch containing our sync data */ |
253 | 255 | ||
254 | size_t packet_tcount; /* Number of unprocessed torrents in packet we currently receive */ | 256 | size_t packet_tcount; /* Number of unprocessed torrents in packet we currently receive */ |
255 | uint8_t packet_tprefix; /* Prefix byte for all torrents in current packet */ | 257 | uint8_t packet_tprefix; /* Prefix byte for all torrents in current packet */ |
256 | uint8_t packet_type; /* Type of current packet */ | 258 | uint8_t packet_type; /* Type of current packet */ |
257 | uint32_t packet_tid; /* Tracker id for current packet */ | 259 | uint32_t packet_tid; /* Tracker id for current packet */ |
258 | 260 | ||
259 | } proxy_peer; | 261 | } proxy_peer; |
260 | static void process_indata( proxy_peer * peer ); | 262 | static void process_indata(proxy_peer *peer); |
261 | 263 | ||
262 | void reset_info_block( proxy_peer * peer ) { | 264 | void reset_info_block(proxy_peer *peer) { |
263 | peer->indata_length = 0; | 265 | peer->indata_length = 0; |
264 | peer->tracker_id = 0; | 266 | peer->tracker_id = 0; |
265 | peer->fd = -1; | 267 | peer->fd = -1; |
266 | peer->packet_tcount = 0; | 268 | peer->packet_tcount = 0; |
267 | iob_reset( &peer->outdata ); | 269 | iob_reset(&peer->outdata); |
268 | PROXYPEER_SETDISCONNECTED( peer->state ); | 270 | PROXYPEER_SETDISCONNECTED(peer->state); |
269 | } | 271 | } |
270 | 272 | ||
271 | /* Number of connections to peers | 273 | /* Number of connections to peers |
272 | * If a peer's IP is set, we try to reconnect, when the connection drops | 274 | * If a peer's IP is set, we try to reconnect, when the connection drops |
273 | * If we already have a connected tracker_id in our records for an _incoming_ connection, drop it | 275 | * If we already have a connected tracker_id in our records for an _incoming_ connection, drop it |
274 | * Multiple connections to/from the same ip are okay, if tracker_id doesn't match | 276 | * Multiple connections to/from the same ip are okay, if tracker_id doesn't match |
275 | * Reconnect attempts occur only twice a minute | 277 | * Reconnect attempts occur only twice a minute |
276 | */ | 278 | */ |
277 | static int g_connection_count; | 279 | static int g_connection_count; |
278 | static ot_time g_connection_reconn; | 280 | static ot_time g_connection_reconn; |
279 | static proxy_peer g_connections[MAX_PEERS]; | 281 | static proxy_peer g_connections[MAX_PEERS]; |
280 | 282 | ||
281 | static void handle_reconnects( void ) { | 283 | static void handle_reconnects(void) { |
282 | int i; | 284 | int i; |
283 | for( i=0; i<g_connection_count; ++i ) | 285 | for (i = 0; i < g_connection_count; ++i) |
284 | if( PROXYPEER_NEEDSCONNECT( g_connections[i].state ) ) { | 286 | if (PROXYPEER_NEEDSCONNECT(g_connections[i].state)) { |
285 | int64 newfd = socket_tcp6( ); | 287 | int64 newfd = socket_tcp6(); |
286 | fprintf( stderr, "(Re)connecting to peer..." ); | 288 | fprintf(stderr, "(Re)connecting to peer..."); |
287 | if( newfd < 0 ) continue; /* No socket for you */ | 289 | if (newfd < 0) |
290 | continue; /* No socket for you */ | ||
288 | io_fd(newfd); | 291 | io_fd(newfd); |
289 | if( socket_bind6_reuse(newfd,g_serverip,g_serverport,0) ) { | 292 | if (socket_bind6_reuse(newfd, g_serverip, g_serverport, 0)) { |
290 | io_close( newfd ); | 293 | io_close(newfd); |
291 | continue; | 294 | continue; |
292 | } | 295 | } |
293 | if( socket_connect6(newfd,g_connections[i].ip,g_connections[i].port,0) == -1 && | 296 | if (socket_connect6(newfd, g_connections[i].ip, g_connections[i].port, 0) == -1 && errno != EINPROGRESS && errno != EWOULDBLOCK) { |
294 | errno != EINPROGRESS && errno != EWOULDBLOCK ) { | ||
295 | close(newfd); | 297 | close(newfd); |
296 | continue; | 298 | continue; |
297 | } | 299 | } |
298 | io_wantwrite(newfd); /* So we will be informed when it is connected */ | 300 | io_wantwrite(newfd); /* So we will be informed when it is connected */ |
299 | io_setcookie(newfd,g_connections+i); | 301 | io_setcookie(newfd, g_connections + i); |
300 | 302 | ||
301 | /* Prepare connection info block */ | 303 | /* Prepare connection info block */ |
302 | reset_info_block( g_connections+i ); | 304 | reset_info_block(g_connections + i); |
303 | g_connections[i].fd = newfd; | 305 | g_connections[i].fd = newfd; |
304 | PROXYPEER_SETCONNECTING( g_connections[i].state ); | 306 | PROXYPEER_SETCONNECTING(g_connections[i].state); |
305 | } | 307 | } |
306 | g_connection_reconn = time(NULL) + 30; | 308 | g_connection_reconn = time(NULL) + 30; |
307 | } | 309 | } |
308 | 310 | ||
309 | /* Handle incoming connection requests, check against whitelist */ | 311 | /* Handle incoming connection requests, check against whitelist */ |
310 | static void handle_accept( int64 serversocket ) { | 312 | static void handle_accept(int64 serversocket) { |
311 | int64 newfd; | 313 | int64 newfd; |
312 | ot_ip6 ip; | 314 | ot_ip6 ip; |
313 | uint16 port; | 315 | uint16 port; |
314 | 316 | ||
315 | while( ( newfd = socket_accept6( serversocket, ip, &port, NULL ) ) != -1 ) { | 317 | while ((newfd = socket_accept6(serversocket, ip, &port, NULL)) != -1) { |
316 | 318 | ||
317 | /* XXX some access control */ | 319 | /* XXX some access control */ |
318 | 320 | ||
319 | /* Put fd into a non-blocking mode */ | 321 | /* Put fd into a non-blocking mode */ |
320 | io_nonblock( newfd ); | 322 | io_nonblock(newfd); |
321 | 323 | ||
322 | if( !io_fd( newfd ) ) | 324 | if (!io_fd(newfd)) |
323 | io_close( newfd ); | 325 | io_close(newfd); |
324 | else { | 326 | else { |
325 | /* Find a new home for our incoming connection */ | 327 | /* Find a new home for our incoming connection */ |
326 | int i; | 328 | int i; |
327 | for( i=0; i<MAX_PEERS; ++i ) | 329 | for (i = 0; i < MAX_PEERS; ++i) |
328 | if( g_connections[i].state == FLAG_DISCONNECTED ) | 330 | if (g_connections[i].state == FLAG_DISCONNECTED) |
329 | break; | 331 | break; |
330 | if( i == MAX_PEERS ) { | 332 | if (i == MAX_PEERS) { |
331 | fprintf( stderr, "No room for incoming connection." ); | 333 | fprintf(stderr, "No room for incoming connection."); |
332 | close( newfd ); | 334 | close(newfd); |
333 | continue; | 335 | continue; |
334 | } | 336 | } |
335 | 337 | ||
336 | /* Prepare connection info block */ | 338 | /* Prepare connection info block */ |
337 | reset_info_block( g_connections+i ); | 339 | reset_info_block(g_connections + i); |
338 | PROXYPEER_SETCONNECTING( g_connections[i].state ); | 340 | PROXYPEER_SETCONNECTING(g_connections[i].state); |
339 | g_connections[i].port = port; | 341 | g_connections[i].port = port; |
340 | g_connections[i].fd = newfd; | 342 | g_connections[i].fd = newfd; |
341 | 343 | ||
342 | io_setcookie( newfd, g_connections + i ); | 344 | io_setcookie(newfd, g_connections + i); |
343 | 345 | ||
344 | /* We expect the connecting side to begin with its tracker_id */ | 346 | /* We expect the connecting side to begin with its tracker_id */ |
345 | io_wantread( newfd ); | 347 | io_wantread(newfd); |
346 | } | 348 | } |
347 | } | 349 | } |
348 | 350 | ||
@@ -350,117 +352,116 @@ static void handle_accept( int64 serversocket ) { | |||
350 | } | 352 | } |
351 | 353 | ||
352 | /* New sync data on the stream */ | 354 | /* New sync data on the stream */ |
353 | static void handle_read( int64 peersocket ) { | 355 | static void handle_read(int64 peersocket) { |
354 | int i; | 356 | int i; |
355 | int64 datalen; | 357 | int64 datalen; |
356 | uint32_t tracker_id; | 358 | uint32_t tracker_id; |
357 | proxy_peer *peer = io_getcookie( peersocket ); | 359 | proxy_peer *peer = io_getcookie(peersocket); |
358 | 360 | ||
359 | if( !peer ) { | 361 | if (!peer) { |
360 | /* Can't happen ;) */ | 362 | /* Can't happen ;) */ |
361 | io_close( peersocket ); | 363 | io_close(peersocket); |
362 | return; | 364 | return; |
363 | } | 365 | } |
364 | switch( peer->state & FLAG_MASK ) { | 366 | switch (peer->state & FLAG_MASK) { |
365 | case FLAG_DISCONNECTED: | 367 | case FLAG_DISCONNECTED: |
366 | io_close( peersocket ); | 368 | io_close(peersocket); |
367 | break; /* Shouldnt happen */ | 369 | break; /* Shouldnt happen */ |
368 | case FLAG_CONNECTING: | 370 | case FLAG_CONNECTING: |
369 | case FLAG_WAITTRACKERID: | 371 | case FLAG_WAITTRACKERID: |
370 | /* We want at least the first four bytes to come at once, to avoid keeping extra states (for now) | 372 | /* We want at least the first four bytes to come at once, to avoid keeping extra states (for now) |
371 | This also catches 0 bytes reads == EOF and negative values, denoting connection errors */ | 373 | This also catches 0 bytes reads == EOF and negative values, denoting connection errors */ |
372 | if( io_tryread( peersocket, (void*)&tracker_id, sizeof( tracker_id ) ) != sizeof( tracker_id ) ) | 374 | if (io_tryread(peersocket, (void *)&tracker_id, sizeof(tracker_id)) != sizeof(tracker_id)) |
373 | goto close_socket; | 375 | goto close_socket; |
374 | 376 | ||
375 | /* See, if we already have a connection to that peer */ | 377 | /* See, if we already have a connection to that peer */ |
376 | for( i=0; i<MAX_PEERS; ++i ) | 378 | for (i = 0; i < MAX_PEERS; ++i) |
377 | if( ( g_connections[i].state & FLAG_MASK ) == FLAG_CONNECTED && | 379 | if ((g_connections[i].state & FLAG_MASK) == FLAG_CONNECTED && g_connections[i].tracker_id == tracker_id) { |
378 | g_connections[i].tracker_id == tracker_id ) { | 380 | fprintf(stderr, "Peer already connected. Closing connection.\n"); |
379 | fprintf( stderr, "Peer already connected. Closing connection.\n" ); | ||
380 | goto close_socket; | 381 | goto close_socket; |
381 | } | 382 | } |
382 | 383 | ||
383 | /* Also no need for soliloquy */ | 384 | /* Also no need for soliloquy */ |
384 | if( tracker_id == g_tracker_id ) | 385 | if (tracker_id == g_tracker_id) |
385 | goto close_socket; | 386 | goto close_socket; |
386 | 387 | ||
387 | /* The new connection is good, send our tracker_id on incoming connections */ | 388 | /* The new connection is good, send our tracker_id on incoming connections */ |
388 | if( peer->state == FLAG_CONNECTING ) | 389 | if (peer->state == FLAG_CONNECTING) |
389 | if( io_trywrite( peersocket, (void*)&g_tracker_id, sizeof( g_tracker_id ) ) != sizeof( g_tracker_id ) ) | 390 | if (io_trywrite(peersocket, (void *)&g_tracker_id, sizeof(g_tracker_id)) != sizeof(g_tracker_id)) |
390 | goto close_socket; | 391 | goto close_socket; |
391 | 392 | ||
392 | peer->tracker_id = tracker_id; | 393 | peer->tracker_id = tracker_id; |
393 | PROXYPEER_SETCONNECTED( peer->state ); | 394 | PROXYPEER_SETCONNECTED(peer->state); |
394 | 395 | ||
395 | if( peer->state & FLAG_OUTGOING ) | 396 | if (peer->state & FLAG_OUTGOING) |
396 | fprintf( stderr, "succeeded.\n" ); | 397 | fprintf(stderr, "succeeded.\n"); |
397 | else | 398 | else |
398 | fprintf( stderr, "Incoming connection successful.\n" ); | 399 | fprintf(stderr, "Incoming connection successful.\n"); |
399 | 400 | ||
400 | break; | 401 | break; |
401 | close_socket: | 402 | close_socket: |
402 | fprintf( stderr, "Handshake incomplete, closing socket\n" ); | 403 | fprintf(stderr, "Handshake incomplete, closing socket\n"); |
403 | io_close( peersocket ); | 404 | io_close(peersocket); |
404 | reset_info_block( peer ); | 405 | reset_info_block(peer); |
405 | break; | 406 | break; |
406 | case FLAG_CONNECTED: | 407 | case FLAG_CONNECTED: |
407 | /* Here we acutally expect data from peer | 408 | /* Here we acutally expect data from peer |
408 | indata_length should be less than 20+256*7 bytes, for incomplete torrent entries */ | 409 | indata_length should be less than 20+256*7 bytes, for incomplete torrent entries */ |
409 | datalen = io_tryread( peersocket, (void*)(peer->indata + peer->indata_length), sizeof( peer->indata ) - peer->indata_length ); | 410 | datalen = io_tryread(peersocket, (void *)(peer->indata + peer->indata_length), sizeof(peer->indata) - peer->indata_length); |
410 | if( !datalen || datalen < -1 ) { | 411 | if (!datalen || datalen < -1) { |
411 | fprintf( stderr, "Connection closed by remote peer.\n" ); | 412 | fprintf(stderr, "Connection closed by remote peer.\n"); |
412 | io_close( peersocket ); | 413 | io_close(peersocket); |
413 | reset_info_block( peer ); | 414 | reset_info_block(peer); |
414 | } else if( datalen > 0 ) { | 415 | } else if (datalen > 0) { |
415 | peer->indata_length += datalen; | 416 | peer->indata_length += datalen; |
416 | process_indata( peer ); | 417 | process_indata(peer); |
417 | } | 418 | } |
418 | break; | 419 | break; |
419 | } | 420 | } |
420 | } | 421 | } |
421 | 422 | ||
422 | /* Can write new sync data to the stream */ | 423 | /* Can write new sync data to the stream */ |
423 | static void handle_write( int64 peersocket ) { | 424 | static void handle_write(int64 peersocket) { |
424 | proxy_peer *peer = io_getcookie( peersocket ); | 425 | proxy_peer *peer = io_getcookie(peersocket); |
425 | 426 | ||
426 | if( !peer ) { | 427 | if (!peer) { |
427 | /* Can't happen ;) */ | 428 | /* Can't happen ;) */ |
428 | io_close( peersocket ); | 429 | io_close(peersocket); |
429 | return; | 430 | return; |
430 | } | 431 | } |
431 | 432 | ||
432 | switch( peer->state & FLAG_MASK ) { | 433 | switch (peer->state & FLAG_MASK) { |
433 | case FLAG_DISCONNECTED: | 434 | case FLAG_DISCONNECTED: |
434 | default: /* Should not happen */ | 435 | default: /* Should not happen */ |
435 | io_close( peersocket ); | 436 | io_close(peersocket); |
436 | break; | 437 | break; |
437 | case FLAG_CONNECTING: | 438 | case FLAG_CONNECTING: |
438 | /* Ensure that the connection is established and handle connection error */ | 439 | /* Ensure that the connection is established and handle connection error */ |
439 | if( peer->state & FLAG_OUTGOING && !socket_connected( peersocket ) ) { | 440 | if (peer->state & FLAG_OUTGOING && !socket_connected(peersocket)) { |
440 | fprintf( stderr, "failed\n" ); | 441 | fprintf(stderr, "failed\n"); |
441 | reset_info_block( peer ); | 442 | reset_info_block(peer); |
442 | io_close( peersocket ); | 443 | io_close(peersocket); |
443 | break; | 444 | break; |
444 | } | 445 | } |
445 | 446 | ||
446 | if( io_trywrite( peersocket, (void*)&g_tracker_id, sizeof( g_tracker_id ) ) == sizeof( g_tracker_id ) ) { | 447 | if (io_trywrite(peersocket, (void *)&g_tracker_id, sizeof(g_tracker_id)) == sizeof(g_tracker_id)) { |
447 | PROXYPEER_SETWAITTRACKERID( peer->state ); | 448 | PROXYPEER_SETWAITTRACKERID(peer->state); |
448 | io_dontwantwrite( peersocket ); | 449 | io_dontwantwrite(peersocket); |
449 | io_wantread( peersocket ); | 450 | io_wantread(peersocket); |
450 | } else { | 451 | } else { |
451 | fprintf( stderr, "Handshake incomplete, closing socket\n" ); | 452 | fprintf(stderr, "Handshake incomplete, closing socket\n"); |
452 | io_close( peersocket ); | 453 | io_close(peersocket); |
453 | reset_info_block( peer ); | 454 | reset_info_block(peer); |
454 | } | 455 | } |
455 | break; | 456 | break; |
456 | case FLAG_CONNECTED: | 457 | case FLAG_CONNECTED: |
457 | switch( iob_send( peersocket, &peer->outdata ) ) { | 458 | switch (iob_send(peersocket, &peer->outdata)) { |
458 | case 0: /* all data sent */ | 459 | case 0: /* all data sent */ |
459 | io_dontwantwrite( peersocket ); | 460 | io_dontwantwrite(peersocket); |
460 | break; | 461 | break; |
461 | case -3: /* an error occured */ | 462 | case -3: /* an error occured */ |
462 | io_close( peersocket ); | 463 | io_close(peersocket); |
463 | reset_info_block( peer ); | 464 | reset_info_block(peer); |
464 | break; | 465 | break; |
465 | default: /* Normal operation or eagain */ | 466 | default: /* Normal operation or eagain */ |
466 | break; | 467 | break; |
@@ -475,289 +476,324 @@ static void server_mainloop() { | |||
475 | int64 sock; | 476 | int64 sock; |
476 | 477 | ||
477 | /* inlined livesync_init() */ | 478 | /* inlined livesync_init() */ |
478 | memset( g_peerbuffer_start, 0, sizeof( g_peerbuffer_start ) ); | 479 | memset(g_peerbuffer_start, 0, sizeof(g_peerbuffer_start)); |
479 | g_peerbuffer_pos = g_peerbuffer_start; | 480 | g_peerbuffer_pos = g_peerbuffer_start; |
480 | memcpy( g_peerbuffer_pos, &g_tracker_id, sizeof( g_tracker_id ) ); | 481 | memcpy(g_peerbuffer_pos, &g_tracker_id, sizeof(g_tracker_id)); |
481 | uint32_pack_big( (char*)g_peerbuffer_pos + sizeof( g_tracker_id ), OT_SYNC_PEER); | 482 | uint32_pack_big((char *)g_peerbuffer_pos + sizeof(g_tracker_id), OT_SYNC_PEER); |
482 | g_peerbuffer_pos += sizeof( g_tracker_id ) + sizeof( uint32_t); | 483 | g_peerbuffer_pos += sizeof(g_tracker_id) + sizeof(uint32_t); |
483 | g_next_packet_time = time(NULL) + LIVESYNC_MAXDELAY; | 484 | g_next_packet_time = time(NULL) + LIVESYNC_MAXDELAY; |
484 | 485 | ||
485 | while(1) { | 486 | while (1) { |
486 | /* See if we need to connect to anyone */ | 487 | /* See if we need to connect to anyone */ |
487 | if( time(NULL) > g_connection_reconn ) | 488 | if (time(NULL) > g_connection_reconn) |
488 | handle_reconnects( ); | 489 | handle_reconnects(); |
489 | 490 | ||
490 | /* Wait for io events until next approx reconn check time */ | 491 | /* Wait for io events until next approx reconn check time */ |
491 | io_waituntil2( 30*1000 ); | 492 | io_waituntil2(30 * 1000); |
492 | 493 | ||
493 | /* Loop over readable sockets */ | 494 | /* Loop over readable sockets */ |
494 | while( ( sock = io_canread( ) ) != -1 ) { | 495 | while ((sock = io_canread()) != -1) { |
495 | const void *cookie = io_getcookie( sock ); | 496 | const void *cookie = io_getcookie(sock); |
496 | if( (uintptr_t)cookie == FLAG_SERVERSOCKET ) | 497 | if ((uintptr_t)cookie == FLAG_SERVERSOCKET) |
497 | handle_accept( sock ); | 498 | handle_accept(sock); |
498 | else | 499 | else |
499 | handle_read( sock ); | 500 | handle_read(sock); |
500 | } | 501 | } |
501 | 502 | ||
502 | /* Loop over writable sockets */ | 503 | /* Loop over writable sockets */ |
503 | while( ( sock = io_canwrite( ) ) != -1 ) | 504 | while ((sock = io_canwrite()) != -1) |
504 | handle_write( sock ); | 505 | handle_write(sock); |
505 | 506 | ||
506 | livesync_ticker( ); | 507 | livesync_ticker(); |
507 | } | 508 | } |
508 | } | 509 | } |
509 | 510 | ||
510 | static void panic( const char *routine ) { | 511 | static void panic(const char *routine) { |
511 | fprintf( stderr, "%s: %s\n", routine, strerror(errno) ); | 512 | fprintf(stderr, "%s: %s\n", routine, strerror(errno)); |
512 | exit( 111 ); | 513 | exit(111); |
513 | } | 514 | } |
514 | 515 | ||
515 | static int64_t ot_try_bind( ot_ip6 ip, uint16_t port ) { | 516 | static int64_t ot_try_bind(ot_ip6 ip, uint16_t port) { |
516 | int64 sock = socket_tcp6( ); | 517 | int64 sock = socket_tcp6(); |
517 | 518 | ||
518 | if( socket_bind6_reuse( sock, ip, port, 0 ) == -1 ) | 519 | if (socket_bind6_reuse(sock, ip, port, 0) == -1) |
519 | panic( "socket_bind6_reuse" ); | 520 | panic("socket_bind6_reuse"); |
520 | 521 | ||
521 | if( socket_listen( sock, SOMAXCONN) == -1 ) | 522 | if (socket_listen(sock, SOMAXCONN) == -1) |
522 | panic( "socket_listen" ); | 523 | panic("socket_listen"); |
523 | 524 | ||
524 | if( !io_fd( sock ) ) | 525 | if (!io_fd(sock)) |
525 | panic( "io_fd" ); | 526 | panic("io_fd"); |
526 | 527 | ||
527 | io_setcookie( sock, (void*)FLAG_SERVERSOCKET ); | 528 | io_setcookie(sock, (void *)FLAG_SERVERSOCKET); |
528 | io_wantread( sock ); | 529 | io_wantread(sock); |
529 | return sock; | 530 | return sock; |
530 | } | 531 | } |
531 | 532 | ||
532 | 533 | static int scan_ip6_port(const char *src, ot_ip6 ip, uint16 *port) { | |
533 | static int scan_ip6_port( const char *src, ot_ip6 ip, uint16 *port ) { | ||
534 | const char *s = src; | 534 | const char *s = src; |
535 | int off, bracket = 0; | 535 | int off, bracket = 0; |
536 | while( isspace(*s) ) ++s; | 536 | while (isspace(*s)) |
537 | if( *s == '[' ) ++s, ++bracket; /* for v6 style notation */ | 537 | ++s; |
538 | if( !(off = scan_ip6( s, ip ) ) ) | 538 | if (*s == '[') |
539 | ++s, ++bracket; /* for v6 style notation */ | ||
540 | if (!(off = scan_ip6(s, ip))) | ||
539 | return 0; | 541 | return 0; |
540 | s += off; | 542 | s += off; |
541 | if( *s == 0 || isspace(*s)) return s-src; | 543 | if (*s == 0 || isspace(*s)) |
542 | if( *s == ']' && bracket ) ++s; | 544 | return s - src; |
543 | if( !ip6_isv4mapped(ip)){ | 545 | if (*s == ']' && bracket) |
544 | if( ( bracket && *(s) != ':' ) || ( *(s) != '.' ) ) return 0; | 546 | ++s; |
547 | if (!ip6_isv4mapped(ip)) { | ||
548 | if ((bracket && *(s) != ':') || (*(s) != '.')) | ||
549 | return 0; | ||
545 | s++; | 550 | s++; |
546 | } else { | 551 | } else { |
547 | if( *(s++) != ':' ) return 0; | 552 | if (*(s++) != ':') |
553 | return 0; | ||
548 | } | 554 | } |
549 | if( !(off = scan_ushort (s, port ) ) ) | 555 | if (!(off = scan_ushort(s, port))) |
550 | return 0; | 556 | return 0; |
551 | return off+s-src; | 557 | return off + s - src; |
552 | } | 558 | } |
553 | 559 | ||
554 | int main( int argc, char **argv ) { | 560 | int main(int argc, char **argv) { |
555 | static pthread_t sync_in_thread_id; | 561 | static pthread_t sync_in_thread_id; |
556 | static pthread_t sync_out_thread_id; | 562 | static pthread_t sync_out_thread_id; |
557 | ot_ip6 serverip; | 563 | ot_ip6 serverip; |
558 | uint16_t tmpport; | 564 | uint16_t tmpport; |
559 | int scanon = 1, lbound = 0, sbound = 0; | 565 | int scanon = 1, lbound = 0, sbound = 0; |
560 | 566 | ||
561 | srandom( time(NULL) ); | 567 | srandom(time(NULL)); |
562 | #ifdef WANT_ARC4RANDOM | 568 | #ifdef WANT_ARC4RANDOM |
563 | g_tracker_id = arc4random(); | 569 | g_tracker_id = arc4random(); |
564 | #else | 570 | #else |
565 | g_tracker_id = random(); | 571 | g_tracker_id = random(); |
566 | #endif | 572 | #endif |
567 | 573 | ||
568 | while( scanon ) { | 574 | while (scanon) { |
569 | switch( getopt( argc, argv, ":l:c:L:h" ) ) { | 575 | switch (getopt(argc, argv, ":l:c:L:h")) { |
570 | case -1: scanon = 0; break; | 576 | case -1: |
577 | scanon = 0; | ||
578 | break; | ||
571 | case 'l': | 579 | case 'l': |
572 | tmpport = 0; | 580 | tmpport = 0; |
573 | if( !scan_ip6_port( optarg, serverip, &tmpport ) || !tmpport ) { usage( argv[0] ); exit( 1 ); } | 581 | if (!scan_ip6_port(optarg, serverip, &tmpport) || !tmpport) { |
574 | ot_try_bind( serverip, tmpport ); | 582 | usage(argv[0]); |
583 | exit(1); | ||
584 | } | ||
585 | ot_try_bind(serverip, tmpport); | ||
575 | ++sbound; | 586 | ++sbound; |
576 | break; | 587 | break; |
577 | case 'c': | 588 | case 'c': |
578 | if( g_connection_count > MAX_PEERS / 2 ) exerr( "Connection limit exceeded.\n" ); | 589 | if (g_connection_count > MAX_PEERS / 2) |
590 | exerr("Connection limit exceeded.\n"); | ||
579 | tmpport = 0; | 591 | tmpport = 0; |
580 | if( !scan_ip6_port( optarg, | 592 | if (!scan_ip6_port(optarg, g_connections[g_connection_count].ip, &g_connections[g_connection_count].port) || !g_connections[g_connection_count].port) { |
581 | g_connections[g_connection_count].ip, | 593 | usage(argv[0]); |
582 | &g_connections[g_connection_count].port ) || | 594 | exit(1); |
583 | !g_connections[g_connection_count].port ) { usage( argv[0] ); exit( 1 ); } | 595 | } |
584 | g_connections[g_connection_count++].state = FLAG_OUTGOING; | 596 | g_connections[g_connection_count++].state = FLAG_OUTGOING; |
585 | break; | 597 | break; |
586 | case 'L': | 598 | case 'L': |
587 | tmpport = 9696; | 599 | tmpport = 9696; |
588 | if( !scan_ip6_port( optarg, serverip, &tmpport ) || !tmpport ) { usage( argv[0] ); exit( 1 ); } | 600 | if (!scan_ip6_port(optarg, serverip, &tmpport) || !tmpport) { |
589 | livesync_bind_mcast( serverip, tmpport); ++lbound; break; | 601 | usage(argv[0]); |
602 | exit(1); | ||
603 | } | ||
604 | livesync_bind_mcast(serverip, tmpport); | ||
605 | ++lbound; | ||
606 | break; | ||
590 | default: | 607 | default: |
591 | case '?': usage( argv[0] ); exit( 1 ); | 608 | case '?': |
609 | usage(argv[0]); | ||
610 | exit(1); | ||
592 | } | 611 | } |
593 | } | 612 | } |
594 | 613 | ||
595 | if( !lbound ) exerr( "No livesync port bound." ); | 614 | if (!lbound) |
596 | if( !g_connection_count && !sbound ) exerr( "No streamsync port bound." ); | 615 | exerr("No livesync port bound."); |
597 | pthread_create( &sync_in_thread_id, NULL, livesync_worker, NULL ); | 616 | if (!g_connection_count && !sbound) |
598 | pthread_create( &sync_out_thread_id, NULL, streamsync_worker, NULL ); | 617 | exerr("No streamsync port bound."); |
618 | pthread_create(&sync_in_thread_id, NULL, livesync_worker, NULL); | ||
619 | pthread_create(&sync_out_thread_id, NULL, streamsync_worker, NULL); | ||
599 | 620 | ||
600 | server_mainloop(); | 621 | server_mainloop(); |
601 | return 0; | 622 | return 0; |
602 | } | 623 | } |
603 | 624 | ||
604 | static void * streamsync_worker( void * args ) { | 625 | static void *streamsync_worker(void *args) { |
605 | (void)args; | 626 | (void)args; |
606 | while( 1 ) { | 627 | while (1) { |
607 | int bucket; | 628 | int bucket; |
608 | /* For each bucket... */ | 629 | /* For each bucket... */ |
609 | for( bucket=0; bucket<OT_BUCKET_COUNT; ++bucket ) { | 630 | for (bucket = 0; bucket < OT_BUCKET_COUNT; ++bucket) { |
610 | /* Get exclusive access to that bucket */ | 631 | /* Get exclusive access to that bucket */ |
611 | ot_vector *torrents_list = mutex_bucket_lock( bucket ); | 632 | ot_vector *torrents_list = mutex_bucket_lock(bucket); |
612 | size_t tor_offset, count_def = 0, count_one = 0, count_two = 0, count_peers = 0; | 633 | size_t tor_offset, count_def = 0, count_one = 0, count_two = 0, count_peers = 0; |
613 | size_t mem, mem_a = 0, mem_b = 0; | 634 | size_t mem, mem_a = 0, mem_b = 0; |
614 | uint8_t *ptr = 0, *ptr_a, *ptr_b, *ptr_c; | 635 | uint8_t *ptr = 0, *ptr_a, *ptr_b, *ptr_c; |
615 | 636 | ||
616 | if( !torrents_list->size ) goto unlock_continue; | 637 | if (!torrents_list->size) |
638 | goto unlock_continue; | ||
617 | 639 | ||
618 | /* For each torrent in this bucket.. */ | 640 | /* For each torrent in this bucket.. */ |
619 | for( tor_offset=0; tor_offset<torrents_list->size; ++tor_offset ) { | 641 | for (tor_offset = 0; tor_offset < torrents_list->size; ++tor_offset) { |
620 | /* Address torrents members */ | 642 | /* Address torrents members */ |
621 | ot_peerlist *peer_list = ( ((ot_torrent*)(torrents_list->data))[tor_offset] ).peer_list; | 643 | ot_peerlist *peer_list = (((ot_torrent *)(torrents_list->data))[tor_offset]).peer_list; |
622 | switch( peer_list->peer_count ) { | 644 | switch (peer_list->peer_count) { |
623 | case 2: count_two++; break; | 645 | case 2: |
624 | case 1: count_one++; break; | 646 | count_two++; |
625 | case 0: break; | 647 | break; |
626 | default: count_def++; | 648 | case 1: |
627 | count_peers += peer_list->peer_count; | 649 | count_one++; |
650 | break; | ||
651 | case 0: | ||
652 | break; | ||
653 | default: | ||
654 | count_def++; | ||
655 | count_peers += peer_list->peer_count; | ||
628 | } | 656 | } |
629 | } | 657 | } |
630 | 658 | ||
631 | /* Maximal memory requirement: max 3 blocks, max torrents * 20 + max peers * 7 */ | 659 | /* Maximal memory requirement: max 3 blocks, max torrents * 20 + max peers * 7 */ |
632 | mem = 3 * ( 1 + 1 + 2 ) + ( count_one + count_two ) * ( 19 + 1 ) + count_def * ( 19 + 8 ) + | 660 | mem = 3 * (1 + 1 + 2) + (count_one + count_two) * (19 + 1) + count_def * (19 + 8) + (count_one + 2 * count_two + count_peers) * 7; |
633 | ( count_one + 2 * count_two + count_peers ) * 7; | 661 | |
634 | 662 | fprintf(stderr, "Mem: %zd\n", mem); | |
635 | fprintf( stderr, "Mem: %zd\n", mem ); | 663 | |
636 | 664 | ptr = ptr_a = ptr_b = ptr_c = malloc(mem); | |
637 | ptr = ptr_a = ptr_b = ptr_c = malloc( mem ); | 665 | if (!ptr) |
638 | if( !ptr ) goto unlock_continue; | 666 | goto unlock_continue; |
639 | 667 | ||
640 | if( count_one > 4 || !count_def ) { | 668 | if (count_one > 4 || !count_def) { |
641 | mem_a = 1 + 1 + 2 + count_one * ( 19 + 7 ); | 669 | mem_a = 1 + 1 + 2 + count_one * (19 + 7); |
642 | ptr_b += mem_a; ptr_c += mem_a; | 670 | ptr_b += mem_a; |
643 | ptr_a[0] = 1; /* Offset 0: packet type 1 */ | 671 | ptr_c += mem_a; |
644 | ptr_a[1] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */ | 672 | ptr_a[0] = 1; /* Offset 0: packet type 1 */ |
645 | ptr_a[2] = count_one >> 8; | 673 | ptr_a[1] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */ |
646 | ptr_a[3] = count_one & 255; | 674 | ptr_a[2] = count_one >> 8; |
647 | ptr_a += 4; | 675 | ptr_a[3] = count_one & 255; |
676 | ptr_a += 4; | ||
648 | } else | 677 | } else |
649 | count_def += count_one; | 678 | count_def += count_one; |
650 | 679 | ||
651 | if( count_two > 4 || !count_def ) { | 680 | if (count_two > 4 || !count_def) { |
652 | mem_b = 1 + 1 + 2 + count_two * ( 19 + 14 ); | 681 | mem_b = 1 + 1 + 2 + count_two * (19 + 14); |
653 | ptr_c += mem_b; | 682 | ptr_c += mem_b; |
654 | ptr_b[0] = 2; /* Offset 0: packet type 2 */ | 683 | ptr_b[0] = 2; /* Offset 0: packet type 2 */ |
655 | ptr_b[1] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */ | 684 | ptr_b[1] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */ |
656 | ptr_b[2] = count_two >> 8; | 685 | ptr_b[2] = count_two >> 8; |
657 | ptr_b[3] = count_two & 255; | 686 | ptr_b[3] = count_two & 255; |
658 | ptr_b += 4; | 687 | ptr_b += 4; |
659 | } else | 688 | } else |
660 | count_def += count_two; | 689 | count_def += count_two; |
661 | 690 | ||
662 | if( count_def ) { | 691 | if (count_def) { |
663 | ptr_c[0] = 0; /* Offset 0: packet type 0 */ | 692 | ptr_c[0] = 0; /* Offset 0: packet type 0 */ |
664 | ptr_c[1] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */ | 693 | ptr_c[1] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */ |
665 | ptr_c[2] = count_def >> 8; | 694 | ptr_c[2] = count_def >> 8; |
666 | ptr_c[3] = count_def & 255; | 695 | ptr_c[3] = count_def & 255; |
667 | ptr_c += 4; | 696 | ptr_c += 4; |
668 | } | 697 | } |
669 | 698 | ||
670 | /* For each torrent in this bucket.. */ | 699 | /* For each torrent in this bucket.. */ |
671 | for( tor_offset=0; tor_offset<torrents_list->size; ++tor_offset ) { | 700 | for (tor_offset = 0; tor_offset < torrents_list->size; ++tor_offset) { |
672 | /* Address torrents members */ | 701 | /* Address torrents members */ |
673 | ot_torrent *torrent = ((ot_torrent*)(torrents_list->data)) + tor_offset; | 702 | ot_torrent *torrent = ((ot_torrent *)(torrents_list->data)) + tor_offset; |
674 | ot_peerlist *peer_list = torrent->peer_list; | 703 | ot_peerlist *peer_list = torrent->peer_list; |
675 | ot_peer *peers = (ot_peer*)(peer_list->peers.data); | 704 | ot_peer *peers = (ot_peer *)(peer_list->peers.data); |
676 | uint8_t **dst; | 705 | uint8_t **dst; |
677 | 706 | ||
678 | /* Determine destination slot */ | 707 | /* Determine destination slot */ |
679 | count_peers = peer_list->peer_count; | 708 | count_peers = peer_list->peer_count; |
680 | switch( count_peers ) { | 709 | switch (count_peers) { |
681 | case 0: continue; | 710 | case 0: |
682 | case 1: dst = mem_a ? &ptr_a : &ptr_c; break; | 711 | continue; |
683 | case 2: dst = mem_b ? &ptr_b : &ptr_c; break; | 712 | case 1: |
684 | default: dst = &ptr_c; break; | 713 | dst = mem_a ? &ptr_a : &ptr_c; |
714 | break; | ||
715 | case 2: | ||
716 | dst = mem_b ? &ptr_b : &ptr_c; | ||
717 | break; | ||
718 | default: | ||
719 | dst = &ptr_c; | ||
720 | break; | ||
685 | } | 721 | } |
686 | 722 | ||
687 | /* Copy tail of info_hash, advance pointer */ | 723 | /* Copy tail of info_hash, advance pointer */ |
688 | memcpy( *dst, ((uint8_t*)torrent->hash) + 1, sizeof( ot_hash ) - 1); | 724 | memcpy(*dst, ((uint8_t *)torrent->hash) + 1, sizeof(ot_hash) - 1); |
689 | *dst += sizeof( ot_hash ) - 1; | 725 | *dst += sizeof(ot_hash) - 1; |
690 | 726 | ||
691 | /* Encode peer count */ | 727 | /* Encode peer count */ |
692 | if( dst == &ptr_c ) | 728 | if (dst == &ptr_c) |
693 | while( count_peers ) { | 729 | while (count_peers) { |
694 | if( count_peers <= 0x7f ) | 730 | if (count_peers <= 0x7f) |
695 | *(*dst)++ = count_peers; | 731 | *(*dst)++ = count_peers; |
696 | else | 732 | else |
697 | *(*dst)++ = 0x80 | ( count_peers & 0x7f ); | 733 | *(*dst)++ = 0x80 | (count_peers & 0x7f); |
698 | count_peers >>= 7; | 734 | count_peers >>= 7; |
699 | } | 735 | } |
700 | 736 | ||
701 | /* Copy peers */ | 737 | /* Copy peers */ |
702 | count_peers = peer_list->peer_count; | 738 | count_peers = peer_list->peer_count; |
703 | while( count_peers-- ) { | 739 | while (count_peers--) { |
704 | memcpy( *dst, peers++, OT_IP_SIZE + 3 ); | 740 | memcpy(*dst, peers++, OT_IP_SIZE + 3); |
705 | *dst += OT_IP_SIZE + 3; | 741 | *dst += OT_IP_SIZE + 3; |
706 | } | 742 | } |
707 | free_peerlist(peer_list); | 743 | free_peerlist(peer_list); |
708 | } | 744 | } |
709 | 745 | ||
710 | free( torrents_list->data ); | 746 | free(torrents_list->data); |
711 | memset( torrents_list, 0, sizeof(*torrents_list ) ); | 747 | memset(torrents_list, 0, sizeof(*torrents_list)); |
712 | unlock_continue: | 748 | unlock_continue: |
713 | mutex_bucket_unlock( bucket, 0 ); | 749 | mutex_bucket_unlock(bucket, 0); |
714 | 750 | ||
715 | if( ptr ) { | 751 | if (ptr) { |
716 | int i; | 752 | int i; |
717 | 753 | ||
718 | if( ptr_b > ptr_c ) ptr_c = ptr_b; | 754 | if (ptr_b > ptr_c) |
719 | if( ptr_a > ptr_c ) ptr_c = ptr_a; | 755 | ptr_c = ptr_b; |
756 | if (ptr_a > ptr_c) | ||
757 | ptr_c = ptr_a; | ||
720 | mem = ptr_c - ptr; | 758 | mem = ptr_c - ptr; |
721 | 759 | ||
722 | for( i=0; i < MAX_PEERS; ++i ) { | 760 | for (i = 0; i < MAX_PEERS; ++i) { |
723 | if( PROXYPEER_ISCONNECTED(g_connections[i].state) ) { | 761 | if (PROXYPEER_ISCONNECTED(g_connections[i].state)) { |
724 | void *tmp = malloc( mem ); | 762 | void *tmp = malloc(mem); |
725 | if( tmp ) { | 763 | if (tmp) { |
726 | memcpy( tmp, ptr, mem ); | 764 | memcpy(tmp, ptr, mem); |
727 | iob_addbuf_free( &g_connections[i].outdata, tmp, mem ); | 765 | iob_addbuf_free(&g_connections[i].outdata, tmp, mem); |
728 | io_wantwrite( g_connections[i].fd ); | 766 | io_wantwrite(g_connections[i].fd); |
729 | } | 767 | } |
730 | } | 768 | } |
731 | } | 769 | } |
732 | 770 | ||
733 | free( ptr ); | 771 | free(ptr); |
734 | } | 772 | } |
735 | usleep( OT_SYNC_SLEEP ); | 773 | usleep(OT_SYNC_SLEEP); |
736 | } | 774 | } |
737 | } | 775 | } |
738 | return 0; | 776 | return 0; |
739 | } | 777 | } |
740 | 778 | ||
741 | static void livesync_issue_peersync( ) { | 779 | static void livesync_issue_peersync() { |
742 | socket_send4(g_socket_out, (char*)g_peerbuffer_start, g_peerbuffer_pos - g_peerbuffer_start, | 780 | socket_send4(g_socket_out, (char *)g_peerbuffer_start, g_peerbuffer_pos - g_peerbuffer_start, groupip_1, LIVESYNC_PORT); |
743 | groupip_1, LIVESYNC_PORT); | 781 | g_peerbuffer_pos = g_peerbuffer_start + sizeof(g_tracker_id) + sizeof(uint32_t); |
744 | g_peerbuffer_pos = g_peerbuffer_start + sizeof( g_tracker_id ) + sizeof( uint32_t ); | ||
745 | g_next_packet_time = time(NULL) + LIVESYNC_MAXDELAY; | 782 | g_next_packet_time = time(NULL) + LIVESYNC_MAXDELAY; |
746 | } | 783 | } |
747 | 784 | ||
748 | void livesync_ticker( ) { | 785 | void livesync_ticker() { |
749 | /* livesync_issue_peersync sets g_next_packet_time */ | 786 | /* livesync_issue_peersync sets g_next_packet_time */ |
750 | if( time(NULL) > g_next_packet_time && | 787 | if (time(NULL) > g_next_packet_time && g_peerbuffer_pos > g_peerbuffer_start + sizeof(g_tracker_id)) |
751 | g_peerbuffer_pos > g_peerbuffer_start + sizeof( g_tracker_id ) ) | ||
752 | livesync_issue_peersync(); | 788 | livesync_issue_peersync(); |
753 | } | 789 | } |
754 | 790 | ||
755 | static void livesync_proxytell( uint8_t prefix, uint8_t *info_hash, uint8_t *peer ) { | 791 | static void livesync_proxytell(uint8_t prefix, uint8_t *info_hash, uint8_t *peer) { |
756 | // unsigned int i; | 792 | // unsigned int i; |
757 | 793 | ||
758 | *g_peerbuffer_pos = prefix; | 794 | *g_peerbuffer_pos = prefix; |
759 | memcpy( g_peerbuffer_pos + 1, info_hash, sizeof(ot_hash) - 1 ); | 795 | memcpy(g_peerbuffer_pos + 1, info_hash, sizeof(ot_hash) - 1); |
760 | memcpy( g_peerbuffer_pos + sizeof(ot_hash), peer, sizeof(ot_peer) - 1 ); | 796 | memcpy(g_peerbuffer_pos + sizeof(ot_hash), peer, sizeof(ot_peer) - 1); |
761 | 797 | ||
762 | #if 0 | 798 | #if 0 |
763 | /* Dump info_hash */ | 799 | /* Dump info_hash */ |
@@ -772,80 +808,84 @@ static void livesync_proxytell( uint8_t prefix, uint8_t *info_hash, uint8_t *pee | |||
772 | #endif | 808 | #endif |
773 | g_peerbuffer_pos += sizeof(ot_peer); | 809 | g_peerbuffer_pos += sizeof(ot_peer); |
774 | 810 | ||
775 | if( g_peerbuffer_pos >= g_peerbuffer_highwater ) | 811 | if (g_peerbuffer_pos >= g_peerbuffer_highwater) |
776 | livesync_issue_peersync(); | 812 | livesync_issue_peersync(); |
777 | } | 813 | } |
778 | 814 | ||
779 | static void process_indata( proxy_peer * peer ) { | 815 | static void process_indata(proxy_peer *peer) { |
780 | size_t consumed, peers; | 816 | size_t consumed, peers; |
781 | uint8_t *data = peer->indata, *hash; | 817 | uint8_t *data = peer->indata, *hash; |
782 | uint8_t *dataend = data + peer->indata_length; | 818 | uint8_t *dataend = data + peer->indata_length; |
783 | 819 | ||
784 | while( 1 ) { | 820 | while (1) { |
785 | /* If we're not inside of a packet, make a new one */ | 821 | /* If we're not inside of a packet, make a new one */ |
786 | if( !peer->packet_tcount ) { | 822 | if (!peer->packet_tcount) { |
787 | /* Ensure the header is complete or postpone processing */ | 823 | /* Ensure the header is complete or postpone processing */ |
788 | if( data + 4 > dataend ) break; | 824 | if (data + 4 > dataend) |
789 | peer->packet_type = data[0]; | 825 | break; |
790 | peer->packet_tprefix = data[1]; | 826 | peer->packet_type = data[0]; |
791 | peer->packet_tcount = data[2] * 256 + data[3]; | 827 | peer->packet_tprefix = data[1]; |
792 | data += 4; | 828 | peer->packet_tcount = data[2] * 256 + data[3]; |
793 | printf( "type: %hhu, prefix: %02X, torrentcount: %zd\n", peer->packet_type, peer->packet_tprefix, peer->packet_tcount ); | 829 | data += 4; |
830 | printf("type: %hhu, prefix: %02X, torrentcount: %zd\n", peer->packet_type, peer->packet_tprefix, peer->packet_tcount); | ||
794 | } | 831 | } |
795 | 832 | ||
796 | /* Ensure size for a minimal torrent block */ | 833 | /* Ensure size for a minimal torrent block */ |
797 | if( data + sizeof(ot_hash) + OT_IP_SIZE + 3 > dataend ) break; | 834 | if (data + sizeof(ot_hash) + OT_IP_SIZE + 3 > dataend) |
835 | break; | ||
798 | 836 | ||
799 | /* Advance pointer to peer count or peers */ | 837 | /* Advance pointer to peer count or peers */ |
800 | hash = data; | 838 | hash = data; |
801 | data += sizeof(ot_hash) - 1; | 839 | data += sizeof(ot_hash) - 1; |
802 | 840 | ||
803 | /* Type 0 has peer count encoded before each peers */ | 841 | /* Type 0 has peer count encoded before each peers */ |
804 | peers = peer->packet_type; | 842 | peers = peer->packet_type; |
805 | if( !peers ) { | 843 | if (!peers) { |
806 | int shift = 0; | 844 | int shift = 0; |
807 | do peers |= ( 0x7f & *data ) << ( 7 * shift ); | 845 | do |
808 | while ( *(data++) & 0x80 && shift++ < 6 ); | 846 | peers |= (0x7f & *data) << (7 * shift); |
847 | while (*(data++) & 0x80 && shift++ < 6); | ||
809 | } | 848 | } |
810 | #if 0 | 849 | #if 0 |
811 | printf( "peers: %zd\n", peers ); | 850 | printf( "peers: %zd\n", peers ); |
812 | #endif | 851 | #endif |
813 | /* Ensure enough data being read to hold all peers */ | 852 | /* Ensure enough data being read to hold all peers */ |
814 | if( data + (OT_IP_SIZE + 3) * peers > dataend ) { | 853 | if (data + (OT_IP_SIZE + 3) * peers > dataend) { |
815 | data = hash; | 854 | data = hash; |
816 | break; | 855 | break; |
817 | } | 856 | } |
818 | while( peers-- ) { | 857 | while (peers--) { |
819 | livesync_proxytell( peer->packet_tprefix, hash, data ); | 858 | livesync_proxytell(peer->packet_tprefix, hash, data); |
820 | data += OT_IP_SIZE + 3; | 859 | data += OT_IP_SIZE + 3; |
821 | } | 860 | } |
822 | --peer->packet_tcount; | 861 | --peer->packet_tcount; |
823 | } | 862 | } |
824 | 863 | ||
825 | consumed = data - peer->indata; | 864 | consumed = data - peer->indata; |
826 | memmove( peer->indata, data, peer->indata_length - consumed ); | 865 | memmove(peer->indata, data, peer->indata_length - consumed); |
827 | peer->indata_length -= consumed; | 866 | peer->indata_length -= consumed; |
828 | } | 867 | } |
829 | 868 | ||
830 | static void * livesync_worker( void * args ) { | 869 | static void *livesync_worker(void *args) { |
831 | (void)args; | 870 | (void)args; |
832 | while( 1 ) { | 871 | while (1) { |
833 | ot_ip6 in_ip; uint16_t in_port; | 872 | ot_ip6 in_ip; |
834 | size_t datalen = socket_recv4(g_socket_in, (char*)g_inbuffer, LIVESYNC_INCOMING_BUFFSIZE, 12+(char*)in_ip, &in_port); | 873 | uint16_t in_port; |
874 | size_t datalen = socket_recv4(g_socket_in, (char *)g_inbuffer, LIVESYNC_INCOMING_BUFFSIZE, 12 + (char *)in_ip, &in_port); | ||
835 | 875 | ||
836 | /* Expect at least tracker id and packet type */ | 876 | /* Expect at least tracker id and packet type */ |
837 | if( datalen <= (ssize_t)(sizeof( g_tracker_id ) + sizeof( uint32_t )) ) | 877 | if (datalen <= (ssize_t)(sizeof(g_tracker_id) + sizeof(uint32_t))) |
838 | continue; | 878 | continue; |
839 | if( !memcmp( g_inbuffer, &g_tracker_id, sizeof( g_tracker_id ) ) ) { | 879 | if (!memcmp(g_inbuffer, &g_tracker_id, sizeof(g_tracker_id))) { |
840 | /* drop packet coming from ourselves */ | 880 | /* drop packet coming from ourselves */ |
841 | continue; | 881 | continue; |
842 | } | 882 | } |
843 | switch( uint32_read_big( (char*)g_inbuffer + sizeof( g_tracker_id ) ) ) { | 883 | switch (uint32_read_big((char *)g_inbuffer + sizeof(g_tracker_id))) { |
844 | case OT_SYNC_PEER4: | 884 | case OT_SYNC_PEER4: |
845 | livesync_handle_peersync( datalen, OT_PEER_SIZE4 ); | 885 | livesync_handle_peersync(datalen, OT_PEER_SIZE4); |
846 | break; | 886 | break; |
847 | case OT_SYNC_PEER6: | 887 | case OT_SYNC_PEER6: |
848 | livesync_handle_peersync( datalen, OT_PEER_SIZE6 ); | 888 | livesync_handle_peersync(datalen, OT_PEER_SIZE6); |
849 | break; | 889 | break; |
850 | default: | 890 | default: |
851 | // fprintf( stderr, "Received an unknown live sync packet type %u.\n", uint32_read_big( sizeof( g_tracker_id ) + (char*)g_inbuffer ) ); | 891 | // fprintf( stderr, "Received an unknown live sync packet type %u.\n", uint32_read_big( sizeof( g_tracker_id ) + (char*)g_inbuffer ) ); |