Commits

Matt Joiner committed 03b3b88

Move peer to lib.peer.

Comments (0)

Files changed (2)

+import errno
+import logging
+import pdb
+import random
+import struct
+
+from gthread import queue
+import gthread.socket as socket
+import gthread
+
+from bencoding import buncode, bencode
+from util import pretty_bitfield
+
+
+class ConnectionClosed(Exception): pass
+
+class Connection:
+
+    CHOKE = 0
+    UNCHOKE = 1
+    INTERESTED = 2
+    NOT_INTERESTED = 3
+    HAVE = 4
+    BITFIELD = 5
+    REQUEST = 6
+    PIECE = 7
+    CANCEL = 8
+    EXTENDED = 20
+
+    __slots__ = (
+        'logger', 'socket', 'peer_bitfield',
+        'last_recv_time', 'last_send_time',
+        'peer_requests', 'our_requests',
+        'peer_choked', 'am_choked',
+        'peer_interested', 'am_interested',
+        'bytes_sent', 'bytes_received',
+        'out_queue', 'in_buffer', 'send_routine_exc',
+        'peer_extensions', 'peer_info_hash', 'peer_peer_id',
+        'peer_extended_protocols', 'peer_reqq')
+
+    protocol = b'\x13BitTorrent protocol'
+    our_extensions = b'\0\0\0\0\0\x10\0\0'
+
+    def __init__(self, sock):
+        self.logger = logging.getLogger('peer.{:x}'.format(id(self)))
+        self.logger.addHandler(logging.FileHandler('log/peer.{:x}'.format(id(self))))
+        self.logger.setLevel(1)
+        self.socket = sock
+        self.peer_bitfield = set()
+        self.last_recv_time = None
+        self.last_send_time = None
+        self.peer_requests = set()
+        self.our_requests = set()
+        self.peer_choked = True
+        self.am_choked = True
+        self.peer_interested = False
+        self.am_interested = False
+        self.bytes_sent = 0
+        self.bytes_received = 0
+        self.out_queue = queue.Queue()
+        self.send(self.protocol + self.our_extensions)
+        self.in_buffer = b''
+        self.send_routine_exc = None
+        import collections
+        self.peer_extended_protocols = collections.defaultdict(int)
+        self.peer_reqq = 20
+        try:
+            localaddr = sock.getsockname()
+            peeraddr = sock.getpeername()
+        except socket.error as exc:
+            self.logger.warning('Error getting socket endpoints: %s', exc)
+        else:
+            self.logger.info('New connection: %s-%s', localaddr, peeraddr)
+
+    def close(self):
+        self.socket.close()
+        self.out_queue.put(None)
+        self.on_close()
+
+    def __del__(self):
+        self.logger.debug('Deleting %r', self)
+        handlers = self.logger.handlers
+        assert len(handlers) == 1, handlers
+        for h in handlers:
+            self.logger.removeHandler(h)
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, *args):
+        self.close()
+
+    def send(self, data):
+        self.out_queue.put(data)
+
+    def send_routine(self):
+        with self:
+            while True:
+                data = self.out_queue.get()
+                if data is None:
+                    return
+                try:
+                    self.socket.sendall(data)
+                except socket.error as exc:
+                    if exc.errno in {errno.EPIPE, errno.ECONNRESET}:
+                        return
+                self.logger.debug('Sent %r bytes', len(data))
+
+    def recv(self, count):
+        data = b''
+        while len(data) < count:
+            try:
+                data1 = self.socket.recv(count - len(data))
+            except socket.error as exc:
+                # TODO ECONNRESET, EAGAIN and EWOULDBLOCK might want special treatment here
+                raise ConnectionClosed(exc)
+            if not data1:
+                raise ConnectionClosed()
+            data += data1
+        self.logger.debug('Received %d bytes', len(data))
+        return data
+
+    def recv_routine(self):
+        import functools
+        self.recv_protocol()
+        self.recv_extensions()
+        self.recv_info_hash()
+        self.recv_peer_id()
+        while True:
+            length, = struct.unpack('>I', self.recv(4))
+            if length == 0:
+                self.logger.info('Received keep-alive')
+                continue
+            self.logger.debug('Next message length: %s', length)
+            type = self.recv(1)[0]
+            handler = {
+                self.CHOKE: self.recv_choke,
+                self.UNCHOKE: self.recv_unchoke,
+                self.INTERESTED: self.recv_interested,
+                self.NOT_INTERESTED: self.recv_not_interested,
+                self.HAVE: self.recv_have,
+                self.BITFIELD: self.recv_bitfield,
+                self.REQUEST: self.recv_request,
+                self.PIECE: self.recv_piece,
+                self.CANCEL: self.recv_cancel,
+                self.EXTENDED: self.recv_extended,
+            }.get(type, functools.partial(self.recv_unknown, type))
+            handler(length - 1)
+
+    def run(self):
+        try:
+            gthread.spawn(self.send_routine)
+            self.recv_routine()
+        except ConnectionClosed as exc:
+            if exc.args:
+                self.logger.error('%s: Connection terminated: %s', self, exc)
+        finally:
+            self.close()
+            if self.send_routine_exc:
+                raise self.send_routine_exc
+
+
+    def recv_protocol(self):
+        self.got_protocol(self.recv(20))
+
+    def got_protocol(self, protocol):
+        if protocol != self.protocol:
+            raise ConnectionClosed('Unknown protocol %s' % protocol)
+
+    def recv_extensions(self):
+        self.got_extensions(self.recv(8))
+
+    def got_extensions(self, extensions):
+        self.logger.debug('Received extensions: %s', extensions)
+        self.peer_extensions = extensions
+
+    # INFO HASH
+
+    def send_info_hash(self, info_hash):
+        assert len(info_hash) == 20, len(info_hash)
+        self.send(info_hash)
+
+    def recv_info_hash(self):
+        self.got_info_hash(self.recv(20))
+
+    def got_info_hash(self, info_hash):
+        self.peer_info_hash = info_hash
+
+    # PEER ID
+
+    def send_peer_id(self, peer_id):
+        assert len(peer_id) == 20, peer_id
+        self.send(peer_id)
+
+    def recv_peer_id(self):
+        self.got_peer_id(self.recv(20))
+
+    def got_peer_id(self, peer_id):
+        self.peer_peer_id = peer_id
+
+    # keep alive
+
+    def send_keep_alive(self):
+        self.send(struct.pack('>I', 0))
+
+    def recv_unknown(self, type, length):
+        self.logger.warning('Received message with unknown type %r, length %r', type, length)
+        max_length = 0x100
+        data = self.recv(min(0x100, length))
+        self.logger.debug('Unknown message begins: %s', data)
+        if length > max_length:
+            raise ConnectionClosed('Unknown message is too long')
+        else:
+            self.got_unknown(type, data)
+
+    def got_unknown(self, type, data):
+        pass
+
+    # choke
+
+    def send_choke(self):
+        if not self.peer_choked:
+            self.logger.debug('%s: Choking peer', self)
+            self.peer_choked = True
+            self.send_message(self.CHOKE)
+        self.peer_requests.clear()
+
+    def recv_choke(self, length):
+        if length != 0:
+            raise ConnectionClosed('Received choke with length=%d' % length)
+        self.got_choke()
+
+    def got_choke(self):
+        self.logger.debug('%s: Peer choked us', self)
+        self.am_choked = True
+        self.our_requests.clear()
+
+    # unchoke
+
+    def send_unchoke(self):
+        if self.peer_choked:
+            self.logger.debug('%s: Unchoking peer', self)
+            self.peer_choked = False
+            self.send_message(self.UNCHOKE)
+
+    def recv_unchoke(self, length):
+        if length != 0:
+            raise ConnectionClosed('Received unchoked with length %s' % length)
+        self.got_unchoke()
+
+    def got_unchoke(self):
+        self.logger.debug('%s: Peer unchoked us', self)
+        self.am_choked = False
+
+    # interested
+
+    def send_interested(self):
+        if not self.am_interested:
+            self.logger.debug('Sending interest')
+            self.am_interested = True
+            self.send_message(self.INTERESTED)
+
+    def recv_interested(self, length):
+        if length != 0:
+            raise ConnectionClosed('Received interested with length=%d' % length)
+        self.got_interested()
+
+    def got_interested(self):
+        self.logger.debug('Peer interested')
+        self.peer_interested = True
+
+    # not interested
+
+    def send_not_interested(self):
+        if self.am_interested:
+            self.logger.debug('Sending not interested')
+            self.am_interested = False
+            self.send_message(self.NOT_INTERESTED)
+
+    def recv_not_interested(self, length):
+        assert length == 0, length
+        self.got_not_interested()
+
+    def got_not_interested(self):
+        self.logger.debug('Peer not interested')
+        self.peer_interested = False
+
+    # have
+
+    def send_have(self, index):
+        self.logger.debug('%s: Sending HAVE(%d)', self, index)
+        self.send_message(self.HAVE, struct.pack('>I', index))
+
+    def recv_have(self, length):
+        index = struct.unpack('>I', self.recv(length))[0]
+        self.got_have(index)
+
+    def got_have(self, index):
+        self.logger.debug('Peer has piece %r', index)
+        self.peer_bitfield.add(index)
+
+    # bitfield
+
+    def send_bitfield(self, bytes):
+        if any(bytes):
+            self.logger.debug('%s: Sending bitfield %s', self, bytes)
+            self.send_message(self.BITFIELD, bytes)
+
+    def recv_bitfield(self, length):
+        import itertools
+        index_iter = itertools.count()
+        bitfield = set()
+        for byte in self.recv(length):
+            for bit_shift in range(7, -1, -1):
+                index = index_iter.__next__()
+                if byte >> bit_shift & 1:
+                    bitfield.add(index)
+        self.got_bitfield(bitfield)
+
+    def got_bitfield(self, bitfield):
+        self.logger.debug('%s: Got bitfield: %s', self, pretty_bitfield(bitfield))
+        self.peer_bitfield = bitfield
+
+    def recv_request(self, length):
+        request = struct.unpack('>III', self.recv(length))
+        self.peer_requests.add(request)
+        self.got_request(request)
+
+    def got_request(self, request):
+        self.logger.debug('Got request %s', request)
+        self.peer_requests.add(request)
+
+    def recv_piece(self, length):
+        index, offset = struct.unpack_from('>II', self.recv(8))
+        self.got_piece(index, offset, self.recv(length - 8))
+
+    def got_piece(self, index, offset, data):
+        request = index, offset, len(data)
+        try:
+            self.our_requests.remove(request)
+        except KeyError:
+            self.logger.warning('%s: Received unrequested block %s', self, request)
+        else:
+            self.logger.info('%s: Received block %s', self, request)
+
+    # cancel
+
+    def recv_cancel(self, length):
+        request = struct.unpack('>III', self.recv(length))
+        self.got_cancel(request)
+
+    def got_cancel(self, request):
+        try:
+            self.peer_requests.remove(request)
+        except KeyError:
+            self.logger.warning('%s: Peer canceled unknown request %s', self, request)
+
+    # extended
+
+    def recv_extended(self, length):
+        import io
+        type = self.recv(1)[0]
+        self.logger.debug('Receiving extended message type: %s', type)
+        message = self.recv(length - 1)
+        self.logger.debug('Received extended message payload: %s', message)
+        self.got_extended(type, buncode(io.BytesIO(message)).__next__())
+
+    def got_extended(self, type, msg):
+        import pprint
+        self.logger.debug('%s: Got extended message (type=%s):\n%s', self, type, pprint.pformat(msg))
+        if type == 0:
+            self.peer_extended_protocols.update(msg['m'])
+            if 'reqq' in msg:
+                self.peer_reqq = msg['reqq']
+
+    def send_message(self, type, payload=b''):
+        self.send(struct.pack('>IB', len(payload) + 1, type) + payload)
+
+    def send_request(self, request):
+        self.logger.info('%s: Sending request: %s', self, request)
+        assert not self.am_choked
+        if request in self.our_requests:
+            self.logger.warning('%s: Tried to send duplicate request: %s', self, request)
+        else:
+            self.our_requests.add(request)
+            self.send_message(self.REQUEST, struct.pack('>III', *request))
+
+    def send_piece(self, index, begin, data):
+        request = index, begin, len(data)
+        self.logger.debug('%s: Sending piece %s', self, request)
+        self.peer_requests.remove(request)
+        self.send_message(self.PIECE, struct.pack('>II', index, begin) + data)
+        self.bytes_sent += len(data)
+
+    def send_cancel(self, request):
+        try:
+            self.our_requests.remove(request)
+        except KeyError:
+            pass
+        else:
+            self.logger.info('%s: Sending cancel: %s', self, request)
+            self.send_message(self.CANCEL, struct.pack('>III', *request))
+
+    def send_extended(self, type, msg):
+        assert self.peer_extensions[5] & 0x10, self.peer_extensions
+        assert self.our_extensions[5] & 0x10, self.our_extensions
+        assert 'm' in msg, msg
+        data = bytes([type]) + bencode(msg)
+        import pprint
+        self.logger.debug(
+            'Sending extended message (type=%s):\n%s',
+            type,
+            pprint.pformat(msg))
+        self.send_message(self.EXTENDED, data)
+

