Commits

Mike Bayer committed b2f5b7f

migrated Queue.Queue to its own module here, to assure RLock compatibility

Comments (0)

Files changed (3)

 - fixed bug when specifying explicit module to mysql dialect
 - when QueuePool times out it raises a TimeoutError instead of
 erroneously making another connection
-- attempting to fix a rare hang that can occur with Queue.Queue
+- Queue.Queue usage in pool has been replaced with a locally
+modified version (works in py2.3/2.4!) that uses a threading.RLock 
+for a mutex.  this is to fix a reported case where a ConnectionFairy's 
+__del__() method got called within the Queue's get() method, which 
+then returns its connection to the Queue via the the put() method, 
+causing a reentrant hang unless threading.RLock is used.
 
 0.2.3
 - overhaul to mapper compilation to be deferred.  this allows mappers

lib/sqlalchemy/pool.py

 be managed automatically, based on module type and connect arguments,
  simply by calling regular DBAPI connect() methods."""
 
-import Queue, weakref, string, cPickle
-import util, exceptions
+import weakref, string, cPickle
+from sqlalchemy import util, exceptions
+import sqlalchemy.queue as Queue
 
 try:
     import thread
-    import threading
 except:
     import dummythread as thread
-    import dummythreading as threading
 
 proxies = {}
 
         self._creator = creator
         self._pool = Queue.Queue(pool_size)
 
-        # modify the pool's mutex to be an RLock.  this is because a rare condition can
-        # occur where a ConnectionFairy's __del__ method gets called within the get() method
-        # of the Queue (and then tries to do a put() within the get()), causing a re-entrant hang.
-        # the RLock allows the mutex to be reentrant within that case.
-        self._pool.mutex = threading.RLock()
-        self._pool.not_empty = threading.Condition(self._pool.mutex)
-        self._pool.not_full = threading.Condition(self._pool.mutex)
-        
         self._overflow = 0 - pool_size
         self._max_overflow = max_overflow
         self._timeout = timeout

lib/sqlalchemy/queue.py

+"""an adaptation of Py2.3/2.4's Queue module which supports reentrant behavior,
+using RLock instead of Lock for its mutex object.
+this is to support the connection pool's usage of __del__ to return connections
+to the underlying Queue, which can apparently in extremely rare cases be invoked
+within the get() method of the Queue itself, producing a put() inside the get()
+and therefore a reentrant condition."""
+
+from time import time as _time
+
+try:
+    # py2.4 deque class
+    from collections import deque
+except:
+    # roll our own...
+    class deque(list):
+        def popleft(self):
+            return self.pop(0)
+
+__all__ = ['Empty', 'Full', 'Queue']
+
+class Empty(Exception):
+    "Exception raised by Queue.get(block=0)/get_nowait()."
+    pass
+
+class Full(Exception):
+    "Exception raised by Queue.put(block=0)/put_nowait()."
+    pass
+
+class Queue:
+    def __init__(self, maxsize=0):
+        """Initialize a queue object with a given maximum size.
+
+        If maxsize is <= 0, the queue size is infinite.
+        """
+        try:
+            import threading
+        except ImportError:
+            import dummy_threading as threading
+        self._init(maxsize)
+        # mutex must be held whenever the queue is mutating.  All methods
+        # that acquire mutex must release it before returning.  mutex
+        # is shared between the two conditions, so acquiring and
+        # releasing the conditions also acquires and releases mutex.
+        self.mutex = threading.RLock()
+        # Notify not_empty whenever an item is added to the queue; a
+        # thread waiting to get is notified then.
+        self.not_empty = threading.Condition(self.mutex)
+        # Notify not_full whenever an item is removed from the queue;
+        # a thread waiting to put is notified then.
+        self.not_full = threading.Condition(self.mutex)
+
+    def qsize(self):
+        """Return the approximate size of the queue (not reliable!)."""
+        self.mutex.acquire()
+        n = self._qsize()
+        self.mutex.release()
+        return n
+
+    def empty(self):
+        """Return True if the queue is empty, False otherwise (not reliable!)."""
+        self.mutex.acquire()
+        n = self._empty()
+        self.mutex.release()
+        return n
+
+    def full(self):
+        """Return True if the queue is full, False otherwise (not reliable!)."""
+        self.mutex.acquire()
+        n = self._full()
+        self.mutex.release()
+        return n
+
+    def put(self, item, block=True, timeout=None):
+        """Put an item into the queue.
+
+        If optional args 'block' is true and 'timeout' is None (the default),
+        block if necessary until a free slot is available. If 'timeout' is
+        a positive number, it blocks at most 'timeout' seconds and raises
+        the Full exception if no free slot was available within that time.
+        Otherwise ('block' is false), put an item on the queue if a free slot
+        is immediately available, else raise the Full exception ('timeout'
+        is ignored in that case).
+        """
+        self.not_full.acquire()
+        try:
+            if not block:
+                if self._full():
+                    raise Full
+            elif timeout is None:
+                while self._full():
+                    self.not_full.wait()
+            else:
+                if timeout < 0:
+                    raise ValueError("'timeout' must be a positive number")
+                endtime = _time() + timeout
+                while self._full():
+                    remaining = endtime - _time()
+                    if remaining <= 0.0:
+                        raise Full
+                    self.not_full.wait(remaining)
+            self._put(item)
+            self.not_empty.notify()
+        finally:
+            self.not_full.release()
+
+    def put_nowait(self, item):
+        """Put an item into the queue without blocking.
+
+        Only enqueue the item if a free slot is immediately available.
+        Otherwise raise the Full exception.
+        """
+        return self.put(item, False)
+
+    def get(self, block=True, timeout=None):
+        """Remove and return an item from the queue.
+
+        If optional args 'block' is true and 'timeout' is None (the default),
+        block if necessary until an item is available. If 'timeout' is
+        a positive number, it blocks at most 'timeout' seconds and raises
+        the Empty exception if no item was available within that time.
+        Otherwise ('block' is false), return an item if one is immediately
+        available, else raise the Empty exception ('timeout' is ignored
+        in that case).
+        """
+        self.not_empty.acquire()
+        try:
+            if not block:
+                if self._empty():
+                    raise Empty
+            elif timeout is None:
+                while self._empty():
+                    self.not_empty.wait()
+            else:
+                if timeout < 0:
+                    raise ValueError("'timeout' must be a positive number")
+                endtime = _time() + timeout
+                while self._empty():
+                    remaining = endtime - _time()
+                    if remaining <= 0.0:
+                        raise Empty
+                    self.not_empty.wait(remaining)
+            item = self._get()
+            self.not_full.notify()
+            return item
+        finally:
+            self.not_empty.release()
+
+    def get_nowait(self):
+        """Remove and return an item from the queue without blocking.
+
+        Only get an item if one is immediately available. Otherwise
+        raise the Empty exception.
+        """
+        return self.get(False)
+
+    # Override these methods to implement other queue organizations
+    # (e.g. stack or priority queue).
+    # These will only be called with appropriate locks held
+
+    # Initialize the queue representation
+    def _init(self, maxsize):
+        self.maxsize = maxsize
+        self.queue = deque()
+
+    def _qsize(self):
+        return len(self.queue)
+
+    # Check whether the queue is empty
+    def _empty(self):
+        return not self.queue
+
+    # Check whether the queue is full
+    def _full(self):
+        return self.maxsize > 0 and len(self.queue) == self.maxsize
+
+    # Put a new item in the queue
+    def _put(self, item):
+        self.queue.append(item)
+
+    # Get an item from the queue
+    def _get(self):
+        return self.queue.popleft()
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.