Commits

Szymon Wróblewski  committed 9fc26e0

added socket Client and Server, corrected small bugs, updated message documentation

  • Participants
  • Parent commits 7201ab7

Comments (0)

Files changed (11)

File doc/source/message.rst

 
    .. note::
       
-      You can create instances of :class:`MessageFactory`, when you want to
-      separate messages for different connections in Client.
+      You can create more instances of :class:`MessageFactory`, when you want
+      to separate messages for different connections in Client.
       
+      
+   .. method:: get_by_name(name)
    
-   .. classmethod:: register(name, field_names[, kwargs])
+      Returns message class with given name
+      
+      :param name: name of message
+      :return: message class (namedtuple)
+      
+      
+   .. method:: get_by_type(type_id)
+      
+      Returns message class with given type_id
+
+      :param type_id: - type identifier of message
+      :return: message class (namedtuple)
+      
+      
+   .. method:: get_hash()
    
-      Register new message type
+      Calculate and return hash.
+
+      Hash depends on registered messages and used serializing library.
+      
+      :return: int
+      
+      
+   .. method:: get_params(message)
+      
+      Return tuple containing type_id, and sending keyword arguments
+
+      :param message: message class created by register
+      :return: int, dict
+   
+   
+   .. method:: pack(message)
+   
+      Pack data to string
+      
+      :param message: object of class created by register
+      :return: string
+      
+      
+   .. method:: register(name, field_names[, kwargs])
+   
+      Register new message type.
       
       :param name: name of message class
       :param field_names: list of names of message fields
-      :param kwargs: keyword arguments for send method
-      :return: packet (namedtuple)
+      :param kwargs: additional keyword arguments for send method
+      :return: message class (namedtuple)
       
+      
+   .. method:: reset_context(context)
+   
+      Prepares object to behave as context for stream unpacking.
+      
+      :param context: object which will be prepared
+      
+      
+   .. method:: set_frozen()
+   
+      Disable ability to register new messages to allow generation of hash.
+      
+      
+   .. method:: unpack(data)
+   
+      Unpack message from string
+      
+      :param data: packed message data as a string
+      :return: message
+      
+   
+   .. method:: unpack_all(data, context)
+   
+      Feed unpacker with data from stream and unpack all messages
+      
+      :param data: packed message(s) data as a string
+      :param context: object previously prepared with :meth:`reset_context`
+      :return: message generator
+
 
 Predefined messages
 -------------------
 
-.. class:: chat_msg(player, msg)
-
-   :param player: player name
-   :param msg: message
-
-
 .. class:: update_remoteobject(type_id, obj_id, variables)
 
    Message used by :class:`syncobject.SyncObjectManager` to update state of

File pygame_network/__init__.py

 from network import Server, Client
 
 _logger = logging.getLogger(__name__)
