Commits

Floris Bruynooghe committed 0d30735

Add patch to replace semaphore

This patch starts replacing eventlet.semaphore with the xthread.Semaphore
emplementation. Thus making everything which is built on top of that
(which is a lot) cross-thread automatically.

This still needs some work for the CappedSemaphore but the base Semaphore
should be fine.

  • Participants
  • Parent commits f864294

Comments (0)

Files changed (2)

+# HG changeset patch
+# Parent e5585ccc1001a2d07dafad524a048cf96ddec33d
+Use xthread-based semaphore by default
+
+This uses the xthread-based semaphore for the eventlet.semaphore module.
+Since this is the base for most locking primitives it allows all of eventlet
+to be used in a cross-thread fasion transparently.
+
+XXX Also need to use xthread in eventlet.event (xthread.Message) and check
+    other possible locations.
+
+diff --git a/eventlet/semaphore.py b/eventlet/semaphore.py
+--- a/eventlet/semaphore.py
++++ b/eventlet/semaphore.py
+@@ -1,7 +1,9 @@
+ from eventlet import greenthread
+ from eventlet import hubs
++from eventlet import xthread
+ 
+-class Semaphore(object):
++
++class Semaphore(xthread.Semaphore):
+     """An unbounded semaphore.
+     Optionally initialize with a resource *count*, then :meth:`acquire` and
+     :meth:`release` resources as needed. Attempting to :meth:`acquire` when
+@@ -19,132 +21,82 @@ class Semaphore(object):
+     If not specified, *value* defaults to 1.
+     """
+ 
+-    def __init__(self, value=1):
+-        self.counter  = value
+-        if value < 0:
+-            raise ValueError("Semaphore must be initialized with a positive "
+-                             "number, got %s" % value)
+-        self._waiters = set()
+-
+-    def __repr__(self):
+-        params = (self.__class__.__name__, hex(id(self)),
+-                  self.counter, len(self._waiters))
+-        return '<%s at %s c=%s _w[%s]>' % params
+-
+-    def __str__(self):
+-        params = (self.__class__.__name__, self.counter, len(self._waiters))
+-        return '<%s c=%s _w[%s]>' % params
+-
+     def locked(self):
+-        """Returns true if a call to acquire would block."""
+-        return self.counter <= 0
++        """Returns true if a call to acquire would block"""
++        return self._value <= 0
+ 
+     def bounded(self):
+-        """Returns False; for consistency with
+-        :class:`~eventlet.semaphore.CappedSemaphore`."""
++        """Returns False
++
++        For consistency with :class:`~eventlet.semaphore.CappedSemaphore`.
++        """
+         return False
+ 
+-    def acquire(self, blocking=True):
+-        """Acquire a semaphore.
+-
+-        When invoked without arguments: if the internal counter is larger than
+-        zero on entry, decrement it by one and return immediately. If it is zero
+-        on entry, block, waiting until some other thread has called release() to
+-        make it larger than zero. This is done with proper interlocking so that
+-        if multiple acquire() calls are blocked, release() will wake exactly one
+-        of them up. The implementation may pick one at random, so the order in
+-        which blocked threads are awakened should not be relied on. There is no
+-        return value in this case.
+-
+-        When invoked with blocking set to true, do the same thing as when called
+-        without arguments, and return true.
+-
+-        When invoked with blocking set to false, do not block. If a call without
+-        an argument would block, return false immediately; otherwise, do the
+-        same thing as when called without arguments, and return true."""
+-        if not blocking and self.locked():
+-            return False
+-        if self.counter <= 0:
+-            self._waiters.add(greenthread.getcurrent())
+-            try:
+-                while self.counter <= 0:
+-                    hubs.get_hub().switch()
+-            finally:
+-                self._waiters.discard(greenthread.getcurrent())
+-        self.counter -= 1
+-        return True
+-
+-    def __enter__(self):
+-        self.acquire()
+-
+-    def release(self, blocking=True):
+-        """Release a semaphore, incrementing the internal counter by one. When
+-        it was zero on entry and another thread is waiting for it to become
+-        larger than zero again, wake up that thread.
+-
+-        The *blocking* argument is for consistency with CappedSemaphore and is
+-        ignored"""
+-        self.counter += 1
+-        if self._waiters:
+-            hubs.get_hub().schedule_call_global(0, self._do_acquire)
+-        return True
+-
+-    def _do_acquire(self):
+-        if self._waiters and self.counter>0:
+-            waiter = self._waiters.pop()
+-            waiter.switch()
+-
+-    def __exit__(self, typ, val, tb):
+-        self.release()
+-
+     @property
+     def balance(self):
+-        """An integer value that represents how many new calls to
+-        :meth:`acquire` or :meth:`release` would be needed to get the counter to
+-        0.  If it is positive, then its value is the number of acquires that can
+-        happen before the next acquire would block.  If it is negative, it is
+-        the negative of the number of releases that would be required in order
+-        to make the counter 0 again (one more release would push the counter to
+-        1 and unblock acquirers).  It takes into account how many greenthreads
+-        are currently blocking in :meth:`acquire`.
++        """Integer indicating number of free/wanted items
++
++        An integer value that represents how many new calls to
++        :meth:`acquire` or :meth:`release` would be needed to get the
++        counter to 0.  If it is positive, then its value is the number
++        of acquires that can happen before the next acquire would
++        block.  If it is negative, it is the negative of the number of
++        releases that would be required in order to have no calls
++        waiting on :meth:`acquire` (one more release would push the
++        internal counter to 1).  It takes into account how may
++        (green)threads are currently blocking in :meth:`acquire`.
++
++        Note that this value is inherently racy and should probably
++        not be relied upon in production code.  In certain situations
++        it could be useful for debugging purposes however.
+         """
+-        # positive means there are free items
+-        # zero means there are no free items but nobody has requested one
+-        # negative means there are requests for items, but no items
+-        return self.counter - len(self._waiters)
++        # Positive means there are free items.  Zero means there are
++        # no free items but nobody has requested one.  Negative means
++        # there are requests for items, but no items.
++        return self._value - len(self._cond._notif._waiters)
+ 
+ 
+ class BoundedSemaphore(Semaphore):
+-    """A bounded semaphore checks to make sure its current value doesn't exceed
+-    its initial value. If it does, ValueError is raised. In most situations
+-    semaphores are used to guard resources with limited capacity. If the
+-    semaphore is released too many times it's a sign of a bug. If not given,
+-    *value* defaults to 1."""
+-    def __init__(self, value=1):
+-        super(BoundedSemaphore, self).__init__(value)
+-        self.original_counter = value
++    """Semaphore which ensures the initial value is not exceeded
++
++    A bounded semaphore checks to make sure its current value doesn't
++    exceed its initial value. If it does, ValueError is raised. In
++    most situations semaphores are used to guard resources with
++    limited capacity. If the semaphore is released too many times it's
++    a sign of a bug. If not given, *value* defaults to 1.
++    """
++
++    def __init__(self, value=1, hubcache=xthread.GLOBAL_HUBCACHE):
++        Semaphore.__init__(self, value, hubcache=hubcache)
++        self._initial_value = value
+ 
+     def release(self, blocking=True):
+-        """Release a semaphore, incrementing the internal counter by one. If
+-        the counter would exceed the initial value, raises ValueError.  When
+-        it was zero on entry and another thread is waiting for it to become
+-        larger than zero again, wake up that thread.
++        """Release a semaphore, incrementing the internal counter by one
+ 
+-        The *blocking* argument is for consistency with :class:`CappedSemaphore`
+-        and is ignored"""
+-        if self.counter >= self.original_counter:
+-            raise ValueError, "Semaphore released too many times"
+-        return super(BoundedSemaphore, self).release(blocking)
++        If the counter would exceed the initial value, raises
++        ValueError.  When it was zero on entry and another thread is
++        waiting for it to become larger than zero again, wake up that
++        thread.
+ 
++        The *blocking* argument is for consistency with
++        :class:`CappedSemaphore` and is ignored
++        """
++        if self._value >= self._initial_value:
++            raise ValueError("Semaphore released too many times")
++        return Semaphore.release(self)
++
++
++# XXX Still convert this --flub
+ class CappedSemaphore(object):
+-    """A blockingly bounded semaphore.
++    """A blockingly bounded semaphore
+ 
+-    Optionally initialize with a resource *count*, then :meth:`acquire` and
+-    :meth:`release` resources as needed. Attempting to :meth:`acquire` when
+-    *count* is zero suspends the calling greenthread until count becomes nonzero
+-    again.  Attempting to :meth:`release` after *count* has reached *limit*
+-    suspends the calling greenthread until *count* becomes less than *limit*
+-    again.
++    Optionally initialize with a resource *count*, then
++    :meth:`acquire` and :meth:`release` resources as
++    needed. Attempting to :meth:`acquire` when *count* is zero
++    suspends the calling greenthread until count becomes nonzero
++    again.  Attempting to :meth:`release` after *count* has reached
++    *limit* suspends the calling greenthread until *count* becomes
++    less than *limit* again.
+ 
+     This has the same API as :class:`threading.Semaphore`, though its
+     semantics and behavior differ subtly due to the upper limit on calls
+diff --git a/tests/semaphore_test.py b/tests/semaphore_test.py
+--- a/tests/semaphore_test.py
++++ b/tests/semaphore_test.py
+@@ -3,6 +3,7 @@ import eventlet
+ from eventlet import semaphore
+ from tests import LimitedTestCase
+ 
++
+ class TestSemaphore(LimitedTestCase):
+     def test_bounded(self):
+         sem = semaphore.CappedSemaphore(2, limit=3)
+@@ -19,7 +20,7 @@ class TestSemaphore(LimitedTestCase):
+         self.assertEqual(3, sem.balance)
+         gt1.wait()
+         gt2.wait()
+-   
++
+     def test_bounded_with_zero_limit(self):
+         sem = semaphore.CappedSemaphore(0, 0)
+         gt = eventlet.spawn(sem.acquire)
+@@ -27,5 +28,5 @@ class TestSemaphore(LimitedTestCase):
+         gt.wait()
+ 
+ 
+-if __name__=='__main__':
++if __name__ == '__main__':
+     unittest.main()
 xthread
+semaphore
 tpool
 docs
 # Placed by Bitbucket