diff options
Diffstat (limited to 'ot_livesync.c')
-rw-r--r-- | ot_livesync.c | 27 |
1 files changed, 22 insertions, 5 deletions
diff --git a/ot_livesync.c b/ot_livesync.c index 87fe5cf..cded0f7 100644 --- a/ot_livesync.c +++ b/ot_livesync.c | |||
@@ -46,13 +46,14 @@ static int64 g_socket_in = -1; | |||
46 | /* For incoming packets */ | 46 | /* For incoming packets */ |
47 | static int64 g_socket_out = -1; | 47 | static int64 g_socket_out = -1; |
48 | 48 | ||
49 | static pthread_mutex_t g_outbuf_mutex = PTHREAD_MUTEX_INITIALIZER; | ||
49 | char g_outbuf[LIVESYNC_OUTGOING_BUFFSIZE_PEERS]; | 50 | char g_outbuf[LIVESYNC_OUTGOING_BUFFSIZE_PEERS]; |
50 | static size_t g_outbuf_data; | 51 | static size_t g_outbuf_data; |
51 | static ot_time g_next_packet_time; | 52 | static ot_time g_next_packet_time; |
52 | 53 | ||
53 | static pthread_t thread_id; | 54 | static pthread_t thread_id; |
54 | void livesync_init( ) { | 55 | void livesync_init( ) { |
55 | 56 | ||
56 | if( g_socket_in == -1 ) | 57 | if( g_socket_in == -1 ) |
57 | exerr( "No socket address for live sync specified." ); | 58 | exerr( "No socket address for live sync specified." ); |
58 | 59 | ||
@@ -62,7 +63,7 @@ void livesync_init( ) { | |||
62 | g_outbuf_data = sizeof( g_tracker_id ) + sizeof( uint32_t ); | 63 | g_outbuf_data = sizeof( g_tracker_id ) + sizeof( uint32_t ); |
63 | 64 | ||
64 | g_next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY; | 65 | g_next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY; |
65 | 66 | ||
66 | pthread_create( &thread_id, NULL, livesync_worker, NULL ); | 67 | pthread_create( &thread_id, NULL, livesync_worker, NULL ); |
67 | } | 68 | } |
68 | 69 | ||
@@ -105,10 +106,20 @@ void livesync_bind_mcast( ot_ip6 ip, uint16_t port) { | |||
105 | socket_mcloop4(g_socket_out, 0); | 106 | socket_mcloop4(g_socket_out, 0); |
106 | } | 107 | } |
107 | 108 | ||
109 | /* Caller MUST hold g_outbuf_mutex. Returns with g_outbuf_mutex unlocked */ | ||
108 | static void livesync_issue_peersync( ) { | 110 | static void livesync_issue_peersync( ) { |
109 | socket_send4(g_socket_out, g_outbuf, g_outbuf_data, groupip_1, LIVESYNC_PORT); | 111 | char mycopy[LIVESYNC_OUTGOING_BUFFSIZE_PEERS]; |
112 | size_t data = g_outbuf_data; | ||
113 | |||
114 | memcpy( mycopy, g_outbuf, data ); | ||
110 | g_outbuf_data = sizeof( g_tracker_id ) + sizeof( uint32_t ); | 115 | g_outbuf_data = sizeof( g_tracker_id ) + sizeof( uint32_t ); |
111 | g_next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY; | 116 | g_next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY; |
117 | |||
118 | /* From now this thread has a local copy of the buffer and | ||
119 | has modified the protected element */ | ||
120 | pthread_mutex_unlock(&g_outbuf_mutex); | ||
121 | |||
122 | socket_send4(g_socket_out, mycopy, data, groupip_1, LIVESYNC_PORT); | ||
112 | } | 123 | } |
113 | 124 | ||
114 | static void livesync_handle_peersync( struct ot_workstruct *ws ) { | 125 | static void livesync_handle_peersync( struct ot_workstruct *ws ) { |
@@ -140,13 +151,17 @@ static void livesync_handle_peersync( struct ot_workstruct *ws ) { | |||
140 | enough */ | 151 | enough */ |
141 | void livesync_ticker( ) { | 152 | void livesync_ticker( ) { |
142 | /* livesync_issue_peersync sets g_next_packet_time */ | 153 | /* livesync_issue_peersync sets g_next_packet_time */ |
154 | pthread_mutex_lock(&g_outbuf_mutex); | ||
143 | if( g_now_seconds > g_next_packet_time && | 155 | if( g_now_seconds > g_next_packet_time && |
144 | g_outbuf_data > sizeof( g_tracker_id ) + sizeof( uint32_t ) ) | 156 | g_outbuf_data > sizeof( g_tracker_id ) + sizeof( uint32_t ) ) |
145 | livesync_issue_peersync(); | 157 | livesync_issue_peersync(); |
158 | else | ||
159 | pthread_mutex_unlock(&g_outbuf_mutex); | ||
146 | } | 160 | } |
147 | 161 | ||
148 | /* Inform live sync about whats going on. */ | 162 | /* Inform live sync about whats going on. */ |
149 | void livesync_tell( struct ot_workstruct *ws ) { | 163 | void livesync_tell( struct ot_workstruct *ws ) { |
164 | pthread_mutex_lock(&g_outbuf_mutex); | ||
150 | 165 | ||
151 | memcpy( g_outbuf + g_outbuf_data, ws->hash, sizeof(ot_hash) ); | 166 | memcpy( g_outbuf + g_outbuf_data, ws->hash, sizeof(ot_hash) ); |
152 | memcpy( g_outbuf + g_outbuf_data + sizeof(ot_hash), &ws->peer, sizeof(ot_peer) ); | 167 | memcpy( g_outbuf + g_outbuf_data + sizeof(ot_hash), &ws->peer, sizeof(ot_peer) ); |
@@ -155,6 +170,8 @@ void livesync_tell( struct ot_workstruct *ws ) { | |||
155 | 170 | ||
156 | if( g_outbuf_data >= LIVESYNC_OUTGOING_BUFFSIZE_PEERS - LIVESYNC_OUTGOING_WATERMARK_PEERS ) | 171 | if( g_outbuf_data >= LIVESYNC_OUTGOING_BUFFSIZE_PEERS - LIVESYNC_OUTGOING_WATERMARK_PEERS ) |
157 | livesync_issue_peersync(); | 172 | livesync_issue_peersync(); |
173 | else | ||
174 | pthread_mutex_unlock(&g_outbuf_mutex); | ||
158 | } | 175 | } |
159 | 176 | ||
160 | static void * livesync_worker( void * args ) { | 177 | static void * livesync_worker( void * args ) { |
@@ -162,11 +179,11 @@ static void * livesync_worker( void * args ) { | |||
162 | ot_ip6 in_ip; uint16_t in_port; | 179 | ot_ip6 in_ip; uint16_t in_port; |
163 | 180 | ||
164 | (void)args; | 181 | (void)args; |
165 | 182 | ||
166 | /* Initialize our "thread local storage" */ | 183 | /* Initialize our "thread local storage" */ |
167 | ws.inbuf = ws.request = malloc( LIVESYNC_INCOMING_BUFFSIZE ); | 184 | ws.inbuf = ws.request = malloc( LIVESYNC_INCOMING_BUFFSIZE ); |
168 | ws.outbuf = ws.reply = 0; | 185 | ws.outbuf = ws.reply = 0; |
169 | 186 | ||
170 | memcpy( in_ip, V4mappedprefix, sizeof( V4mappedprefix ) ); | 187 | memcpy( in_ip, V4mappedprefix, sizeof( V4mappedprefix ) ); |
171 | 188 | ||
172 | while( 1 ) { | 189 | while( 1 ) { |