Commits

Szymon Wróblewski committed 1f81c70

added discovery server, many samll changes

  • Participants
  • Parent commits c5c8bcb

Comments (0)

Files changed (11)

pygame_network/discovery/__init__.py

Empty file added.

pygame_network/discovery/messages.py

+from .. import message
+
+message_factory = message.MessageFactory()
+get_servers = message_factory.register('get_servers')
+register = message_factory.register('register', (
+    'name',
+    'address',
+    'port'
+))
+servers_list = chat_msg = message_factory.register('servers_list', (
+    'servers'
+))

pygame_network/discovery/server.py

+import sys
+import logging
+from .. import __init__ as net
+import messages
+
+_logger = logging.getLogger(__name__)
+
+
+class Handler(net.Handler):
+    def net_register(self, message, channel):
+        self.server.append()
+
+    def net_get_servers(self, message, channel):
+        pass
+
+
+class Server(net.Server):
+    handler = Handler
+    message_factory = messages.message_factory
+    servers = list()
+
+
+def run(address, port):
+    server = Server(address, port)
+    _logger.info('Listening')
+    try:
+        while True:
+            server.step(1000)
+    except KeyboardInterrupt:
+        pass
+
+
+if __name__ == '__main__':
+    import argparse
+    parser = argparse.ArgumentParser(description='Server discovery service.')
+    parser.add_argument('-a', '--address', type=str, default='')
+    parser.add_argument('-p', '--port', type=int, default=8000)
+    args = parser.parse_args()
+    print 'Server started (Use Control-C to exit)'
+    run(args.address, args.port)
+    print 'Exiting'

pygame_network/discovery/server_.py

+import logging
+from SimpleXMLRPCServer import SimpleXMLRPCServer
+
+_logger = logging.getLogger(__name__)
+
+
+class DiscoveryService(object):
+    def __init__(self):
+        self.servers = []
+
+    def register(self, name, address, port):
+        """register(name, address, port) => string
+
+        Register new server
+        """
+        try:
+            self.servers.append((str(name), str(address), int(port)))
+        except Exception as e:
+            return '%s: %s' % (e.__class__.__name__, e.message)
+
+    def list(self):
+        """list() => [<name, address, port>]
+
+        List all available servers
+        """
+        return self.servers
+
+
+def main(address, port):
+    server = SimpleXMLRPCServer((address, port), allow_none=True)
+    server.register_introspection_functions()
+    server.register_instance(DiscoveryService())
+    server.serve_forever()
+    try:
+        server.serve_forever()
+    except KeyboardInterrupt:
+        pass
+
+
+if __name__ == '__main__':
+    import argparse
+    parser = argparse.ArgumentParser(description='Server discovery service.')
+    parser.add_argument('-a', '--address', type=str, default='')
+    parser.add_argument('-p', '--port', type=int, default=8000)
+    args = parser.parse_args()
+    print 'Server started (Use Control-C to exit)'
+    main(args.address, args.port)
+    print 'Exiting'

pygame_network/event.py

 __all__ = ('NETWORK',
+           'NET_DISCONNECTED',
            'NET_CONNECTED',
-           'NET_DISCONNECTED',
+           'NET_ACCEPTED',
            'NET_RECEIVED')
 
 NETWORK = 30
-NET_CONNECTED = 0
-NET_DISCONNECTED = 1
-NET_RECEIVED = 2
+NET_DISCONNECTED = 0
+NET_CONNECTED = 1
+NET_ACCEPTED = 2
+NET_RECEIVED = 3
+
+
+def accepted(connection):
+    pass
 
 
 def connected(connection):
     from pygame.locals import USEREVENT
     NETWORK = USEREVENT + event_val
 
