Commits

Szymon Wróblewski committed 6cc1df4

recreated discovery server and client

Comments (0)

Files changed (7)

pygnetic/discovery.py

+import logging
+import socket
+import select
+import SocketServer
+from collections import OrderedDict
+from itertools import islice
+from .. import __init__ as net
+from .. import message
+
+_logger = logging.getLogger(__name__)
+
+message_factory = message.MessageFactory()
+# server messages
+register = message_factory.register('register', (
+    'name', 'host', 'port'
+))
+get_servers = message_factory.register('get_servers', (
+    'current', 'list_step'
+))
+servers_list = message_factory.register('servers_list', (
+    'servers'
+))
+ping = message_factory.register('ping', (
+    'sid'
+))
+error = message_factory.register('error', (
+    'type'
+))
+ack = message_factory.register('ack', (
+    'value'
+))
+
+
+class Errors(object):
+    TIMEOUT = 1
+
+
+class ServerHandler(SocketServer.BaseRequestHandler):
+    def handle(self):
+        data, socket = self.request
+        message = message_factory.unpack(data)
+        if message is None:
+            return
+        ret_value = getattr(self, 'net_' + message.__class__.__name__)(message)
+        ack_data = message_factory.pack(ack(ret_value))
+        socket.sendto(ack_data, self.client_address)
+
+    def unknown_msg(self, message):
+        _logger.warning('Received unknown message: %s', message.__calss__.__name__)
+
+    def net_register(self, message):
+        sid = self.server.id_cnt = self.server.id_cnt + 1
+        self.server.servers[sid] = [message, 0]
+        return sid
+
+    def net_get_servers(self, message):
+        s = message.current * message.list_step
+        e = s + message.list_step
+        servers = self.server.servers
+        return {k: servers[k][0] for k in islice(servers.iterkeys(), s, e)}
+
+
+class DiscoveryServer(SocketServer.UDPServer):
+    allow_reuse_address = True
+    max_packet_size = 8192
+
+    def __init__(self, host, port):
+        super(DiscoveryServer, self).__init__((host, port), ServerHandler)
+        self.servers = OrderedDict()
+        self.id_cnt = 0
+
+
+class DiscoveryClient(object):
+    max_packet_size = 8192
+
+    def __init__(self, host, port, s_adapter=net.serialization.selected_adapter):
+        self.address = host, port
+        self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+        self.response = None
+        self.sid = 0
+
+    def _send_msg(self, message):
+        data = message_factory.pack(message)
+        self.socket.sendto(data, self.address)
+
+    def update(self, timeout=0):
+        r = select.select([self.socket], [], [], timeout / 1000.0)[0]
+        if len(r) > 0:
+            data, address = self.socket.recvfrom(self.max_packet_size)
+            if address == self.address:
+                self.response = message_factory.unpack(data)
+
+    def close(self):
+        self.socket.close()
+
+    def register(self, name, host, port):
+        self._send_msg(register(name, host, port))
+
+    def ping(self):
+        if self.sid == 0:
+            return False
+        self._make_connection()
+        self.connection.net_ping(self.sid)
+        return True

pygnetic/discovery/__init__.py

Empty file removed.

pygnetic/discovery/client.py

-from weakref import proxy
-from .. import __init__ as net
-import messages
-
-
-class Handler(net.Handler):
-    def net_register_ack(self, message, **kwargs):
-        self.parent.sid = message.sid
-
-    def net_servers_list(self, message, **kwargs):
-        pass
-
-    def net_error(self, message, **kwargs):
-        if message.type == messages.Errors.TIMEOUT:
-            self._make_connection()
-            self.parent.net_register(self.parent.info)
-
-    def net_ping(self, message, **kwargs):
-        self._make_connection()
-        self.parent.net_ping(message.sid)
-
-
-class Client(object):
-    def __init__(self, host, port=5000, n_adapter=net.network.selected_adapter,
-                 s_adapter=net.serialization.selected_adapter):
-        messages.message_factory.s_adapter = s_adapter
-        self.client = n_adapter.Client()
-        self.connection = None
-        self.handler = Handler()
-        self.handler.parent = proxy(self)
-        self.address = host, port
-        self.info = ('', '', 0)
-        self.sid = 0
-
-    def _make_connection(self):
-        if self.connection is None or not self.connection.connected:
-            self.connection = self.client.connect(*self.address)
-            self.connection.add_handler(self.handler)
-
-    def register(self, name, host, port):
-        self.info = name, host, port
-        self._make_connection()
-        self.connection.net_register(name, host, port)
-
-    def ping(self):
-        if self.sid == 0:
-            return False
-        self._make_connection()
-        self.connection.net_ping(self.sid)
-        return True

pygnetic/discovery/messages.py

