Commits

Anonymous committed d6f0427 Draft

use marshalling

Comments (0)

Files changed (1)

araldo_redis/endpoints/redis_endpoint.py

 """ araldo Endpoint for communication with Redis PubSub
 """
 
+import json
 import gevent
 from gevent.monkey import patch_all
 
         """
         messages = self._client.listen()
         while True:
-            message = messages.next()
-            print "%s: received: %s" % (self, message)
-            self.gevent_queue.put((self.name(), message))
+            try:
+                redis_message = messages.next()
+                self._logger.debug(
+                    "%s: received: %s",
+                    self, redis_message)
+                data = redis_message["data"]
+                self._logger.debug("data: %s", data)
+                message = self._marshalling.to_internal_format(data)
+                self._logger.debug(
+                    "unmarshalled: %s",
+                    message)
+                self.gevent_queue.put((self.name(), message))
+            except TypeError as error:
+                self._logger.debug(
+                    "Unable to unmarshal message; discarding (%s)",
+                    error)
             gevent.sleep(0)
 
     def check(self):
         return "outbound-redis"
 
     def send(self, message):
+        message_str = self._marshalling.to_external_format(message)
+        self._logger.debug("message_str: %s", message_str)
         self._server.publish(
             self._channel_name,
-            message)
+            message_str)