summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ot_livesync.c86
-rw-r--r--ot_livesync.h8
2 files changed, 63 insertions, 31 deletions
diff --git a/ot_livesync.c b/ot_livesync.c
index b87fa6d..335cce5 100644
--- a/ot_livesync.c
+++ b/ot_livesync.c
@@ -35,7 +35,7 @@ char groupip_1[4] = { 224,0,23,5 };
35 35
36#define LIVESYNC_MAXDELAY 15 /* seconds */ 36#define LIVESYNC_MAXDELAY 15 /* seconds */
37 37
38enum { OT_SYNC_PEER }; 38enum { OT_SYNC_PEER4, OT_SYNC_PEER6 };
39 39
40/* Forward declaration */ 40/* Forward declaration */
41static void * livesync_worker( void * args ); 41static void * livesync_worker( void * args );
@@ -47,9 +47,14 @@ static int64 g_socket_in = -1;
47static int64 g_socket_out = -1; 47static int64 g_socket_out = -1;
48 48
49static pthread_mutex_t g_outbuf_mutex = PTHREAD_MUTEX_INITIALIZER; 49static pthread_mutex_t g_outbuf_mutex = PTHREAD_MUTEX_INITIALIZER;
50char g_outbuf[LIVESYNC_OUTGOING_BUFFSIZE_PEERS]; 50typedef struct {
51static size_t g_outbuf_data; 51 uint8_t data[LIVESYNC_OUTGOING_BUFFSIZE_PEERS];
52static ot_time g_next_packet_time; 52 size_t fill;
53 ot_time next_packet_time;
54} sync_buffer;
55
56static sync_buffer g_v6_buf;
57static sync_buffer g_v4_buf;
53 58
54static pthread_t thread_id; 59static pthread_t thread_id;
55void livesync_init( ) { 60void livesync_init( ) {
@@ -58,11 +63,17 @@ void livesync_init( ) {
58 exerr( "No socket address for live sync specified." ); 63 exerr( "No socket address for live sync specified." );
59 64
60 /* Prepare outgoing peers buffer */ 65 /* Prepare outgoing peers buffer */
61 memcpy( g_outbuf, &g_tracker_id, sizeof( g_tracker_id ) ); 66 memcpy( g_v6_buf.data, &g_tracker_id, sizeof( g_tracker_id ) );
62 uint32_pack_big( g_outbuf + sizeof( g_tracker_id ), OT_SYNC_PEER); 67 memcpy( g_v4_buf.data, &g_tracker_id, sizeof( g_tracker_id ) );
63 g_outbuf_data = sizeof( g_tracker_id ) + sizeof( uint32_t ); 68
69 uint32_pack_big( (char*)g_v6_buf.data + sizeof( g_tracker_id ), OT_SYNC_PEER6);
70 uint32_pack_big( (char*)g_v4_buf.data + sizeof( g_tracker_id ), OT_SYNC_PEER4);
71
72 g_v6_buf.fill = sizeof( g_tracker_id ) + sizeof( uint32_t );
73 g_v4_buf.fill = sizeof( g_tracker_id ) + sizeof( uint32_t );
64 74
65 g_next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY; 75 g_v6_buf.next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY;
76 g_v4_buf.next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY;
66 77
67 pthread_create( &thread_id, NULL, livesync_worker, NULL ); 78 pthread_create( &thread_id, NULL, livesync_worker, NULL );
68} 79}
@@ -107,28 +118,28 @@ void livesync_bind_mcast( ot_ip6 ip, uint16_t port) {
107} 118}
108 119
109/* Caller MUST hold g_outbuf_mutex. Returns with g_outbuf_mutex unlocked */ 120/* Caller MUST hold g_outbuf_mutex. Returns with g_outbuf_mutex unlocked */
110static void livesync_issue_peersync( ) { 121static void livesync_issue_peersync( sync_buffer *buf ) {
111 char mycopy[LIVESYNC_OUTGOING_BUFFSIZE_PEERS]; 122 char mycopy[LIVESYNC_OUTGOING_BUFFSIZE_PEERS];
112 size_t data = g_outbuf_data; 123 size_t fill = buf->fill;
113 124
114 memcpy( mycopy, g_outbuf, data ); 125 memcpy( mycopy, buf->data, fill );
115 g_outbuf_data = sizeof( g_tracker_id ) + sizeof( uint32_t ); 126 buf->fill = sizeof( g_tracker_id ) + sizeof( uint32_t );
116 g_next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY; 127 buf->next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY;
117 128
118 /* From now this thread has a local copy of the buffer and 129 /* From now this thread has a local copy of the buffer and
119 has modified the protected element */ 130 has modified the protected element */
120 pthread_mutex_unlock(&g_outbuf_mutex); 131 pthread_mutex_unlock(&g_outbuf_mutex);
121 132
122 socket_send4(g_socket_out, mycopy, data, groupip_1, LIVESYNC_PORT); 133 socket_send4(g_socket_out, mycopy, fill, groupip_1, LIVESYNC_PORT);
123} 134}
124 135
125static void livesync_handle_peersync( struct ot_workstruct *ws ) { 136static void livesync_handle_peersync( struct ot_workstruct *ws, size_t peer_size ) {
126 int off = sizeof( g_tracker_id ) + sizeof( uint32_t ); 137 size_t off = sizeof( g_tracker_id ) + sizeof( uint32_t );
127 138
128 /* Now basic sanity checks have been done on the live sync packet 139 /* Now basic sanity checks have been done on the live sync packet
129 We might add more testing and logging. */ 140 We might add more testing and logging. */
130 while( off + (ssize_t)sizeof( ot_hash ) + OT_PEER_SIZE4 <= ws->request_size ) { 141 while( (ssize_t)(off + sizeof( ot_hash ) + peer_size) <= ws->request_size ) {
131 memcpy( &ws->peer, ws->request + off + sizeof(ot_hash), OT_PEER_SIZE4 ); 142 memcpy( &ws->peer, ws->request + off + sizeof(ot_hash), peer_size );
132 ws->hash = (ot_hash*)(ws->request + off); 143 ws->hash = (ot_hash*)(ws->request + off);
133 144
134 if( !g_opentracker_running ) return; 145 if( !g_opentracker_running ) return;
@@ -138,12 +149,12 @@ static void livesync_handle_peersync( struct ot_workstruct *ws ) {
138 else 149 else
139 add_peer_to_torrent_and_return_peers( FLAG_MCA, ws, /* amount = */ 0 ); 150 add_peer_to_torrent_and_return_peers( FLAG_MCA, ws, /* amount = */ 0 );
140 151
141 off += sizeof( ot_hash ) + sizeof( ot_peer ); 152 off += sizeof( ot_hash ) + peer_size;
142 } 153 }
143 154
144 stats_issue_event(EVENT_SYNC, 0, 155 stats_issue_event(EVENT_SYNC, 0,
145 (ws->request_size - sizeof( g_tracker_id ) - sizeof( uint32_t ) ) / 156 (ws->request_size - sizeof( g_tracker_id ) - sizeof( uint32_t ) ) /
146 ((ssize_t)sizeof( ot_hash ) + OT_PEER_SIZE4)); 157 ((ssize_t)sizeof( ot_hash ) + peer_size));
147} 158}
148 159
149/* Tickle the live sync module from time to time, so no events get 160/* Tickle the live sync module from time to time, so no events get
@@ -152,24 +163,36 @@ static void livesync_handle_peersync( struct ot_workstruct *ws ) {
152void livesync_ticker( ) { 163void livesync_ticker( ) {
153 /* livesync_issue_peersync sets g_next_packet_time */ 164 /* livesync_issue_peersync sets g_next_packet_time */
154 pthread_mutex_lock(&g_outbuf_mutex); 165 pthread_mutex_lock(&g_outbuf_mutex);
155 if( g_now_seconds > g_next_packet_time && 166 if( g_now_seconds > g_v6_buf.next_packet_time &&
156 g_outbuf_data > sizeof( g_tracker_id ) + sizeof( uint32_t ) ) 167 g_v6_buf.fill > sizeof( g_tracker_id ) + sizeof( uint32_t ) )
157 livesync_issue_peersync(); 168 livesync_issue_peersync(&g_v6_buf);
169 else
170 pthread_mutex_unlock(&g_outbuf_mutex);
171
172 pthread_mutex_lock(&g_outbuf_mutex);
173 if( g_now_seconds > g_v4_buf.next_packet_time &&
174 g_v4_buf.fill > sizeof( g_tracker_id ) + sizeof( uint32_t ) )
175 livesync_issue_peersync(&g_v4_buf);
158 else 176 else
159 pthread_mutex_unlock(&g_outbuf_mutex); 177 pthread_mutex_unlock(&g_outbuf_mutex);
160} 178}
161 179
162/* Inform live sync about whats going on. */ 180/* Inform live sync about whats going on. */
163void livesync_tell( struct ot_workstruct *ws ) { 181void livesync_tell( struct ot_workstruct *ws ) {
182 size_t peer_size; /* initialized in next line */
183 ot_peer *peer_src = peer_from_peer6(&ws->peer, &peer_size);
184 sync_buffer *dest_buf = peer_size == OT_PEER_SIZE6 ? &g_v6_buf : &g_v4_buf;
185
164 pthread_mutex_lock(&g_outbuf_mutex); 186 pthread_mutex_lock(&g_outbuf_mutex);
165 187
166 memcpy( g_outbuf + g_outbuf_data, ws->hash, sizeof(ot_hash) ); 188 memcpy( dest_buf->data + dest_buf->fill, ws->hash, sizeof(ot_hash) );
167 memcpy( g_outbuf + g_outbuf_data + sizeof(ot_hash), &ws->peer, OT_PEER_SIZE4 ); 189 dest_buf->fill += sizeof(ot_hash);
168 190
169 g_outbuf_data += sizeof(ot_hash) + OT_PEER_SIZE4; 191 memcpy( dest_buf->data + dest_buf->fill, peer_src, peer_size );
192 dest_buf->fill += peer_size;
170 193
171 if( g_outbuf_data >= LIVESYNC_OUTGOING_BUFFSIZE_PEERS - LIVESYNC_OUTGOING_WATERMARK_PEERS ) 194 if( dest_buf->fill >= LIVESYNC_OUTGOING_BUFFSIZE_PEERS - LIVESYNC_OUTGOING_WATERMARK_PEERS )
172 livesync_issue_peersync(); 195 livesync_issue_peersync(dest_buf);
173 else 196 else
174 pthread_mutex_unlock(&g_outbuf_mutex); 197 pthread_mutex_unlock(&g_outbuf_mutex);
175} 198}
@@ -200,8 +223,11 @@ static void * livesync_worker( void * args ) {
200 } 223 }
201 224
202 switch( uint32_read_big( sizeof( g_tracker_id ) + (char *)ws.inbuf ) ) { 225 switch( uint32_read_big( sizeof( g_tracker_id ) + (char *)ws.inbuf ) ) {
203 case OT_SYNC_PEER: 226 case OT_SYNC_PEER6:
204 livesync_handle_peersync( &ws ); 227 livesync_handle_peersync( &ws, OT_PEER_SIZE6 );
228 break;
229 case OT_SYNC_PEER4:
230 livesync_handle_peersync( &ws, OT_PEER_SIZE4 );
205 break; 231 break;
206 default: 232 default:
207 break; 233 break;
diff --git a/ot_livesync.h b/ot_livesync.h
index d7490e5..41bfc2e 100644
--- a/ot_livesync.h
+++ b/ot_livesync.h
@@ -28,13 +28,19 @@
28 Each tracker instance accumulates announce requests until its buffer is 28 Each tracker instance accumulates announce requests until its buffer is
29 full or a timeout is reached. Then it broadcasts its live sync packer: 29 full or a timeout is reached. Then it broadcasts its live sync packer:
30 30
31 packet type SYNC_LIVE 31 packet type SYNC_LIVE4
32 [ 0x0008 0x14 info_hash 32 [ 0x0008 0x14 info_hash
33 0x001c 0x04 peer's ipv4 address 33 0x001c 0x04 peer's ipv4 address
34 0x0020 0x02 peer's port 34 0x0020 0x02 peer's port
35 0x0024 0x02 peer flags v1 ( SEEDING = 0x80, COMPLETE = 0x40, STOPPED = 0x20 ) 35 0x0024 0x02 peer flags v1 ( SEEDING = 0x80, COMPLETE = 0x40, STOPPED = 0x20 )
36 ]* 36 ]*
37 37
38 packet type SYNC_LIVE6
39 [ 0x0008 0x14 info_hash
40 0x001c 0x10 peer's ipv6 address
41 0x002c 0x02 peer's port
42 0x002e 0x02 peer flags v1 ( SEEDING = 0x80, COMPLETE = 0x40, STOPPED = 0x20 )
43 ]*
38*/ 44*/
39 45
40#ifdef WANT_SYNC_LIVE 46#ifdef WANT_SYNC_LIVE