summaryrefslogtreecommitdiff
path: root/sync_daemon.pl
diff options
context:
space:
mode:
Diffstat (limited to 'sync_daemon.pl')
-rw-r--r--sync_daemon.pl291
1 files changed, 291 insertions, 0 deletions
diff --git a/sync_daemon.pl b/sync_daemon.pl
new file mode 100644
index 0000000..5f49422
--- /dev/null
+++ b/sync_daemon.pl
@@ -0,0 +1,291 @@
1#!/usr/bin/perl
2
3use strict;
4
5#use Convert::Bencode_XS qw(:all);
6use Convert::Bencode qw(:all);
7use Data::Dumper;
8use LWP::UserAgent;
9use URI::Escape;
10
11# enable verbose output
12my $debug = 0;
13
14# tracker from where we get our sync data
15my @trackers = ('127.0.0.1:8989');
16# tracker to upload merged data
17my @client_tracker = ('127.0.0.1:8989');
18
19# time to wait between syncs
20my $sleeptime = '300';
21
22# SSL cert and key
23my $ssl_cert = 'cert.pem';
24my $ssl_key = 'key.pem';
25
26foreach(@trackers) {
27 print "Syncing from: $_\n";
28}
29foreach(@client_tracker) {
30 print "Syncing to: $_\n";
31}
32
33my $file = shift;
34
35
36# global hash for storing the merged syncs
37my %merged_syncs;
38
39while(1) {
40 %merged_syncs;
41 my @bencoded_sync_data;
42
43 # fetch the sync from every tracker and put it into an array in bencoded form
44 foreach my $tracker (@trackers) {
45 my $bencoded_sync = fetch_sync($tracker);
46# my $bencoded_sync = fetch_sync_from_file($file);
47 if($bencoded_sync ne 0 && $bencoded_sync =~ /^d4\:sync/) {
48 push(@bencoded_sync_data,$bencoded_sync);
49 }
50 }
51
52 # bdecode every sync and throw it into the merged-sync
53 foreach my $bencoded_sync (@bencoded_sync_data) {
54 print "Doing merge...\n";
55 merge_sync(bdecode($bencoded_sync));
56 my $num_torrents = keys(%merged_syncs);
57
58 print "number of torrents: $num_torrents\n";
59 }
60
61 # number of max. peers in one changeset
62 my $peer_limit = 500;
63 # max number of changesets per commit
64 my $max_changesets = 10;
65 my $hash_count = 0;
66 my $peer_count = 0;
67 my $changeset;
68 my @escaped_changesets;
69
70 # run until all hashes are put into changesets and commited to the trackers
71 while(keys(%merged_syncs) != 0) {
72
73 foreach my $hash (keys(%merged_syncs)) {
74 print "Starting new changeset\n" if($peer_count == 0 && $debug);
75 my $num_peers = keys(%{$merged_syncs{$hash}});
76
77 print "\t$peer_count peers for $hash_count hashes in changeset\n" if($debug);
78
79 my $pack_hash = pack('H*',$hash);
80
81 # as long as the peer_limit is not reached, add new hashes with peers to the changeset hash-table
82 if($peer_count < $peer_limit) {
83 print "\t\tAdd $num_peers peers for $hash changeset\n" if($debug);
84 $peer_count = $peer_count + $num_peers;
85 foreach my $peer_socket (keys(%{$merged_syncs{$hash}})) {
86
87 my $flags = $merged_syncs{$hash}{$peer_socket};
88
89 print "\t\t\tAdd $peer_socket $flags\n" if($debug);
90
91 my $pack_peer = packme($peer_socket,$flags);
92
93 $changeset->{'sync'}->{$pack_hash} = $changeset->{'sync'}->{$pack_hash}.$pack_peer;
94 }
95 $hash_count++;
96 # hash is stored in the changeset, delete it from the hash-table
97 delete $merged_syncs{$hash};
98 }
99
100 # the peer_limit is reached or we are out of torrents, so start preparing a changeset
101 if($peer_count >= $peer_limit || keys(%merged_syncs) == 0) {
102
103 print "Commit changeset for $hash_count hashes with $peer_count peers total\n" if($debug);
104
105 # bencode the changeset
106 my $enc_changeset = bencode($changeset);
107
108 # URL-escape the changeset and put into an array of changesets
109 my $foobar = uri_escape($enc_changeset);
110 push(@escaped_changesets,$foobar);
111
112 # the changeset is ready and stored, so delete it from the changeset hash-table
113 delete $changeset->{'sync'};
114
115 $hash_count = 0;
116 $peer_count = 0;
117 print "\n\n\n" if($debug);
118 }
119
120 # if enought changesets are prepared or we are out of torrents for more changesets,
121 # sync the changesets to the trackers
122 if($#escaped_changesets == $max_changesets || keys(%merged_syncs) == 0) {
123 print "\tSync...\n";
124 sync_to_tracker(\@escaped_changesets);
125 undef @escaped_changesets;
126 }
127
128 }
129 }
130
131 print "Sleeping for $sleeptime seconds\n";
132 sleep $sleeptime;
133}
134
135sub connect_tracker {
136 # connect a tracker via HTTPS, returns the body of the response
137 my $url = shift;
138
139 $ENV{HTTPS_DEBUG} = 0;
140 $ENV{HTTPS_CERT_FILE} = $ssl_cert;
141 $ENV{HTTPS_KEY_FILE} = $ssl_key;
142
143 my $ua = new LWP::UserAgent;
144 my $req = new HTTP::Request('GET', $url);
145 my $res = $ua->request($req);
146
147 my $content = $res->content;
148
149 if($res->is_success()) {
150 return $content;
151 } else {
152 print $res->code."|".$res->status_line."\n";
153 return 0;
154 }
155}
156
157sub sync_to_tracker {
158 # commit changesets to a tracker
159 my @changesets = @{(shift)};
160
161 # prepare the URI with URL-encoded changesets concatenated by a &
162 my $uri = 'sync?';
163 foreach my $set (@changesets) {
164 $uri .= 'changeset='.$set.'&';
165 }
166 my $uri_length = length($uri);
167
168 # commit the collection of changesets to the tracker via HTTPS
169 foreach my $tracker (@client_tracker) {
170 print "\t\tTracker: $tracker (URI: $uri_length)\n";
171 my $url = "https://$tracker/".$uri;
172 connect_tracker($url);
173 }
174}
175
176sub packme {
177 # pack data
178 # returns ipaddr, port and flags in packed format
179 my $peer_socket = shift;
180 my $flags = shift;
181
182 my($a,$b,$c,$d,$port) = split(/[\.,\:]/,$peer_socket);
183 my $pack_peer = pack('C4 n1 b16',$a,$b,$c,$d,$port,$flags);
184
185 return $pack_peer;
186}
187
188sub unpackme {
189 # unpack packed data
190 # returns ipaddr. in quad-form with port (a.b.c.d:port) and flags as bitfield
191 # data is packed as:
192 # - 4 byte ipaddr. (unsigned char value)
193 # - 2 byte port (unsigned short in "network" (big-endian) order)
194 # - 2 byte flags (bit string (ascending bit order inside each byte))
195 my $data = shift;
196
197 my($a, $b, $c, $d, $port, $flags) = unpack('C4 n1 b16',$data);
198 my $peer_socket = "$a\.$b\.$c\.$d\:$port";
199
200 return($peer_socket,$flags);
201}
202
203sub fetch_sync {
204 # fetch sync from a tracker
205 my $tracker = shift;
206 my $url = "https://$tracker/sync";
207
208 print "Fetching from $url\n";
209
210 my $body = connect_tracker($url);
211
212 if($body && $body =~ /^d4\:sync/) {
213 return $body;
214 } else {
215 return 0;
216 }
217}
218
219sub fetch_sync_from_file {
220 # fetch sync from a file
221 my $file = shift;
222 my $body;
223 print "Fetching from file $file\n";
224 open(FILE,"<$file");
225 while(<FILE>) {
226 $body .= $_;
227 }
228 close(FILE);
229 return $body;
230}
231
232sub merge_sync {
233 # This builds a hash table with the torrenthash as keys. The value is a hash table again with the peer-socket as keys
234 # and flags in the value
235 # Example:
236 # 60dd2beb4197f71677c0f5ba92b956f7d04651e5 =>
237 # 192.168.23.23:2323 => 0000000000000000
238 # 23.23.23.23:2342 => 0000000100000000
239 # b220b4d7136e84a88abc090db88bec8604a808f3 =>
240 # 42.23.42.23:55555 => 0000000000000000
241
242 my $hashref = shift;
243
244 my $nonuniq_hash_counter = 0;
245 my $nonuniq_peer_counter = 0;
246 my $hash_counter = 0;
247 my $peer_counter = 0;
248
249 foreach my $key (keys(%{$hashref->{'sync'}})) {
250 # start merge for every sha1-hash in the sync
251 my $hash = unpack('H*',$key);
252
253 $hash_counter++;
254 $nonuniq_hash_counter++ if exists $merged_syncs{$hash};
255
256 while(${$hashref->{'sync'}}{$key} ne "")
257 {
258 # split the value into 8-byte and unpack it for getting peer-socket and flags
259 my($peer_socket,$flags) = unpackme(substr(${$hashref->{'sync'}}{$key},0,8,''));
260
261 $peer_counter++;
262 $nonuniq_peer_counter++ if exists $merged_syncs{$hash}{$peer_socket};
263
264 # Create a hash table with sha1-hash as key and a hash table as value.
265 # The hash table in the value has the peer-socket as key and flags as value
266 # If the entry already exists, the flags are ORed together, if not it is ORed with 0
267 $merged_syncs{$hash}{$peer_socket} = $flags | $merged_syncs{$hash}{$peer_socket};
268 }
269 }
270 print "$hash_counter hashes $nonuniq_hash_counter non-uniq, $peer_counter peers $nonuniq_peer_counter non-uniq.\n";
271
272}
273
274sub test_decode {
275 my $hashref = shift;
276
277 print "CHANGESET DEBUG OUTPUT\n";
278
279 print Dumper $hashref;
280 foreach my $key (keys(%{$hashref->{'sync'}})) {
281 my $hash = unpack('H*',$key);
282
283 print "Changeset for $hash\n";
284 while(${$hashref->{'sync'}}{$key} ne "")
285 {
286
287 my($peer_socket,$flags) = unpackme(substr(${$hashref->{'sync'}}{$key},0,8,''));
288 print "\tSocket: $peer_socket Flags: $flags\n";
289 }
290 }
291}