-# -*- coding: utf-8 -*-
-
-from .. import message
-
-message_factory = message.MessageFactory()
-# server messages
-register = message_factory.register('register', (
-    'name',
-    'host',
-    'port'
-))
-get_servers = message_factory.register('get_servers', (
-    'list_step'
-))
-next_servers = message_factory.register('next_servers', (
-    'current'
-))
-servers_list = message_factory.register('servers_list', (
-    'servers'
-))
-ping = message_factory.register('ping', (
-    'sid'
-))
-error = message_factory.register('error', (
-    'type'
-))
-ack = message_factory.register('ack', (
-    'value'
-))
-
-
-class Errors(object):
-    TIMEOUT = 1

pygnetic/discovery/server.py

-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-import logging
-from .. import __init__ as net
-import messages
-
-_logger = logging.getLogger(__name__)
-
-
-class Handler(net.Handler):
-    def __init__(self):
-        self.curr = 0
-        self.list_step = 10
-        self.servers = None
-
-    def net_register(self, message, **kwargs):
-        sid = self.server.id_cnt = self.server.id_cnt + 1
-        self.server.servers[sid] = [message, 0]
-        self.connection.net_register_ack(sid)
-        self.connection.disconnect()
-
-    def net_get_servers(self, message, **kwargs):
-        self.list_step = int(message.list_step)
-        self.servers = tuple(m for m, _ in self.server.servers.itervalues())
-        self.connection.net_servers_list(self.servers[:self.list_step])
-        self.curr = 1
-
-    def net_next_servers(self, message, **kwargs):
-        if self.servers is not None:
-            s = self.curr * self.list_step
-            e = s + self.list_step
-            self.curr += 1
-            t = self.servers[s:e]
-            self.connection.net_servers_list(t)
-            if len(t) < self.list_step:
-                self.connection.disconnect()
-
-    def net_ping(self, message, **kwargs):
-        s = self.server.servers.get(message.sid)
-        if s is not None:
-            s[1] = 0
-        else:
-            self.connection.net_error(messages.Errors.TIMEOUT)
-        self.connection.disconnect()
-
-
-class DiscoveryServer(net.Server):
-    handler = Handler
-    message_factory = messages.message_factory
-    servers = {}
-    id_cnt = 0
-    ping_timeout = 30
-
-    def update(self, *args):
-        super(DiscoveryServer, self).update(*args)
-        self.servers = {k: [v[0], v[1] + 1]
-                        for k, v in self.servers.iteritems()
-                        if v[1] + 1 < self.ping_timeout}
-
-
-def run(host, port):
-    server = DiscoveryServer(host, port)
-    _logger.info('Listening')
-    try:
-        while True:
-            server.update(1000)
-    except KeyboardInterrupt:
-        pass
-
-
-if __name__ == '__main__':
-    import argparse
-    parser = argparse.ArgumentParser(description='Server discovery service.')
-    parser.add_argument('-a', '--address', type=str, default='')
-    parser.add_argument('-p', '--port', type=int, default=5000)
-    args = parser.parse_args()
-    print 'Server started (Use Control-C to exit)'
-    run(args.address, args.port)
-    print 'Exiting'

pygnetic/discovery/server_.py

-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-import logging
-from SimpleXMLRPCServer import SimpleXMLRPCServer
-
-_logger = logging.getLogger(__name__)
-
-
-class DiscoveryService(object):
-    def __init__(self):
-        self.servers = []
-
-    def register(self, name, address, port):
-        """register(name, address, port) => string
-
-        Register new server
-        """
-        try:
-            self.servers.append((str(name), str(address), int(port)))
-        except Exception as e:
-            return '%s: %s' % (e.__class__.__name__, e.message)
-
-    def list(self):
-        """list() => [<name, address, port>]
-
-        List all available servers
-        """
-        return self.servers
-
-
-def main(address, port):
-    server = SimpleXMLRPCServer((address, port), allow_none=True)
-    server.register_introspection_functions()
-    server.register_instance(DiscoveryService())
-    server.serve_forever()
-    try:
-        server.serve_forever()
-    except KeyboardInterrupt:
-        pass
-
-
-if __name__ == '__main__':
-    import argparse
-    parser = argparse.ArgumentParser(description='Server discovery service.')
-    parser.add_argument('-a', '--address', type=str, default='')
-    parser.add_argument('-p', '--port', type=int, default=8000)
-    args = parser.parse_args()
-    print 'Server started (Use Control-C to exit)'
-    main(args.address, args.port)
-    print 'Exiting'

pygnetic/network/socket_adapter.py

                 sock.close()
 
     def update(self, timeout=0):
-        asyncore.loop(timeout / 1000, False, None, 1)
+        asyncore.loop(timeout / 1000.0, False, None, 1)
 
 
 class Client(client.Client):
         return conn, conn.socket.fileno()
 
     def update(self, timeout=0):
-        asyncore.loop(timeout / 1000, False, self.conn_map, 1)
+        asyncore.loop(timeout / 1000.0, False, self.conn_map, 1)