-_network_module = None
-_serialization_module = None
 register = message.message_factory.register
 
 
 def init(events=False, event_val=1, logging_lvl=logging.INFO,
-         network_m=('enet',), serialization_m=('msgpack', 'json')):
+         n_module=('enet',), s_module=('msgpack', 'json')):
     """Initialize network library.
 
     events - allow sending Pygame events (default False)
     event_val - set event ID as event_val + pygame.USEREVENT (default 1)
     logging_lvl - level of logging messages (default logging.INFO, None to skip
                   initializing logging module
-    network_m - string or list of strings with names of network
+    n_module - string or list of strings with names of network
               library adapters, first available will be used
-    serialization_m - string or list of strings with names of serialization
+    s_module - string or list of strings with names of serialization
                     library adapters, first available will be used
 
     Note: Because of the dynamic loading of network library adapter, Client,
         logging.basicConfig(level=logging_lvl,
                             format='%(asctime)-8s %(levelname)-8s %(message)s',
                             datefmt='%H:%M:%S')
-    network.select_adapter(network_m)
+    network.select_adapter(n_module)
     if network._selected_adapter is not None:
         _logger.info("Using %s",
             network._selected_adapter.__name__.split('.')[-1])
     else:
         _logger.critical("Can't find any network module")
-    serialization.select_adapter(serialization_m)
+    serialization.select_adapter(s_module)
     if serialization._selected_adapter is not None:
         _logger.info("Using %s",
             serialization._selected_adapter.__name__.split('.')[-1])

File pygame_network/client.py

         self.conn_map = {}
         _logger.info('Client created, connections limit: %d', conn_limit)
 
-    def connect(self, address, port, message_factory=None, **kwargs):
+    def connect(self, host, port, message_factory=None, **kwargs):
         if message_factory is None:
             message_factory = self.message_factory
-        _logger.info('Connecting to %s:%d', address, port)
+        _logger.info('Connecting to %s:%d', host, port)
         message_factory.set_frozen()
-        socket, c_id = self._create_connection(address, port,
-            message_factory.get_hash(), **kwargs)
-        conn = self.connection(self, socket, message_factory)
-        self.conn_map[c_id] = conn
-        return conn
+        connection, c_id = self._create_connection(host, port,
+            message_factory, **kwargs)
+        self.conn_map[c_id] = connection
+        return connection
 
     def update(self, timeout=0):
         pass

File pygame_network/connection.py

             raise TypeError('%s takes exactly %d arguments (%d given)' %
                 (message.__doc__, int(e) - 1, int(f) - 1))
         data = self.message_factory.pack(message_)
-        _logger.info('Sent %s message to %s:%s', name, *self.address)
+        _logger.info('Sent %s message to %s', name, self.address)
         self.data_sent += len(data)
         self.messages_sent += 1
         return self._send_data(data, **params)
     def _receive(self, data, **kwargs):
         for message in self.message_factory.unpack_all(data, self):
             name = message.__class__.__name__
-            _logger.info('Received %s message from %s:%s', name, *self.address)
+            _logger.info('Received %s message from %s', name, self.address)
             event.received(self, message)
             for h in self.handlers:
                 getattr(h, 'net_' + name, h.on_recive)(message, **kwargs)
 
     def _connect(self):
-        _logger.info('Connected to %s:%s', *self.address)
+        _logger.info('Connected to %s', self.address)
         event.connected(self)
         for h in self.handlers:
             h.on_connect()
 
     def _disconnect(self):
-        _logger.info('Disconnected from %s:%s', *self.address)
+        _logger.info('Disconnected from %s', self.address)
         event.disconnected(self)
         for h in self.handlers:
             h.on_disconnect()
     @property
     def address(self):
         """Connection address."""
-        return None, None
+        return '', ''

File pygame_network/message.py

         return data
 
     def set_frozen(self):
+        """Disable ability to register new messages to allow generation of hash
+        """
         self._frozen = True
 
     def reset_context(self, context):
             _logger.error('Message unpacking error: %s', message)
 
     def unpack(self, data):
-        """Unpack data from string and buffer, return message
+        """Unpack message from string
 
         MessageFactory.unpack(data): return message
 
     'obj_id',
     'variables'
 ), channel=1, flags=0)
-chat_msg = message_factory.register('chat_msg', (
-    'player',
-    'msg'
-))

File pygame_network/network/enet_adapter.py

 class Server(server.Server):
     connection = Connection
 
-    def __init__(self, address='', port=0, con_limit=4, *args, **kwargs):
-        super(Server, self).__init__(address, port, con_limit, *args, **kwargs)
-        address = enet.Address(address, port)
-        self.host = enet.Host(address, con_limit, *args, **kwargs)
+    def __init__(self, host='', port=0, con_limit=4, *args, **kwargs):
+        super(Server, self).__init__(host, port, con_limit, *args, **kwargs)
+        host = enet.Address(host, port)
+        self.host = enet.Host(host, con_limit, *args, **kwargs)
         self._peer_cnt = 0
 
     def update(self, timeout=0):
                 self._receive(event.peer.data, event.packet.data, channel=event.channelID)
             event = host.check_events()
 
+    @property
+    def address(self):
+        """Server address."""
+        address = self.host.address
+        return address.host, address.port
+
 
 class Client(client.Client):
     connection = Connection
         self.host = enet.Host(None, conn_limit)
         self._peer_cnt = 0
 
-    def _create_connection(self, address, port, mf_hash, channels=1, **kwargs):
-        address = enet.Address(address, port)
+    def _create_connection(self, host, port, message_factory, channels=1, **kwargs):
+        host = enet.Address(host, port)
         peer_id = self._peer_cnt = self._peer_cnt + 1
         peer_id = str(peer_id)
-        peer = self.host.connect(address, channels, mf_hash)
+        peer = self.host.connect(host, channels, message_factory.get_hash())
         peer.data = peer_id
-        return peer, peer_id
+        connection = Connection(self, peer, message_factory)
+        return connection, peer_id
 
     def update(self, timeout=0):
         if len(self.conn_map) == 0:

File pygame_network/network/socket_adapter.py

 import logging
 import socket
+import struct
 import asyncore
 from collections import deque
 from .. import connection, server, client
 
 _logger = logging.getLogger(__name__)
+_connect_struct = struct.Struct('!I')
 
+class Dispacher(asyncore.dispatcher, object):
+    pass
 
-class Connection(connection.Connection, asyncore.dispatcher):
+class Connection(connection.Connection, Dispacher):
     __send = asyncore.dispatcher.send
     # maximum amount of data received / sent at once
     recv_buffer_size = 4096
     def __init__(self, parent, socket, message_factory, *args, **kwargs):
         super(Connection, self).__init__(
             parent, message_factory, # params for base_adapter.Connection
-            socket, parent.conn_map, # params for asyncore.dispatcher
+            socket, None, # params for asyncore.dispatcher
             * args, **kwargs)
         #self.send_queue = deque()
         self.send_buffer = bytearray()
 
     def _send_part(self):
         try:
-            num_sent = self.__send(self.send_buffer)
+            num_sent = asyncore.dispatcher.send(self, self.send_buffer)
         except socket.error:
             self.handle_error()
             return
         return (not self.connected) or len(self.send_buffer)
 
     def handle_read(self):
-        # tinkering with dispatcher internal variables,
-        # because it doesn't support socket.recv_into
-        try:
-            data = self.socket.recv_into(self.recv_buffer)
-            if not data:
-                self.handle_close()
-            else:
-                self._receive(data)
-        except socket.error, why:
-            if why.args[0] in asyncore._DISCONNECTED:
-                self.handle_close()
-            else:
-                self.handle_error()
-            return
-        self._receive(self.recv(self.recv_buffer_size))
+        data = self.recv(self.recv_buffer_size)
+        if data:
+            self._receive(data)
+
+#    def handle_read2(self):
+#        # tinkering with dispatcher internal variables,
+#        # because it doesn't support socket.recv_into
+#        try:
+#            num_rcvd = self.socket.recv_into(self.recv_buffer)
+#            if not num_rcvd:
+#                self.handle_close()
+#            else:
+#                self._receive(self.recv_buffer[:num_rcvd])
+#        except socket.error, why:
+#            if why.args[0] in asyncore._DISCONNECTED:
+#                self.handle_close()
+#            else:
+#                self.handle_error()
+#            return
 
     def handle_connect(self):
+        self.addr = self.socket.getpeername()
         self._connect()
 
     def handle_close(self):
         return getattr(_logger, type)(message)
 
     def disconnect(self, *args):
-        pass
+        self.parent._disconnect(self.socket.fileno())
+        self.close()
 
     @property
     def address(self):
         return self.addr
+
+
+class Server(server.Server, Dispacher):
+    connection = Connection
+
+    def __init__(self, host='', port=0, con_limit=4, *args, **kwargs):
+        super(Server, self).__init__(
+            host, port, con_limit,
+            None, None,
+            *args, **kwargs)
+        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
+        # disable Nagle buffering algorithm
+        self.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
+        self.set_reuse_addr()
+        self.bind((host, port))
+        self.listen(con_limit)
+
+    def handle_accept(self):
+        pair = self.accept()
+        if pair is not None:
+            sock, addr = pair
+            sock.settimeout(0.5)  # TODO: sth better?
+            mf_hash = _connect_struct.unpack(sock.recv(4))[0]
+            sock.setblocking(0)
+            self._accept(mf_hash, sock, self._fileno, addr)
+
+    def update(self, timeout=0):
+        asyncore.loop(timeout / 1000, False, None, 1)
+
+
+class Client(client.Client):
+    def __init__(self, conn_limit=0, *args, **kwargs):
+        super(Client, self).__init__(*args, **kwargs)
+        self._sock_cnt = 0
+
+    def _create_connection(self, host, port, message_factory, **kwargs):
+        conn = Connection(self, None, message_factory)
+        conn.create_socket(socket.AF_INET, socket.SOCK_STREAM)
+        # disable Nagle buffering algorithm
+        conn.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
+        conn.connect((host, port))
+        conn.socket.send(_connect_struct.pack(message_factory.get_hash()))
+        return conn, conn.socket.fileno()
+
+    def update(self, timeout=0):
+        asyncore.loop(timeout / 1000, False, self.conn_map, 1)

File pygame_network/server.py

     handler = None
     connection = None
 
-    def __init__(self, address='', port=0, conn_limit=4, *args, **kwargs):
+    def __init__(self, host='', port=0, conn_limit=4, *args, **kwargs):
         super(Server, self).__init__(*args, **kwargs)
         _logger.info('Server created %s:%d, connections limit: %d',
-                     address, port, conn_limit)
+                     host, port, conn_limit)
         self.message_factory.set_frozen()
         self.conn_map = {}
+        self.conn_limit = conn_limit
 
     def update(self, timeout=0):
         pass
         else:
             return (c.handlers[0] for c in self.conn_map.itervalues()
                     if c not in exclude)
+
+    @property
+    def address(self):
+        """Server address."""
+        return None, None

File test_client_2.py

 
 
 def main():
-    net.init(logging_lvl=logging.DEBUG)
+    net.init(logging_lvl=logging.DEBUG, n_module='socket')
     net.register('echo', ('msg', 'msg_id'))
     client = net.Client()
     connection = client.connect("localhost", 54301)

File test_client_3.py

     pygame.display.flip()
 
     # Network init
-    net.init(events=True, logging_lvl=logging.DEBUG)  # enable Pygame events
+    net.init(events=True, logging_lvl=logging.DEBUG, n_module='socket')  # enable Pygame events
     echo = net.register('echo', ('msg', 'msg_id'))
     client = net.Client()
     connection = None

File test_server.py

 
 
 def main():
-    net.init(logging_lvl=logging.DEBUG)
+    net.init(logging_lvl=logging.DEBUG, n_module='socket')
     net.register('echo', ('msg', 'msg_id'))
     server = Server(port=54301)
     logging.info('Listening')