Commits

coady  committed 022869b

Resource pooling.

  • Participants
  • Parent commits 9edc15d

Comments (0)

Files changed (5)

 ==================
 Lupyne should run anywhere PyLucene does, though its primary testing is on the popular unix variants.
 
- * Python 2.6.5+, 2.7
+ * Python 2.6.6+, 2.7
  * PyLucene 3.1, 3.2, 3.3, 3.4, 3.5
  * CherryPy 3.1.2+, 3.2 (only required for server)
 

File docs/client.rst

   :members:
   :exclude-members: response_class
 
+Pool
+---------
+.. autoclass:: Pool
+  :show-inheritance:
+  :members:
+
 Resources
 ---------
 .. autoclass:: Resources

File lupyne/client.py

 `Replicas`_ will automatically retry if host is unreachable.
 """
 
-from future_builtins import map
+from future_builtins import map, zip
 import warnings
 import random
 import time
     content_type = 'application/json'
     def end(self):
         self.body = self.read()
-        self.close()
         self.time = float(self.getheader('x-response-time', 'nan'))
         if 'gzip' in self.getheader('content-encoding', ''):
             self.body = gzip.GzipFile(fileobj=io.BytesIO(self.body)).read()
         "Return response body from DELETE request."
         return self.call('DELETE', path, params=params)()
 
-class Resources(dict):
-    """Thread-safe mapping of hosts to optionally persistent resources.
-    
-    :param hosts: host[:port] strings
-    :param limit: maximum number of cached connections per host
-    """
-    class queue(collections.deque):
-        "Queue of prioritized resources."
-    def __init__(self, hosts, limit=0):
-        self.update((host, self.queue(maxlen=limit)) for host in hosts)
-    def request(self, host, method, path, body=None):
-        "Send request to given host and return exclusive `resource`_."
+class Pool(collections.deque):
+    "Thread-safe resource pool for one host."
+    def __init__(self, host, limit=0):
+        collections.deque.__init__(self, maxlen=limit)
+        self.host = host
+    def stream(self, method, path, body=None):
+        "Generate resource, initial response, and final response while handling timeouts."
         try:
-            resource = self[host].popleft()
+            resource = self.popleft()
         except IndexError:
-            resource = Resource(host)
+            resource = Resource(self.host)
         resource.request(method, path, body)
-        return resource
-    def getresponse(self, host, resource):
-        """Return `response`_ and release `resource`_ if request completed.
-        Return None if it appears request may be repeated."""
+        response = yield resource
         try:
             response = resource.getresponse()
         except httplib.BadStatusLine:
         except socket.error as exc:
             if exc.errno != errno.ECONNRESET:
                 raise
+        if response is None or response.status == httplib.REQUEST_TIMEOUT or (response.status == httplib.BAD_REQUEST and response.body == 'Illegal end of headers.'):
+            resource.close()
+            resource.request(method, path, body)
+            yield response
+            yield resource.getresponse()
         else:
-            if response.status != httplib.REQUEST_TIMEOUT and (response.status != httplib.BAD_REQUEST or response.body != 'Illegal end of headers.'):
-                self[host].append(resource)
-                return response
-        resource.close()
+            self.append(resource)
+            yield response
+            yield response
+    def call(self, method, path, body=None):
+        "Send request and return completed `response`_."
+        return list(self.stream(method, path, body))[-1]
+
+class Resources(dict):
+    """Thread-safe mapping of hosts to resource pools.
+    
+    :param hosts: host[:port] strings
+    :param limit: maximum number of cached resources per host
+    """
+    def __init__(self, hosts, limit=0):
+        self.update((host, Pool(host, limit)) for host in hosts)
     def priority(self, host):
         "Return priority for host.  None may be used to eliminate from consideration."
         return -len(self[host])
             priorities[self.priority(host)].append(host)
         priorities.pop(None, None)
         return random.choice(priorities[min(priorities)])
-    def stream(self, host, method, path, body=None):
-        resource = self.request(host, method, path, body)
-        yield
-        response = self.getresponse(host, resource)
-        if response is None:
-            resource.request(method, path, body)
-        yield
-        if response is None:
-            response = resource.getresponse()
-        yield response
     def unicast(self, method, path, body=None, hosts=()):
         "Send request and return `response`_ from any host, optionally from given subset."
         host = self.choice(tuple(hosts) or self)
-        return list(self.stream(host, method, path, body))[-1]
+        return self[host].call(method, path, body)
     def broadcast(self, method, path, body=None, hosts=()):
         "Send requests and return responses from all hosts, optionally from given subset."
         hosts = tuple(hosts) or self
-        streams = [self.stream(host, method, path, body) for host in hosts]
-        for attempt in range(3):
-            responses = list(map(next, streams))
-        return responses
+        streams = [self[host].stream(method, path, body) for host in hosts]
+        return list(zip(*streams))[-1]
 
 class Shards(dict):
     """Mapping of keys to host clusters, with associated `resources`_.
     Reads are balanced among all remaining hosts.
     """
     get, post, put, delete = map(Resource.__dict__.__getitem__, ['get', 'post', 'put', 'delete'])
-    class queue(Resources.queue):
-        failure = 0
     def __init__(self, hosts, limit=0):
         self.hosts = collections.deque(hosts)
         Resources.__init__(self, self.hosts, limit)
+        for pool in self.values():
+            pool.failure = 0
     def priority(self, host):
-        queue = self[host]
-        if not queue.failure:
-            return -len(queue)
+        pool = self[host]
+        if not pool.failure:
+            return -len(pool)
     def call(self, method, path, body=None, params=(), retry=False):
         """Send request and return completed `response`_, even if hosts are unreachable.
         
             path += '?' + urllib.urlencode(params, doseq=True)
         host = self.choice(self) if method == 'GET' else self.hosts[0]
         try:
-            response = list(self.stream(host, method, path, body))[-1]
+            response = self[host].call(method, path, body)
         except socket.error:
             self[host].failure = time.time()
             if method != 'GET':

File lupyne/server.py

         path = '/' + '{0}/update/{1}/'.format(path, uuid.uuid1()).lstrip('/')
         directory = lucene.FSDirectory.cast_(self.searcher.directory).directory.path
         resource = client.Resource(host)
-        names = set(resource.put(path)).difference(self.searcher.indexCommit.fileNames)
+        names = sorted(set(resource.put(path)).difference(os.listdir(directory)))
         try:
             for name in names:
                 resource.download(path + name, os.path.join(directory, name))

File test/distributed.py

         assert resources.broadcast('GET', '/')
         assert list(map(len, resources.values())) == counts[::-1]
         host = self.hosts[1]
-        resource = resources.request(host, 'GET', '/')
+        stream = resources[host].stream('GET', '/')
+        resource = next(stream)
         resource.getresponse = lambda: getresponse(socket.error)
-        self.assertRaises(socket.error, resources.getresponse, host, resource)
-        resource = resources.request(host, 'GET', '/')
+        self.assertRaises(socket.error, next, stream)
+        stream = resources[host].stream('GET', '/')
+        resource = next(stream)
         resource.getresponse = lambda: getresponse(httplib.BadStatusLine)
-        assert resources.getresponse(host, resource) is None
+        assert next(stream) is None
         resources.clear()
         self.assertRaises(ValueError, resources.unicast, 'GET', '/')