peer.py

-import errno
-import logging
-import pdb
-import random
-import struct
-
-from gthread import queue
-import gthread.socket as socket
-import gthread
-
-from bencoding import buncode, bencode
-from util import pretty_bitfield
-
-
-class ConnectionClosed(Exception): pass
-
-class Connection:
-
-    CHOKE = 0
-    UNCHOKE = 1
-    INTERESTED = 2
-    NOT_INTERESTED = 3
-    HAVE = 4
-    BITFIELD = 5
-    REQUEST = 6
-    PIECE = 7
-    CANCEL = 8
-    EXTENDED = 20
-
-    __slots__ = (
-        'logger', 'socket', 'peer_bitfield',
-        'last_recv_time', 'last_send_time',
-        'peer_requests', 'our_requests',
-        'peer_choked', 'am_choked',
-        'peer_interested', 'am_interested',
-        'bytes_sent', 'bytes_received',
-        'out_queue', 'in_buffer', 'send_routine_exc',
-        'peer_extensions', 'peer_info_hash', 'peer_peer_id',
-        'peer_extended_protocols', 'peer_reqq')
-
-    protocol = b'\x13BitTorrent protocol'
-    our_extensions = b'\0\0\0\0\0\x10\0\0'
-
-    def __init__(self, sock):
-        self.logger = logging.getLogger('peer.{:x}'.format(id(self)))
-        self.logger.addHandler(logging.FileHandler('log/peer.{:x}'.format(id(self))))
-        self.logger.setLevel(1)
-        self.socket = sock
-        self.peer_bitfield = set()
-        self.last_recv_time = None
-        self.last_send_time = None
-        self.peer_requests = set()
-        self.our_requests = set()
-        self.peer_choked = True
-        self.am_choked = True
-        self.peer_interested = False
-        self.am_interested = False
-        self.bytes_sent = 0
-        self.bytes_received = 0
-        self.out_queue = queue.Queue()
-        self.send(self.protocol + self.our_extensions)
-        self.in_buffer = b''
-        self.send_routine_exc = None
-        import collections
-        self.peer_extended_protocols = collections.defaultdict(int)
-        self.peer_reqq = 20
-        try:
-            localaddr = sock.getsockname()
-            peeraddr = sock.getpeername()
-        except socket.error as exc:
-            self.logger.warning('Error getting socket endpoints: %s', exc)
-        else:
-            self.logger.info('New connection: %s-%s', localaddr, peeraddr)
-
-    def close(self):
-        self.socket.close()
-        self.out_queue.put(None)
-        self.on_close()
-
-    def __del__(self):
-        self.logger.debug('Deleting %r', self)
-        handlers = self.logger.handlers
-        assert len(handlers) == 1, handlers
-        for h in handlers:
-            self.logger.removeHandler(h)
-
-    def __enter__(self):
-        return self
-
-    def __exit__(self, *args):
-        self.close()
-
-    def send(self, data):
-        self.out_queue.put(data)
-
-    def send_routine(self):
-        with self:
-            while True:
-                data = self.out_queue.get()
-                if data is None:
-                    return
-                try:
-                    self.socket.sendall(data)
-                except socket.error as exc:
-                    if exc.errno in {errno.EPIPE, errno.ECONNRESET}:
-                        return
-                self.logger.debug('Sent %r bytes', len(data))
-
-    def recv(self, count):
-        data = b''
-        while len(data) < count:
-            try:
-                data1 = self.socket.recv(count - len(data))
-            except socket.error as exc:
-                # TODO ECONNRESET, EAGAIN and EWOULDBLOCK might want special treatment here
-                raise ConnectionClosed(exc)
-            if not data1:
-                raise ConnectionClosed()
-            data += data1
-        self.logger.debug('Received %d bytes', len(data))
-        return data
-
-    def recv_routine(self):
-        import functools
-        self.recv_protocol()
-        self.recv_extensions()
-        self.recv_info_hash()
-        self.recv_peer_id()
-        while True:
-            length, = struct.unpack('>I', self.recv(4))
-            if length == 0:
-                self.logger.info('Received keep-alive')
-                continue
-            self.logger.debug('Next message length: %s', length)
-            type = self.recv(1)[0]
-            handler = {
-                self.CHOKE: self.recv_choke,
-                self.UNCHOKE: self.recv_unchoke,
-                self.INTERESTED: self.recv_interested,
-                self.NOT_INTERESTED: self.recv_not_interested,
-                self.HAVE: self.recv_have,
-                self.BITFIELD: self.recv_bitfield,
-                self.REQUEST: self.recv_request,
-                self.PIECE: self.recv_piece,
-                self.CANCEL: self.recv_cancel,
-                self.EXTENDED: self.recv_extended,
-            }.get(type, functools.partial(self.recv_unknown, type))
-            handler(length - 1)
-
-    def run(self):
-        try:
-            gthread.spawn(self.send_routine)
-            self.recv_routine()
-        except ConnectionClosed as exc:
-            if exc.args:
-                self.logger.error('%s: Connection terminated: %s', self, exc)
-        finally:
-            self.close()
-            if self.send_routine_exc:
-                raise self.send_routine_exc
-
-
-    def recv_protocol(self):
-        self.got_protocol(self.recv(20))
-
-    def got_protocol(self, protocol):
-        if protocol != self.protocol:
-            raise ConnectionClosed('Unknown protocol %s' % protocol)
-
-    def recv_extensions(self):
-        self.got_extensions(self.recv(8))
-
-    def got_extensions(self, extensions):
-        self.logger.debug('Received extensions: %s', extensions)
-        self.peer_extensions = extensions
-
-    # INFO HASH
-
-    def send_info_hash(self, info_hash):
-        assert len(info_hash) == 20, len(info_hash)
-        self.send(info_hash)
-
-    def recv_info_hash(self):
-        self.got_info_hash(self.recv(20))
-
-    def got_info_hash(self, info_hash):
-        self.peer_info_hash = info_hash
-
-    # PEER ID
-
-    def send_peer_id(self, peer_id):
-        assert len(peer_id) == 20, peer_id
-        self.send(peer_id)
-
-    def recv_peer_id(self):
-        self.got_peer_id(self.recv(20))
-
-    def got_peer_id(self, peer_id):
-        self.peer_peer_id = peer_id
-
-    # keep alive
-
-    def send_keep_alive(self):
-        self.send(struct.pack('>I', 0))
-
-    def recv_unknown(self, type, length):
-        self.logger.warning('Received message with unknown type %r, length %r', type, length)
-        max_length = 0x100
-        data = self.recv(min(0x100, length))
-        self.logger.debug('Unknown message begins: %s', data)
-        if length > max_length:
-            raise ConnectionClosed('Unknown message is too long')
-        else:
-            self.got_unknown(type, data)
-
-    def got_unknown(self, type, data):
-        pass
-
-    # choke
-
-    def send_choke(self):
-        if not self.peer_choked:
-            self.logger.debug('%s: Choking peer', self)
-            self.peer_choked = True
-            self.send_message(self.CHOKE)
-        self.peer_requests.clear()
-
-    def recv_choke(self, length):
-        if length != 0:
-            raise ConnectionClosed('Received choke with length=%d' % length)
-        self.got_choke()
-
-    def got_choke(self):
-        self.logger.debug('%s: Peer choked us', self)
-        self.am_choked = True
-        self.our_requests.clear()
-
-    # unchoke
-
-    def send_unchoke(self):
-        if self.peer_choked:
-            self.logger.debug('%s: Unchoking peer', self)
-            self.peer_choked = False
-            self.send_message(self.UNCHOKE)
-
-    def recv_unchoke(self, length):
-        if length != 0:
-            raise ConnectionClosed('Received unchoked with length %s' % length)
-        self.got_unchoke()
-
-    def got_unchoke(self):
-        self.logger.debug('%s: Peer unchoked us', self)
-        self.am_choked = False
-
-    # interested
-
-    def send_interested(self):
-        if not self.am_interested:
-            self.logger.debug('Sending interest')
-            self.am_interested = True
-            self.send_message(self.INTERESTED)
-
-    def recv_interested(self, length):
-        if length != 0:
-            raise ConnectionClosed('Received interested with length=%d' % length)
-        self.got_interested()
-
-    def got_interested(self):
-        self.logger.debug('Peer interested')
-        self.peer_interested = True
-
-    # not interested
-
-    def send_not_interested(self):
-        if self.am_interested:
-            self.logger.debug('Sending not interested')
-            self.am_interested = False
-            self.send_message(self.NOT_INTERESTED)
-
-    def recv_not_interested(self, length):
-        assert length == 0, length
-        self.got_not_interested()
-
-    def got_not_interested(self):
-        self.logger.debug('Peer not interested')
-        self.peer_interested = False
-
-    # have
-
-    def send_have(self, index):
-        self.logger.debug('%s: Sending HAVE(%d)', self, index)
-        self.send_message(self.HAVE, struct.pack('>I', index))
-
-    def recv_have(self, length):
-        index = struct.unpack('>I', self.recv(length))[0]
-        self.got_have(index)
-
-    def got_have(self, index):
-        self.logger.debug('Peer has piece %r', index)
-        self.peer_bitfield.add(index)
-
-    # bitfield
-
-    def send_bitfield(self, bytes):
-        if any(bytes):
-            self.logger.debug('%s: Sending bitfield %s', self, bytes)
-            self.send_message(self.BITFIELD, bytes)
-
-    def recv_bitfield(self, length):
-        import itertools
-        index_iter = itertools.count()
-        bitfield = set()
-        for byte in self.recv(length):
-            for bit_shift in range(7, -1, -1):
-                index = index_iter.__next__()
-                if byte >> bit_shift & 1:
-                    bitfield.add(index)
-        self.got_bitfield(bitfield)
-
-    def got_bitfield(self, bitfield):
-        self.logger.debug('%s: Got bitfield: %s', self, pretty_bitfield(bitfield))
-        self.peer_bitfield = bitfield
-
-    def recv_request(self, length):
-        request = struct.unpack('>III', self.recv(length))
-        self.peer_requests.add(request)
-        self.got_request(request)
-
-    def got_request(self, request):
-        self.logger.debug('Got request %s', request)
-        self.peer_requests.add(request)
-
-    def recv_piece(self, length):
-        index, offset = struct.unpack_from('>II', self.recv(8))
-        self.got_piece(index, offset, self.recv(length - 8))
-
-    def got_piece(self, index, offset, data):
-        request = index, offset, len(data)
-        try:
-            self.our_requests.remove(request)
-        except KeyError:
-            self.logger.warning('%s: Received unrequested block %s', self, request)
-        else:
-            self.logger.info('%s: Received block %s', self, request)
-
-    # cancel
-
-    def recv_cancel(self, length):
-        request = struct.unpack('>III', self.recv(length))
-        self.got_cancel(request)
-
-    def got_cancel(self, request):
-        try:
-            self.peer_requests.remove(request)
-        except KeyError:
-            self.logger.warning('%s: Peer canceled unknown request %s', self, request)
-
-    # extended
-
-    def recv_extended(self, length):
-        import io
-        type = self.recv(1)[0]
-        self.logger.debug('Receiving extended message type: %s', type)
-        message = self.recv(length - 1)
-        self.logger.debug('Received extended message payload: %s', message)
-        self.got_extended(type, buncode(io.BytesIO(message)).__next__())
-
-    def got_extended(self, type, msg):
-        import pprint
-        self.logger.debug('%s: Got extended message (type=%s):\n%s', self, type, pprint.pformat(msg))
-        if type == 0:
-            self.peer_extended_protocols.update(msg['m'])
-            if 'reqq' in msg:
-                self.peer_reqq = msg['reqq']
-
-    def send_message(self, type, payload=b''):
-        self.send(struct.pack('>IB', len(payload) + 1, type) + payload)
-
-    def send_request(self, request):
-        self.logger.info('%s: Sending request: %s', self, request)
-        assert not self.am_choked
-        if request in self.our_requests:
-            self.logger.warning('%s: Tried to send duplicate request: %s', self, request)
-        else:
-            self.our_requests.add(request)
-            self.send_message(self.REQUEST, struct.pack('>III', *request))
-
-    def send_piece(self, index, begin, data):
-        request = index, begin, len(data)
-        self.logger.debug('%s: Sending piece %s', self, request)
-        self.peer_requests.remove(request)
-        self.send_message(self.PIECE, struct.pack('>II', index, begin) + data)
-        self.bytes_sent += len(data)
-
-    def send_cancel(self, request):
-        try:
-            self.our_requests.remove(request)
-        except KeyError:
-            pass
-        else:
-            self.logger.info('%s: Sending cancel: %s', self, request)
-            self.send_message(self.CANCEL, struct.pack('>III', *request))
-
-    def send_extended(self, type, msg):
-        assert self.peer_extensions[5] & 0x10, self.peer_extensions
-        assert self.our_extensions[5] & 0x10, self.our_extensions
-        assert 'm' in msg, msg
-        data = bytes([type]) + bencode(msg)
-        import pprint
-        self.logger.debug(
-            'Sending extended message (type=%s):\n%s',
-            type,
-            pprint.pformat(msg))
-        self.send_message(self.EXTENDED, data)
-