diff options
-rw-r--r-- | sync_daemon.pl | 291 |
1 files changed, 291 insertions, 0 deletions
diff --git a/sync_daemon.pl b/sync_daemon.pl new file mode 100644 index 0000000..5f49422 --- /dev/null +++ b/sync_daemon.pl | |||
@@ -0,0 +1,291 @@ | |||
1 | #!/usr/bin/perl | ||
2 | |||
3 | use strict; | ||
4 | |||
5 | #use Convert::Bencode_XS qw(:all); | ||
6 | use Convert::Bencode qw(:all); | ||
7 | use Data::Dumper; | ||
8 | use LWP::UserAgent; | ||
9 | use URI::Escape; | ||
10 | |||
11 | # enable verbose output | ||
12 | my $debug = 0; | ||
13 | |||
14 | # tracker from where we get our sync data | ||
15 | my @trackers = ('127.0.0.1:8989'); | ||
16 | # tracker to upload merged data | ||
17 | my @client_tracker = ('127.0.0.1:8989'); | ||
18 | |||
19 | # time to wait between syncs | ||
20 | my $sleeptime = '300'; | ||
21 | |||
22 | # SSL cert and key | ||
23 | my $ssl_cert = 'cert.pem'; | ||
24 | my $ssl_key = 'key.pem'; | ||
25 | |||
26 | foreach(@trackers) { | ||
27 | print "Syncing from: $_\n"; | ||
28 | } | ||
29 | foreach(@client_tracker) { | ||
30 | print "Syncing to: $_\n"; | ||
31 | } | ||
32 | |||
33 | my $file = shift; | ||
34 | |||
35 | |||
36 | # global hash for storing the merged syncs | ||
37 | my %merged_syncs; | ||
38 | |||
39 | while(1) { | ||
40 | %merged_syncs; | ||
41 | my @bencoded_sync_data; | ||
42 | |||
43 | # fetch the sync from every tracker and put it into an array in bencoded form | ||
44 | foreach my $tracker (@trackers) { | ||
45 | my $bencoded_sync = fetch_sync($tracker); | ||
46 | # my $bencoded_sync = fetch_sync_from_file($file); | ||
47 | if($bencoded_sync ne 0 && $bencoded_sync =~ /^d4\:sync/) { | ||
48 | push(@bencoded_sync_data,$bencoded_sync); | ||
49 | } | ||
50 | } | ||
51 | |||
52 | # bdecode every sync and throw it into the merged-sync | ||
53 | foreach my $bencoded_sync (@bencoded_sync_data) { | ||
54 | print "Doing merge...\n"; | ||
55 | merge_sync(bdecode($bencoded_sync)); | ||
56 | my $num_torrents = keys(%merged_syncs); | ||
57 | |||
58 | print "number of torrents: $num_torrents\n"; | ||
59 | } | ||
60 | |||
61 | # number of max. peers in one changeset | ||
62 | my $peer_limit = 500; | ||
63 | # max number of changesets per commit | ||
64 | my $max_changesets = 10; | ||
65 | my $hash_count = 0; | ||
66 | my $peer_count = 0; | ||
67 | my $changeset; | ||
68 | my @escaped_changesets; | ||
69 | |||
70 | # run until all hashes are put into changesets and commited to the trackers | ||
71 | while(keys(%merged_syncs) != 0) { | ||
72 | |||
73 | foreach my $hash (keys(%merged_syncs)) { | ||
74 | print "Starting new changeset\n" if($peer_count == 0 && $debug); | ||
75 | my $num_peers = keys(%{$merged_syncs{$hash}}); | ||
76 | |||
77 | print "\t$peer_count peers for $hash_count hashes in changeset\n" if($debug); | ||
78 | |||
79 | my $pack_hash = pack('H*',$hash); | ||
80 | |||
81 | # as long as the peer_limit is not reached, add new hashes with peers to the changeset hash-table | ||
82 | if($peer_count < $peer_limit) { | ||
83 | print "\t\tAdd $num_peers peers for $hash changeset\n" if($debug); | ||
84 | $peer_count = $peer_count + $num_peers; | ||
85 | foreach my $peer_socket (keys(%{$merged_syncs{$hash}})) { | ||
86 | |||
87 | my $flags = $merged_syncs{$hash}{$peer_socket}; | ||
88 | |||
89 | print "\t\t\tAdd $peer_socket $flags\n" if($debug); | ||
90 | |||
91 | my $pack_peer = packme($peer_socket,$flags); | ||
92 | |||
93 | $changeset->{'sync'}->{$pack_hash} = $changeset->{'sync'}->{$pack_hash}.$pack_peer; | ||
94 | } | ||
95 | $hash_count++; | ||
96 | # hash is stored in the changeset, delete it from the hash-table | ||
97 | delete $merged_syncs{$hash}; | ||
98 | } | ||
99 | |||
100 | # the peer_limit is reached or we are out of torrents, so start preparing a changeset | ||
101 | if($peer_count >= $peer_limit || keys(%merged_syncs) == 0) { | ||
102 | |||
103 | print "Commit changeset for $hash_count hashes with $peer_count peers total\n" if($debug); | ||
104 | |||
105 | # bencode the changeset | ||
106 | my $enc_changeset = bencode($changeset); | ||
107 | |||
108 | # URL-escape the changeset and put into an array of changesets | ||
109 | my $foobar = uri_escape($enc_changeset); | ||
110 | push(@escaped_changesets,$foobar); | ||
111 | |||
112 | # the changeset is ready and stored, so delete it from the changeset hash-table | ||
113 | delete $changeset->{'sync'}; | ||
114 | |||
115 | $hash_count = 0; | ||
116 | $peer_count = 0; | ||
117 | print "\n\n\n" if($debug); | ||
118 | } | ||
119 | |||
120 | # if enought changesets are prepared or we are out of torrents for more changesets, | ||
121 | # sync the changesets to the trackers | ||
122 | if($#escaped_changesets == $max_changesets || keys(%merged_syncs) == 0) { | ||
123 | print "\tSync...\n"; | ||
124 | sync_to_tracker(\@escaped_changesets); | ||
125 | undef @escaped_changesets; | ||
126 | } | ||
127 | |||
128 | } | ||
129 | } | ||
130 | |||
131 | print "Sleeping for $sleeptime seconds\n"; | ||
132 | sleep $sleeptime; | ||
133 | } | ||
134 | |||
135 | sub connect_tracker { | ||
136 | # connect a tracker via HTTPS, returns the body of the response | ||
137 | my $url = shift; | ||
138 | |||
139 | $ENV{HTTPS_DEBUG} = 0; | ||
140 | $ENV{HTTPS_CERT_FILE} = $ssl_cert; | ||
141 | $ENV{HTTPS_KEY_FILE} = $ssl_key; | ||
142 | |||
143 | my $ua = new LWP::UserAgent; | ||
144 | my $req = new HTTP::Request('GET', $url); | ||
145 | my $res = $ua->request($req); | ||
146 | |||
147 | my $content = $res->content; | ||
148 | |||
149 | if($res->is_success()) { | ||
150 | return $content; | ||
151 | } else { | ||
152 | print $res->code."|".$res->status_line."\n"; | ||
153 | return 0; | ||
154 | } | ||
155 | } | ||
156 | |||
157 | sub sync_to_tracker { | ||
158 | # commit changesets to a tracker | ||
159 | my @changesets = @{(shift)}; | ||
160 | |||
161 | # prepare the URI with URL-encoded changesets concatenated by a & | ||
162 | my $uri = 'sync?'; | ||
163 | foreach my $set (@changesets) { | ||
164 | $uri .= 'changeset='.$set.'&'; | ||
165 | } | ||
166 | my $uri_length = length($uri); | ||
167 | |||
168 | # commit the collection of changesets to the tracker via HTTPS | ||
169 | foreach my $tracker (@client_tracker) { | ||
170 | print "\t\tTracker: $tracker (URI: $uri_length)\n"; | ||
171 | my $url = "https://$tracker/".$uri; | ||
172 | connect_tracker($url); | ||
173 | } | ||
174 | } | ||
175 | |||
176 | sub packme { | ||
177 | # pack data | ||
178 | # returns ipaddr, port and flags in packed format | ||
179 | my $peer_socket = shift; | ||
180 | my $flags = shift; | ||
181 | |||
182 | my($a,$b,$c,$d,$port) = split(/[\.,\:]/,$peer_socket); | ||
183 | my $pack_peer = pack('C4 n1 b16',$a,$b,$c,$d,$port,$flags); | ||
184 | |||
185 | return $pack_peer; | ||
186 | } | ||
187 | |||
188 | sub unpackme { | ||
189 | # unpack packed data | ||
190 | # returns ipaddr. in quad-form with port (a.b.c.d:port) and flags as bitfield | ||
191 | # data is packed as: | ||
192 | # - 4 byte ipaddr. (unsigned char value) | ||
193 | # - 2 byte port (unsigned short in "network" (big-endian) order) | ||
194 | # - 2 byte flags (bit string (ascending bit order inside each byte)) | ||
195 | my $data = shift; | ||
196 | |||
197 | my($a, $b, $c, $d, $port, $flags) = unpack('C4 n1 b16',$data); | ||
198 | my $peer_socket = "$a\.$b\.$c\.$d\:$port"; | ||
199 | |||
200 | return($peer_socket,$flags); | ||
201 | } | ||
202 | |||
203 | sub fetch_sync { | ||
204 | # fetch sync from a tracker | ||
205 | my $tracker = shift; | ||
206 | my $url = "https://$tracker/sync"; | ||
207 | |||
208 | print "Fetching from $url\n"; | ||
209 | |||
210 | my $body = connect_tracker($url); | ||
211 | |||
212 | if($body && $body =~ /^d4\:sync/) { | ||
213 | return $body; | ||
214 | } else { | ||
215 | return 0; | ||
216 | } | ||
217 | } | ||
218 | |||
219 | sub fetch_sync_from_file { | ||
220 | # fetch sync from a file | ||
221 | my $file = shift; | ||
222 | my $body; | ||
223 | print "Fetching from file $file\n"; | ||
224 | open(FILE,"<$file"); | ||
225 | while(<FILE>) { | ||
226 | $body .= $_; | ||
227 | } | ||
228 | close(FILE); | ||
229 | return $body; | ||
230 | } | ||
231 | |||
232 | sub merge_sync { | ||
233 | # This builds a hash table with the torrenthash as keys. The value is a hash table again with the peer-socket as keys | ||
234 | # and flags in the value | ||
235 | # Example: | ||
236 | # 60dd2beb4197f71677c0f5ba92b956f7d04651e5 => | ||
237 | # 192.168.23.23:2323 => 0000000000000000 | ||
238 | # 23.23.23.23:2342 => 0000000100000000 | ||
239 | # b220b4d7136e84a88abc090db88bec8604a808f3 => | ||
240 | # 42.23.42.23:55555 => 0000000000000000 | ||
241 | |||
242 | my $hashref = shift; | ||
243 | |||
244 | my $nonuniq_hash_counter = 0; | ||
245 | my $nonuniq_peer_counter = 0; | ||
246 | my $hash_counter = 0; | ||
247 | my $peer_counter = 0; | ||
248 | |||
249 | foreach my $key (keys(%{$hashref->{'sync'}})) { | ||
250 | # start merge for every sha1-hash in the sync | ||
251 | my $hash = unpack('H*',$key); | ||
252 | |||
253 | $hash_counter++; | ||
254 | $nonuniq_hash_counter++ if exists $merged_syncs{$hash}; | ||
255 | |||
256 | while(${$hashref->{'sync'}}{$key} ne "") | ||
257 | { | ||
258 | # split the value into 8-byte and unpack it for getting peer-socket and flags | ||
259 | my($peer_socket,$flags) = unpackme(substr(${$hashref->{'sync'}}{$key},0,8,'')); | ||
260 | |||
261 | $peer_counter++; | ||
262 | $nonuniq_peer_counter++ if exists $merged_syncs{$hash}{$peer_socket}; | ||
263 | |||
264 | # Create a hash table with sha1-hash as key and a hash table as value. | ||
265 | # The hash table in the value has the peer-socket as key and flags as value | ||
266 | # If the entry already exists, the flags are ORed together, if not it is ORed with 0 | ||
267 | $merged_syncs{$hash}{$peer_socket} = $flags | $merged_syncs{$hash}{$peer_socket}; | ||
268 | } | ||
269 | } | ||
270 | print "$hash_counter hashes $nonuniq_hash_counter non-uniq, $peer_counter peers $nonuniq_peer_counter non-uniq.\n"; | ||
271 | |||
272 | } | ||
273 | |||
274 | sub test_decode { | ||
275 | my $hashref = shift; | ||
276 | |||
277 | print "CHANGESET DEBUG OUTPUT\n"; | ||
278 | |||
279 | print Dumper $hashref; | ||
280 | foreach my $key (keys(%{$hashref->{'sync'}})) { | ||
281 | my $hash = unpack('H*',$key); | ||
282 | |||
283 | print "Changeset for $hash\n"; | ||
284 | while(${$hashref->{'sync'}}{$key} ne "") | ||
285 | { | ||
286 | |||
287 | my($peer_socket,$flags) = unpackme(substr(${$hashref->{'sync'}}{$key},0,8,'')); | ||
288 | print "\tSocket: $peer_socket Flags: $flags\n"; | ||
289 | } | ||
290 | } | ||
291 | } | ||