Commits

Steve Losh committed 387a4ac

protocols.redis: Add publish/subscribe functionality.

This needs a lot of work, but at least it might get the ball rolling.

Comments (0)

Files changed (1)

diesel/protocols/redis.py

         resp = yield self._get_response()
         yield response(resp)
 
+    @call
+    def subscribe(self, *channels):
+        '''Subscribe to the given channels.
+        
+        Returns the number of channels this client is currently subscribed to.
+        
+        NOTE: You probably shouldn't run multiple subscribe commands, because
+              between the time you run the first and the time you run the next
+              a published message may come in which will mess up the response
+              to the subscribe command (and cause you to lose a message).
+        '''
+        if not channels:
+            yield response(None)
+        else:
+            yield self._send('SUBSCRIBE', *channels)
+            for channel in channels:
+                s, c, n = yield self._get_response()
+            yield response(n)
+
+    @call
+    def unsubscribe(self, *channels):
+        '''Unsubscribe from the given channels, or all of them if none are given.
+        
+        Returns the number of channels this client is still subscribed to.
+        
+        NOTE: This is not really working.  A published message may come in before
+              the unsubscribe command's response and mess things up.  This is
+              something we'll need to fix.
+        '''
+        yield self._send('UNSUBSCRIBE', *channels)
+        
+        resp_iter = xrange(len(channels)) if channels else itertools.cycle([None])
+        for _ in resp_iter:
+            s, c, n = yield self._get_response()
+            if not n:
+                break
+        yield response(n)
+
+    @call
+    def get_from_subscriptions(self):
+        '''Wait for a published message on a subscribed channel.
+        
+        Returns a tuple consisting of:
+        
+            * The channel the message was received from.
+            * The message itself.
+        
+        NOTE: The message will always be a string.  Handle this as you see fit.
+        '''
+        m, channel, payload = yield self._get_response()
+        yield response((channel, payload))
+
+    @call
+    def publish(self, channel, message):
+        '''Publish a message on the given channel.
+        
+        Returns the number of clients that received the message.
+        '''
+        yield self._send_bulk_multi('PUBLISH', channel, str(message))
+        resp = yield self._get_response()
+        yield response(resp)
+
     def _send_bulk(self, cmd, data, *args):
         yield '%s %s%s\r\n' % (cmd, 
         (' '.join(args) + ' ') if args else '', len(data))