diff options
Diffstat (limited to 'ot_livesync.c')
-rw-r--r-- | ot_livesync.c | 172 |
1 files changed, 85 insertions, 87 deletions
diff --git a/ot_livesync.c b/ot_livesync.c index 335cce5..246317b 100644 --- a/ot_livesync.c +++ b/ot_livesync.c | |||
@@ -4,126 +4,126 @@ | |||
4 | $id$ */ | 4 | $id$ */ |
5 | 5 | ||
6 | /* System */ | 6 | /* System */ |
7 | #include <pthread.h> | ||
8 | #include <stdlib.h> | ||
9 | #include <string.h> | ||
7 | #include <sys/types.h> | 10 | #include <sys/types.h> |
8 | #include <sys/uio.h> | 11 | #include <sys/uio.h> |
9 | #include <string.h> | ||
10 | #include <pthread.h> | ||
11 | #include <unistd.h> | 12 | #include <unistd.h> |
12 | #include <stdlib.h> | ||
13 | 13 | ||
14 | /* Libowfat */ | 14 | /* Libowfat */ |
15 | #include "socket.h" | ||
16 | #include "ndelay.h" | ||
17 | #include "byte.h" | 15 | #include "byte.h" |
18 | #include "ip6.h" | 16 | #include "ip6.h" |
17 | #include "ndelay.h" | ||
18 | #include "socket.h" | ||
19 | 19 | ||
20 | /* Opentracker */ | 20 | /* Opentracker */ |
21 | #include "trackerlogic.h" | ||
22 | #include "ot_livesync.h" | ||
23 | #include "ot_accesslist.h" | 21 | #include "ot_accesslist.h" |
24 | #include "ot_stats.h" | 22 | #include "ot_livesync.h" |
25 | #include "ot_mutex.h" | 23 | #include "ot_mutex.h" |
24 | #include "ot_stats.h" | ||
25 | #include "trackerlogic.h" | ||
26 | 26 | ||
27 | #ifdef WANT_SYNC_LIVE | 27 | #ifdef WANT_SYNC_LIVE |
28 | 28 | ||
29 | char groupip_1[4] = { 224,0,23,5 }; | 29 | char groupip_1[4] = {224, 0, 23, 5}; |
30 | 30 | ||
31 | #define LIVESYNC_INCOMING_BUFFSIZE (256*256) | 31 | #define LIVESYNC_INCOMING_BUFFSIZE (256 * 256) |
32 | 32 | ||
33 | #define LIVESYNC_OUTGOING_BUFFSIZE_PEERS 1480 | 33 | #define LIVESYNC_OUTGOING_BUFFSIZE_PEERS 1480 |
34 | #define LIVESYNC_OUTGOING_WATERMARK_PEERS (sizeof(ot_peer)+sizeof(ot_hash)) | 34 | #define LIVESYNC_OUTGOING_WATERMARK_PEERS (sizeof(ot_peer) + sizeof(ot_hash)) |
35 | 35 | ||
36 | #define LIVESYNC_MAXDELAY 15 /* seconds */ | 36 | #define LIVESYNC_MAXDELAY 15 /* seconds */ |
37 | 37 | ||
38 | enum { OT_SYNC_PEER4, OT_SYNC_PEER6 }; | 38 | enum { OT_SYNC_PEER4, OT_SYNC_PEER6 }; |
39 | 39 | ||
40 | /* Forward declaration */ | 40 | /* Forward declaration */ |
41 | static void * livesync_worker( void * args ); | 41 | static void *livesync_worker(void *args); |
42 | 42 | ||
43 | /* For outgoing packets */ | 43 | /* For outgoing packets */ |
44 | static int64 g_socket_in = -1; | 44 | static int64 g_socket_in = -1; |
45 | 45 | ||
46 | /* For incoming packets */ | 46 | /* For incoming packets */ |
47 | static int64 g_socket_out = -1; | 47 | static int64 g_socket_out = -1; |
48 | 48 | ||
49 | static pthread_mutex_t g_outbuf_mutex = PTHREAD_MUTEX_INITIALIZER; | 49 | static pthread_mutex_t g_outbuf_mutex = PTHREAD_MUTEX_INITIALIZER; |
50 | typedef struct { | 50 | typedef struct { |
51 | uint8_t data[LIVESYNC_OUTGOING_BUFFSIZE_PEERS]; | 51 | uint8_t data[LIVESYNC_OUTGOING_BUFFSIZE_PEERS]; |
52 | size_t fill; | 52 | size_t fill; |
53 | ot_time next_packet_time; | 53 | ot_time next_packet_time; |
54 | } sync_buffer; | 54 | } sync_buffer; |
55 | 55 | ||
56 | static sync_buffer g_v6_buf; | 56 | static sync_buffer g_v6_buf; |
57 | static sync_buffer g_v4_buf; | 57 | static sync_buffer g_v4_buf; |
58 | 58 | ||
59 | static pthread_t thread_id; | 59 | static pthread_t thread_id; |
60 | void livesync_init( ) { | 60 | void livesync_init() { |
61 | 61 | ||
62 | if( g_socket_in == -1 ) | 62 | if (g_socket_in == -1) |
63 | exerr( "No socket address for live sync specified." ); | 63 | exerr("No socket address for live sync specified."); |
64 | 64 | ||
65 | /* Prepare outgoing peers buffer */ | 65 | /* Prepare outgoing peers buffer */ |
66 | memcpy( g_v6_buf.data, &g_tracker_id, sizeof( g_tracker_id ) ); | 66 | memcpy(g_v6_buf.data, &g_tracker_id, sizeof(g_tracker_id)); |
67 | memcpy( g_v4_buf.data, &g_tracker_id, sizeof( g_tracker_id ) ); | 67 | memcpy(g_v4_buf.data, &g_tracker_id, sizeof(g_tracker_id)); |
68 | 68 | ||
69 | uint32_pack_big( (char*)g_v6_buf.data + sizeof( g_tracker_id ), OT_SYNC_PEER6); | 69 | uint32_pack_big((char *)g_v6_buf.data + sizeof(g_tracker_id), OT_SYNC_PEER6); |
70 | uint32_pack_big( (char*)g_v4_buf.data + sizeof( g_tracker_id ), OT_SYNC_PEER4); | 70 | uint32_pack_big((char *)g_v4_buf.data + sizeof(g_tracker_id), OT_SYNC_PEER4); |
71 | 71 | ||
72 | g_v6_buf.fill = sizeof( g_tracker_id ) + sizeof( uint32_t ); | 72 | g_v6_buf.fill = sizeof(g_tracker_id) + sizeof(uint32_t); |
73 | g_v4_buf.fill = sizeof( g_tracker_id ) + sizeof( uint32_t ); | 73 | g_v4_buf.fill = sizeof(g_tracker_id) + sizeof(uint32_t); |
74 | 74 | ||
75 | g_v6_buf.next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY; | 75 | g_v6_buf.next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY; |
76 | g_v4_buf.next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY; | 76 | g_v4_buf.next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY; |
77 | 77 | ||
78 | pthread_create( &thread_id, NULL, livesync_worker, NULL ); | 78 | pthread_create(&thread_id, NULL, livesync_worker, NULL); |
79 | } | 79 | } |
80 | 80 | ||
81 | void livesync_deinit() { | 81 | void livesync_deinit() { |
82 | if( g_socket_in != -1 ) | 82 | if (g_socket_in != -1) |
83 | close( g_socket_in ); | 83 | close(g_socket_in); |
84 | if( g_socket_out != -1 ) | 84 | if (g_socket_out != -1) |
85 | close( g_socket_out ); | 85 | close(g_socket_out); |
86 | 86 | ||
87 | pthread_cancel( thread_id ); | 87 | pthread_cancel(thread_id); |
88 | } | 88 | } |
89 | 89 | ||
90 | void livesync_bind_mcast( ot_ip6 ip, uint16_t port) { | 90 | void livesync_bind_mcast(ot_ip6 ip, uint16_t port) { |
91 | char tmpip[4] = {0,0,0,0}; | 91 | char tmpip[4] = {0, 0, 0, 0}; |
92 | char *v4ip; | 92 | char *v4ip; |
93 | 93 | ||
94 | if( !ip6_isv4mapped(ip)) | 94 | if (!ip6_isv4mapped(ip)) |
95 | exerr("v6 mcast support not yet available."); | 95 | exerr("v6 mcast support not yet available."); |
96 | v4ip = ip+12; | 96 | v4ip = ip + 12; |
97 | 97 | ||
98 | if( g_socket_in != -1 ) | 98 | if (g_socket_in != -1) |
99 | exerr("Error: Livesync listen ip specified twice."); | 99 | exerr("Error: Livesync listen ip specified twice."); |
100 | 100 | ||
101 | if( ( g_socket_in = socket_udp4( )) < 0) | 101 | if ((g_socket_in = socket_udp4()) < 0) |
102 | exerr("Error: Cant create live sync incoming socket." ); | 102 | exerr("Error: Cant create live sync incoming socket."); |
103 | ndelay_off(g_socket_in); | 103 | ndelay_off(g_socket_in); |
104 | 104 | ||
105 | if( socket_bind4_reuse( g_socket_in, tmpip, port ) == -1 ) | 105 | if (socket_bind4_reuse(g_socket_in, tmpip, port) == -1) |
106 | exerr("Error: Cant bind live sync incoming socket." ); | 106 | exerr("Error: Cant bind live sync incoming socket."); |
107 | 107 | ||
108 | if( socket_mcjoin4( g_socket_in, groupip_1, v4ip ) ) | 108 | if (socket_mcjoin4(g_socket_in, groupip_1, v4ip)) |
109 | exerr("Error: Cant make live sync incoming socket join mcast group."); | 109 | exerr("Error: Cant make live sync incoming socket join mcast group."); |
110 | 110 | ||
111 | if( ( g_socket_out = socket_udp4()) < 0) | 111 | if ((g_socket_out = socket_udp4()) < 0) |
112 | exerr("Error: Cant create live sync outgoing socket." ); | 112 | exerr("Error: Cant create live sync outgoing socket."); |
113 | if( socket_bind4_reuse( g_socket_out, v4ip, port ) == -1 ) | 113 | if (socket_bind4_reuse(g_socket_out, v4ip, port) == -1) |
114 | exerr("Error: Cant bind live sync outgoing socket." ); | 114 | exerr("Error: Cant bind live sync outgoing socket."); |
115 | 115 | ||
116 | socket_mcttl4(g_socket_out, 1); | 116 | socket_mcttl4(g_socket_out, 1); |
117 | socket_mcloop4(g_socket_out, 0); | 117 | socket_mcloop4(g_socket_out, 0); |
118 | } | 118 | } |
119 | 119 | ||
120 | /* Caller MUST hold g_outbuf_mutex. Returns with g_outbuf_mutex unlocked */ | 120 | /* Caller MUST hold g_outbuf_mutex. Returns with g_outbuf_mutex unlocked */ |
121 | static void livesync_issue_peersync( sync_buffer *buf ) { | 121 | static void livesync_issue_peersync(sync_buffer *buf) { |
122 | char mycopy[LIVESYNC_OUTGOING_BUFFSIZE_PEERS]; | 122 | char mycopy[LIVESYNC_OUTGOING_BUFFSIZE_PEERS]; |
123 | size_t fill = buf->fill; | 123 | size_t fill = buf->fill; |
124 | 124 | ||
125 | memcpy( mycopy, buf->data, fill ); | 125 | memcpy(mycopy, buf->data, fill); |
126 | buf->fill = sizeof( g_tracker_id ) + sizeof( uint32_t ); | 126 | buf->fill = sizeof(g_tracker_id) + sizeof(uint32_t); |
127 | buf->next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY; | 127 | buf->next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY; |
128 | 128 | ||
129 | /* From now this thread has a local copy of the buffer and | 129 | /* From now this thread has a local copy of the buffer and |
@@ -133,101 +133,99 @@ static void livesync_issue_peersync( sync_buffer *buf ) { | |||
133 | socket_send4(g_socket_out, mycopy, fill, groupip_1, LIVESYNC_PORT); | 133 | socket_send4(g_socket_out, mycopy, fill, groupip_1, LIVESYNC_PORT); |
134 | } | 134 | } |
135 | 135 | ||
136 | static void livesync_handle_peersync( struct ot_workstruct *ws, size_t peer_size ) { | 136 | static void livesync_handle_peersync(struct ot_workstruct *ws, size_t peer_size) { |
137 | size_t off = sizeof( g_tracker_id ) + sizeof( uint32_t ); | 137 | size_t off = sizeof(g_tracker_id) + sizeof(uint32_t); |
138 | 138 | ||
139 | /* Now basic sanity checks have been done on the live sync packet | 139 | /* Now basic sanity checks have been done on the live sync packet |
140 | We might add more testing and logging. */ | 140 | We might add more testing and logging. */ |
141 | while( (ssize_t)(off + sizeof( ot_hash ) + peer_size) <= ws->request_size ) { | 141 | while ((ssize_t)(off + sizeof(ot_hash) + peer_size) <= ws->request_size) { |
142 | memcpy( &ws->peer, ws->request + off + sizeof(ot_hash), peer_size ); | 142 | memcpy(&ws->peer, ws->request + off + sizeof(ot_hash), peer_size); |
143 | ws->hash = (ot_hash*)(ws->request + off); | 143 | ws->hash = (ot_hash *)(ws->request + off); |
144 | 144 | ||
145 | if( !g_opentracker_running ) return; | 145 | if (!g_opentracker_running) |
146 | return; | ||
146 | 147 | ||
147 | if( OT_PEERFLAG(ws->peer) & PEER_FLAG_STOPPED ) | 148 | if (OT_PEERFLAG(ws->peer) & PEER_FLAG_STOPPED) |
148 | remove_peer_from_torrent( FLAG_MCA, ws ); | 149 | remove_peer_from_torrent(FLAG_MCA, ws); |
149 | else | 150 | else |
150 | add_peer_to_torrent_and_return_peers( FLAG_MCA, ws, /* amount = */ 0 ); | 151 | add_peer_to_torrent_and_return_peers(FLAG_MCA, ws, /* amount = */ 0); |
151 | 152 | ||
152 | off += sizeof( ot_hash ) + peer_size; | 153 | off += sizeof(ot_hash) + peer_size; |
153 | } | 154 | } |
154 | 155 | ||
155 | stats_issue_event(EVENT_SYNC, 0, | 156 | stats_issue_event(EVENT_SYNC, 0, (ws->request_size - sizeof(g_tracker_id) - sizeof(uint32_t)) / ((ssize_t)sizeof(ot_hash) + peer_size)); |
156 | (ws->request_size - sizeof( g_tracker_id ) - sizeof( uint32_t ) ) / | ||
157 | ((ssize_t)sizeof( ot_hash ) + peer_size)); | ||
158 | } | 157 | } |
159 | 158 | ||
160 | /* Tickle the live sync module from time to time, so no events get | 159 | /* Tickle the live sync module from time to time, so no events get |
161 | stuck when there's not enough traffic to fill udp packets fast | 160 | stuck when there's not enough traffic to fill udp packets fast |
162 | enough */ | 161 | enough */ |
163 | void livesync_ticker( ) { | 162 | void livesync_ticker() { |
164 | /* livesync_issue_peersync sets g_next_packet_time */ | 163 | /* livesync_issue_peersync sets g_next_packet_time */ |
165 | pthread_mutex_lock(&g_outbuf_mutex); | 164 | pthread_mutex_lock(&g_outbuf_mutex); |
166 | if( g_now_seconds > g_v6_buf.next_packet_time && | 165 | if (g_now_seconds > g_v6_buf.next_packet_time && g_v6_buf.fill > sizeof(g_tracker_id) + sizeof(uint32_t)) |
167 | g_v6_buf.fill > sizeof( g_tracker_id ) + sizeof( uint32_t ) ) | ||
168 | livesync_issue_peersync(&g_v6_buf); | 166 | livesync_issue_peersync(&g_v6_buf); |
169 | else | 167 | else |
170 | pthread_mutex_unlock(&g_outbuf_mutex); | 168 | pthread_mutex_unlock(&g_outbuf_mutex); |
171 | 169 | ||
172 | pthread_mutex_lock(&g_outbuf_mutex); | 170 | pthread_mutex_lock(&g_outbuf_mutex); |
173 | if( g_now_seconds > g_v4_buf.next_packet_time && | 171 | if (g_now_seconds > g_v4_buf.next_packet_time && g_v4_buf.fill > sizeof(g_tracker_id) + sizeof(uint32_t)) |
174 | g_v4_buf.fill > sizeof( g_tracker_id ) + sizeof( uint32_t ) ) | ||
175 | livesync_issue_peersync(&g_v4_buf); | 172 | livesync_issue_peersync(&g_v4_buf); |
176 | else | 173 | else |
177 | pthread_mutex_unlock(&g_outbuf_mutex); | 174 | pthread_mutex_unlock(&g_outbuf_mutex); |
178 | } | 175 | } |
179 | 176 | ||
180 | /* Inform live sync about whats going on. */ | 177 | /* Inform live sync about whats going on. */ |
181 | void livesync_tell( struct ot_workstruct *ws ) { | 178 | void livesync_tell(struct ot_workstruct *ws) { |
182 | size_t peer_size; /* initialized in next line */ | 179 | size_t peer_size; /* initialized in next line */ |
183 | ot_peer *peer_src = peer_from_peer6(&ws->peer, &peer_size); | 180 | ot_peer *peer_src = peer_from_peer6(&ws->peer, &peer_size); |
184 | sync_buffer *dest_buf = peer_size == OT_PEER_SIZE6 ? &g_v6_buf : &g_v4_buf; | 181 | sync_buffer *dest_buf = peer_size == OT_PEER_SIZE6 ? &g_v6_buf : &g_v4_buf; |
185 | 182 | ||
186 | pthread_mutex_lock(&g_outbuf_mutex); | 183 | pthread_mutex_lock(&g_outbuf_mutex); |
187 | 184 | ||
188 | memcpy( dest_buf->data + dest_buf->fill, ws->hash, sizeof(ot_hash) ); | 185 | memcpy(dest_buf->data + dest_buf->fill, ws->hash, sizeof(ot_hash)); |
189 | dest_buf->fill += sizeof(ot_hash); | 186 | dest_buf->fill += sizeof(ot_hash); |
190 | 187 | ||
191 | memcpy( dest_buf->data + dest_buf->fill, peer_src, peer_size ); | 188 | memcpy(dest_buf->data + dest_buf->fill, peer_src, peer_size); |
192 | dest_buf->fill += peer_size; | 189 | dest_buf->fill += peer_size; |
193 | 190 | ||
194 | if( dest_buf->fill >= LIVESYNC_OUTGOING_BUFFSIZE_PEERS - LIVESYNC_OUTGOING_WATERMARK_PEERS ) | 191 | if (dest_buf->fill >= LIVESYNC_OUTGOING_BUFFSIZE_PEERS - LIVESYNC_OUTGOING_WATERMARK_PEERS) |
195 | livesync_issue_peersync(dest_buf); | 192 | livesync_issue_peersync(dest_buf); |
196 | else | 193 | else |
197 | pthread_mutex_unlock(&g_outbuf_mutex); | 194 | pthread_mutex_unlock(&g_outbuf_mutex); |
198 | } | 195 | } |
199 | 196 | ||
200 | static void * livesync_worker( void * args ) { | 197 | static void *livesync_worker(void *args) { |
201 | struct ot_workstruct ws; | 198 | struct ot_workstruct ws; |
202 | ot_ip6 in_ip; uint16_t in_port; | 199 | ot_ip6 in_ip; |
200 | uint16_t in_port; | ||
203 | 201 | ||
204 | (void)args; | 202 | (void)args; |
205 | 203 | ||
206 | /* Initialize our "thread local storage" */ | 204 | /* Initialize our "thread local storage" */ |
207 | ws.inbuf = ws.request = malloc( LIVESYNC_INCOMING_BUFFSIZE ); | 205 | ws.inbuf = ws.request = malloc(LIVESYNC_INCOMING_BUFFSIZE); |
208 | ws.outbuf = ws.reply = 0; | 206 | ws.outbuf = ws.reply = 0; |
209 | 207 | ||
210 | memcpy( in_ip, V4mappedprefix, sizeof( V4mappedprefix ) ); | 208 | memcpy(in_ip, V4mappedprefix, sizeof(V4mappedprefix)); |
211 | 209 | ||
212 | while( 1 ) { | 210 | while (1) { |
213 | ws.request_size = socket_recv4(g_socket_in, (char*)ws.inbuf, LIVESYNC_INCOMING_BUFFSIZE, 12+(char*)in_ip, &in_port); | 211 | ws.request_size = socket_recv4(g_socket_in, (char *)ws.inbuf, LIVESYNC_INCOMING_BUFFSIZE, 12 + (char *)in_ip, &in_port); |
214 | 212 | ||
215 | /* Expect at least tracker id and packet type */ | 213 | /* Expect at least tracker id and packet type */ |
216 | if( ws.request_size <= (ssize_t)(sizeof( g_tracker_id ) + sizeof( uint32_t )) ) | 214 | if (ws.request_size <= (ssize_t)(sizeof(g_tracker_id) + sizeof(uint32_t))) |
217 | continue; | 215 | continue; |
218 | if( !accesslist_is_blessed(in_ip, OT_PERMISSION_MAY_LIVESYNC)) | 216 | if (!accesslist_is_blessed(in_ip, OT_PERMISSION_MAY_LIVESYNC)) |
219 | continue; | 217 | continue; |
220 | if( !memcmp( ws.inbuf, &g_tracker_id, sizeof( g_tracker_id ) ) ) { | 218 | if (!memcmp(ws.inbuf, &g_tracker_id, sizeof(g_tracker_id))) { |
221 | /* TODO: log packet coming from ourselves */ | 219 | /* TODO: log packet coming from ourselves */ |
222 | continue; | 220 | continue; |
223 | } | 221 | } |
224 | 222 | ||
225 | switch( uint32_read_big( sizeof( g_tracker_id ) + (char *)ws.inbuf ) ) { | 223 | switch (uint32_read_big(sizeof(g_tracker_id) + (char *)ws.inbuf)) { |
226 | case OT_SYNC_PEER6: | 224 | case OT_SYNC_PEER6: |
227 | livesync_handle_peersync( &ws, OT_PEER_SIZE6 ); | 225 | livesync_handle_peersync(&ws, OT_PEER_SIZE6); |
228 | break; | 226 | break; |
229 | case OT_SYNC_PEER4: | 227 | case OT_SYNC_PEER4: |
230 | livesync_handle_peersync( &ws, OT_PEER_SIZE4 ); | 228 | livesync_handle_peersync(&ws, OT_PEER_SIZE4); |
231 | break; | 229 | break; |
232 | default: | 230 | default: |
233 | break; | 231 | break; |