summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorerdgeist <>2008-10-06 02:05:53 +0000
committererdgeist <>2008-10-06 02:05:53 +0000
commit465cc2ecdf14909144debe70a4833642e11697da (patch)
tree0c386a01cccd0f8974d5fb74a895eb96b398c414
parent17724dde29c488f08338653ac6a98fb7a9fd6d22 (diff)
Live sync is now handled in its own thread. Therefore it now creates and handles its own sockets.
-rw-r--r--ot_livesync.c134
-rw-r--r--ot_livesync.h18
2 files changed, 90 insertions, 62 deletions
diff --git a/ot_livesync.c b/ot_livesync.c
index 577bb5f..92c947c 100644
--- a/ot_livesync.c
+++ b/ot_livesync.c
@@ -7,9 +7,11 @@
7#include <sys/types.h> 7#include <sys/types.h>
8#include <sys/uio.h> 8#include <sys/uio.h>
9#include <string.h> 9#include <string.h>
10#include <pthread.h>
10 11
11/* Libowfat */ 12/* Libowfat */
12#include "socket.h" 13#include "socket.h"
14#include "ndelay.h"
13 15
14/* Opentracker */ 16/* Opentracker */
15#include "trackerlogic.h" 17#include "trackerlogic.h"
@@ -17,10 +19,23 @@
17#include "ot_accesslist.h" 19#include "ot_accesslist.h"
18 20
19#ifdef WANT_SYNC_LIVE 21#ifdef WANT_SYNC_LIVE
20char groupip_1[4] = { LIVESYNC_MCASTDOMAIN_1 }; 22
23char groupip_1[4] = { 224,0,23,42 };
24
25#define LIVESYNC_BUFFINSIZE (256*256)
26#define LIVESYNC_BUFFSIZE 1504
27#define LIVESYNC_BUFFWATER (sizeof(ot_peer)+sizeof(ot_hash))
28
29#define LIVESYNC_MAXDELAY 15
30
31/* Forward declaration */
32static void * livesync_worker( void * args );
21 33
22/* For outgoing packets */ 34/* For outgoing packets */
23int64 g_livesync_socket = -1; 35static int64 g_livesync_socket_in = -1;
36
37/* For incoming packets */
38static int64 g_livesync_socket_out = -1;
24 39
25static uint8_t livesync_inbuffer[LIVESYNC_BUFFINSIZE]; 40static uint8_t livesync_inbuffer[LIVESYNC_BUFFINSIZE];
26static uint8_t livesync_outbuffer_start[ LIVESYNC_BUFFSIZE ]; 41static uint8_t livesync_outbuffer_start[ LIVESYNC_BUFFSIZE ];
@@ -28,34 +43,49 @@ static uint8_t *livesync_outbuffer_pos;
28static uint8_t *livesync_outbuffer_highwater = livesync_outbuffer_start + LIVESYNC_BUFFSIZE - LIVESYNC_BUFFWATER; 43static uint8_t *livesync_outbuffer_highwater = livesync_outbuffer_start + LIVESYNC_BUFFSIZE - LIVESYNC_BUFFWATER;
29static ot_time livesync_lastpacket_time; 44static ot_time livesync_lastpacket_time;
30 45
46static pthread_t thread_id;
31void livesync_init( ) { 47void livesync_init( ) {
32 if( g_livesync_socket == -1 ) 48 if( g_livesync_socket_in == -1 )
33 exerr( "No socket address for live sync specified." ); 49 exerr( "No socket address for live sync specified." );
34 livesync_outbuffer_pos = livesync_outbuffer_start; 50 livesync_outbuffer_pos = livesync_outbuffer_start;
35 memmove( livesync_outbuffer_pos, &g_tracker_id, sizeof( g_tracker_id ) ); 51 memmove( livesync_outbuffer_pos, &g_tracker_id, sizeof( g_tracker_id ) );
36 livesync_outbuffer_pos += sizeof( g_tracker_id ); 52 livesync_outbuffer_pos += sizeof( g_tracker_id );
37 livesync_lastpacket_time = g_now; 53 livesync_lastpacket_time = g_now;
54
55 pthread_create( &thread_id, NULL, livesync_worker, NULL );
38} 56}
39 57
40void livesync_deinit() { 58void livesync_deinit() {
41 59 pthread_cancel( thread_id );
42} 60}
43 61
44void livesync_bind_mcast( char *ip, uint16_t port) { 62void livesync_bind_mcast( char *ip, uint16_t port) {
45 char tmpip[4] = {0,0,0,0}; 63 char tmpip[4] = {0,0,0,0};
46 if( g_livesync_socket != -1 ) 64
47 exerr("Livesync listen ip specified twice."); 65 if( g_livesync_socket_in != -1 )
48 if( socket_mcjoin4( ot_try_bind(tmpip, port, FLAG_MCA ), groupip_1, ip ) ) 66 exerr("Error: Livesync listen ip specified twice.");
49 exerr("Cant join mcast group."); 67
50 g_livesync_socket = ot_try_bind( ip, port, FLAG_UDP ); 68 if( ( g_livesync_socket_in = socket_udp4( )) < 0)
51 io_dontwantread(g_livesync_socket); 69 exerr("Error: Cant create live sync incoming socket." );
52 70 ndelay_off(g_livesync_socket_in);
53 socket_mcttl4(g_livesync_socket, 1); 71
54 socket_mcloop4(g_livesync_socket, 0); 72 if( socket_bind4_reuse( g_livesync_socket_in, tmpip, port ) == -1 )
73 exerr("Error: Cant bind live sync incoming socket." );
74
75 if( socket_mcjoin4( g_livesync_socket_in, groupip_1, ip ) )
76 exerr("Error: Cant make live sync incoming socket join mcast group.");
77
78 if( ( g_livesync_socket_out = socket_udp4()) < 0)
79 exerr("Error: Cant create live sync outgoing socket." );
80 if( socket_bind4_reuse( g_livesync_socket_out, ip, port ) == -1 )
81 exerr("Error: Cant bind live sync outgoing socket." );
82
83 socket_mcttl4(g_livesync_socket_out, 1);
84 socket_mcloop4(g_livesync_socket_out, 0);
55} 85}
56 86
57static void livesync_issuepacket( ) { 87static void livesync_issuepacket( ) {
58 socket_send4(g_livesync_socket, (char*)livesync_outbuffer_start, livesync_outbuffer_pos - livesync_outbuffer_start, 88 socket_send4(g_livesync_socket_out, (char*)livesync_outbuffer_start, livesync_outbuffer_pos - livesync_outbuffer_start,
59 groupip_1, LIVESYNC_PORT); 89 groupip_1, LIVESYNC_PORT);
60 livesync_outbuffer_pos = livesync_outbuffer_start + sizeof( g_tracker_id ); 90 livesync_outbuffer_pos = livesync_outbuffer_start + sizeof( g_tracker_id );
61 livesync_lastpacket_time = g_now; 91 livesync_lastpacket_time = g_now;
@@ -81,41 +111,51 @@ void livesync_ticker( ) {
81 livesync_issuepacket(); 111 livesync_issuepacket();
82} 112}
83 113
84/* Handle an incoming live sync packet */ 114static void * livesync_worker( void * args ) {
85void handle_livesync( int64 serversocket ) {
86 uint8_t in_ip[4]; uint16_t in_port; 115 uint8_t in_ip[4]; uint16_t in_port;
87 ssize_t datalen = socket_recv4(serversocket, (char*)livesync_inbuffer, LIVESYNC_BUFFINSIZE, (char*)in_ip, &in_port); 116 ssize_t datalen;
88 int off = 4; 117 int off;
89
90 if( datalen < (ssize_t)(sizeof( g_tracker_id ) + sizeof( ot_hash ) + sizeof( ot_peer ) ) ) {
91 // TODO: log invalid sync packet
92 return;
93 }
94
95 if( !accesslist_isblessed((char*)in_ip, OT_PERMISSION_MAY_LIVESYNC)) {
96 // TODO: log invalid sync packet
97 return;
98 }
99
100 if( !memcmp( livesync_inbuffer, &g_tracker_id, sizeof( g_tracker_id ) ) ) {
101 // TODO: log packet coming from ourselves
102 return;
103 }
104
105 // Now basic sanity checks have been done on the live sync packet
106 // We might add more testing and logging.
107 while( off + (ssize_t)sizeof( ot_hash ) + (ssize_t)sizeof( ot_peer ) <= datalen ) {
108 ot_peer *peer = (ot_peer*)(livesync_inbuffer + off + sizeof(ot_hash));
109 ot_hash *hash = (ot_hash*)(livesync_inbuffer + off);
110
111 if( OT_FLAG(peer) & PEER_FLAG_STOPPED )
112 remove_peer_from_torrent(hash, peer, NULL, FLAG_MCA);
113 else
114 add_peer_to_torrent( hash, peer WANT_SYNC_PARAM(1));
115
116 off += sizeof( ot_hash ) + sizeof( ot_peer );
117 }
118 118
119 args = args;
120
121 while( 1 ) {
122 datalen = socket_recv4(g_livesync_socket_in, (char*)livesync_inbuffer, LIVESYNC_BUFFINSIZE, (char*)in_ip, &in_port);
123 off = 4;
124
125 if( datalen <= 0 )
126 continue;
127
128 if( datalen < (ssize_t)(sizeof( g_tracker_id ) + sizeof( ot_hash ) + sizeof( ot_peer ) ) ) {
129 // TODO: log invalid sync packet
130 continue;
131 }
132
133 if( !accesslist_isblessed((char*)in_ip, OT_PERMISSION_MAY_LIVESYNC)) {
134 // TODO: log invalid sync packet
135 continue;
136 }
137
138 if( !memcmp( livesync_inbuffer, &g_tracker_id, sizeof( g_tracker_id ) ) ) {
139 // TODO: log packet coming from ourselves
140 continue;
141 }
142
143 // Now basic sanity checks have been done on the live sync packet
144 // We might add more testing and logging.
145 while( off + (ssize_t)sizeof( ot_hash ) + (ssize_t)sizeof( ot_peer ) <= datalen ) {
146 ot_peer *peer = (ot_peer*)(livesync_inbuffer + off + sizeof(ot_hash));
147 ot_hash *hash = (ot_hash*)(livesync_inbuffer + off);
148
149 if( OT_FLAG(peer) & PEER_FLAG_STOPPED )
150 remove_peer_from_torrent(hash, peer, NULL, FLAG_MCA);
151 else
152 add_peer_to_torrent( hash, peer WANT_SYNC_PARAM(1));
153
154 off += sizeof( ot_hash ) + sizeof( ot_peer );
155 }
156 }
157 /* Never returns. */
158 return NULL;
119} 159}
120 160
121#endif 161#endif
diff --git a/ot_livesync.h b/ot_livesync.h
index 4dc6b60..27070d6 100644
--- a/ot_livesync.h
+++ b/ot_livesync.h
@@ -10,14 +10,14 @@
10#include "trackerlogic.h" 10#include "trackerlogic.h"
11 11
12/* 12/*
13 Syncing is done as udp packets in the multicast domain 224.23.42.N port 9696 13 Syncing is done as udp packets in the multicast domain 224.0.42.N port 9696
14 14
15 Each tracker should join the multicast group and send its live sync packets 15 Each tracker should join the multicast group and send its live sync packets
16 to that group, using a ttl of 1 16 to that group, using a ttl of 1
17 17
18 Format of a live sync packet is straight forward and depends on N: 18 Format of a live sync packet is straight forward and depends on N:
19 19
20 For N == 1: (simple tracker2tracker sync) 20 For N == 23: (simple tracker2tracker sync)
21 0x0000 0x04 id of tracker instance 21 0x0000 0x04 id of tracker instance
22 [ 0x0004 0x14 info_hash 22 [ 0x0004 0x14 info_hash
23 0x0018 0x04 peer's ipv4 address 23 0x0018 0x04 peer's ipv4 address
@@ -25,7 +25,7 @@
25 0x0020 0x02 peer flags v1 ( SEEDING = 0x80, COMPLETE = 0x40, STOPPED = 0x20 ) 25 0x0020 0x02 peer flags v1 ( SEEDING = 0x80, COMPLETE = 0x40, STOPPED = 0x20 )
26 ]* 26 ]*
27 27
28 For N == 2: (aggregator syncs) 28 For N == 24: (aggregator syncs)
29 0x0000 0x04 id of tracker instance 29 0x0000 0x04 id of tracker instance
30 [ 0x0004 0x14 info_hash 30 [ 0x0004 0x14 info_hash
31 0x0018 0x01 number of peers 31 0x0018 0x01 number of peers
@@ -41,18 +41,6 @@
41#ifdef WANT_SYNC_LIVE 41#ifdef WANT_SYNC_LIVE
42 42
43#define LIVESYNC_PORT 9696 43#define LIVESYNC_PORT 9696
44#define LIVESYNC_MCASTDOMAIN_1 224,23,42,1
45#define LIVESYNC_MCASTDOMAIN_2 224,23,42,2
46extern char groupip_1[4];
47extern char groupip_2[4];
48
49extern int64 g_livesync_socket;
50
51#define LIVESYNC_BUFFINSIZE (256*256)
52#define LIVESYNC_BUFFSIZE 1504
53#define LIVESYNC_BUFFWATER (sizeof(ot_peer)+sizeof(ot_hash))
54
55#define LIVESYNC_MAXDELAY 15
56 44
57void livesync_init(); 45void livesync_init();
58void livesync_deinit(); 46void livesync_deinit();