Commits

Jacob Söndergaard committed cf98d3f

On reconnect, try to re-establish the watched tubes list and the tube used. Don't raise a custom exception when there is no connection.

Comments (0)

Files changed (3)

 ### Connection methods
 
 **`connect(callback=None)`**  
-Establish the client's connection to beanstalkd. Calls back when connection has been established. When first connected, the client will attempt to re-connect if the socket is closed.
+Establish the client's connection to beanstalkd. Calls back when connection has been established. After first attempt to connect, the client will automatically attempt to re-connect (with 1 second intervals) if the socket is closed unexpectedly.
 
 **`close(callback=None)`**  
 Close the client's connection to beanstalkd. Calls back when connection has been closed.
 
+**`closed()`**
+Return True if the connection is established, otherwise returns False.
+
+If the connection is down (also while re-connecting), any attempt to communicate with beanstalkd, using methods in the following sections, will raise an IOError exception.
+
 ### Producer methods
 
 **`put(body, priority=DEFAULT_PRIORITY, delay=0, ttr=120, callback=None)`**  

beanstalktc/beanstalktc.py

 limitations under the License.
 '''
 
-__version__ = '0.2.0'
+__version__ = '0.2.1'
 
 import socket
 import time
 class CommandFailed(BeanstalktcException): pass
 class Buried(BeanstalktcException): pass
 class DeadlineSoon(BeanstalktcException): pass
-class NotConnected(BeanstalktcException): pass
 
 
 class Client(object):
         self.port = port
         self.io_loop = io_loop or IOLoop.instance()
         self._stream = None
+        self._using = 'default'  # current tube
+        self._watching = set()   # set of watched tubes
 
     def _reconnect(self, callback):
+        # wait some time before trying to re-connect
         self.io_loop.add_timeout(time.time() + RECONNECT_TIMEOUT,
-                lambda: self.connect(callback))
+                lambda: self.connect(self._reconnected))
+
+    def _reconnected(self):
+        # re-establish the used tube and tubes being watched
+        watch_list = list(self._watching)
+
+        def do_next(arg):
+            try:
+                if watch_list:
+                    self.watch(watch_list.pop(), do_next)
+                elif self._using != 'default':
+                    self.use(self._using)
+            except:
+                # ignored, as next re-connect will retry the operation
+                pass
+
+        do_next()
 
     def connect(self, callback=None):
         """Connect to beanstalkd server."""
-        if not self._stream or self._stream.closed():
+        if self.closed():
             self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM,
                     socket.IPPROTO_TCP)
             self._stream = IOStream(self._socket, io_loop=self.io_loop)
-            self._stream.set_close_callback(lambda: self._reconnect(callback))
+            self._stream.set_close_callback(self._reconnect)
         self._stream.connect((self.host, self.port), callback)
 
     def close(self, callback=None):
         self._stream.write('quit\r\n', self._stream.close)
 
     def closed(self):
-        '''Returns true if the connection has been closed.'''
-        return self._stream.closed()
+        '''Returns True if the connection is closed.'''
+        return not self._stream or self._stream.closed()
 
     def _interact(self, command, callback, expected_ok=[], expected_err=[]):
-        if self.closed():
-            raise NotConnected('Not connected to the beanstalkd server'
-                    ' - attempting to reconnect')
-
         # write command to socket stream
         self._stream.write(command,
                 # when command is written: read line from socket stream
     
         Calls back with the name of the tube now being used.
         """
-        self._interact_value('use %s\r\n' % name, callback, ['USING'])
+        def using(name):
+            self._using = name
+            if callback:
+                callback(name)
+
+        self._interact_value('use %s\r\n' % name, using, ['USING'])
     
     #
     #  Worker commands
     
         Call back with number of tubes currently in the watch list.
         """
+        def watching(count):
+            self._watching.add(name)
+            if callback:
+                callback(count)
+
         self._interact_value('watch %s\r\n' % name, callback, ['WATCHING'])
 
     def ignore(self, name, callback=None):
         A CommandFailed exception is raised on an attempt to ignore the only
         tube in the watch list.
         """
-        self._interact_value('ignore %s\r\n' % name, callback, ['WATCHING'],
+        def ignoring(count):
+            self._watching.remove(name)
+            if callback:
+                callback(count)
+
+        self._interact_value('ignore %s\r\n' % name, ignoring, ['WATCHING'],
             ['NOT_IGNORED'])
 
     #
 from setuptools import setup
 
 setup(name='beanstalktc',
-      version='0.2.0',
+      version='0.2.1',
       description='An async beanstalkd client for Tornado',
       author='Jacob Sondergaard',
       author_email='jacob@nephics.com',