Boris FELD avatar Boris FELD committed 65dd325

Add first version of Proof Of Concept

Comments (0)

Files changed (5)

+Running the Proof Of Concept
+============================
+
+Installation
+------------
+
+For running the poc, you need:
+- Python
+- Pyzmq
+
+In order to install pyzmq, run 'easy_install pyzmq', it shoud be sufficient.
+
+Control nodes
+-------------
+
+When you will launch a node, you cannot be able to interact directly, you must use the commander script.
+
+When you will launch a node, you will see this in stdout:
+
+INFO:4505a2b2-0ea0-4d27-aa3b-cd13996abf8d:Start 4505a2b2-0ea0-4d27-aa3b-cd13996abf8d, listen to 49209 and pub to 50238
+
+Copy the listen port, then launch the commander script with this port, like that:
+
+python commander 49209
+
+Then you can type command, there are 2 valid commands:
+send MSG: Send the message to a random node
+publish MSG: Publish the message
+
+For each of these commands, you should see some output on other nodes.
+
+Running in local
+----------------
+
+You need to launch broker in the background with 'python broker.py'.
+
+Then you can launch several nodes with 'python node.py'.
+
+You can then try to send message or publish message via the commander script.
+
+Running with hub
+----------------
+
+You need to launch broker in the background with 'python hub.py'.
+
+Then you can launch several nodes with 'python node.py --not-localt'.
+
+You can then try to send message or publish message via the commander script.
+
+
+Have fun!
+import zmq
+import json
+import socket
+import logging
+import threading
+
+logging.basicConfig(level=logging.DEBUG)
+
+# Socket part
+ANY = "0.0.0.0"
+MCAST_ADDR = "237.252.249.227"
+MCAST_PORT = 1600
+#create a UDP socket
+sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
+#allow multiple sockets to use the same PORT number
+sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+#Bind to the port that we know will receive multicast data
+sock.bind((ANY, MCAST_PORT))
+#tell the kernel that we are a multicast socket
+sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 255)
+#Tell the kernel that we want to add ourselves to a multicast group
+#The address for the multicast group is the third param
+sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP,
+    socket.inet_aton(MCAST_ADDR) + socket.inet_aton(ANY))
+
+stop_event = threading.Event()
+started_event = threading.Event()
+
+
+def run(blocking=True):
+    logging.info('Multicast broker started on tcp://127.0.0.1:32001')
+    if blocking:
+        sock.settimeout(1)
+    else:
+        sock.settimeout(0)
+
+    # zeromq part
+    pub = zmq.Context.instance().socket(zmq.PUB)
+    pub.bind('tcp://127.0.0.1:32001')
+
+    stop_event.clear()
+    while not stop_event.is_set():
+        started_event.set()
+        try:
+            data, addr = sock.recvfrom(1024)
+            data = json.loads(data)
+            data.update({'address': addr[0]})
+            # data.update({'address': '127.0.0.1'})
+            logging.info('Get data %s', data)
+        except socket.error:
+            pass
+        else:
+            pub.send('register %s' % json.dumps(data))
+            del data, addr
+
+    logging.info('Multicast broker stopped')
+
+
+def stop():
+    stop_event.set()
+
+if __name__ == '__main__':
+    run()
+import zmq
+import sys
+import json
+
+c = zmq.Context()
+s = c.socket(zmq.DEALER)
+s.connect('tcp://127.0.0.1:%s' % sys.argv[1])
+
+while True:
+    cmd = raw_input('Enter cmd: ').split(' ', 1)
+    s.send_multipart(('', cmd[0], json.dumps({'msg': cmd[1]})))
+import zmq
+import logging
+
+HUB_SUB_PORT = 12345
+HUB_PUB_PORT = 12346
+HUB_RCV_PORT = 12347
+HUB_SND_PORT = 12348
+
+# Prepare our context and sockets
+context = zmq.Context()
+rcv = context.socket(zmq.ROUTER)
+rcv.bind("tcp://*:%s" % HUB_RCV_PORT)
+snd = context.socket(zmq.ROUTER)
+snd.bind("tcp://*:%s" % HUB_SND_PORT)
+sub = context.socket(zmq.SUB)
+sub.setsockopt(zmq.SUBSCRIBE, '')
+sub.bind("tcp://*:%s" % HUB_SUB_PORT)
+pub = context.socket(zmq.PUB)
+pub.bind("tcp://*:%s" % HUB_PUB_PORT)
+
+# Initialize poll set
+poller = zmq.Poller()
+poller.register(rcv, zmq.POLLIN)
+poller.register(snd, zmq.POLLIN)
+poller.register(sub, zmq.POLLIN)
+
+logging.basicConfig(level=logging.DEBUG)
+logger = logging.getLogger()
+
+logger.info('Start logger')
+
+# Switch messages between sockets
+while True:
+    socks = dict(poller.poll())
+
+    if socks.get(rcv) == zmq.POLLIN:
+        message = rcv.recv_multipart()
+        logger.info("Message in rcv: %s", message)
+        snd.send_multipart(message[1:])
+
+    if socks.get(snd) == zmq.POLLIN:
+        message = snd.recv_multipart()
+        logger.info("Message in snd: %s", message)
+        rcv.send_multipart(message[1:])
+
+    if socks.get(sub) == zmq.POLLIN:
+        msg = sub.recv_multipart()
+        logger.info("Message in sub: %s", msg)
+        pub.send_multipart(msg)
+# -*- coding: utf-8 -*-
+import zmq
+import sys
+import uuid
+import json
+import socket
+import random
+import logging
+import threading
+from time import sleep
+
+from zmq.eventloop import ioloop, zmqstream
+
+logging.basicConfig(level=logging.DEBUG)
+
+# Come from excellent http://stefan.sofa-rockers.org/2012/02/01/designing-and-testing-pyzmq-applications-part-1/
+def stream(context, loop, sock_type, addr, bind, callback=None, subscribe=None,
+        random=False, identity=None):
+    """
+    Creates a :class:`~zmq.eventloop.zmqstream.ZMQStream`.
+
+    :param sock_type: The ØMQ socket type (e.g. ``zmq.REQ``)
+    :param addr: Address to bind or connect to formatted as *host:port*,
+            *(host, port)* or *host* (bind to random port).
+            If *bind* is ``True``, *host* may be:
+
+            - the wild-card ``*``, meaning all available interfaces,
+            - the primary IPv4 address assigned to the interface, in its
+              numeric representation or
+            - the interface name as defined by the operating system.
+
+            If *bind* is ``False``, *host* may be:
+
+            - the DNS name of the peer or
+            - the IPv4 address of the peer, in its numeric representation.
+
+            If *addr* is just a host name without a port and *bind* is
+            ``True``, the socket will be bound to a random port.
+    :param bind: Binds to *addr* if ``True`` or tries to connect to it
+            otherwise.
+    :param callback: A callback for
+            :meth:`~zmq.eventloop.zmqstream.ZMQStream.on_recv`, optional
+    :param subscribe: Subscription pattern for *SUB* sockets, optional,
+            defaults to ``b''``.
+    :returns: A tuple containg the stream and the port number.
+
+    """
+    sock = context.socket(sock_type)
+
+    # Bind/connect the socket
+    if bind:
+        if not random:
+            sock.bind(addr)
+        else:
+            port = sock.bind_to_random_port(addr)
+    else:
+        sock.connect(addr)
+
+    # Add a default subscription for SUB sockets
+    if sock_type == zmq.SUB:
+        if subscribe:
+            sock.setsockopt(zmq.SUBSCRIBE, subscribe)
+        else:
+            sock.setsockopt(zmq.SUBSCRIBE, '')
+
+    # Identity
+    if identity:
+        sock.setsockopt(zmq.IDENTITY, identity)
+
+    # Create the stream and add the callback
+    stream = zmqstream.ZMQStream(sock, loop)
+    if callback:
+        stream.on_recv(callback)
+
+    if random:
+        return stream, int(port)
+    else:
+        return stream
+
+
+class Service(object):
+
+    # Multicast/Local registration
+    ANY = "0.0.0.0"
+    SENDERPORT = 32000
+    MCAST_ADDR = "237.252.249.227"
+    MCAST_PORT = 1600
+
+    # Global registration
+    HUB_HOST = "127.0.0.1"
+    HUB_SUB_PORT = 12345
+    HUB_PUB_PORT = 12346
+    HUB_RCV_PORT = 12347
+    HUB_SND_PORT = 12348
+
+    def __init__(self):
+        # Service
+        self.service_id = str(uuid.uuid4())
+        self.services = {}
+        self.logger = logging.getLogger(self.service_id)
+
+        # Sockets
+        self.context = zmq.Context.instance()
+        self.loop = ioloop.IOLoop.instance()
+
+        self.pub, self.pub_port = stream(self.context, self.loop, zmq.PUB,
+            'tcp://*', bind=True, random=True)
+        self.sub = stream(self.context, self.loop, zmq.SUB,
+            'tcp://127.0.0.1:32001', bind=False, callback=self.process_sub)
+        self.server, self.server_port = stream(self.context, self.loop,
+            zmq.DEALER, 'tcp://*', bind=True, random=True,
+            callback=self.process_server, identity=self.service_id)
+
+        self.logger.info('Start %s, listen to %s and pub to %s' %
+            (self.service_id, self.server_port, self.pub_port))
+
+    def register_local(self):
+        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM,
+            socket.IPPROTO_UDP)
+        sock.bind((self.ANY, 0))
+        sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 255)
+        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+
+        sock.sendto(json.dumps(self._register_info()),
+            (self.MCAST_ADDR, self.MCAST_PORT))
+
+    def register_global(self):
+        self.sub.connect('tcp://%s:%s' % (self.HUB_HOST, self.HUB_PUB_PORT))
+        self.pub.connect('tcp://%s:%s' % (self.HUB_HOST, self.HUB_SUB_PORT))
+        self.server.connect('tcp://%s:%s' % (self.HUB_HOST, self.HUB_SND_PORT))
+
+        sleep(1)
+        info = self._register_info()
+        info['address'] = 'hub'
+        self.pub.send('register %s' % json.dumps(info))
+
+    def _register_info(self):
+        return {'id': self.service_id, 'pub_port': self.pub_port,
+            'server_port': self.server_port}
+
+    def process_sub(self, messages):
+        for message in messages:
+            message_type, data = message.split(' ', 1)
+            data = json.loads(data)
+            if message_type == 'register' and data['id'] != self.service_id and\
+                    data['id'] not in self.services.keys():
+                self.process_register(data)
+                self.register_to(data['id'])
+            else:
+                self.logger.info('Get event %s: %s', message_type, data)
+
+    def process_server(self, messages):
+        message_type, data = messages[-2:]
+        data = json.loads(data)
+
+        if message_type == 'register' and data['id'] != self.service_id:
+            self.process_register(data)
+        elif message_type == 'send':
+            self.send(random.choice(self.services.keys()), data['msg'])
+        elif message_type == 'publish':
+            self.publish('MSG %s' % data['msg'])
+        else:
+            self.logger.info('Get message: %s', data)
+
+    def register_to(self, service_id):
+        register_info = self._register_info()
+        register_info['address'] = socket.gethostbyname(socket.gethostname())
+        self.send(service_id, register_info, type='register')
+
+    def process_register(self, data):
+        self.services[data['id']] = data
+        print "Data", data
+        if data['address'] != 'hub':
+            sub_address = 'tcp://%s:%s' % (data['address'], data['pub_port'])
+            self.logger.info('Connect sub to %s' % sub_address)
+            self.sub.connect(sub_address)
+
+        self.logger.info('Service %s is at %s:%s, publish at %s:%s ' % (data['id'],
+            data['address'], data['server_port'], data['address'],
+            data['pub_port']))
+
+    def start(self):
+        self.loop.start()
+
+    def send(self, destination, msg, type='msg'):
+        data = self.services[destination]
+
+        client = self.context.socket(zmq.DEALER)
+        if data['address'] == 'hub':
+            address = 'tcp://%s:%s' % (self.HUB_HOST, self.HUB_RCV_PORT)
+            client.connect(address)
+            client.send_multipart((str(destination), type, json.dumps(msg)))
+        else:
+            address = 'tcp://%s:%s' % (data['address'], data['server_port'])
+            client.connect(address)
+            client.send_multipart((type, json.dumps(msg)))
+
+    def publish(self, msg):
+        self.pub.send('MSG %s' % str(json.dumps(msg)))
+
+if __name__ == '__main__':
+    s = Service()
+    if not '--no-local' in sys.argv:
+        s.register_local()
+    s.register_global()
+    s.start()
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.