Commits

Matt Joiner  committed 323b31d

Move and start rewriting modules.

  • Participants
  • Parent commits 03b3b88

Comments (0)

Files changed (6)

File lib/torrent.py

+from .util import pretty_bitfield
+from .trackers import AnnounceError
+
+
+class Torrent:
+
+    import logging
+    logger = logging.getLogger('torrent')
+    del logging
+
+    @property
+    def total_length(self):
+        return self.data.total_bytes
+
+    def piece_length(self, index):
+        pl = self.metainfo.raw['info']['piece length']
+        return min(self.total_length - index * pl, pl)
+
+    def piece_blocks(self, index):
+        piece_length = self.piece_length(index)
+        block_length = 1 << 14
+        for begin in range(0, piece_length, block_length):
+            yield begin, min(block_length, piece_length - begin)
+
+    @property
+    def left(self):
+        return sum(block[1] for blocks in self.wanted_blocks.values() for block in blocks)
+
+    def __init__(self, metainfo, data, socket, limiter=None, trackers=()):
+        import threading, os
+        self.closed = False
+        self.closed_cond = threading.Condition()
+        self.half_open = threading.Semaphore(20)
+        self.metainfo = metainfo
+        self.piece_count = len(self.metainfo.piece_hashes)
+        self.info_hash = metainfo.info_hash
+        self.socket = socket
+        self.peer_id = b'ET0001' + os.urandom(14)
+        self.data = data
+        self.wanted_blocks = {}
+        self.ip_address = 0
+        self.downloaded = 0
+        self.uploaded = 0
+        self.trackers = {}
+        for index in range(self.piece_count):
+            if not self.have_piece(index):
+                self.wanted_blocks[index] = set(self.piece_blocks(index))
+        self.swarm_addrs = set()
+        self.swarm_addrs_not_empty = threading.Condition()
+        self.active_peers = {} # peer_id: connection
+        self.trackers = trackers
+        self.name = metainfo.raw['info']['name'].decode()
+
+    @property
+    def port(self):
+        return self.socket.getsockname()[1]
+
+    def add_swarm_peers(self, peers):
+        if not peers:
+            return
+        with self.swarm_addrs_not_empty:
+            self.swarm_addrs.update(peers)
+            if self.swarm_addrs:
+                self.swarm_addrs_not_empty.notify_all()
+
+    def want_piece(self, index):
+        return index in self.wanted_blocks
+
+    def all_requests(self):
+        requests = multiset()
+        with self.lock:
+            for conn in self.active_peers.values():
+                requests.update(conn.our_requests)
+        return requests
+
+    def top_up_requests(self, conn):
+        request_generator = self.generate_requests(conn)
+        try:
+            while not conn.am_choked and len(conn.our_requests) < conn.peer_reqq:
+                try:
+                    request = request_generator.__next__()
+                except StopIteration:
+                    conn.send_not_interested()
+                    return
+                conn.send_interested()
+                conn.send_request(request)
+        finally:
+            request_generator.close()
+
+    def generate_requests(self, conn):
+        with self.lock:
+            all_requests = set()
+            for conn in self.active_peers.values():
+                all_requests.update(conn.our_requests)
+            for index in self.wanted_blocks.keys() & conn.peer_bitfield:
+                choices = {(index,) + block for block in self.wanted_blocks[index]}
+                choices -= conn.our_requests
+                choices -= all_requests
+                while choices:
+                    yield choices.pop()
+            for request in (all_requests - conn.our_requests):
+                if request[0] in conn.peer_bitfield:
+                    yield request
+
+    def bitfield_bytes(self):
+        bytes = bytearray((self.piece_count + 7) // 8)
+        for index in range(self.piece_count):
+            if index not in self.wanted_blocks:
+                bytes[index // 8] |= 1 << 7 - index % 8
+        return bytes
+
+    def peer_sent_info_hash(self, conn):
+        if conn.peer_info_hash != self.info_hash:
+            self.logger.warning("%s: Peer %r's info hash does not match: %s", self, conn, conn.peer_info_hash)
+            conn.close()
+
+    def routine_wrapper(routine):
+        def wrapper(self, *args, **kwds):
+            try:
+                return routine(self, *args, **kwds)
+            except:
+                self.logger.exception('%s', self)
+            finally:
+                self.logger.debug('Routine %s returned', routine)
+                self.close()
+        return wrapper
+
+    @routine_wrapper
+    def peer_routine(self, sock, addr):
+        # create and connect the socket if a socket wasn't given
+        if sock is None:
+            with self.half_open:
+                sock = socket.socket()
+                sock.settimeout(30)
+                try:
+                    sock.connect(addr)
+                except socket.error as exc:
+                    self.logger.info('Connecting to %s: %s', addr, exc)
+                    return
+                else:
+                    self.logger.info('Connected to %s', addr)
+
+        with sock:
+            sock.setblocking(True)
+            with ConnectionWrapper(sock, self) as conn:
+                #~ threading.current_thread().name = conn
+                conn.send_info_hash(self.info_hash)
+                conn.send_peer_id(self.peer_id)
+                conn.run()
+
+    def peer_sent_peer_id(self, conn):
+        # add the connection to the active peers
+        peer_id = conn.peer_peer_id
+        with self.lock:
+            if peer_id in self.active_peers:
+                self.logger.warning(
+                    '%s: Already connected to peer %s',
+                    self,
+                    peer_id)
+                assert conn is not self.active_peers[peer_id]
+                conn.close()
+            else:
+                self.active_peers[peer_id] = conn
+                conn.send_bitfield(self.bitfield_bytes())
+                if conn.peer_extensions[5] & 0x10:
+                    conn.send_extended(0, {
+                        'm': {'ut_pex': 1},
+                        'reqq': 5,
+                        'v': 'erutor-0.1',
+                        'yourip': socket.inet_aton(conn.socket.getpeername()[0]),
+                        'p': self.port})
+
+    def peer_disconnected(self, conn):
+        self.logger.debug('%s: Removing connection: %s', self, conn)
+        try:
+            peer_id = conn.peer_peer_id
+        except AttributeError:
+            return
+        with self.lock:
+            if self.active_peers.get(peer_id) is conn:
+                del self.active_peers[peer_id]
+                self.cond.notify_all()
+
+    def activate_peer(self, sock, addr):
+        gthread.spawn(self.peer_routine, sock, addr)
+        return True
+
+    @routine_wrapper
+    def connect_routine(self):
+        while True:
+            with self.swarm_addrs_not_empty:
+                while not self.swarm_addrs:
+                    self.swarm_addrs_not_empty.wait()
+                    if self.closed:
+                        return
+                addr = self.swarm_addrs.pop()
+            with self.half_open:
+                self.activate_peer(None, addr)
+
+    @routine_wrapper
+    def accept_routine(self):
+        import socket, errno
+        while True:
+            try:
+                sock, addr = self.socket.accept()
+            except socket.error as exc:
+                if exc.errno == errno.EINVAL:
+                    return
+                raise
+            self.logger.info('%s: Accepted connection from %s', self, addr)
+            self.activate_peer(sock, addr)
+
+    @staticmethod
+    def conn_selection_key_func(conn):
+        return conn.peer_interested, conn.am_interested, conn.bytes_received, not conn.am_choked
+
+    def do_stats(self):
+        num = sum(len(conn.peer_bitfield) for conn in self.active_peers.values())
+        denom = len(self.active_peers) * self.piece_count
+        try:
+            av_completion = 100 * num / denom
+        except ZeroDivisionError:
+            av_completion = 0
+        self.logger.info(
+            '%s: %d active peers (%f%% average completion), %d pending addrs',
+            self, len(self.active_peers), av_completion, len(self.swarm_addrs))
+
+    def wait_closed(self, timeout=None):
+        '''Returns True if the torrent is closed before the timeout ends.'''
+        if self.closed:
+            return True
+        with self.closed_cond:
+            if self.closed:
+                return True
+            self.closed_cond.wait(timeout)
+            return self.closed
+
+    @routine_wrapper
+    def stat_routine(self):
+        while not self.wait_closed(5):
+            self.do_stats()
+
+    @routine_wrapper
+    def choking_routine(self):
+        while True:
+            self.do_stats()
+            peers = sorted(self.active_peers.values(), key=self.conn_selection_key_func)
+            self.logger.debug(
+                '%s: Best unchoke candidates: %s',
+                self,
+                [self.conn_selection_key_func(peer) for peer in peers[-10:]])
+            for peer in peers[:-5]:
+                peer.send_choke()
+            for peer in peers[-5:]:
+                peer.send_unchoke()
+            self.wait(10)
+
+    def final_announce(self, tracker):
+        try:
+            tracker.announce(self, final=True, completed=not self.wanted_blocks)
+        except AnnounceError as exc:
+            self.logger.warning('%s: Error in final announce: %r', self, exc)
+
+    def close(self):
+        with self.closed_cond:
+            if self.closed:
+                return
+            self.closed = True
+            self.closed_cond.notify_all()
+        with self.swarm_addrs_not_empty:
+            self.swarm_addrs_not_empty.notify_all()
+        import socket
+        self.socket.shutdown(socket.SHUT_RDWR)
+
+    def completion(self):
+        return (
+            sum(1 for blocks in self.wanted_blocks.values() if blocks)
+            / self.piece_count)
+
+    def wait_for_deadline(self, deadline):
+        return self.wait(self.timeout_from_deadline(deadline))
+
+    def timeout_from_deadline(self, deadline):
+        import time
+        timeout = deadline - time.time()
+        if timeout < 0:
+            return 0
+        return timeout
+
+    @routine_wrapper
+    def tracker_routine(self, tracker):
+        import time
+        while True:
+            timeout = 60
+            while True:
+                deadline = time.time() + timeout
+                try:
+                    peers, interval = tracker.announce(self, self.timeout_from_deadline(deadline))
+                except AnnounceError as exc:
+                    self.logger.warning('%s: Error announcing %s: %r', self, tracker, exc)
+                else:
+                    break
+                self.wait_for_deadline(deadline)
+                timeout = min(timeout * 2, 3600)
+            peers = set(peers)
+            self.logger.info('Got %d peers from tracker %s', len(peers), tracker)
+            if peers:
+                self.add_swarm_peers(peers)
+            self.logger.debug('%s: Sleeping for interval %ss', tracker, interval)
+            if self.wait_closed(interval):
+                return
+
+    def run(self):
+        try:
+            self.logger.info(
+                'Missing pieces: %s',
+                pretty_bitfield(self.wanted_blocks.keys()))
+            import threading
+            for tracker in self.trackers:
+                thread = threading.Thread(target=self.tracker_routine, args=[tracker])
+                thread.daemon = True
+                thread.start()
+            for target in self.accept_routine, self.connect_routine, self.stat_routine:
+                thread = threading.Thread(target=target)
+                thread.start()
+            self.wait_closed()
+        finally:
+            self.close()
+            self.logger.info('Closed %s', self)
+
+    def piece_pread(self, index, count, offset):
+        assert index not in self.wanted_blocks, index
+        return self.data.pread(count, self.piece_length(0) * index + offset)
+
+    def piece_pwrite(self, index, piece, offset):
+        assert offset + len(piece) <= self.piece_length(index)
+        return self.data.pwrite(piece, index * self.piece_length(0) + offset)
+
+    def get_piece_data(self, index):
+        return self.data.pread(self.piece_length(index), index * self.piece_length(0))
+
+    def have_piece(self, piece_index):
+        import hashlib
+        return hashlib.sha1(self.get_piece_data(piece_index)).digest() == self.metainfo.piece_hashes[piece_index]
+
+    def __str__(self):
+        return self.name
+
+    def add_tracker(self, url):
+        if url in self.trackers:
+            self.logger.info('%s: Duplicate tracker: %s', self, url)
+            return
+        self.logger.debug('%s: Adding tracker for %s', self, url)
+        tracker = Tracker(url)
+        self.trackers[url] = tracker
+
+    def got_block(self, index, begin, data):
+        '''Return True if the block was needed, otherwise False'''
+        request = index, begin, len(data)
+        with self.lock:
+            try:
+                self.wanted_blocks[index].remove(request[1:])
+            except KeyError:
+                self.logger.warning('%s: Received unwanted block %s', self, request)
+                return False
+            else:
+                self.piece_pwrite(index, data, begin)
+                if not self.wanted_blocks[index]:
+                    # there are no more blocks needed for this piece
+                    del self.wanted_blocks[index]
+                    if self.have_piece(index):
+                        self.logger.info('%s: Downloaded piece %d', self, index)
+                        self.print_completion()
+                        for conn in self.active_peers.values():
+                            conn.send_have(index)
+                    else:
+                        self.logger.error('%s: Piece %d failed checksum', self, index)
+                        self.wanted_blocks[index] = set(self.piece_blocks(index))
+                if request[1:] not in self.wanted_blocks.get(index, ()):
+                    for conn in self.active_peers.values():
+                        conn.send_cancel(request)
+                self.downloaded += request[2]
+                return True
+
+def main():
+    import util
+    util.configure_logging()
+
+    import argparse
+    parser = argparse.ArgumentParser()
+    parser.add_argument('torrent')
+    parser.add_argument('peer_dests', nargs='+')
+    namespace = parser.parse_args()
+
+    torrents = [Torrent(namespace.torrent, 0, dest) for dest in namespace.peer_dests]
+    def close(self):
+        for t in torrents:
+            Torrent.close(t)
+    for tor in torrents:
+        tor.close = close.__get__(tor)
+        tor.trackers.clear()
+    for tor in torrents:
+        gthread.spawn(tor.run)
+    # add the previous torrent as the only peer for each torrent
+    for ti in range(len(torrents)):
+        torrents[ti].add_swarm_peers({('localhost', torrents[ti-1].port)})
+    #~ def handler(signum, frame):
+        #~ for tor in torrents:
+            #~ tor.close()
+    #~ import signal
+    #~ signal.signal(signal.SIGINT, handler)
+    #~ for thr in threads:
+        #~ thr.join()
+    gthread.switch()
+
+if __name__ == '__main__':
+    main()

File lib/trackers.py

+#!/usr/bin/env python3
+
+import errno
+import http
+import logging
+import pdb
+import random
+import signal
+import socket
+import struct
+import sys
+import threading
+import time
+import traceback
+import urllib
+import urllib.parse
+import urllib.request
+
+from . import bencode
+from .util import extract_packed_peer_addrs
+
+logger = logging.getLogger('tracker')
+
+NONE_EVENT = 0
+COMPLETED_EVENT = 1
+STARTED_EVENT = 2
+STOPPED_EVENT = 3
+
+
+class AnnounceError(Exception): pass
+
+
+class _Tracker:
+
+    logger = logger
+
+    def __init__(self, url):
+        super().__init__()
+        self.url = url
+        self.event = STARTED_EVENT
+
+    def __str__(self):
+        return self.url
+
+    # can't completed be determined by left == 0?
+    def announce(self, torrent, timeout=None, final=False, completed=False):
+        if final:
+            event = COMPLETED_EVENT if completed else STOPPED_EVENT
+        else:
+            event = self.event
+        self.logger.info('%s: Announcing %s event', self, {
+            STARTED_EVENT: 'started',
+            NONE_EVENT: 'update',
+            COMPLETED_EVENT: 'completed',
+            STOPPED_EVENT: 'stopped',}[event].upper())
+        retval = self._announce(torrent, event, timeout)
+        if self.event == STARTED_EVENT:
+            self.event = NONE_EVENT
+        return retval
+
+
+class HTTPTracker(_Tracker):
+
+    __slots__ = ()
+
+    def _announce(self, torrent, event, timeout):
+        split_url = urllib.parse.urlsplit(self.url)
+        parsed_qs = urllib.parse.parse_qsl(split_url.query)
+        parsed_qs += [
+            ('info_hash', torrent.info_hash),
+            ('peer_id', torrent.peer_id),
+            ('port', torrent.port),
+            ('uploaded', torrent.uploaded),
+            ('downloaded', torrent.downloaded),
+            ('left', torrent.left),
+            ('compact', 1),
+            ('no_peer_id', 1),
+            ('event', {
+                NONE_EVENT: '',
+                COMPLETED_EVENT: 'completed',
+                STARTED_EVENT: 'started',
+                STOPPED_EVENT: 'stopped'}[event]),]
+        split_url_list = list(split_url)
+        split_url_list[3] = urllib.parse.urlencode(parsed_qs)
+        url = urllib.parse.urlunsplit(split_url_list)
+        self.logger.debug('%s: Requesting %s', self, url)
+        try:
+            response = urllib.request.urlopen(url, timeout=timeout)
+        except Exception as exc:
+            raise AnnounceError(exc) from exc
+        self.logger.debug('%s: Response from %s:\n%s', self, response.geturl(), response.info())
+        try:
+            response_body = bencode.decode(response).__next__()
+        except Exception as exc:
+            raise AnnounceError('Error buncoding response', exc)
+        self.logger.debug('%s: Buncoded response: %s', self, response_body)
+        try:
+            failure_reason = response_body['failure reason']
+        except KeyError:
+            pass
+        else:
+            raise AnnounceError(failure_reason.decode())
+        return extract_packed_peer_addrs(response_body['peers']), response_body['interval']
+
+
+class UDPTracker(_Tracker):
+
+    CONNECT_ACTION = 0
+    ANNOUNCE_ACTION = 1
+    SCRAPE_ACTION = 2
+    ERROR_ACTION = 3
+
+    __slots__ = 'socket', 'connected', 'addr', 'transaction_id', 'connection_id'
+
+    def __init__(self, url):
+        split_url = urllib.parse.urlsplit(url)
+        host, port = split_url.netloc.split(':')
+        self.addr = host, int(port)
+        self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+        self.disconnect()
+        super().__init__(url)
+
+    def connect(self, deadline):
+        assert not self.connected
+        try:
+            self.socket.connect(self.addr)
+        except socket.error as exc:
+            raise AnnounceError('Error connecting: %s' % exc)
+        self.transaction_id = random.getrandbits(32)
+        self.logger.debug('%s: Generated transaction ID: %08x', self, self.transaction_id)
+        request = struct.pack('>QII',
+            0x41727101980,
+            self.CONNECT_ACTION,
+            self.transaction_id)
+        self.logger.debug('%s: Sending connection request', self)
+        self.send_request(request)
+        response = self.receive_response(self.CONNECT_ACTION, deadline)
+        self.connection_id = struct.unpack_from('>Q', response)[0]
+        self.logger.debug('%s: Obtained connection ID: %x', self, self.connection_id)
+        self.connected = True
+
+    def disconnect(self):
+        self.connected = False
+        self.transaction_id = None
+        self.connection_id = None
+
+    def send_request(self, data):
+        self.socket.send(data)
+
+    def timeout_from_deadline(self, deadline):
+        if deadline is None:
+            return None
+        return max(deadline - time.time(), 0)
+
+    def deadline_from_timeout(self, timeout):
+        if timeout is None:
+            return None
+        return time.time() + timeout
+
+    def receive_response(self, action, deadline):
+        while True:
+            timeout = self.timeout_from_deadline(deadline)
+            self.logger.debug('%s: Setting socket timeout: %s', self, timeout)
+            self.socket.settimeout(timeout)
+            self.logger.debug('%s: Receiving at %s', self, self.socket.getsockname())
+            try:
+                data = self.socket.recv(4096)
+            except (socket.timeout, socket.error) as exc:
+                raise AnnounceError(str(exc)) from exc
+            response_action, transaction_id = struct.unpack_from('>II', data)
+            if transaction_id != self.transaction_id:
+                self.logger.warning("%s: Transaction ID doesn't match: %08x", self, transaction_id)
+                continue
+            if response_action == self.ERROR_ACTION:
+                raise AnnounceError(data[8:])
+            if response_action != action:
+                self.logger.warning(
+                    '%s: Received response for different action: %s',
+                    self, response_action)
+                continue
+            return data[8:]
+
+    def _announce(self, torrent, event, timeout):
+        deadline = self.deadline_from_timeout(timeout)
+        try:
+            if not self.connected:
+                self.connect(deadline)
+            pack_args = [
+                self.connection_id,
+                self.ANNOUNCE_ACTION,
+                self.transaction_id,
+                torrent.info_hash,
+                torrent.peer_id,
+                torrent.downloaded,
+                torrent.left,
+                torrent.uploaded,
+                event,
+                torrent.ip_address,
+                0,
+                -1,
+                torrent.port]
+            self.logger.debug('%s: Packing announce input with %s', self, pack_args)
+            data = struct.pack('>QII20s20sQQQIIIiH', *pack_args)
+            self.logger.info('%s: Announcing %s', self, torrent)
+            self.send_request(data)
+            data = self.receive_response(self.ANNOUNCE_ACTION, deadline)
+            interval, leechers, seeders = struct.unpack_from('>III', data)
+            self.logger.info('%s: Tracker claims %d seeders and %d leechers', self, seeders, leechers)
+            return extract_packed_peer_addrs(data, 12), interval
+        except:
+            self.disconnect()
+            raise
+
+
+tracker_schemes = {
+    'udp': UDPTracker,
+    'http': HTTPTracker,
+}
+
+def Tracker(url):
+    parts = urllib.parse.urlsplit(url)
+    scheme = parts.scheme
+    if scheme not in tracker_schemes:
+        raise Exception('Tracker scheme not supported', scheme)
+    return tracker_schemes[scheme](url)
+
+def main():
+    import argparse
+
+    parser = argparse.ArgumentParser()
+    parser.add_argument('torrent')
+    parser.add_argument('url')
+    args = parser.parse_args()
+
+    logging.basicConfig(level=logging.NOTSET, stream=sys.stderr)
+    import torrent
+    torrent = torrent.Torrent(args.torrent, 17051, None)
+
+    tracker = torrent.trackers[args.url]
+    peers, interval = tracker.announce(torrent)
+    print('Interval:', interval)
+    for p in peers:
+        print(p)
+
+if __name__ == '__main__':
+    main()
+def configure_logging():
+    import logging
+    formatter = logging.Formatter("{levelname}:{name}:{message}", style='{')
+    handler = logging.StreamHandler()
+    def filter(record):
+        if record.name.split('.')[0] == 'torrent':
+            if record.levelno >= logging.INFO:
+                return True
+        if record.levelno >= logging.WARNING:
+            return True
+    handler.addFilter(filter)
+    handler.setFormatter(formatter)
+    logging.root.addHandler(handler)
+
+    logging.root.setLevel(logging.NOTSET)
+
+
+import collections
+
+# TODO: might delete this it's horribly slow
+class multiset(collections.MutableSet):
+
+    def __init__(self, iterable=None):
+        super().__init__()
+        self.__data = collections.Counter()
+        if not iterable is None:
+            for item in iterable:
+                self.add(item)
+
+    def __str__(self):
+        return 'multiset({})'.format(self.__data)
+
+    def __repr__(self):
+        return 'multiset({!r})'.format(list(self))
+
+    def __contains__(self, key):
+        return self.__data[key]
+
+    def __iter__(self):
+        for key, value in self.__data.items():
+            for i in range(value):
+                yield key
+
+    def __len__(self):
+        return sum(self.__data.values())
+
+    def add(self, item):
+        self.__data[item] += 1
+
+    def update(self, other):
+        for item in other:
+            self.add(item)
+
+    def discard(self, item):
+        if self.__data[item]:
+            self.__data[item] -= 1
+
+import socket
+import struct
+packed_peer_struct = struct.Struct('>4sH')
+
+def extract_packed_peer_addrs(data, offset=0):
+    struct = packed_peer_struct
+    while len(data) - offset >= struct.size:
+        packed_ip, port = struct.unpack_from(data, offset)
+        offset += struct.size
+        yield socket.inet_ntoa(packed_ip), port
+
+def pretty_bitfield(bitfield):
+    def ranges(bitfield=bitfield):
+        bitfield = iter(sorted(bitfield))
+        start = end = next(bitfield)
+        for index in bitfield:
+            if index == end + 1:
+                end = index
+            else:
+                yield start, end
+                start = end = index
+        yield start, end
+    return '[{}]'.format(
+        ', '.join(
+            str(s) if s == e else '{s}..{e}'.format(**vars())
+            for s, e in ranges()))

File torrent.py

-import binascii
-import bisect
-import collections
-import errno
-import hashlib
-import io
-import itertools
-import logging
-import os
-import pdb
-import random
-import select
-import signal
-import struct
-import sys
-import time
-
-import gthread.socket as socket
-import gthread
-
-from bencoding import bencode, buncode
-from peer import Connection, ConnectionClosed
-from data import SingleFileTorrentData, MultipleFileTorrentData
-from metainfo import Metainfo
-from trackers import AnnounceError, Tracker
-from util import multiset, extract_packed_peer_addrs, pretty_bitfield
-
-logger = logging.getLogger('torrent')
-
-
-class ConnectionWrapper(Connection):
-
-    __slots__ = 'torrent',
-
-    def __init__(self, socket, torrent):
-        super().__init__(socket)
-        self.torrent = torrent
-
-    def got_info_hash(self, *args, **kwargs):
-        super().got_info_hash(*args, **kwargs)
-        self.torrent.peer_sent_info_hash(self)
-
-    def got_peer_id(self, *args, **kwargs):
-        super().got_peer_id(*args, **kwargs)
-        self.torrent.peer_sent_peer_id(self)
-
-    def on_close(self):
-        self.torrent.peer_disconnected(self)
-
-    #~ CHOKE = 0
-    #~ UNCHOKE = 1
-    #~ INTERESTED = 2
-    #~ NOT_INTERESTED = 3
-    #~ HAVE = 4
-    #~ BITFIELD = 5
-    #~ REQUEST = 6
-    #~ PIECE = 7
-    #~ CANCEL = 8
-    #~ EXTENDED = 20
-
-    def got_choke(self):
-        super().got_choke()
-        self.send_choke()
-
-    def got_unchoke(self):
-        super().got_unchoke()
-        self.torrent.top_up_requests(self)
-
-    def got_interested(self):
-        super().got_interested()
-        if self.am_interested:
-            self.send_unchoke()
-
-    def got_have(self, index):
-        super().got_have(index)
-        if self.torrent.want_piece(index):
-            self.send_interested()
-            self.torrent.top_up_requests(self)
-
-    def got_bitfield(self, bitfield):
-        for index in bitfield:
-            assert index in range(self.torrent.piece_count), index
-        super().got_bitfield(bitfield)
-        self.torrent.top_up_requests(self)
-
-    def got_request(self, request):
-        super().got_request(request)
-        if not self.peer_choked:
-            self.send_piece(
-                request[0],
-                request[1],
-                self.torrent.piece_pread(request[0], request[2], request[1]))
-
-    def got_piece(self, index, begin, data):
-        super().got_piece(index, begin, data)
-        self.torrent.got_block(index, begin, data)
-        self.torrent.top_up_requests(self)
-
-    def got_extended(self, type, msg):
-        super().got_extended(type, msg)
-        if type == 1:
-            peers = list(extract_packed_peer_addrs(msg.get('added', b'')))
-            self.logger.info('%s: Got %d new peers from peer', self, len(peers))
-            self.torrent.add_swarm_peers(peers)
-        elif type != 0:
-            self.logger.error('Received extended message with invalid ID: %s', type)
-
-
-class TorrentClosed(Exception): pass
-
-class Torrent:
-
-    logger = logger
-
-    @property
-    def total_length(self):
-        return self.data.total_bytes
-
-    def piece_length(self, index):
-        return min(
-            self.total_length - index * self.metainfo['info']['piece length'],
-            self.metainfo['info']['piece length'])
-
-    def piece_blocks(self, index):
-        piece_length = self.piece_length(index)
-        block_length = 1 << 14
-        for begin in range(0, piece_length, block_length):
-            yield begin, min(block_length, piece_length - begin)
-
-    @property
-    def info_hash(self):
-        return self.metainfo.info_hash
-
-    @property
-    def running(self):
-        return not self.closed
-
-    @property
-    def closed(self):
-        return self.closed_event.is_set()
-
-    @property
-    def left(self):
-        return sum(block[1] for blocks in self.wanted_blocks.values() for block in blocks)
-
-    def __init__(self, file, port=None, destination=None, upload_rate=50<<1024):
-        import gthread
-        self.closed_event = gthread.Event()
-        self.half_open = gthread.Semaphore(20)
-
-        self.lock = gthread.RLock()
-        self.cond = gthread.Condition(self.lock)
-
-        self.upload_rate = upload_rate
-
-        # urgh, wtf happened to my interface?
-        self.metainfo = Metainfo(buncode(open(file, 'rb')).__next__())
-
-        self.piece_count = len(self.metainfo['info']['pieces']) // 20
-
-        self.logger.info('%s: Hash is %s', self, binascii.b2a_hex(self.info_hash).decode())
-        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
-        self.socket.bind(('', 0 if port is None else port))
-        self.socket.listen(50)
-        self.peer_id = b'ET0001' + os.urandom(14)
-
-        if 'length' in self.metainfo['info']:
-            self.data = SingleFileTorrentData(
-                destination,
-                self.metainfo['info']['name'].decode(),
-                self.metainfo['info']['length'])
-        else:
-            self.data = MultipleFileTorrentData(
-                destination,
-                self.metainfo['info']['name'].decode(),
-                self.metainfo['info']['files'],)
-
-        self.wanted_blocks = {}
-        for index in range(self.piece_count):
-            if not self.have_piece(index):
-                self.wanted_blocks[index] = set(self.piece_blocks(index))
-
-        self.logger.info('Missing pieces: %s', pretty_bitfield(self.wanted_blocks.keys()))
-
-        self.ip_address = 0
-        self.downloaded = 0
-        self.uploaded = 0
-
-        self.trackers = {}
-
-        self.active_peers = {} # peer_id: connection
-        self.pending_peers = set()
-        self.pending_peers_not_empty = gthread.Condition()
-
-        self.add_tracker(self.metainfo['announce'].decode())
-        for url in self.metainfo['announce-list']:
-            assert isinstance(url, list), url
-            assert len(url) == 1, url
-            self.add_tracker(url[0].decode())
-
-    @property
-    def port(self):
-        if hasattr(self, 'socket'):
-            return self.socket.getsockname()[1]
-
-    def add_swarm_peers(self, peers):
-        with self.pending_peers_not_empty:
-            self.pending_peers.update(peers)
-            self.pending_peers_not_empty.notify_all()
-
-    def want_piece(self, index):
-        return index in self.wanted_blocks
-
-    def all_requests(self):
-        requests = multiset()
-        with self.lock:
-            for conn in self.active_peers.values():
-                requests.update(conn.our_requests)
-        return requests
-
-    def top_up_requests(self, conn):
-        request_generator = self.generate_requests(conn)
-        try:
-            while not conn.am_choked and len(conn.our_requests) < conn.peer_reqq:
-                try:
-                    request = request_generator.__next__()
-                except StopIteration:
-                    conn.send_not_interested()
-                    return
-                conn.send_interested()
-                conn.send_request(request)
-        finally:
-            request_generator.close()
-
-    def generate_requests(self, conn):
-        with self.lock:
-            all_requests = set()
-            for conn in self.active_peers.values():
-                all_requests.update(conn.our_requests)
-            for index in self.wanted_blocks.keys() & conn.peer_bitfield:
-                choices = {(index,) + block for block in self.wanted_blocks[index]}
-                choices -= conn.our_requests
-                choices -= all_requests
-                while choices:
-                    yield choices.pop()
-            for request in (all_requests - conn.our_requests):
-                if request[0] in conn.peer_bitfield:
-                    yield request
-
-    def bitfield_bytes(self):
-        bytes = bytearray((self.piece_count + 7) // 8)
-        for index in range(self.piece_count):
-            if index not in self.wanted_blocks:
-                bytes[index // 8] |= 1 << 7 - index % 8
-        return bytes
-
-    def peer_sent_info_hash(self, conn):
-        if conn.peer_info_hash != self.info_hash:
-            self.logger.warning("%s: Peer %r's info hash does not match: %s", self, conn, conn.peer_info_hash)
-            conn.close()
-
-    def routine_wrapper(target):
-        def _(self, *args, **kwds):
-            try:
-                return target(self, *args, **kwds)
-            except TorrentClosed:
-                self.close()
-            except:
-                self.logger.exception('%s', self)
-                self.close()
-                raise
-            finally:
-                self.logger.debug('Routine %s returned', target)
-        return _
-
-    @routine_wrapper
-    def peer_routine(self, sock, addr):
-        # create and connect the socket if a socket wasn't given
-        if sock is None:
-            with self.half_open:
-                sock = socket.socket()
-                sock.settimeout(30)
-                try:
-                    sock.connect(addr)
-                except socket.error as exc:
-                    self.logger.info('Connecting to %s: %s', addr, exc)
-                    return
-                else:
-                    self.logger.info('Connected to %s', addr)
-
-        with sock:
-            sock.setblocking(True)
-            with ConnectionWrapper(sock, self) as conn:
-                #~ threading.current_thread().name = conn
-                conn.send_info_hash(self.info_hash)
-                conn.send_peer_id(self.peer_id)
-                conn.run()
-
-    def peer_sent_peer_id(self, conn):
-        # add the connection to the active peers
-        peer_id = conn.peer_peer_id
-        with self.lock:
-            if peer_id in self.active_peers:
-                self.logger.warning(
-                    '%s: Already connected to peer %s',
-                    self,
-                    peer_id)
-                assert conn is not self.active_peers[peer_id]
-                conn.close()
-            else:
-                self.active_peers[peer_id] = conn
-                conn.send_bitfield(self.bitfield_bytes())
-                if conn.peer_extensions[5] & 0x10:
-                    conn.send_extended(0, {
-                        'm': {'ut_pex': 1},
-                        'reqq': 5,
-                        'v': 'erutor-0.1',
-                        'yourip': socket.inet_aton(conn.socket.getpeername()[0]),
-                        'p': self.port})
-
-    def peer_disconnected(self, conn):
-        self.logger.debug('%s: Removing connection: %s', self, conn)
-        try:
-            peer_id = conn.peer_peer_id
-        except AttributeError:
-            return
-        with self.lock:
-            if self.active_peers.get(peer_id) is conn:
-                del self.active_peers[peer_id]
-                self.cond.notify_all()
-
-    def activate_peer(self, sock, addr):
-        gthread.spawn(self.peer_routine, sock, addr)
-        return True
-
-    @routine_wrapper
-    def connect_routine(self):
-        while True:
-            with self.pending_peers_not_empty:
-                while not self.pending_peers:
-                    self.pending_peers_not_empty.wait()
-                addr = self.pending_peers.pop()
-            with self.half_open:
-                self.activate_peer(None, addr)
-
-    @routine_wrapper
-    def accept_routine(self):
-        while True:
-            sock, addr = self.socket.accept()
-            self.logger.info('%s: Accepted connection from %s', self, addr)
-            self.activate_peer(sock, addr)
-
-    @staticmethod
-    def conn_selection_key_func(conn):
-        return conn.peer_interested, conn.am_interested, conn.bytes_received, not conn.am_choked
-
-    def do_stats(self):
-        with self.lock:
-            num = sum(len(conn.peer_bitfield) for conn in self.active_peers.values())
-            denom = len(self.active_peers) * self.piece_count
-        try:
-            av_completion = 100 * num / denom
-        except ZeroDivisionError:
-            av_completion = 0
-        self.logger.info(
-            '%s: %d active peers (%f%% average completion), %d pending addrs',
-            self, len(self.active_peers), av_completion, len(self.pending_peers))
-
-    @routine_wrapper
-    def stat_routine(self):
-        while not self.wait(5):
-            self.do_stats()
-
-    @routine_wrapper
-    def choking_routine(self):
-        while True:
-            self.do_stats()
-            peers = sorted(self.active_peers.values(), key=self.conn_selection_key_func)
-            self.logger.debug(
-                '%s: Best unchoke candidates: %s',
-                self,
-                [self.conn_selection_key_func(peer) for peer in peers[-10:]])
-            for peer in peers[:-5]:
-                peer.send_choke()
-            for peer in peers[-5:]:
-                peer.send_unchoke()
-            self.wait(10)
-
-    def final_announce(self, tracker):
-        try:
-            tracker.announce(self, final=True, completed=not self.wanted_blocks)
-        except AnnounceError as exc:
-            self.logger.warning('%s: Error in final announce: %r', self, exc)
-
-    def close(self):
-        self.closed_event.set()
-        self.pending_peers.put(None)
-
-    def print_completion(self):
-        count = sum(
-            index not in self.wanted_blocks \
-            for index in range(self.piece_count))
-        print(
-            '{} is {}% completed.'.format(self, 100 * count / self.piece_count),
-            file=sys.stderr)
-
-    def wait(self, timeout):
-        if self.closed_event.wait(timeout):
-            raise TorrentClosed()
-        return False
-
-    def wait_for_deadline(self, deadline):
-        return self.wait(self.timeout_from_deadline(deadline))
-
-    def timeout_from_deadline(self, deadline):
-        timeout = deadline - time.time()
-        if timeout < 0:
-            return 0
-        return timeout
-
-    @routine_wrapper
-    def tracker_routine(self, tracker):
-        while True:
-            timeout = 60
-            while True:
-                deadline = time.time() + timeout
-                try:
-                    peers, interval = tracker.announce(self, self.timeout_from_deadline(deadline))
-                except AnnounceError as exc:
-                    self.logger.warning('%s: Error announcing %s: %r', self, tracker, exc)
-                else:
-                    break
-                self.wait_for_deadline(deadline)
-                timeout = min(timeout * 2, 3600)
-            peers = list(peers)
-            self.logger.info('Got %d peers from tracker %s', len(peers), tracker)
-            self.add_swarm_peers(peers)
-            self.logger.debug('%s: Waiting for interval %ss', tracker, interval)
-            self.wait(interval)
-
-    def run(self):
-        import threading
-        for tracker in self.trackers.values():
-            threading.Thread(target=self.tracker_routine, args=[tracker]).start()
-        import gthread
-        for target in self.accept_routine, self.connect_routine, self.stat_routine:
-            gthread.spawn(target)
-
-        self.closed_event.wait()
-        with self.lock:
-            self.cond.notify_all()
-
-        for tracker in self.trackers.values():
-            threading.Thread(target=self.final_announce, args=[tracker]).start()
-
-        self.logger.info('Closed %s', self)
-
-    __call__ = run
-
-    def piece_pread(self, index, count, offset):
-        assert index not in self.wanted_blocks, index
-        return self.data.pread(count, self.piece_length(0) * index + offset)
-
-    def piece_pwrite(self, index, piece, offset):
-        assert offset + len(piece) <= self.piece_length(index)
-        return self.data.pwrite(piece, index * self.piece_length(0) + offset)
-
-    def get_piece_data(self, index):
-        return self.data.pread(self.piece_length(index), index * self.piece_length(0))
-
-    def have_piece(self, piece_index):
-        hash_offset = piece_index * 20
-        hasher = hashlib.sha1(self.get_piece_data(piece_index))
-        digest = self.metainfo['info']['pieces'][hash_offset:hash_offset+20]
-        return hasher.digest() == digest
-
-    def __str__(self):
-        return self.metainfo['info']['name'].decode()
-
-    def add_tracker(self, url):
-        if url in self.trackers:
-            self.logger.info('%s: Duplicate tracker: %s', self, url)
-            return
-        self.logger.debug('%s: Adding tracker for %s', self, url)
-        tracker = Tracker(url)
-        self.trackers[url] = tracker
-
-    def got_block(self, index, begin, data):
-        '''Return True if the block was needed, otherwise False'''
-        request = index, begin, len(data)
-        with self.lock:
-            try:
-                self.wanted_blocks[index].remove(request[1:])
-            except KeyError:
-                self.logger.warning('%s: Received unwanted block %s', self, request)
-                return False
-            else:
-                self.piece_pwrite(index, data, begin)
-                if not self.wanted_blocks[index]:
-                    # there are no more blocks needed for this piece
-                    del self.wanted_blocks[index]
-                    if self.have_piece(index):
-                        self.logger.info('%s: Downloaded piece %d', self, index)
-                        self.print_completion()
-                        for conn in self.active_peers.values():
-                            conn.send_have(index)
-                    else:
-                        self.logger.error('%s: Piece %d failed checksum', self, index)
-                        self.wanted_blocks[index] = set(self.piece_blocks(index))
-                if request[1:] not in self.wanted_blocks.get(index, ()):
-                    for conn in self.active_peers.values():
-                        conn.send_cancel(request)
-                self.downloaded += request[2]
-                return True
-
-def main():
-    import util
-    util.configure_logging()
-
-    import argparse
-    parser = argparse.ArgumentParser()
-    parser.add_argument('torrent')
-    parser.add_argument('peer_dests', nargs='+')
-    namespace = parser.parse_args()
-
-    torrents = [Torrent(namespace.torrent, 0, dest) for dest in namespace.peer_dests]
-    def close(self):
-        for t in torrents:
-            Torrent.close(t)
-    for tor in torrents:
-        tor.close = close.__get__(tor)
-        tor.trackers.clear()
-    for tor in torrents:
-        gthread.spawn(tor.run)
-    # add the previous torrent as the only peer for each torrent
-    for ti in range(len(torrents)):
-        torrents[ti].add_swarm_peers({('localhost', torrents[ti-1].port)})
-    #~ def handler(signum, frame):
-        #~ for tor in torrents:
-            #~ tor.close()
-    #~ import signal
-    #~ signal.signal(signal.SIGINT, handler)
-    #~ for thr in threads:
-        #~ thr.join()
-    gthread.switch()
-
-if __name__ == '__main__':
-    main()

File trackers.py

-#!/usr/bin/env python3
-
-import errno
-import http
-import logging
-import pdb
-import random
-import signal
-import socket
-import struct
-import sys
-import threading
-import time
-import traceback
-import urllib
-import urllib.parse
-import urllib.request
-
-import bencoding
-from bencoding import bencode, buncode
-from util import extract_packed_peer_addrs
-
-logger = logging.getLogger('tracker')
-
-NONE_EVENT = 0
-COMPLETED_EVENT = 1
-STARTED_EVENT = 2
-STOPPED_EVENT = 3
-
-
-class AnnounceError(Exception): pass
-
-
-class _Tracker:
-
-    logger = logger
-
-    def __init__(self, url):
-        super().__init__()
-        self.url = url
-        self.event = STARTED_EVENT
-
-    def __str__(self):
-        return self.url
-
-    # can't completed be determined by left == 0?
-    def announce(self, torrent, timeout=None, final=False, completed=False):
-        if final:
-            event = COMPLETED_EVENT if completed else STOPPED_EVENT
-        else:
-            event = self.event
-        self.logger.info('%s: Announcing %s event', self, {
-            STARTED_EVENT: 'started',
-            NONE_EVENT: 'update',
-            COMPLETED_EVENT: 'completed',
-            STOPPED_EVENT: 'stopped',}[event].upper())
-        retval = self._announce(torrent, event, timeout)
-        if self.event == STARTED_EVENT:
-            self.event = NONE_EVENT
-        return retval
-
-
-class HTTPTracker(_Tracker):
-
-    __slots__ = ()
-
-    def _announce(self, torrent, event, timeout):
-        split_url = urllib.parse.urlsplit(self.url)
-        parsed_qs = urllib.parse.parse_qsl(split_url.query)
-        parsed_qs += [
-            ('info_hash', torrent.info_hash),
-            ('peer_id', torrent.peer_id),
-            ('port', torrent.port),
-            ('uploaded', torrent.uploaded),
-            ('downloaded', torrent.downloaded),
-            ('left', torrent.left),
-            ('compact', 1),
-            ('no_peer_id', 1),
-            ('event', {
-                NONE_EVENT: '',
-                COMPLETED_EVENT: 'completed',
-                STARTED_EVENT: 'started',
-                STOPPED_EVENT: 'stopped'}[event]),]
-        split_url_list = list(split_url)
-        split_url_list[3] = urllib.parse.urlencode(parsed_qs)
-        url = urllib.parse.urlunsplit(split_url_list)
-        self.logger.debug('%s: Requesting %s', self, url)
-        try:
-            response = urllib.request.urlopen(url, timeout=timeout)
-        except Exception as exc:
-            raise AnnounceError(exc) from exc
-        self.logger.debug('%s: Response from %s:\n%s', self, response.geturl(), response.info())
-        try:
-            response_body = bencoding.buncode(response).__next__()
-        except Exception as exc:
-            raise AnnounceError('Error buncoding response', exc)
-        self.logger.debug('%s: Buncoded response: %s', self, response_body)
-        try:
-            failure_reason = response_body['failure reason']
-        except KeyError:
-            pass
-        else:
-            raise AnnounceError(failure_reason.decode())
-        return extract_packed_peer_addrs(response_body['peers']), response_body['interval']
-
-
-class UDPTracker(_Tracker):
-
-    CONNECT_ACTION = 0
-    ANNOUNCE_ACTION = 1
-    SCRAPE_ACTION = 2
-    ERROR_ACTION = 3
-
-    __slots__ = 'socket', 'connected', 'addr', 'transaction_id', 'connection_id'
-
-    def __init__(self, url):
-        split_url = urllib.parse.urlsplit(url)
-        host, port = split_url.netloc.split(':')
-        self.addr = host, int(port)
-        self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
-        self.disconnect()
-        super().__init__(url)
-
-    def connect(self, deadline):
-        assert not self.connected
-        try:
-            self.socket.connect(self.addr)
-        except socket.error as exc:
-            raise AnnounceError('Error connecting: %s' % exc)
-        self.transaction_id = random.getrandbits(32)
-        self.logger.debug('%s: Generated transaction ID: %08x', self, self.transaction_id)
-        request = struct.pack('>QII',
-            0x41727101980,
-            self.CONNECT_ACTION,
-            self.transaction_id)
-        self.logger.debug('%s: Sending connection request', self)
-        self.send_request(request)
-        response = self.receive_response(self.CONNECT_ACTION, deadline)
-        self.connection_id = struct.unpack_from('>Q', response)[0]
-        self.logger.debug('%s: Obtained connection ID: %x', self, self.connection_id)
-        self.connected = True
-
-    def disconnect(self):
-        self.connected = False
-        self.transaction_id = None
-        self.connection_id = None
-
-    def send_request(self, data):
-        self.socket.send(data)
-
-    def timeout_from_deadline(self, deadline):
-        if deadline is None:
-            return None
-        return max(deadline - time.time(), 0)
-
-    def deadline_from_timeout(self, timeout):
-        if timeout is None:
-            return None
-        return time.time() + timeout
-
-    def receive_response(self, action, deadline):
-        while True:
-            timeout = self.timeout_from_deadline(deadline)
-            self.logger.debug('%s: Setting socket timeout: %s', self, timeout)
-            self.socket.settimeout(timeout)
-            self.logger.debug('%s: Receiving at %s', self, self.socket.getsockname())
-            try:
-                data = self.socket.recv(4096)
-            except (socket.timeout, socket.error) as exc:
-                raise AnnounceError(str(exc)) from exc
-            response_action, transaction_id = struct.unpack_from('>II', data)
-            if transaction_id != self.transaction_id:
-                self.logger.warning("%s: Transaction ID doesn't match: %08x", self, transaction_id)
-                continue
-            if response_action == self.ERROR_ACTION:
-                raise AnnounceError(data[8:])
-            if response_action != action:
-                self.logger.warning(
-                    '%s: Received response for different action: %s',
-                    self, response_action)
-                continue
-            return data[8:]
-
-    def _announce(self, torrent, event, timeout):
-        deadline = self.deadline_from_timeout(timeout)
-        try:
-            if not self.connected:
-                self.connect(deadline)
-            pack_args = [
-                self.connection_id,
-                self.ANNOUNCE_ACTION,
-                self.transaction_id,
-                torrent.info_hash,
-                torrent.peer_id,
-                torrent.downloaded,
-                torrent.left,
-                torrent.uploaded,
-                event,
-                torrent.ip_address,
-                0,
-                -1,
-                torrent.port]
-            self.logger.debug('%s: Packing announce input with %s', self, pack_args)
-            data = struct.pack('>QII20s20sQQQIIIiH', *pack_args)
-            self.logger.info('%s: Announcing %s', self, torrent)
-            self.send_request(data)
-            data = self.receive_response(self.ANNOUNCE_ACTION, deadline)
-            interval, leechers, seeders = struct.unpack_from('>III', data)
-            self.logger.info('%s: Tracker claims %d seeders and %d leechers', self, seeders, leechers)
-            return extract_packed_peer_addrs(data, 12), interval
-        except:
-            self.disconnect()
-            raise
-
-
-tracker_schemes = {
-    'udp': UDPTracker,
-    'http': HTTPTracker,
-}
-
-def Tracker(url):
-    parts = urllib.parse.urlsplit(url)
-    scheme = parts.scheme
-    if scheme not in tracker_schemes:
-        raise Exception('Tracker scheme not supported', scheme)
-    return tracker_schemes[scheme](url)
-
-def main():
-    import argparse
-
-    parser = argparse.ArgumentParser()
-    parser.add_argument('torrent')
-    parser.add_argument('url')
-    args = parser.parse_args()
-
-    logging.basicConfig(level=logging.NOTSET, stream=sys.stderr)
-    import torrent
-    torrent = torrent.Torrent(args.torrent, 17051, None)
-
-    tracker = torrent.trackers[args.url]
-    peers, interval = tracker.announce(torrent)
-    print('Interval:', interval)
-    for p in peers:
-        print(p)
-
-if __name__ == '__main__':
-    main()

File util.py

-def configure_logging():
-    import logging
-    formatter = logging.Formatter("{levelname}:{name}:{message}", style='{')
-    handler = logging.StreamHandler()
-    def filter(record):
-        if record.name.split('.')[0] == 'torrent':
-            if record.levelno >= logging.INFO:
-                return True
-        if record.levelno >= logging.WARNING:
-            return True
-    handler.addFilter(filter)
-    handler.setFormatter(formatter)
-    logging.root.addHandler(handler)
-
-    logging.root.setLevel(logging.NOTSET)
-
-
-import collections
-
-# TODO: might delete this it's horribly slow
-class multiset(collections.MutableSet):
-
-    def __init__(self, iterable=None):
-        super().__init__()
-        self.__data = collections.Counter()
-        if not iterable is None:
-            for item in iterable:
-                self.add(item)
-
-    def __str__(self):
-        return 'multiset({})'.format(self.__data)
-
-    def __repr__(self):
-        return 'multiset({!r})'.format(list(self))
-
-    def __contains__(self, key):
-        return self.__data[key]
-
-    def __iter__(self):
-        for key, value in self.__data.items():
-            for i in range(value):
-                yield key
-
-    def __len__(self):
-        return sum(self.__data.values())
-
-    def add(self, item):
-        self.__data[item] += 1
-
-    def update(self, other):
-        for item in other:
-            self.add(item)
-
-    def discard(self, item):
-        if self.__data[item]:
-            self.__data[item] -= 1
-
-import socket
-import struct
-packed_peer_struct = struct.Struct('>4sH')
-
-def extract_packed_peer_addrs(data, offset=0):
-    struct = packed_peer_struct
-    while len(data) - offset >= struct.size:
-        packed_ip, port = struct.unpack_from(data, offset)
-        offset += struct.size
-        yield socket.inet_ntoa(packed_ip), port
-
-def pretty_bitfield(bitfield):
-    def ranges(bitfield=bitfield):
-        bitfield = iter(sorted(bitfield))
-        start = end = next(bitfield)
-        for index in bitfield:
-            if index == end + 1:
-                end = index
-            else:
-                yield start, end
-                start = end = index
-        yield start, end
-    return '[{}]'.format(
-        ', '.join(
-            str(s) if s == e else '{s}..{e}'.format(**vars())
-            for s, e in ranges()))