Commits

Ginés Martínez Sánchez committed 4479ac9 Draft

circle full closed? perhaps!!

Comments (0)

Files changed (6)

ginsfsm/examples/router/test_router.ini

 use = call:ginsfsm.examples.router.test_router:gaplic1
 gaplic-name = TITI
 router_enabled = true
-GRouter.server = true
-GWebSocket.trace_mach = false
-GRouter.trace_mach = true
-#GObj.trace_mach = true
-GSock.trace_dump = true
+GRouter.server = false
+GRouter.static_routes = TOTO, toto, http://localhost:8000;
+GRouter.trace_router = false
+#GObj.trace_mach = false
+GSock.trace_dump = false
+Sample1.trace_mach = true
 
 [app:gaplic2]
 use = call:ginsfsm.examples.router.test_router:gaplic2
 gaplic-name = TOTO
 router_enabled = true
-#GRouter.server = true
-GWebSocket.trace_mach = false
-GRouter.trace_mach = true
-#GObj.trace_mach = true
-GSock.trace_dump = true
+GRouter.server = true
+GRouter.localhost_route_ports= 8000
+GRouter.trace_router = false
+#GObj.trace_mach = false
+GSock.trace_dump = false
+Sample2.trace_mach = true
 
 
 # Begin logging configuration

ginsfsm/examples/router/test_router.py

 # -*- encoding: utf-8 -*-
 """
-Router in own ports
-===================
+In this example:
+    TITI gaplic of main thread
+    TOTO gaplic another thread or subprocess
+
+TOTO enable router server in port 8000
+TITI add static router to localhost:8000
+
+TOTO has EV_SET_TIMEOUT Api.
+    Its action is set a timeout that broadcast EV_MESSAGE.
+
+TITI do two things:
+    - subscribe to event EV_MESSAGE of external TOTO
+    - send EV_SET_TIMEOUT to external TOTO
+
+    The EV_SET_TIMEOUT event reachs TOTO,
+    then it set the timer that broadcast EV_MESSAGE.
+
+    TITI receive the external EV_MESSAGE because it's subscribe it.
+
+    When TITI receive the EV_MESSAGE,
+    re-send EV_SET_TIMEOUT to external TOTO
+    and so on.
+
 """
 
 from ginsfsm.gaplic import GAplic
 #                   GAplic TITI
 #===============================================================
 def ac_message(self, event):
+    print "ORDENNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNN"
     list_globals()
