Kirill Simonov avatar Kirill Simonov committed c6b17c8

Added connection pool.

Also, fixed conversion from `NUMBER` for Oracle; added `weight` indicator
to `Utility` adapters.

Comments (0)

Files changed (5)

src/htsql/adapter.py

         Hello, World!
     """
 
+    weight = 0.0
+
+    @classmethod
+    def dominates(component, other):
+        if issubclass(component, other):
+            return True
+        if component.weight > other.weight:
+            return True
+        return False
+
     @classmethod
     def matches(component, dispatch_key):
         # For an utility, the dispatch key is always a 0-tuple.
         return ()
 
 
+def weigh(value):
+    assert isinstance(value, (int, float))
+    frame = sys._getframe(1)
+    frame.f_locals['weight'] = value
+
+
 class Adapter(Component):
     """
     Implements adapter interfaces.

src/htsql/application.py

         self.component_registry = ComponentRegistry()
         # A cached copy of the introspected catalog (FIXME: catalog_registry?).
         self.cached_catalog = None
+        # A cached copy of the connection pool (FIXME: connection_pool?).
+        self.cached_pool = None
 
     def __enter__(self):
         """

src/htsql/connect.py

 
 
 from __future__ import with_statement
-from .adapter import Adapter, Utility, adapts
+from .adapter import Adapter, Utility, adapts, weigh
 from .domain import Domain
+from .context import context
+import threading
 
 
 class DBError(Exception):
     def __init__(self, connection, guard):
         self.connection = connection
         self.guard = guard
+        self.is_busy = True
+        self.is_valid = True
 
     def cursor(self):
         """
         """
         Close the connection.
         """
+        self.is_valid = False
         with self.guard:
             return self.connection.close()
 
+    def invalidate(self):
+        self.is_valid = False
+
+    def acquire(self):
+        assert not self.is_busy
+        self.is_busy = True
+
+    def release(self):
+        assert self.is_busy
+        self.is_busy = False
+
 
 class CursorProxy(object):
     """
         return None
 
 
+class Pool(object):
+
+    def __init__(self):
+        self.lock = threading.Lock()
+        self.items = []
+
+
+class PoolConnect(Connect):
+
+    weigh(1.0)
+
+    def __call__(self, with_autocommit=False):
+        if with_autocommit:
+            return super(PoolConnect, self).__call__(with_autocommit)
+        app = context.app
+        if app.cached_pool is None:
+            app.cached_pool = Pool()
+        pool = app.cached_pool
+        with pool.lock:
+            for connection in pool.items[:]:
+                if not connection.is_valid:
+                    pool.items.remove(connection)
+            for connection in pool.items:
+                if not connection.is_busy:
+                    connection.acquire()
+                    return connection
+            connection = super(PoolConnect, self).__call__(with_autocommit)
+            pool.items.append(connection)
+            return connection
+
+
 class Normalize(Adapter):
 
     adapts(Domain)

src/htsql/request.py

         profile = RequestProfile(plan)
         records = None
         if plan.sql:
+            select = plan.frame.segment.select
+            normalizers = []
+            for phrase in select:
+                normalize = Normalize(phrase.domain)
+                normalizers.append(normalize)
+            connection = None
             try:
                 connect = Connect()
                 connection = connect()
                 cursor = connection.cursor()
                 cursor.execute(plan.sql)
-                rows = cursor.fetchall()
-                connection.close()
+                records = []
+                for row in cursor:
+                    values = []
+                    for item, normalize in zip(row, normalizers):
+                        value = normalize(item)
+                        values.append(value)
+                    records.append((values))
+                connection.release()
             except DBError, exc:
                 raise EngineError("error while executing %r: %s"
                                   % (plan.sql, exc), plan.mark)
-            records = []
-            select = plan.frame.segment.select
-            normalizers = []
-            for phrase in select:
-                normalize = Normalize(phrase.domain)
-                normalizers.append(normalize)
-            for row in rows:
-                values = []
-                for item, normalize in zip(row, normalizers):
-                    value = normalize(item)
-                    values.append(value)
-                records.append((values))
+            except:
+                if connection is not None:
+                    connection.invalidate()
         return Product(profile, records)
 
     def render(self, environ):

src/htsql_oracle/connect.py

     Implementation of the connection adapter for Oracle.
     """
 
+    @classmethod
+    def outconverter(cls, value):
+        value = value.replace(',', '.')
+        if '.' in value:
+            return decimal.Decimal(value)
+        return int(value)
+
+    @classmethod
+    def outputtypehandler(cls, cursor, name, defaultType,
+                          size, precision, scale):
+        if defaultType == cx_Oracle.NUMBER:
+            return cursor.var(str, 100, cursor.arraysize,
+                              outconverter=cls.outconverter)
+
     def open_connection(self, with_autocommit=False):
         db = context.app.db
         parameters = {}
         connection = cx_Oracle.connect(**parameters)
         if with_autocommit:
             connection.autocommit = True
+        connection.outputtypehandler = self.outputtypehandler
         return connection
 
     def normalize_error(self, exception):
 
     def __call__(self, value):
         if isinstance(value, cx_Oracle.LOB):
-            value = value.read()
+            try:
+                value = value.read()
+            except cx_Oracle.Error, exc:
+                message = str(exc)
+                raise OracleError(message, exc)
         if isinstance(value, unicode):
             value = value.encode('utf-8')
         return value
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.