Szymon Wróblewski avatar Szymon Wróblewski committed 59fc2b7

discovery module updated, small changes in message module

Comments (0)

Files changed (2)

pygnetic/discovery.py

 message_factory = message.MessageFactory()
 
 register = message_factory.register('register', (
-    'oid', 'name', 'host', 'port'
+    'oid', 'name', 'port'
 ))
 get_servers = message_factory.register('get_servers', (
     'oid', 'current', 'part_size'
     'oid', 'sid'
 ))
 response = message_factory.register('response', (
-    'oid', 'value', 'error'
+    'oid', 'mid', 'value', 'error'
 ))
 
 
         name = message.__class__.__name__
         _logger.info('Received %s message from %s', name, self.client_address)
         ret_val, err = getattr(self, 'net_' + name)(message)
-        ack_data = message_factory.pack(response(message.oid, ret_val, err))
+        mid = message_factory.get_params(message.__class__)[0]
+        ack_data = message_factory.pack(response(message.oid, mid, ret_val, err))
         cnt = socket.sendto(ack_data, self.client_address)
         _logger.info('Sent response to %s', self.client_address)
         _logger.debug('Sent %d bytes: %r', cnt, ack_data)
 
     def net_register(self, message):
         sid = self.server.id_cnt = self.server.id_cnt + 1
-        self.server.servers[sid] = [message[1:], 0]
+        self.server.servers[sid] = [(message.name, self.client_address[0], message.port), 0]
         return sid, Errors.NO_ERROR
 
     def net_get_servers(self, message):
         self.address = socket.gethostbyname(host), port
         self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
         self.response = None
-        self.callback = None
+        self.callbacks = {}
+        self.default_callback = lambda r: None
         self.oid = 0
 
-    def _send_msg(self, message, *args):
+    def _send_msg(self, message_cls, *args):
         oid = self.oid = self.oid + 1
-        data = message_factory.pack(message(oid, *args))
+        data = message_factory.pack(message_cls(oid, *args))
         cnt = self.socket.sendto(data, self.address)
         self.response = None
-        name = message.__class__.__name__
-        _logger.info('Sent %s message to %s', name, self.address)
+        _logger.info('Sent %s message to %s', message_cls.__name__, self.address)
         _logger.debug('Sent %d bytes: %r', cnt, data)
         return oid
 
     def update(self, timeout=0):
         r = select.select([self.socket], [], [], timeout / 1000.0)[0]
+        # sending to closed socket puts some data in receive buffer causing
+        # error while calling recvfrom
         if len(r) > 0:
-            data, address = self.socket.recvfrom(self.max_packet_size)
+            try:
+                data, address = self.socket.recvfrom(self.max_packet_size)
+            except Exception as e:
+                _logger.debug('Error: %s', e)
+                return
             if address == self.address:
                 _logger.debug('Received data: %r', data)
                 r = self.response = message_factory.unpack(data)
+                if r.__class__ != response:
+                    return
                 _logger.info('Received response from %s', address)
-                if callable(self.callback):
-                    self.callback(r)
+                self.callbacks.pop(r.oid, self.default_callback)(r)
             else:
                 _logger.info('Unexpected data from %s', address)
 
     def close(self):
         self.socket.close()
 
-    def register(self, name, host, port):
-        return self._send_msg(register, name, host, port)
+    def add_callback(self, oid, callback):
+        if callback is not None and callable(callback):
+            self.callbacks[oid] = callback
+        if len(self.callbacks) > 10:
+            keep = sorted(self.callbacks.iterkeys())[:-10]
+            self.callbacks = {k: v for k, v in self.callbacks.iter(keep)}
+
+    def register(self, name, port):
+        return self._send_msg(register, name, port)
 
     def ping(self, sid):
         return self._send_msg(ping, sid)

pygnetic/message.py

         """Returns message class with given name.
 
         :param name: name of message
-        :return: message class (namedtuple)
+        :return: message class (namedtuple) or None if not found
         """
-        return self._message_names[name]
+        return self._message_names.get(name)
 
     def get_by_type(self, type_id):
         """Returns message class with given type_id.
 
         :param type_id: type identifier of message
-        :return: message class (namedtuple)
+        :return: message class (namedtuple) or None if not found
         """
-        return self._message_types[type_id]
+        return self._message_types.get(type_id)
 
     def get_params(self, message_cls):
         """Return tuple containing type_id, and sending keyword arguments
 
         :param message_cls: message class created by register
-        :return: int, dict
+        :return: int, dict or None if not found
         """
-        return self._message_params[message_cls]
+        return self._message_params.get(message_cls)
 
     def get_hash(self):
         """Calculate and return hash.
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.