Source

ppenet / ppenet / peer.py

import random
import protocol

DEFAULT_ROUND_TRIP_TIME = 500
DEFAULT_PACKET_THROTTLE = 32
PACKET_THROTTLE_SCALE = 32
PACKET_THROTTLE_COUNTER = 7
PACKET_THROTTLE_ACCELERATION = 2
PACKET_THROTTLE_DECELERATION = 2
PACKET_THROTTLE_INTERVAL = 5000
PACKET_LOSS_SCALE = (1 << 16)
PACKET_LOSS_INTERVAL = 10000
WINDOW_SIZE_SCALE = 64 * 1024
TIMEOUT_LIMIT = 32
TIMEOUT_MINIMUM = 5000
TIMEOUT_MAXIMUM = 30000
PING_INTERVAL = 500
UNSEQUENCED_WINDOWS = 64
UNSEQUENCED_WINDOW_SIZE = 1024
FREE_UNSEQUENCED_WINDOWS = 32
RELIABLE_WINDOWS = 16
RELIABLE_WINDOW_SIZE = 0x1000
FREE_RELIABLE_WINDOWS = 8


class State:
    (DISCONNECTED,
    CONNECTING,
    ACKNOWLEDGING_CONNECT,
    CONNECTION_PENDING,
    CONNECTION_SUCCEEDED,
    CONNECTED,
    DISCONNECT_LATER,
    DISCONNECTING,
    ACKNOWLEDGING_DISCONNECT,
    ZOMBIE) = range(10)


class PacketFlag:
    RELIABLE = 1 << 0
    UNSEQUENCED = 1 << 1
    UNRELIABLE_FRAGMENT = 1 << 3


class Channel(object):
    def __init__(self):
        self.outgoing_reliable_seq_num = 0
        self.outgoing_unreliable_seq_num = 0
        self.used_reliable_windows = 0
        self.reliable_windows = [0] * RELIABLE_WINDOWS
        self.incoming_reliable_seq_num = 0
        self.incoming_unreliable_seq_num = 0
        self.incoming_reliable_commands = []
        self.incoming_unreliable_commands = []


class Acknowledgement(object):
    def __init__(self):
        self.ack_list = []
        self.sent_lime = 0
        self.command = 0


class OutgoingCommand(object):
    def __init__(self, command, data='', offset=0, length=0):
        self.outgoing_command_list = []
        self.reliable_seq_num = 0
        self.unreliable_seq_num = 0
        self.sent_time = 0
        self.round_trip_timeout = 0
        self.round_trip_timeout_limit = 0
        self.fragment_offset = offset
        self.fragment_length = length
        self.send_attempts = 0
        self.command = command
        self.data = data

    def push_to_queue(self, peer): #setup_outgoing_command
        command = self.command
        channel = peer._get_channel(command.channel_id)

        peer.outgoing_data_total += command.size + self.fragment_length
        if command.channel_id == 0xff:
            peer.outgoing_reliable_seq_num += 1
            self.reliable_seq_num = peer.outgoing_reliable_seq_num
            self.unreliable_seq_num = 0
        elif command.command & protocol.Flag.COMMAND_ACKNOWLEDGE:
            channel.outgoing_reliable_seq_num += 1
            channel.outgoing_unreliable_seq_num = 0
            self.reliable_seq_num = channel.outgoing_reliable_seq_num
            self.unreliable_seq_num = 0
        elif command.command & protocol.Flag.COMMAND_UNSEQUENCED:
            peer.outgoing_unsequenced_group += 1
            self.reliable_seq_num = 0
            self.unreliable_seq_num = 0
        else:
            if self.fragment_offset == 0:
                channel.outgoing_unreliable_seq_num += 1
            self.reliable_seq_num = channel.outgoing_reliable_seq_num
            self.unreliable_seq_num = channel.outgoing_unreliable_seq_num
        command.reliable_seq_num = self.reliable_seq_num

        command_type = command.command & protocol.COMMAND_MASK
        if command_type == protocol.Command.SEND_UNRELIABLE:
            command.unreliable_seq_num = self.unreliable_seq_num
        elif command_type == protocol.Command.SEND_UNSEQUENCED:
            command.unsequenced_group = peer.outgoing_unsequenced_group

        if command.command & protocol.Flag.COMMAND_ACKNOWLEDGE:
            peer.outgoing_reliable_commands.append(self)
        else:
            peer.outgoing_unreliable_commands.append(self)


