Commits

Szymon Wróblewski committed 0e7b0f4

updated discovery module, added discovery_server script, small changes in message and json_adapter

  • Participants
  • Parent commits 6cc1df4

Comments (0)

Files changed (7)

File discovery_server.py

+import logging
+import pygnetic
+from pygnetic.discovery import DiscoveryServer
+
+_logger = logging.getLogger(__name__)
+
+if __name__ == '__main__':
+    import argparse
+    parser = argparse.ArgumentParser(description='Server discovery service.')
+    parser.add_argument('-i', '--host', type=str, default='',
+                        help='Server host IP')
+    parser.add_argument('-p', '--port', type=int, default=5000,
+                        help='Server port')
+    parser.add_argument('-d', '--debug', action='store_true',
+                        help='Enable debugging')
+    args = parser.parse_args()
+    if args.debug:
+        pygnetic.init(logging_lvl=logging.DEBUG)
+    else:
+        pygnetic.init()
+    _logger.info('Server started (Use Control-C to exit)')
+    try:
+        DiscoveryServer(args.host, args.port).serve_forever()
+    except KeyboardInterrupt:
+        pass
+    _logger.info('Exiting')

File doc/source/api/index.rst

       
       Default instance of :class:`MessageFactory` used by other modules.
    
-   .. autoclass:: MessageFactory
+   .. autoclass:: MessageFactory([s_adapter])
    
       Example::
       

File doc/source/conf.py

 # Theme options are theme-specific and customize the look and feel of a theme
 # further.  For a list of options available for each theme, see the
 # documentation.
-html_theme_options = {
-    'stickysidebar': 'true',
-    'collapsiblesidebar': 'true',
-}
+#html_theme_options = {}
 
 # Add any paths that contain custom themes here, relative to this directory.
 #html_theme_path = []

File pygnetic/__init__.py

                             format='%(asctime)-8s %(levelname)-8s %(message)s',
                             datefmt='%H:%M:%S')
     network.select_adapter(n_module)
-    if network._selected_adapter is not None:
+    if network.selected_adapter is not None:
         _logger.info("Using %s",
-            network._selected_adapter.__name__.split('.')[-1])
+            network.selected_adapter.__name__.split('.')[-1])
     else:
         _logger.critical("Can't find any network module")
         return False
     serialization.select_adapter(s_module)
-    if serialization._selected_adapter is not None:
+    if serialization.selected_adapter is not None:
         _logger.info("Using %s",
-            serialization._selected_adapter.__name__.split('.')[-1])
+            serialization.selected_adapter.__name__.split('.')[-1])
     else:
         _logger.critical("Can't find any serialization module")
         return False

File pygnetic/discovery.py

 import SocketServer
 from collections import OrderedDict
 from itertools import islice
-from .. import __init__ as net
-from .. import message
+import message
+import serialization
 
 _logger = logging.getLogger(__name__)
 
 message_factory = message.MessageFactory()
 # server messages
 register = message_factory.register('register', (
-    'name', 'host', 'port'
+    'oid', 'name', 'host', 'port'
 ))
 get_servers = message_factory.register('get_servers', (
-    'current', 'list_step'
-))
-servers_list = message_factory.register('servers_list', (
-    'servers'
+    'oid', 'current', 'list_step'
 ))
 ping = message_factory.register('ping', (
-    'sid'
+    'oid', 'sid'
 ))
