Commits

benoitc  committed e2c0024

new branch threadsafe. httpc is now threadsafe and could handle multiple
connection by reusing them via a queue. This is a naive implementation.
It may be better to share socket rather than HTTPConnection object which
isn't pickable.

  • Participants
  • Parent commits 7635881
  • Branches threadsafe

Comments (0)

Files changed (1)

File restkit/httpc.py

 import urllib
 import urlparse
 
+
+"""try:
+    from multiprocessing import Lock, Queue
+    from Queue import Empty, Full
+except ImportError:
+    from threading import Lock 
+    from Queue import Queue, Empty, Full"""
+  
+
+from threading import Lock 
+from Queue import Queue, Empty, Full 
+
 import restkit
 from restkit import errors
 from restkit.utils import to_bytestring
 class HttpClient(object):
     MAX_REDIRECTIONS = 5
     
-    def __init__(self, follow_redirect=True, force_follow_redirect=False):
+    def __init__(self, follow_redirect=True, force_follow_redirect=False, 
+            max_connections=100, block=False, timeout=None):
         self.authorizations = []
         self.use_proxy = False
         self.follow_redirect = follow_redirect
         self.force_follow_redirect = force_follow_redirect
+        self.connections = {}
+        self.max_connections = max_connections
+        self.block = block
+        self.timeout = None
+        self.lock = Lock()
         
     def add_authorization(self, obj_auth):
         self.authorizations.append(obj_auth)
         
     def _get_connection(self, uri, headers=None):
         connection = None
-        if uri.scheme == 'https':
-            if not uri.port:
-                connection = httplib.HTTPSConnection(uri.hostname)
-            else:
-                connection = httplib.HTTPSConnection(uri.hostname, uri.port)
-        else:
-            if not uri.port:
-                connection = httplib.HTTPConnection(uri.hostname)
-            else:
-                connection = httplib.HTTPConnection(uri.hostname, uri.port)
+        conn_key = (uri.scheme, uri.netloc)
+        self.lock.acquire()
+        try:
+            pool = self.connections.setdefault(conn_key, Queue(self.max_connections))
+            try:
+                connection = pool.get(block=self.block)
+                if connection.sock is False:
+                    connection.connect()
+            except Empty:
+                if uri.scheme == 'https':
+                    if not uri.port:
+                        connection = httplib.HTTPSConnection(uri.hostname)
+                    else:
+                        connection = httplib.HTTPSConnection(uri.hostname, uri.port)
+                else:
+                    if not uri.port:
+                        connection = httplib.HTTPConnection(uri.hostname)
+                    else:
+                        connection = httplib.HTTPConnection(uri.hostname, uri.port)
+        finally:
+            self.lock.release()
         return connection
         
+    def _release_connection(self, uri, connection):
+        conn_key = (uri.scheme, uri.netloc)
+        self.lock.acquire()
+        try:
+            pool = self.connections.setdefault(conn_key, Queue(self.max_connections))
+            pool.put(connection, block=self.block)
+        finally:
+            self.lock.release()
         
-    def _make_request(self, uri, method, body, headers):
-        connection = self._get_connection(uri, headers)
-        connection.debuglevel = restkit.debuglevel
-        
+    def _make_request(self, uri, method, body, headers): 
         for i in range(2):
+            connection = self._get_connection(uri, headers)
+            connection.debuglevel = restkit.debuglevel
             try:
                 if connection.host != uri.hostname:
                     connection.putrequest(method, uri.geturl())
                         if not new_uri.netloc: # we got a relative url
                             absolute_uri = "%s://%s" % (uri.scheme, uri.netloc)
                             new_url = urlparse.urljoin(absolute_uri, new_url)
+                        self._release_connection(uri, connection)
                         response, connection = self._request(url_parser(new_url), method, body, 
                             headers, nb_redirections + 1)
                         self.final_url = new_url
                         absolute_uri = "%s://%s" % (uri.scheme, uri.netloc)
                         new_uri = url_parser(new_url)
                         new_url = urlparse.urljoin(absolute_uri, new_url)
+                    self._release_connection(uri, connection)
                     response, connection = self._request(url_parser(new_url), 'GET', headers, nb_redirections + 1)
                     self.final_url = new_url
             else:
             connection.close()
             return resp, ""
         else:
-            return resp, _decompress_content(resp, response, connection, stream, stream_size)
+            return resp, _decompress_content(resp, response, 
+                lambda: self._release_connection(uri, connection), 
+                stream, stream_size)
         
 class ProxiedHttpClient(HttpClient):
     """ HTTP Client with simple proxy management """
             return httplib.HTTPConnection(proxy_uri.hostname, proxy_uri.port)
         return None
             
-def _decompress_content(resp, response, connection, stream=False, stream_size=16384):
+def _decompress_content(resp, response, release_callback, stream=False, stream_size=16384):
     try:
         encoding = resp.get('content-encoding', None)
         if encoding in ['gzip', 'deflate']:
             
             if encoding == 'gzip':
                 compressedstream = StringIO.StringIO(response.read())
-                connection.close()
+                release_callback()
                 data = gzip.GzipFile(fileobj=compressedstream)
                 if stream:
                     return ResponseStream(data, stream_size)
                     return data.read()
             else:
                 data =  zlib.decompress(response.read())
-                connection.close()
+                release_callback()
                 if stream:
                     return ResponseStream(StringIO.StringIO(data), stream_size)
                 else:
                     return data
         else:
             if stream:
-                return ResponseStream(response, stream_size, connection)
+                return ResponseStream(response, stream_size, release_callback)
             else:
                 data = response.read()
-                connection.close()
+                release_callback()
                 return data
     except Exception, e:
         raise errors.ResponseError("Decompression failed %s" % str(e))
         
 class ResponseStream(object):
     
-    def __init__(self, response, amnt=16384, connection=None):
+    def __init__(self, response, amnt=16384, release_callback=None):
         self.response = response
         self.amnt = amnt
-        self.connection = connection
+        self.callback = release_callback
         
     def next(self):
         return self.response.read(self.amnt)
                 yield data
             else:
                 break
-        if hasattr(self.connection, 'close'):
-            self.connection.close()
+        if self.callback is not None:
+            self.callback()
         
 class HTTPResponse(dict):
     """An object more like email.Message than httplib.HTTPResponse.