class IncomingCommand(object):
    def __init__(self):
        self.incoming_command_list = []
        self.reliable_seq_num = 0
        self.unreliable_seq_num = 0
        self.command = 0
        self.fragment_count = 0
        self.fragments_remaining = 0
        self.fragments = 0
        self.data = ''


class Peer(object):
    def __init__(self, host, peer_id, channel_count):
        self.host = host
        self.outgoing_peer_id = protocol.MAXIMUM_PEER_ID
        self.incoming_peer_id = peer_id
        self.connect_id = 0
        self.outgoing_session_id = self.incoming_session_id = 0xFF
        self.address = None
        self.data = 0
        self.state = State.DISCONNECTED
        self.channels = {}
        self.channel_count = 0
        self.incoming_bandwidth = 0
        self.outgoing_bandwidth = 0
        self.incoming_bandwidth_throttle_epoch = 0
        self.outgoing_bandwidth_throttle_epoch = 0
        self.incoming_data_total = 0
        self.outgoing_data_total = 0
        self.last_send_time = 0
        self.last_receive_time = 0
        self.next_timeout = 0
        self.earliest_timeout = 0
        self.packet_loss_epoch = 0
        self.packets_sent = 0
        self.packets_lost = 0
        self.packet_loss = 0
        self.packet_loss_variance = 0
        self.packet_throttle = DEFAULT_PACKET_THROTTLE
        self.packet_throttle_limit = PACKET_THROTTLE_SCALE
        self.packet_throttle_counter = 0
        self.packet_throttle_epoch = 0
        self.packet_throttle_acceleration = PACKET_THROTTLE_ACCELERATION
        self.packet_throttle_deceleration = PACKET_THROTTLE_DECELERATION
        self.packet_throttle_interval = PACKET_THROTTLE_INTERVAL
        self.ping_interval = PING_INTERVAL
        self.timeout_limit = TIMEOUT_LIMIT
        self.timeout_minimum = TIMEOUT_MINIMUM
        self.timeout_maximum = TIMEOUT_MAXIMUM
        self.last_round_trip_time = DEFAULT_ROUND_TRIP_TIME
        self.lowest_round_trip_time = DEFAULT_ROUND_TRIP_TIME
        self.last_round_trip_time_variance = 0
        self.highest_round_trip_time_variance = 0
        self.round_trip_time = DEFAULT_ROUND_TRIP_TIME
        self.round_trip_time_variance = 0
        self.mtu = host.mtu
        self.window_size = protocol.MAXIMUM_WINDOW_SIZE
        self.reliable_data_in_transit = 0
        self.outgoing_reliable_seq_num = 0
        self.acknowledgements = []
        self.sent_reliable_commands = []
        self.sent_unreliable_commands = []
        self.outgoing_reliable_commands = []
        self.outgoing_unreliable_commands = []
        self.dispatched_commands = []
        self.needs_dispatch = 0
        self.incoming_unsequenced_group = 0
        self.outgoing_unsequenced_group = 0
        self.unsequenced_window = []  # probably bytearray or memoryview
        self.event_data = 0

    def connect(self, address, data=0):
        host = self.host
        self.state = State.CONNECTING
        self.address = address
        self.connect_id = random.getrandbits(32)
        if host.outgoing_bandwidth == 0:
            window_size = protocol.MAXIMUM_WINDOW_SIZE
        else:
            window_size = host.outgoing_bandwidth / WINDOW_SIZE_SCALE\
                          * protocol.MAXIMUM_WINDOW_SIZE
        self.window_size = max(min(window_size, protocol.MAXIMUM_WINDOW_SIZE),
                               protocol.MINIMUM_WINDOW_SIZE)
        connect = protocol.Connect(
            protocol.Command.CONNECT | protocol.Flag.COMMAND_ACKNOWLEDGE,
            0xFF,
            0,
            self.incoming_peer_id,
            self.incoming_session_id,
            self.outgoing_session_id,
            self.mtu,
            self.window_size,
            self.channel_count,
            self.incoming_bandwidth,
            self.outgoing_bandwidth,
            self.packet_throttle_interval,
            self.packet_throttle_acceleration,
            self.packet_throttle_deceleration,
            self.connect_id,
            self.data
        )
        self._setup_outgoing_command(OutgoingCommand(connect))

    def _get_channel(self, channel_id, by_user=0):
        try:
            channel = self.channels[channel_id]
        except KeyError:
            if 0 <= channel_id <= protocol.MAXIMUM_CHANNEL_COUNT - by_user:
                self.channels[channel_id] = channel = Channel()
            else:
                raise ValueError('Wrong channel_id: %d' % channel_id)
        return channel

    def _setup_outgoing_command(self, outgoing_command):
        command = outgoing_command.command
        channel = self._get_channel(command.channel_id)

        self.outgoing_data_total += command.size + outgoing_command.fragment_length
        if command.channel_id == 0xff:
            self.outgoing_reliable_seq_num += 1
            outgoing_command.reliable_seq_num = self.outgoing_reliable_seq_num
            outgoing_command.unreliable_seq_num = 0
        elif command.command & protocol.Flag.COMMAND_ACKNOWLEDGE:
            channel.outgoing_reliable_seq_num += 1
            channel.outgoing_unreliable_seq_num = 0
            outgoing_command.reliable_seq_num = channel.outgoing_reliable_seq_num
            outgoing_command.unreliable_seq_num = 0
        elif command.command & protocol.Flag.COMMAND_UNSEQUENCED:
            self.outgoing_unsequenced_group += 1
            outgoing_command.reliable_seq_num = 0
            outgoing_command.unreliable_seq_num = 0
        else:
            if outgoing_command.fragment_offset == 0:
                channel.outgoing_unreliable_seq_num += 1
            outgoing_command.reliable_seq_num = channel.outgoing_reliable_seq_num
            outgoing_command.unreliable_seq_num = channel.outgoing_unreliable_seq_num
        command.reliable_seq_num = outgoing_command.reliable_seq_num

        command_type = command.command & protocol.COMMAND_MASK
        if command_type == protocol.Command.SEND_UNRELIABLE:
            command.unreliable_seq_num = outgoing_command.unreliable_seq_num
        elif command_type == protocol.Command.SEND_UNSEQUENCED:
            command.unsequenced_group = self.outgoing_unsequenced_group

        if command.command & protocol.Flag.COMMAND_ACKNOWLEDGE:
            self.outgoing_reliable_commands.append(outgoing_command)
        else:
            self.outgoing_unreliable_commands.append(outgoing_command)

    def send(self, channel_id, data, flags=0):
        channel = self._get_channel(channel_id, 1)
        if self.state != State.CONNECTED:
            raise IOError('Not connected')
        data_len = len(data)
        if data_len > protocol.MAXIMUM_PACKET_SIZE:
            raise ValueError('Data too large')
        fragment_len = self.mtu - protocol.SendFragment.size
        if self.host.checksum is not None:
            fragment_len -= 4  # 32bit unsigned int

        if data_len > fragment_len:
            # fragmenting
            fragment_count = (data_len + fragment_len - 1) / fragment_len
            if fragment_count > protocol.MAXIMUM_FRAGMENT_COUNT:
                raise ValueError('Data too large')
            if (flags & PacketFlag.RELIABLE | PacketFlag.UNRELIABLE_FRAGMENT) \
                    == PacketFlag.UNRELIABLE_FRAGMENT and \
                    channel.outgoing_unreliable_seq_num < 0xffff:
                command_type = protocol.Command.SEND_UNRELIABLE_FRAGMENT
                start_seq_num = channel.outgoing_unreliable_seq_num + 1
            else:
                command_type = protocol.Command.SEND_FRAGMENT | protocol.Flag.COMMAND_ACKNOWLEDGE
                start_seq_num = channel.outgoing_reliable_seq_num + 1

            for fragment_number, fragment_offset in enumerate(xrange(0, data_len, fragment_len)):
                if data_len - fragment_offset < fragment_len:
                    # calculate size of last fragment
                    fragment_len = data_len - fragment_offset

                fragment = OutgoingCommand(
                    protocol.SendFragment(
                        command_type,
                        channel_id,
                        0,
                        start_seq_num,
                        fragment_len,
                        fragment_count,
                        fragment_number,
                        data_len,
                        fragment_offset
                    ),
                    fragment_offset,
                    fragment_len
                )

                self._setup_outgoing_command(fragment)
        else:
            if flags & (PacketFlag.RELIABLE | PacketFlag.UNSEQUENCED) == PacketFlag.UNSEQUENCED:
                command = protocol.SendUnsequenced()
            elif flags & PacketFlag.RELIABLE or channel.outgoing_unreliable_seq_num >= 0xffff:
                command = protocol.SendReliable()
            else:
                command = protocol.SendUnreliable()
            command.channel_id = channel_id
            command.data_length = data_len
            self._setup_outgoing_command(OutgoingCommand(command, data, 0, data_len))