-error = message_factory.register('error', (
-    'type'
-))
-ack = message_factory.register('ack', (
-    'value'
+response = message_factory.register('response', (
+    'oid', 'value', 'error'
 ))
 
 
 class Errors(object):
+    NO_ERROR = 0
     TIMEOUT = 1
 
 
 class ServerHandler(SocketServer.BaseRequestHandler):
     def handle(self):
         data, socket = self.request
+        _logger.debug('Received data: %r', data)
         message = message_factory.unpack(data)
         if message is None:
             return
-        ret_value = getattr(self, 'net_' + message.__class__.__name__)(message)
-        ack_data = message_factory.pack(ack(ret_value))
-        socket.sendto(ack_data, self.client_address)
+        name = message.__class__.__name__
+        _logger.info('Received %s message from %s', name, self.client_address)
+        ret_val, err = getattr(self, 'net_' + name)(message)
+        ack_data = message_factory.pack(response(message.oid, ret_val, err))
+        cnt = socket.sendto(ack_data, self.client_address)
+        _logger.info('Sent response to %s', self.client_address)
+        _logger.debug('Sent %d bytes: %r', cnt, ack_data)
 
     def unknown_msg(self, message):
-        _logger.warning('Received unknown message: %s', message.__calss__.__name__)
+        name = message.__class__.__name__
+        _logger.warning('Received unknown %s message from %s',
+                        name, self.client_address)
 
     def net_register(self, message):
         sid = self.server.id_cnt = self.server.id_cnt + 1
         self.server.servers[sid] = [message, 0]
-        return sid
+        return sid, Errors.NO_ERROR
 
     def net_get_servers(self, message):
         s = message.current * message.list_step
         e = s + message.list_step
         servers = self.server.servers
-        return {k: servers[k][0] for k in islice(servers.iterkeys(), s, e)}
+        d = {k: servers[k][0] for k in islice(servers.iterkeys(), s, e)}
+        return d, Errors.NO_ERROR
 
 
-class DiscoveryServer(SocketServer.UDPServer):
+class DiscoveryServer(SocketServer.UDPServer, object):
     allow_reuse_address = True
     max_packet_size = 8192
 
-    def __init__(self, host, port):
+    def __init__(self, host='', port=5000):
         super(DiscoveryServer, self).__init__((host, port), ServerHandler)
         self.servers = OrderedDict()
         self.id_cnt = 0
 class DiscoveryClient(object):
     max_packet_size = 8192
 
-    def __init__(self, host, port, s_adapter=net.serialization.selected_adapter):
-        self.address = host, port
+    def __init__(self, host, port=5000, s_adapter=None):
+        self.address = socket.gethostbyname(host), port
         self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
         self.response = None
-        self.sid = 0
+        self.oid = 0
+        if s_adapter is None:
+            s_adapter = serialization.selected_adapter
 
-    def _send_msg(self, message):
-        data = message_factory.pack(message)
-        self.socket.sendto(data, self.address)
+    def _send_msg(self, message, *args):
+        oid = self.oid = self.oid + 1
+        data = message_factory.pack(message(oid, *args))
+        cnt = self.socket.sendto(data, self.address)
+        self.response = None
+        name = message.__class__.__name__
+        _logger.info('Sent %s message to %s', name, self.address)
+        _logger.debug('Sent %d bytes: %r', cnt, data)
+        return oid
 
     def update(self, timeout=0):
         r = select.select([self.socket], [], [], timeout / 1000.0)[0]
         if len(r) > 0:
             data, address = self.socket.recvfrom(self.max_packet_size)
             if address == self.address:
-                self.response = message_factory.unpack(data)
+                _logger.debug('Received data: %r', data)
+                r = self.response = message_factory.unpack(data)
+                _logger.info('Received response from %s', address)
+            else:
+                _logger.info('Unexpected data from %s', address)
 
     def close(self):
         self.socket.close()
 
     def register(self, name, host, port):
-        self._send_msg(register(name, host, port))
+        return self._send_msg(register, name, host, port)
 
-    def ping(self):
-        if self.sid == 0:
-            return False
-        self._make_connection()
-        self.connection.net_ping(self.sid)
-        return True
+    def ping(self, sid):
+        return self._send_msg(ping, sid)

File pygnetic/message.py

 
 
 class MessageFactory(object):
-    """Class allowing to register new message types and pack/unpack them."""
-    def __init__(self, s_adater=serialization._selected_adapter):
+    """Class allowing to register new message types and pack/unpack them.
+
+    :param s_adapter:
+        :term:`serialization adapter`
+        (default: None - library selected with :func:`.init`)
+    """
+    def __init__(self, s_adapter=None):
         self._message_names = {}  # name -> message
         self._message_types = WeakValueDictionary()  # type_id -> message
-        self._message_params = WeakKeyDictionary()  # message -> type_id, send par
-        self.s_adapter = s_adater
+        self._message_params = WeakKeyDictionary()  # message -> type_id, send kwargs
+        if s_adapter is None:
+            self.s_adapter = serialization
+        else:
+            self.s_adapter = s_adapter
         self._type_id_cnt = 0
         self._frozen = False
         self._hash = None
         :return: message
         """
         _logger.debug("Unpacking message (length: %d)", len(data))
-        try:
-            message = self.s_adapter.unpack(data)
-        except:
+        message = self.s_adapter.unpack(data)
+        if message is not None:
+            return self._process_message(message)
+        else:
             _logger.error('Data corrupted')
-            self._reset_unpacker()  # prevent from corrupting next data
-            return
-        return self._process_message(message)
+            _logger.debug('Data: %r', data)
 
     def unpack_all(self, data, context):
         """Feed unpacker with data from stream and unpack all messages.
                 ids = self._message_types.keys()
                 ids.sort()
                 l = list()
-                l.append(self.s_adapter.__name__)
+                a = getattr(self.s_adapter, 'selected_adapter', self.s_adapter)
+                l.append(a.__name__)
                 for i in ids:
                     p = self._message_types[i]
                     l.append((i, p.__name__, p._fields))

File pygnetic/serialization/json_adapter.py

     next = decode
 
 
+def unpack(*args):
+    try:
+        return json.loads(*args)
+    except ValueError:
+        pass
+
+
 pack = json.dumps
-unpack = json.loads
 unpacker = JSONDecoder