Commits

Peter Ward committed 0102c9f

add initial synchronisation experimentation

Comments (0)

Files changed (5)

+from gi.repository import GLib
+
+# need to import Thing so it's available to the type system
+from common import Thing
+import sync
+
+def on_my_signal(arg):
+    print 'sync signal handler for my-signal with argument', arg
+
+def on_foo_change(obj, param):
+    print obj.foo
+
+def on_new_object(client, obj):
+    print 'Got new object', obj
+
+    obj.connect('notify::foo', on_foo_change)
+
+    GLib.timeout_add(
+        2000,
+        lambda: obj.emit('my-signal', 36),
+    )
+
+client = sync.Client()
+client.connect('new-object', on_new_object)
+
+loop = GLib.MainLoop()
+loop.run()
+from gi.repository import GObject
+
+class Thing(GObject.GObject):
+    __gsignals__ = {
+        'my-signal': (GObject.SIGNAL_RUN_FIRST, None, (int,)),
+    }
+
+#    def __new__(*args):
+#        print '__new__'
+#        return GObject.GObject.__new__(*args)
+#
+#    def __init__(*args):
+#        print '__init__'
+#        GObject.GObject.__init__(*args)
+
+    def do_my_signal(self, arg):
+        print 'class method for my_signal called with argument', arg
+
+    foo = GObject.property(type=str, default='bar')
+from gi.repository import GLib
+
+from common import Thing
+import sync
+
+t = Thing()
+
+server = sync.Server()
+
+def shortly():
+    # synchronise t with clients, sharing the foo property
+    # and the my-signal signal
+    server.add_object(t, ['foo'], ['my-signal'])
+
+    t.emit('my-signal', 42)
+    t.foo = 'quack'
+
+GLib.timeout_add(2000, shortly)
+
+loop = GLib.MainLoop()
+loop.run()
+from gi.repository import GObject
+#from gi.repository import Json
+
+import json
+import zmq
+from zmqglib import add_zmq_source
+
+FROM_SERVER = 'ipc:///tmp/from-server.sock'
+TO_SERVER = 'ipc:///tmp/to-server.sock'
+
+ctx = zmq.Context()
+
+class Server(GObject.GObject):
+    def __init__(self, from_socket=FROM_SERVER, to_socket=TO_SERVER):
+        super(Server, self).__init__()
+        self.objects = {}
+
+        self.from_sock = ctx.socket(zmq.PUB)
+        self.from_sock.bind(from_socket)
+        self.to_sock = ctx.socket(zmq.REP)
+        self.to_sock.bind(to_socket)
+
+    def add_object(self, obj, properties, signals):
+        type_name = obj.__gtype__.name
+
+        values = {}
+        for prop in properties:
+            obj.connect('notify::' + prop, self.on_notify)
+            value = obj.get_property(prop)
+            values[prop] = value
+
+        self.objects[obj] = (properties, signals)
+
+        self.from_sock.send(
+            'new-object\0%d\0%s\0%s' % (
+                id(obj),
+                type_name,
+                json.dumps(values),
+            )
+        )
+
+    def on_notify(self, obj, param):
+        prop = param.name
+        value = obj.get_property(prop)
+
+        self.from_sock.send(
+            'notify\0%d\0%s\0%s' % (
+                id(obj),
+                prop,
+                json.dumps(value),
+            )
+        )
+
+class Client(GObject.GObject):
+    __gsignals__ = {
+        'new-object': (GObject.SIGNAL_RUN_FIRST, None, (object,)),
+    }
+
+    def __init__(self, from_socket=FROM_SERVER, to_socket=TO_SERVER):
+        super(Client, self).__init__()
+
+        self.objects_by_id = {}
+        self.ids_by_object = {}
+
+        self.from_sock = ctx.socket(zmq.SUB)
+        self.from_sock.setsockopt(zmq.SUBSCRIBE, '')
+        self.from_sock.connect(from_socket)
+        add_zmq_source(self.from_sock, self.on_recv)
+
+        self.to_sock = ctx.socket(zmq.REQ)
+        self.to_sock.connect(to_socket)
+
+    def on_recv(self, events):
+        msg = self.from_sock.recv()
+
+        msg_class, rest = msg.split('\0', 1)
+
+        if msg_class == 'new-object':
+            obj_id, type_name, properties = rest.split('\0', 2)
+
+            properties = json.loads(properties)
+
+            cls = GObject.type_from_name(type_name)
+            obj = GObject.new(cls, **properties)
+
+            self.objects_by_id[obj_id] = obj
+            self.ids_by_object[obj] = obj_id
+
+            self.emit('new-object', obj)
+
+        elif msg_class == 'notify':
+            obj_id, parameter, value = rest.split('\0', 2)
+
+            value = json.loads(value)
+
+            obj = self.objects_by_id[obj_id]
+            obj.set_property(parameter, value)
+
+        else:
+            raise ValueError('unknown message class %r' % msg_class)
+
+        return True
+
+import zmq
+
+from gi.repository import GLib
+
+READ_EVENTS = (
+    GLib.IOCondition.IN |
+    GLib.IOCondition.HUP |
+    GLib.IOCondition.ERR
+)
+
+WRITE_EVENTS = (
+    GLib.IOCondition.OUT |
+    GLib.IOCondition.HUP |
+    GLib.IOCondition.ERR
+)
+
+def add_zmq_source(
+    sock, callback,
+    cond=READ_EVENTS,
+    emask=zmq.POLLIN,
+):
+    def func(source, condition):
+        events = sock.getsockopt(zmq.EVENTS) & emask
+        while events:
+            if not callback(events):
+                return False
+            events = sock.getsockopt(zmq.EVENTS) & emask
+        return True
+
+    fd = sock.getsockopt(zmq.FD)
+    GLib.io_add_watch(fd, cond, func)