summaryrefslogtreecommitdiff
path: root/ot_livesync.c
diff options
context:
space:
mode:
Diffstat (limited to 'ot_livesync.c')
-rw-r--r--ot_livesync.c172
1 files changed, 85 insertions, 87 deletions
diff --git a/ot_livesync.c b/ot_livesync.c
index 335cce5..246317b 100644
--- a/ot_livesync.c
+++ b/ot_livesync.c
@@ -4,126 +4,126 @@
4 $id$ */ 4 $id$ */
5 5
6/* System */ 6/* System */
7#include <pthread.h>
8#include <stdlib.h>
9#include <string.h>
7#include <sys/types.h> 10#include <sys/types.h>
8#include <sys/uio.h> 11#include <sys/uio.h>
9#include <string.h>
10#include <pthread.h>
11#include <unistd.h> 12#include <unistd.h>
12#include <stdlib.h>
13 13
14/* Libowfat */ 14/* Libowfat */
15#include "socket.h"
16#include "ndelay.h"
17#include "byte.h" 15#include "byte.h"
18#include "ip6.h" 16#include "ip6.h"
17#include "ndelay.h"
18#include "socket.h"
19 19
20/* Opentracker */ 20/* Opentracker */
21#include "trackerlogic.h"
22#include "ot_livesync.h"
23#include "ot_accesslist.h" 21#include "ot_accesslist.h"
24#include "ot_stats.h" 22#include "ot_livesync.h"
25#include "ot_mutex.h" 23#include "ot_mutex.h"
24#include "ot_stats.h"
25#include "trackerlogic.h"
26 26
27#ifdef WANT_SYNC_LIVE 27#ifdef WANT_SYNC_LIVE
28 28
29char groupip_1[4] = { 224,0,23,5 }; 29char groupip_1[4] = {224, 0, 23, 5};
30 30
31#define LIVESYNC_INCOMING_BUFFSIZE (256*256) 31#define LIVESYNC_INCOMING_BUFFSIZE (256 * 256)
32 32
33#define LIVESYNC_OUTGOING_BUFFSIZE_PEERS 1480 33#define LIVESYNC_OUTGOING_BUFFSIZE_PEERS 1480
34#define LIVESYNC_OUTGOING_WATERMARK_PEERS (sizeof(ot_peer)+sizeof(ot_hash)) 34#define LIVESYNC_OUTGOING_WATERMARK_PEERS (sizeof(ot_peer) + sizeof(ot_hash))
35 35
36#define LIVESYNC_MAXDELAY 15 /* seconds */ 36#define LIVESYNC_MAXDELAY 15 /* seconds */
37 37
38enum { OT_SYNC_PEER4, OT_SYNC_PEER6 }; 38enum { OT_SYNC_PEER4, OT_SYNC_PEER6 };
39 39
40/* Forward declaration */ 40/* Forward declaration */
41static void * livesync_worker( void * args ); 41static void *livesync_worker(void *args);
42 42
43/* For outgoing packets */ 43/* For outgoing packets */
44static int64 g_socket_in = -1; 44static int64 g_socket_in = -1;
45 45
46/* For incoming packets */ 46/* For incoming packets */
47static int64 g_socket_out = -1; 47static int64 g_socket_out = -1;
48 48
49static pthread_mutex_t g_outbuf_mutex = PTHREAD_MUTEX_INITIALIZER; 49static pthread_mutex_t g_outbuf_mutex = PTHREAD_MUTEX_INITIALIZER;
50typedef struct { 50typedef struct {
51 uint8_t data[LIVESYNC_OUTGOING_BUFFSIZE_PEERS]; 51 uint8_t data[LIVESYNC_OUTGOING_BUFFSIZE_PEERS];
52 size_t fill; 52 size_t fill;
53 ot_time next_packet_time; 53 ot_time next_packet_time;
54} sync_buffer; 54} sync_buffer;
55 55
56static sync_buffer g_v6_buf; 56static sync_buffer g_v6_buf;
57static sync_buffer g_v4_buf; 57static sync_buffer g_v4_buf;
58 58
59static pthread_t thread_id; 59static pthread_t thread_id;
60void livesync_init( ) { 60void livesync_init() {
61 61
62 if( g_socket_in == -1 ) 62 if (g_socket_in == -1)
63 exerr( "No socket address for live sync specified." ); 63 exerr("No socket address for live sync specified.");
64 64
65 /* Prepare outgoing peers buffer */ 65 /* Prepare outgoing peers buffer */
66 memcpy( g_v6_buf.data, &g_tracker_id, sizeof( g_tracker_id ) ); 66 memcpy(g_v6_buf.data, &g_tracker_id, sizeof(g_tracker_id));
67 memcpy( g_v4_buf.data, &g_tracker_id, sizeof( g_tracker_id ) ); 67 memcpy(g_v4_buf.data, &g_tracker_id, sizeof(g_tracker_id));
68 68
69 uint32_pack_big( (char*)g_v6_buf.data + sizeof( g_tracker_id ), OT_SYNC_PEER6); 69 uint32_pack_big((char *)g_v6_buf.data + sizeof(g_tracker_id), OT_SYNC_PEER6);
70 uint32_pack_big( (char*)g_v4_buf.data + sizeof( g_tracker_id ), OT_SYNC_PEER4); 70 uint32_pack_big((char *)g_v4_buf.data + sizeof(g_tracker_id), OT_SYNC_PEER4);
71 71
72 g_v6_buf.fill = sizeof( g_tracker_id ) + sizeof( uint32_t ); 72 g_v6_buf.fill = sizeof(g_tracker_id) + sizeof(uint32_t);
73 g_v4_buf.fill = sizeof( g_tracker_id ) + sizeof( uint32_t ); 73 g_v4_buf.fill = sizeof(g_tracker_id) + sizeof(uint32_t);
74 74
75 g_v6_buf.next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY; 75 g_v6_buf.next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY;
76 g_v4_buf.next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY; 76 g_v4_buf.next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY;
77 77
78 pthread_create( &thread_id, NULL, livesync_worker, NULL ); 78 pthread_create(&thread_id, NULL, livesync_worker, NULL);
79} 79}
80 80
81void livesync_deinit() { 81void livesync_deinit() {
82 if( g_socket_in != -1 ) 82 if (g_socket_in != -1)
83 close( g_socket_in ); 83 close(g_socket_in);
84 if( g_socket_out != -1 ) 84 if (g_socket_out != -1)
85 close( g_socket_out ); 85 close(g_socket_out);
86 86
87 pthread_cancel( thread_id ); 87 pthread_cancel(thread_id);
88} 88}
89 89
90void livesync_bind_mcast( ot_ip6 ip, uint16_t port) { 90void livesync_bind_mcast(ot_ip6 ip, uint16_t port) {
91 char tmpip[4] = {0,0,0,0}; 91 char tmpip[4] = {0, 0, 0, 0};
92 char *v4ip; 92 char *v4ip;
93 93
94 if( !ip6_isv4mapped(ip)) 94 if (!ip6_isv4mapped(ip))
95 exerr("v6 mcast support not yet available."); 95 exerr("v6 mcast support not yet available.");
96 v4ip = ip+12; 96 v4ip = ip + 12;
97 97
98 if( g_socket_in != -1 ) 98 if (g_socket_in != -1)
99 exerr("Error: Livesync listen ip specified twice."); 99 exerr("Error: Livesync listen ip specified twice.");
100 100
101 if( ( g_socket_in = socket_udp4( )) < 0) 101 if ((g_socket_in = socket_udp4()) < 0)
102 exerr("Error: Cant create live sync incoming socket." ); 102 exerr("Error: Cant create live sync incoming socket.");
103 ndelay_off(g_socket_in); 103 ndelay_off(g_socket_in);
104 104
105 if( socket_bind4_reuse( g_socket_in, tmpip, port ) == -1 ) 105 if (socket_bind4_reuse(g_socket_in, tmpip, port) == -1)
106 exerr("Error: Cant bind live sync incoming socket." ); 106 exerr("Error: Cant bind live sync incoming socket.");
107 107
108 if( socket_mcjoin4( g_socket_in, groupip_1, v4ip ) ) 108 if (socket_mcjoin4(g_socket_in, groupip_1, v4ip))
109 exerr("Error: Cant make live sync incoming socket join mcast group."); 109 exerr("Error: Cant make live sync incoming socket join mcast group.");
110 110
111 if( ( g_socket_out = socket_udp4()) < 0) 111 if ((g_socket_out = socket_udp4()) < 0)
112 exerr("Error: Cant create live sync outgoing socket." ); 112 exerr("Error: Cant create live sync outgoing socket.");
113 if( socket_bind4_reuse( g_socket_out, v4ip, port ) == -1 ) 113 if (socket_bind4_reuse(g_socket_out, v4ip, port) == -1)
114 exerr("Error: Cant bind live sync outgoing socket." ); 114 exerr("Error: Cant bind live sync outgoing socket.");
115 115
116 socket_mcttl4(g_socket_out, 1); 116 socket_mcttl4(g_socket_out, 1);
117 socket_mcloop4(g_socket_out, 0); 117 socket_mcloop4(g_socket_out, 0);
118} 118}
119 119
120/* Caller MUST hold g_outbuf_mutex. Returns with g_outbuf_mutex unlocked */ 120/* Caller MUST hold g_outbuf_mutex. Returns with g_outbuf_mutex unlocked */
121static void livesync_issue_peersync( sync_buffer *buf ) { 121static void livesync_issue_peersync(sync_buffer *buf) {
122 char mycopy[LIVESYNC_OUTGOING_BUFFSIZE_PEERS]; 122 char mycopy[LIVESYNC_OUTGOING_BUFFSIZE_PEERS];
123 size_t fill = buf->fill; 123 size_t fill = buf->fill;
124 124
125 memcpy( mycopy, buf->data, fill ); 125 memcpy(mycopy, buf->data, fill);
126 buf->fill = sizeof( g_tracker_id ) + sizeof( uint32_t ); 126 buf->fill = sizeof(g_tracker_id) + sizeof(uint32_t);
127 buf->next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY; 127 buf->next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY;
128 128
129 /* From now this thread has a local copy of the buffer and 129 /* From now this thread has a local copy of the buffer and
@@ -133,101 +133,99 @@ static void livesync_issue_peersync( sync_buffer *buf ) {
133 socket_send4(g_socket_out, mycopy, fill, groupip_1, LIVESYNC_PORT); 133 socket_send4(g_socket_out, mycopy, fill, groupip_1, LIVESYNC_PORT);
134} 134}
135 135
136static void livesync_handle_peersync( struct ot_workstruct *ws, size_t peer_size ) { 136static void livesync_handle_peersync(struct ot_workstruct *ws, size_t peer_size) {
137 size_t off = sizeof( g_tracker_id ) + sizeof( uint32_t ); 137 size_t off = sizeof(g_tracker_id) + sizeof(uint32_t);
138 138
139 /* Now basic sanity checks have been done on the live sync packet 139 /* Now basic sanity checks have been done on the live sync packet
140 We might add more testing and logging. */ 140 We might add more testing and logging. */
141 while( (ssize_t)(off + sizeof( ot_hash ) + peer_size) <= ws->request_size ) { 141 while ((ssize_t)(off + sizeof(ot_hash) + peer_size) <= ws->request_size) {
142 memcpy( &ws->peer, ws->request + off + sizeof(ot_hash), peer_size ); 142 memcpy(&ws->peer, ws->request + off + sizeof(ot_hash), peer_size);
143 ws->hash = (ot_hash*)(ws->request + off); 143 ws->hash = (ot_hash *)(ws->request + off);
144 144
145 if( !g_opentracker_running ) return; 145 if (!g_opentracker_running)
146 return;
146 147
147 if( OT_PEERFLAG(ws->peer) & PEER_FLAG_STOPPED ) 148 if (OT_PEERFLAG(ws->peer) & PEER_FLAG_STOPPED)
148 remove_peer_from_torrent( FLAG_MCA, ws ); 149 remove_peer_from_torrent(FLAG_MCA, ws);
149 else 150 else
150 add_peer_to_torrent_and_return_peers( FLAG_MCA, ws, /* amount = */ 0 ); 151 add_peer_to_torrent_and_return_peers(FLAG_MCA, ws, /* amount = */ 0);
151 152
152 off += sizeof( ot_hash ) + peer_size; 153 off += sizeof(ot_hash) + peer_size;
153 } 154 }
154 155
155 stats_issue_event(EVENT_SYNC, 0, 156 stats_issue_event(EVENT_SYNC, 0, (ws->request_size - sizeof(g_tracker_id) - sizeof(uint32_t)) / ((ssize_t)sizeof(ot_hash) + peer_size));
156 (ws->request_size - sizeof( g_tracker_id ) - sizeof( uint32_t ) ) /
157 ((ssize_t)sizeof( ot_hash ) + peer_size));
158} 157}
159 158
160/* Tickle the live sync module from time to time, so no events get 159/* Tickle the live sync module from time to time, so no events get
161 stuck when there's not enough traffic to fill udp packets fast 160 stuck when there's not enough traffic to fill udp packets fast
162 enough */ 161 enough */
163void livesync_ticker( ) { 162void livesync_ticker() {
164 /* livesync_issue_peersync sets g_next_packet_time */ 163 /* livesync_issue_peersync sets g_next_packet_time */
165 pthread_mutex_lock(&g_outbuf_mutex); 164 pthread_mutex_lock(&g_outbuf_mutex);
166 if( g_now_seconds > g_v6_buf.next_packet_time && 165 if (g_now_seconds > g_v6_buf.next_packet_time && g_v6_buf.fill > sizeof(g_tracker_id) + sizeof(uint32_t))
167 g_v6_buf.fill > sizeof( g_tracker_id ) + sizeof( uint32_t ) )
168 livesync_issue_peersync(&g_v6_buf); 166 livesync_issue_peersync(&g_v6_buf);
169 else 167 else
170 pthread_mutex_unlock(&g_outbuf_mutex); 168 pthread_mutex_unlock(&g_outbuf_mutex);
171 169
172 pthread_mutex_lock(&g_outbuf_mutex); 170 pthread_mutex_lock(&g_outbuf_mutex);
173 if( g_now_seconds > g_v4_buf.next_packet_time && 171 if (g_now_seconds > g_v4_buf.next_packet_time && g_v4_buf.fill > sizeof(g_tracker_id) + sizeof(uint32_t))
174 g_v4_buf.fill > sizeof( g_tracker_id ) + sizeof( uint32_t ) )
175 livesync_issue_peersync(&g_v4_buf); 172 livesync_issue_peersync(&g_v4_buf);
176 else 173 else
177 pthread_mutex_unlock(&g_outbuf_mutex); 174 pthread_mutex_unlock(&g_outbuf_mutex);
178} 175}
179 176
180/* Inform live sync about whats going on. */ 177/* Inform live sync about whats going on. */
181void livesync_tell( struct ot_workstruct *ws ) { 178void livesync_tell(struct ot_workstruct *ws) {
182 size_t peer_size; /* initialized in next line */ 179 size_t peer_size; /* initialized in next line */
183 ot_peer *peer_src = peer_from_peer6(&ws->peer, &peer_size); 180 ot_peer *peer_src = peer_from_peer6(&ws->peer, &peer_size);
184 sync_buffer *dest_buf = peer_size == OT_PEER_SIZE6 ? &g_v6_buf : &g_v4_buf; 181 sync_buffer *dest_buf = peer_size == OT_PEER_SIZE6 ? &g_v6_buf : &g_v4_buf;
185 182
186 pthread_mutex_lock(&g_outbuf_mutex); 183 pthread_mutex_lock(&g_outbuf_mutex);
187 184
188 memcpy( dest_buf->data + dest_buf->fill, ws->hash, sizeof(ot_hash) ); 185 memcpy(dest_buf->data + dest_buf->fill, ws->hash, sizeof(ot_hash));
189 dest_buf->fill += sizeof(ot_hash); 186 dest_buf->fill += sizeof(ot_hash);
190 187
191 memcpy( dest_buf->data + dest_buf->fill, peer_src, peer_size ); 188 memcpy(dest_buf->data + dest_buf->fill, peer_src, peer_size);
192 dest_buf->fill += peer_size; 189 dest_buf->fill += peer_size;
193 190
194 if( dest_buf->fill >= LIVESYNC_OUTGOING_BUFFSIZE_PEERS - LIVESYNC_OUTGOING_WATERMARK_PEERS ) 191 if (dest_buf->fill >= LIVESYNC_OUTGOING_BUFFSIZE_PEERS - LIVESYNC_OUTGOING_WATERMARK_PEERS)
195 livesync_issue_peersync(dest_buf); 192 livesync_issue_peersync(dest_buf);
196 else 193 else
197 pthread_mutex_unlock(&g_outbuf_mutex); 194 pthread_mutex_unlock(&g_outbuf_mutex);
198} 195}
199 196
200static void * livesync_worker( void * args ) { 197static void *livesync_worker(void *args) {
201 struct ot_workstruct ws; 198 struct ot_workstruct ws;
202 ot_ip6 in_ip; uint16_t in_port; 199 ot_ip6 in_ip;
200 uint16_t in_port;
203 201
204 (void)args; 202 (void)args;
205 203
206 /* Initialize our "thread local storage" */ 204 /* Initialize our "thread local storage" */
207 ws.inbuf = ws.request = malloc( LIVESYNC_INCOMING_BUFFSIZE ); 205 ws.inbuf = ws.request = malloc(LIVESYNC_INCOMING_BUFFSIZE);
208 ws.outbuf = ws.reply = 0; 206 ws.outbuf = ws.reply = 0;
209 207
210 memcpy( in_ip, V4mappedprefix, sizeof( V4mappedprefix ) ); 208 memcpy(in_ip, V4mappedprefix, sizeof(V4mappedprefix));
211 209
212 while( 1 ) { 210 while (1) {
213 ws.request_size = socket_recv4(g_socket_in, (char*)ws.inbuf, LIVESYNC_INCOMING_BUFFSIZE, 12+(char*)in_ip, &in_port); 211 ws.request_size = socket_recv4(g_socket_in, (char *)ws.inbuf, LIVESYNC_INCOMING_BUFFSIZE, 12 + (char *)in_ip, &in_port);
214 212
215 /* Expect at least tracker id and packet type */ 213 /* Expect at least tracker id and packet type */
216 if( ws.request_size <= (ssize_t)(sizeof( g_tracker_id ) + sizeof( uint32_t )) ) 214 if (ws.request_size <= (ssize_t)(sizeof(g_tracker_id) + sizeof(uint32_t)))
217 continue; 215 continue;
218 if( !accesslist_is_blessed(in_ip, OT_PERMISSION_MAY_LIVESYNC)) 216 if (!accesslist_is_blessed(in_ip, OT_PERMISSION_MAY_LIVESYNC))
219 continue; 217 continue;
220 if( !memcmp( ws.inbuf, &g_tracker_id, sizeof( g_tracker_id ) ) ) { 218 if (!memcmp(ws.inbuf, &g_tracker_id, sizeof(g_tracker_id))) {
221 /* TODO: log packet coming from ourselves */ 219 /* TODO: log packet coming from ourselves */
222 continue; 220 continue;
223 } 221 }
224 222
225 switch( uint32_read_big( sizeof( g_tracker_id ) + (char *)ws.inbuf ) ) { 223 switch (uint32_read_big(sizeof(g_tracker_id) + (char *)ws.inbuf)) {
226 case OT_SYNC_PEER6: 224 case OT_SYNC_PEER6:
227 livesync_handle_peersync( &ws, OT_PEER_SIZE6 ); 225 livesync_handle_peersync(&ws, OT_PEER_SIZE6);
228 break; 226 break;
229 case OT_SYNC_PEER4: 227 case OT_SYNC_PEER4:
230 livesync_handle_peersync( &ws, OT_PEER_SIZE4 ); 228 livesync_handle_peersync(&ws, OT_PEER_SIZE4);
231 break; 229 break;
232 default: 230 default:
233 break; 231 break;