+    self.gaplic.send_event_to_external_gaplic(
+        'TOTO',             # gaplic_name
+        'sample2',          # gobj_name
+        'EV_SET_TIMEOUT',   # event_name
+        self,               # subscriber_gobj
+        timeout=2,
+    )
 
 
 SAMPLE1_FSM = {
     def start_up(self):
         """
         """
-        self.gaplic.subscribe_event_from_external_gaplic(
-            'TOTO',  # gaplic_name
-            'sample2',  # gobj_name
-            'EV_MESSAGE',  # event_name
-            self  # subscriber_gobj
+        ret = self.gaplic.subscribe_event_from_external_gaplic(
+            'TOTO',         # gaplic_name
+            'sample2',      # gobj_name
+            'EV_MESSAGE',   # event_name
+            self,           # subscriber_gobj
+        )
+        if not ret:
+            print "Eiiiiiiiiiiiiiiiiiiiiii something wrong!"
+
+        self.gaplic.send_event_to_external_gaplic(
+            'TOTO',             # gaplic_name
+            'sample2',          # gobj_name
+            'EV_SET_TIMEOUT',   # event_name
+            self,               # subscriber_gobj
+            timeout=2,
         )
 
 
 #===============================================================
 #                   GAplic TOTO
 #===============================================================
+def ac_set_timeout(self, event):
+    # TODO: get the seconds
+    timeout = event.timeout
+    self.set_timeout(timeout)
+
+
 def ac_timeout(self, event):
-    if self.count == 0:
-        ret = self.send_event(
-            'router',
-            'EV_ADD_EXTERNAL_ROUTER',
-            url='ws://127.0.0.1:2690',
-        )
-        if ret >= 1:
-            self.count += 1
-        self.set_timeout(8)
-    else:
-        self.broadcast_event('EV_MESSAGE')
+    self.broadcast_event('EV_MESSAGE')
 
 
 SAMPLE2_FSM = {
     'event_list': (
         'EV_MESSAGE: top output',
         'EV_TIMEOUT: bottom input',
+        'EV_SET_TIMEOUT: top input',
     ),
     'state_list': ('ST_IDLE',),
     'machine': {
         'ST_IDLE':
         (
-            ('EV_TIMEOUT', ac_timeout, None),
+            ('EV_SET_TIMEOUT',  ac_set_timeout,     None),
+            ('EV_TIMEOUT',      ac_timeout,         None),
         ),
     }
 }
             None,
             GTimer,
             self)
-        self.set_timeout(10)
-        """
-        self.gaplic.subscribe_event_from_external_gaplic(
-            self.gaplic,  # gaplic_name
-            self,  # gobj_name
-            'EV_MESSAGE',  # event_name
-            self  # subscriber_gobj
-        )
-        """
 
     def set_timeout(self, seconds):
         self.send_event(self.timer, 'EV_SET_TIMER', seconds=seconds)
     else:
         raise Exception('You must supply an gaplic name ("gaplic-name")')
 
-    ga = GAplic(gaplic_name, **local_conf)
+    ga = GAplic(name=gaplic_name, roles=('titi',), **local_conf)
     ga.create_gobj(
         'sample1',
         Sample1,
     else:
         raise Exception('You must supply an gaplic name ("gaplic-name")')
 
-    ga = GAplic(gaplic_name, **local_conf)
+    ga = GAplic(name=gaplic_name, roles=('toto',), **local_conf)
     ga.create_gobj(
         'sample2',
         Sample2,
 
     # simulate running from ini file
     local_conf = {
-        'gaplic-name': 'TITI',
+        'gaplic-name': 'TOTO',
         'router_enabled': True,
         'GRouter.server': True,
-        'GSock.trace_dump': True,
+        'GRouter.trace_router': True,
+        'GRouter.localhost_route_ports': 8000,
+        'GSock.trace_dump': False,
         'GObj.trace_mach': False,
         'GObj.logger': logging,
     }
 
     # run main process
     local_conf = {
-        'gaplic-name': 'TOTO',
+        'gaplic-name': 'TITI',
         'router_enabled': True,
         'GRouter.server': False,
-        'GSock.trace_dump': True,
+        'GRouter.trace_router': True,
+        'GRouter.static_routes': 'TOTO, toto, http://localhost:8000',
+        'GSock.trace_dump': False,
         'GObj.trace_mach': False,
         'GObj.logger': logging,
     }

ginsfsm/gaplic.py

         if logger is None:
             # TODO use package caller
             self.logger = logging.getLogger(__name__)
+            if not self.logger:
+                self.logger = logging
         else:
             if isinstance(logger, string_types):
                 self.logger = logging.getLogger(logger)
               response of external executed action.
               Received with the same event.
         """
-        self.router.mt_send_event_to_external_role(
+        if not isinstance(role, string_types):
+            raise TypeError(
+                'Destination role %r must be a string'
+                % (role)
+            )
+        if not isinstance(gobj_name, string_types):
+            raise TypeError(
+                'Destination gobj %r must be a string'
+                % (gobj_name)
+            )
+
+        if not isinstance(event_name, string_types):
+            raise TypeError(
+                'Event name %r must be a string'
+                % (event_name)
+            )
+
+        if not isinstance(subscriber_gobj, (string_types, GObj)):
+            raise TypeError(
+                'Subscriber gobj %r must be a string'
+                % (subscriber_gobj)
+            )
+
+        if role in self.roles:
+            raise TypeError(
+                "Please don't use external methods to self gaplic."
+                % (role)
+            )
+
+        if isinstance(subscriber_gobj, (GObj)):
+            subscriber_gobj = subscriber_gobj.name
+
+        subs_gobj = self.find_unique_gobj(subscriber_gobj)
+        if not subs_gobj:
+            raise TypeError(
+                'Subscriber gobj %r must be a __unique_named__ gobj'
+                % (subscriber_gobj)
+            )
+
+        return self.router.mt_send_event_to_external_role(
             role,
             gobj_name,
             event_name,
               response of external executed action.
               Received with the same event.
         """
-        self.router.mt_send_event_to_external_gaplic(
+        if not isinstance(gaplic_name, string_types):
+            raise TypeError(
+                'Destination gaplic %r must be a string'
+                % (gaplic_name)
+            )
+        if not isinstance(gobj_name, string_types):
+            raise TypeError(
+                'Destination gobj %r must be a string'
+                % (gobj_name)
+            )
+
+        if not isinstance(event_name, string_types):
+            raise TypeError(
+                'Event name %r must be a string'
+                % (event_name)
+            )
+
+        if not isinstance(subscriber_gobj, (string_types, GObj)):
+            raise TypeError(
+                'Subscriber gobj %r must be a string'
+                % (subscriber_gobj)
+            )
+
+        if gaplic_name == self.name:
+            raise TypeError(
+                "Please don't use external methods to self gaplic."
+                % (gaplic_name)
+            )
+
+        if isinstance(subscriber_gobj, (GObj)):
+            subscriber_gobj = subscriber_gobj.name
+
+        subs_gobj = self.find_unique_gobj(subscriber_gobj)
+        if not subs_gobj:
+            raise TypeError(
+                'Subscriber gobj %r must be a __unique_named__ gobj'
+                % (subscriber_gobj)
+            )
+
+        return self.router.mt_send_event_to_external_gaplic(
             gaplic_name,
             gobj_name,
             event_name,
             self, gaplic_name, gobj_name, event_name, subscriber_gobj, **kw):
         """ Subscribe an event of an external gaplic.
         """
-        if not (isinstance(gobj_name, string_types) and
-                isinstance(gaplic_name, string_types)):
-            raise TypeError(
-                'Destination %r %r must be a string instance'
-                % (gaplic_name, gobj_name)
-            )
-
-        if gaplic_name == self.name:
-            named_gobj = self.find_unique_gobj(gobj_name)
-            if named_gobj:
-                named_gobj.subscribe_event(event_name, subscriber_gobj, **kw)
-                return
-
-        self.router.mt_subscribe_external_event(
+        kw.update({'__subscribe_event__': True})
+        return self.send_event_to_external_gaplic(
             gaplic_name,
             gobj_name,
             event_name,
             subscriber_gobj,
-            kw
+            **kw
+        )
+
+    def subscribe_event_from_external_role(
+            self, role, gobj_name, event_name, subscriber_gobj, **kw):
+        """ Subscribe an event of an external gaplic.
+        """
+        kw.update({'__subscribe_event__': True})
+        return self.send_event_to_external_role(
+            role,
+            gobj_name,
+            event_name,
+            subscriber_gobj,
+            **kw
         )
 
     def delete_all_references(self, gobj):
             parent_name = "%s:%s" % (parent.__class__.__name__, parent.name)
             name = parent_name + '.' + name
             parent = parent.parent
-        return "%s: %s" % (name, 'Destroyed' if self._destroyed else "Lived")
+        return "'%s: %s'" % (name, 'Destroyed' if self._destroyed else "Lived")
 
     def create_gobj(self, name, gclass, parent, **kw):
         """ Factory function to create gobj's instances.
                     )
                     oevent.event_name = sub.__rename_event_name__
 
+                if hasattr(sub, '__subscriptor_reference__'):
+                    oevent.kw.update(
+                        {
+                            '__subscriptor_reference__':
+                            sub.__subscriptor_reference__,
+                        }
+                    )
+                    oevent.event_name = sub.__rename_event_name__
+
                 ret = False
                 if isinstance(sub.subscriber_gobj, Deferred):
                     # outside world
             otherwise otherwise a :exc:`GObjError` will be raised.
 
         :param kw: keyword arguments.
+        :return: the subscription object.
 
         Possible values for **kw** arguments:
             * `__use_post_event__`: ``bool``
               the sent event with the value of the original event name.
 
 
-            * `__hard_subscription__`: permanent subscription.
+            * `__hard_subscription__`:  ``bool``
+
+              True for permanent subscription.
 
               This subscription cannot be remove,
               neither with delete_all_subscriptions().
+              (Well, with force you can)
+
+            * `__subscriptor_reference__`:  ``str``
+
+              If exists, it will be added as kw in the event broadcast.
+              Can be used by the subscriptor for general purposes.
 
         """
         #if subscriber_gobj is not None:
             self.logger.debug('NEW %r' % subscription)
         self._dl_subscriptions.add(subscription)
         self._some_subscriptions = True
+        return subscription
 
     def _find_subscription(self, event_name, subscriber_gobj):
         """ Find a subscription by event_name and subscriber gobj.
                     return sub
         return None
 
+    def delete_subscription_by_object(self, subscription):
+        if subscription in self._dl_subscriptions:
+            self._dl_subscriptions.remove(subscription)
+            if len(self._dl_subscriptions) == 0:
+                self._some_subscriptions = False
+            return True
+        return False
+
     def delete_subscription(self, event_name, subscriber_gobj):
         """ Remove `subscription`.
 

ginsfsm/protocols/sockjs/server/c_websocket.py

                         message = message.decode("utf-8")
                     except:
                         pass
-                    self.logger.debug(
-                        "%r: RECEIVE Websocket text FRAME: %r" % (
-                        self, message)
-                    )
+                    if self.trace_mach:
+                        self.logger.debug(
+                            "%r: RECEIVE Websocket text FRAME: %r" % (
+                            self, message)
+                        )
                     self.gaplic.add_callback(
                         self.broadcast_event,
                         'EV_ON_MESSAGE',
                     )
             elif operation == OPCODE_BINARY_FRAME:
                 if message:
-                    self.logger.debug(
-                        "%r: RECEIVE Websocket binary FRAME: %r" % (
-                        self, message)
-                    )
+                    if self.trace_mach:
+                        self.logger.debug(
+                            "%r: RECEIVE Websocket binary FRAME: %r" % (
+                            self, message)
+                        )
                     self.gaplic.add_callback(
                         self.broadcast_event,
                         'EV_ON_MESSAGE',
             elif operation == OPCODE_CONTROL_CLOSE:
                 self.close()
             elif operation == OPCODE_CONTROL_PING:
-                self.logger.debug("%r: RECEIVE Websocket Control PING" % (
-                    self)
-                )
+                if self.trace_mach:
+                    self.logger.debug("%r: RECEIVE Websocket Control PING" % (
+                        self)
+                    )
                 self.pong()
             elif operation == OPCODE_CONTROL_PONG:
-                self.logger.debug("%r: RECEIVE Websocket Control PONG" % (
-                    self)
-                )
+                if self.trace_mach:
+                    self.logger.debug("%r: RECEIVE Websocket Control PONG" % (
+                        self)
+                    )
             else:
                 self.logger.error("ERROR %r: Websocket BAD OPCODE %r" % (
                     self, operation)

ginsfsm/router.py

 .. autoclass:: GRouter
     :members: mt_subscribe_gobj_event
 
+NEW IDEAS:
 Los eventos se garantiza que se entregan al siguiente nodo.
 Si se ha podido entregar entonces llega el ack
 que provoca que se borre de la cola.
 from ginsfsm.utils import (
     get_host_port_from_urls,
     get_path_from_url,
-)
-from ginsfsm.protocols.sockjs.server.proto import json_decode
-from ginsfsm.utils import (
     hexdump,
     random_key,
 )
+from ginsfsm.protocols.sockjs.server.proto import json_decode
+
+
+class ExternalSubscription(object):
+    """ Container of external subscription.
+    """
+    def __init__(self, intra_event, route_ref):
+        self.event_name = intra_event.event_name
+        self.destination_gobj = intra_event.destination_gobj
+        self.origin_gaplic = intra_event.origin_gaplic
+        self.origin_gobj = intra_event.origin_gobj
+        self.route_ref = route_ref
 
 
 class IntraEvent(object):
         'message_type',
         'serial',
         'event_name',
+        'reference',
     ]
     event_nack_fields = [
         'message_type',
             my_gaplic_name=None,
             my_roles=None,
             kw=None,
-            error_message=None):
+            error_message=None,
+            reference=None):
         self.message_type = message_type
         self.route_ref = route_ref
         self.serial = serial
         else:
             self.kw = {}
         self.error_message = error_message
+        self.reference = reference
 
     def toJSON(self):
         message_type = self.message_type
     def fire_pending_events(self, route):
         if not route.route_ref:
             return
-        if not route.identity_ack:
-            # TODO: improve sending identity card with timeouts
-            self.router.send_identity_card(route.write)
-            return
         write = route.write
         trace_router = self.router.config.trace_router
 
         if not write:
             if trace_router:
-                self.logger.info("    * route DISCONNECTED!!")
+                self.router.logger.info("    * route DISCONNECTED!!")
             return False
+
+        if not route.identity_ack:
+            # TODO: improve sending identity card with timeouts
+            self.router.send_identity_card(route.write)
+            return
+
         if route.cur_pending_event:
-
             try:
                 msg = route.cur_pending_event.toJSON()
             except Exception as e:
             if trace_router:
                 self.logger.info("   !!! IntraEvent MINE !!!")
 
+            if intra_event.kw.get('__subscribe_event__', False):
+                # it's a external subscription
+                external_subscriptor_ref = self.make_external_subscription(
+                    intra_event,
+                    this_route.route_ref
+                )
+                if not external_subscriptor_ref:
+                    ack = intra_event.copy('__event_nack__')
+                    ack.error_message = "Cannot make external subscription"
+                    if trace_router:
+                        prefix = '%s ==> %s' % (
+                            registry.my_gaplic_name, this_route.route_ref)
+                        self.trace_intra_event(prefix, ack)
+                    write(ack.toJSON())
+                    return
+
+                subs_kw = {
+                    '__rename_event_name__': 'EV_SUBSCRIPTION',
+                    '__subscriptor_reference__': external_subscriptor_ref,
+                }
+                try:
+                    named_gobj.subscribe_event(
+                        intra_event.event_name,
+                        self,
+                        **subs_kw
+                    )
+                except Exception as e:
+                    self.remove_external_subscription(external_subscriptor_ref)
+                    ack = intra_event.copy('__event_nack__')
+                    ack.error_message = "Cannot subscribe event: " + e
+                    if trace_router:
+                        prefix = '%s ==> %s' % (
+                            registry.my_gaplic_name, this_route.route_ref)
+                        self.trace_intra_event(prefix, ack)
+                    write(ack.toJSON())
+                    return
+
+                # simple ack, no checkout, for the sender could remove the msg.
+                ack = intra_event.copy('__event_ack__')
+                ack.reference = external_subscriptor_ref
+                if trace_router:
+                    prefix = '%s ==> %s' % (
+                        registry.my_gaplic_name, this_route.route_ref)
+                    self.trace_intra_event(prefix, ack)
+                write(ack.toJSON())
+                return
+
             # simple ack, no checkout, for the sender could remove the msg.
             ack = intra_event.copy('__event_ack__')
             if trace_router:
     return ret
 
 
+def ac_subscription(self, event):
+    subscriptor_reference = event.__subscriptor_reference__
+    subscription = self.external_subscriptions.get(subscriptor_reference, None)
+    if subscription:
+        self.mt_send_event_to_external_route(
+            subscription.route_ref,
+            subscription.origin_gobj,
+            subscription.event_name,
+            subscription.destination_gobj,
+            event.kw)
+
+
 GROUTER_FSM = {
     'event_list': (
         'EV_ADD_STATIC_ROUTE: top input',
         'EV_ON_MESSAGE: bottom input',
         'EV_INPUT_MESSAGE: bottom input',
         'EV_TIMEOUT: bottom input',
+        'EV_SUBSCRIPTION: bottom input',
     ),
     'state_list': ('ST_IDLE',),
     'machine': {
             ('EV_ON_MESSAGE',           ac_on_message,          None),
             ('EV_INPUT_MESSAGE',        ac_input_message,       None),
             ('EV_TIMEOUT',              ac_timeout,             None),
+            ('EV_SUBSCRIPTION',         ac_subscription,        None),
         ),
     }
 }
         GObj.__init__(self, GROUTER_FSM, GROUTER_GCONFIG)
         self.server_sock = None  # server sock of tcp router
         self.local_router = None
-        self.gaplic_registry = {}
+        self.external_subscriptions = {}
 
     def start_up(self):
         """ Initialization zone.
             timeout_event_name='EV_TIMEOUT')
         self.set_timeout(self.config.timeout_idle)
 
-        self.config.trace_router = True  # TODO: REMOVE!!!
         if self.config.static_routes:
             routes = self.config.static_routes
             for r in routes:
             self.send_event(self._timer, 'EV_SET_TIMER', seconds=-1)
 
     def mt_execute_command(self, command):
+        """ TODO
+        """
         ret = 'EXECUTED ' + command + ' Ok'
         return ret
 
             r = dynamic_routes[key]
             name = r.gaplic_name
             roles = r.roles
-            s += '  route_ref: %-12s | name:%s | roles:%s\n' % (key, name, roles)
+            s += '  route_ref: %-12s | name:%s | roles:%s\n' % (
+                key, name, roles)
         s += '\n'
         return s
 
             event_name,
             subscriber_gobj,
             kw):
-        """
+        """ Send event to external role
         """
         registry = self.registry
         route = self.search_route_by_role(role)
             destination_role=role,
             destination_gobj=gobj_name,
             origin_gaplic=registry.my_gaplic_name,
-            origin_gobj=subscriber_gobj.name,
+            origin_gobj=subscriber_gobj,
             kw=kw)
 
         registry.enqueue_pending_event(route, intra_event)
             event_name,
             subscriber_gobj,
             kw):
-        """
+        """ Send event to external gaplic
         """
         registry = self.registry
         route = self.search_route_by_gaplic(gaplic_name)
             destination_gaplic=gaplic_name,
             destination_gobj=gobj_name,
             origin_gaplic=registry.my_gaplic_name,
-            origin_gobj=subscriber_gobj.name,
+            origin_gobj=subscriber_gobj,
             kw=kw)
 
         registry.enqueue_pending_event(route, intra_event)
             event_name,
             subscriber_gobj,
             kw):
-        """
+        """ Send event to route.
         """
         registry = self.registry
         route = registry.get_route_by_ref(route_ref)
             registry.fire_pending_events(route)
         return True
 
-    def xxx_send_event_to_external_gaplic(
-            self, gaplic_name, gobj_name, event_name, subscriber_gobj, **kw):
-        """ Send an event to an external gaplic.
+    def make_external_subscription(self, intra_event, route_ref):
+        k = random_key()
+        subscription = ExternalSubscription(intra_event, route_ref)
+        self.external_subscriptions[k] = subscription
+        return k
 
-        :param gaplic_name: name of external gaplic.
-        :param gobj_name: name of external gobj.
-        :param event_name: name of the event to send.
-        :param subscriber_gobj: subscriber obj that wants receive the response.
-        :param kw: keyword arguments.
-
-        Possible values for **kw** arguments:
-            * `__shot_subscription__`: ``int`` Number of subcriptions
-              to response of external executed action.
-        """
-        print "SENNNNNNNDDD", gaplic_name, gobj_name
-
-    def xxx_send_event_to_external_role(
-            self, role, gobj_name, event_name, subscriber_gobj, **kw):
-        """ Send an event to an external gaplic.
-
-        :param gaplic_name: name of external gaplic.
-        :param gobj_name: name of external gobj.
-        :param event_name: name of the event to send.
-        :param subscriber_gobj: subscriber obj that wants receive the response.
-        :param kw: keyword arguments.
-
-        Possible values for **kw** arguments:
-            * `__shot_subscription__`: ``int`` Number of subcriptions
-              to response of external executed action.
-        """
-        print "SENNNNNNNDDD", role, gobj_name
-
-    def xxx_subscribe_external_event(
-            self, gaplic_name, gobj_name, event_name, subscriber_gobj, **kw):
-        """ Subscribe to an gobj event of external gaplic.
-
-        :param gobj: it must be an compoust name: `gaplic-name.gobj-name`
-        """
-        print "SUBSCRIBEEEE", gaplic_name, gobj_name
+    def remove_external_subscription(self, external_subscriptor_reference):
+        if external_subscriptor_reference in self.external_subscriptions:
+            subs = self.external_subscriptions[external_subscriptor_reference]
+            if subs:
+                subs.destination_gobj.delete_subscription_by_object(
+                    subs.subscription)
+            del self.external_subscriptions[external_subscriptor_reference]