+    def _accepted(connection):
+        pygame.event.post(Event(NETWORK, {
+            'net_type': NET_ACCEPTED,
+            #'connection': proxy(connection)
+            'connection': connection
+        }))
+    accepted = _accepted
+
     def _connected(connection):
         pygame.event.post(Event(NETWORK, {
             'net_type': NET_CONNECTED,

pygame_network/message.py

         data - packed message data as a string
         """
         _logger.debug("Unpacking message (length: %d)", len(data))
-        message = s_lib.unpack(data)
+        try:
+            message = s_lib.unpack(data)
+        except StopIteration:  # end of stream
+            _logger.warning('Not enough data to unpack')
+        except:
+            _logger.error('Data corrupted (No streaming support in library)')
+            return
         sys_data_l = len(sys_data)
         try:
             type_id = message[0]
             sys_data[:] = message[1:1 + sys_data_l]
             return cls._message_types[type_id](*message[1 + sys_data_l:])
         except KeyError:
-            _logger.warning('Unknown message type_id: %s', type_id)
+            # should not happen
+            # prevented by hash control during connection
+            _logger.error('Unknown message type_id: %s', type_id)
         except:
-            _logger.warning('Message unpacking error: %s', message)
-        return None
+            _logger.error('Message unpacking error: %s', message)
+        return
 
     @classmethod
     def get_by_name(cls, name):

pygame_network/network/base_adapter.py

     * using send method with message as argument
 
     """
+    message_factory = MessageFactory
 
-    def __init__(self, parent, message_factory=MessageFactory):
+    def __init__(self, parent, message_factory=None, *args, **kwargs):
+        super(Connection, self).__init__(*args, **kwargs)
         self.parent = proxy(parent)
-        self._message_cnt = 0
-        self._message_factory = message_factory
+        if message_factory is not None:
+            self.message_factory = message_factory
         self._handlers = []
+        self.data_sent = 0
+        self.data_received = 0
+        self.messages_sent = 0
+        self.messages_received = 0
 
     def __getattr__(self, name):
         parts = name.split('_', 1)
                 (message.__doc__, int(e) - 1, int(f) - 1))
         data = self._message_factory.pack(message_)
         _logger.info('Sent %s message to %s:%s', name, *self.address)
+        self.data_sent += len(data)
+        self.messages_sent += 1
         return self._send_data(data, **params)
 
     def _receive(self, data, channel=0):
         message = self._message_factory.unpack(data)
+        if message is None:
+            return
         name = message.__class__.__name__
         _logger.info('Received %s message from %s:%s', name, *self.address)
         event.received(self, message, channel)
 
 class Server(object):
     message_factory = MessageFactory
-    handler_cls = None
+    handler = None
 
     def __init__(self, address='', port=0, connections_limit=4, *args, **kwargs):
         _logger.debug('Server created %s, connections limit: %d', address, connections_limit)
         self.message_factory._frozen = True
         _logger.debug('MessageFactory frozen')
-        self.peers = {}
+        self.conn_map = {}
 
     def step(self, timeout=0):
         pass
 
     def connections(self, exclude=None):
         if exclude is None:
-            return self.peers.itervalues()
+            return self.conn_map.itervalues()
         else:
-            return (c for c in self.peers.itervalues() if c not in exclude)
+            return (c for c in self.conn_map.itervalues() if c not in exclude)
 
     def handlers(self, exclude=None):
         if exclude is None:
-            return (c._handlers[0] for c in self.peers.itervalues())
+            return (c._handlers[0] for c in self.conn_map.itervalues())
         else:
-            return (c._handlers[0] for c in self.peers.itervalues() if c not in exclude)
+            return (c._handlers[0] for c in self.conn_map.itervalues() if c not in exclude)
 

pygame_network/network/enet_adapter/connection.py

 import enet
-from ...message import MessageFactory
 from .. import base_adapter
 
 _state_mapping = {
         peer - Enet peer instance
     """
 
-    def __init__(self, parent, peer, message_factory=MessageFactory):
+    def __init__(self, parent, peer, message_factory=None):
         super(Connection, self).__init__(parent, message_factory)
         self.peer = peer
 

pygame_network/network/enet_adapter/server.py

 
 class Server(object):
     message_factory = MessageFactory
-    handler_cls = None
+    handler = None
 
-    def __init__(self, address='', port=0, connections_limit=4, *args, **kwargs):
+    def __init__(self, address='', port=0, con_limit=4, *args, **kwargs):
         address = enet.Address(address, port)
-        self.host = enet.Host(address, connections_limit, *args, **kwargs)
-        self.peers = {}
+        self.host = enet.Host(address, con_limit, *args, **kwargs)
+        self.conn_map = {}
         self._peer_cnt = 0
-        _logger.debug('Server created %s, connections limit: %d', address, connections_limit)
+        _logger.debug('Server created %s, connections limit: %d', address, con_limit)
         self.message_factory._frozen = True
         _logger.debug('MessageFactory frozen')
 
                     peer_id = str(peer_id)
                     event.peer.data = peer_id
                     connection = Connection(self, event.peer, self.message_factory)
-                    if issubclass(self.handler_cls, Handler):
-                        handler = self.handler_cls()
+                    if issubclass(self.handler, Handler):
+                        handler = self.handler()
                         handler.server = proxy(self)
                         connection.add_handler(handler)
-                    self.peers[peer_id] = connection
+                    self.conn_map[peer_id] = connection
                     connection._connect()
                 else:
                     _logger.warning('Connection with %s refused, MessageFactory'\
                                     ' hash incorrect', event.peer.address)
                     event.peer.disconnect_now()
             elif event.type == enet.EVENT_TYPE_DISCONNECT:
-                self.peers[event.peer.data]._disconnect()
-                del self.peers[event.peer.data]
+                self.conn_map[event.peer.data]._disconnect()
+                del self.conn_map[event.peer.data]
             elif event.type == enet.EVENT_TYPE_RECEIVE:
-                self.peers[event.peer.data]._receive(event.packet.data, event.channelID)
+                self.conn_map[event.peer.data]._receive(event.packet.data, event.channelID)
             event = host.check_events()
 
     def connections(self, exclude=None):
         if exclude is None:
-            return self.peers.itervalues()
+            return self.conn_map.itervalues()
         else:
-            return (c for c in self.peers.itervalues() if c not in exclude)
+            return (c for c in self.conn_map.itervalues() if c not in exclude)
 
     def handlers(self, exclude=[]):
-        return (c._handlers[0] for c in self.peers.itervalues() if c not in exclude)
+        return (c._handlers[0] for c in self.conn_map.itervalues() if c not in exclude)

pygame_network/network/socket_adapter/connection.py

 import logging
 from collections import deque
 from asyncore import dispatcher
+from .. import base_adapter
 from ...message import MessageFactory
 
 _logger = logging.getLogger(__name__)
 
 
-class Connection(dispatcher):
+class Connection(base_adapter.Connection, dispatcher):
     __send = dispatcher.send
+    rcvr_buffer_size = 2048
 
     def __init__(self, parent, socket, message_factory=MessageFactory):
         super(Connection, self).__init__(parent, message_factory)
-        self._buffer = deque()
+        self._queue = deque()
+        self.state = base_adapter.State.DISCONNECTED
 
     def _send_data(self, data, **kwargs):
-        pass
+        self._message_queue.append(data)
 
     def disconnect(self, *args):
         """Request a disconnection."""
         pass
 
     @property
-    def state(self):
-        """Connection state."""
-        return None
-
-    @property
     def address(self):
         """Connection address."""
         return None, None
 
     def handle_read(self):
-        self.log_info('unhandled read event', 'warning')
+        self._receive(self.recv(self.rcvr_buffer_size))
 
     def handle_write(self):
-        self.log_info('unhandled write event', 'warning')
+        #queue, self._queue = self._queue, deque()
+        for data in self._queue:
+            self.__send(data)
+        self._queue = deque()
 
     def handle_connect(self):
-        self.log_info('unhandled connect event', 'warning')
-
-    def handle_accept(self):
-        self.log_info('unhandled accept event', 'warning')
+        self.state = base_adapter.State.CONNECTED
+        self._connect()
 
     def handle_close(self):
         self.log_info('unhandled close event', 'warning')
         logging.info('message @ch%d: %s', channel, message)
 
 
+class Server(net.Server):
+    handler = EchoHandler
+
+
 def main():
     net.init(logging_lvl=logging.DEBUG)
     net.register('echo', ('msg', 'msg_id'))
-    server = net.Server(port=54301)
-    server.handler_cls = EchoHandler
+    server = Server(port=54301)
     logging.info('Listening')
     run = True
     while run: