Commits

Floris Bruynooghe committed 0f2c8fd

Refactor to fix notifying a greenthread from a thread

The problem was that is was possible for greenthread to be released
before it switched to the hub. This would mean it would go straight
to switching to the hub to block, but never be switched back anymore.
This has been fixed by creating a waiter object for greenthreads to
block on which keeps state. This way it will not switch away anymore
if it gets notified before it got to blocking. Since the notification
always happens in the same thread (thanks to .schedule_call_global
being safe to do from another thread) the state kept in the waiter
instance is thread-safe.

  • Participants
  • Parent commits 6dacb39

Comments (0)

Files changed (1)

 # HG changeset patch
 # Parent e2a73a7f7e459d258010b40d0ce3b55aca4aa179
 # User Floris Bruynooghe <flub@devork.be>
-# Date 1337897116 -3600
+# Date 1337898924 -3600
 
 Create xthread package
 
 new file mode 100644
 --- /dev/null
 +++ b/eventlet/xthread.py
-@@ -0,0 +1,624 @@
+@@ -0,0 +1,662 @@
 +"""Cross-thread syncronisation primitives for eventlet
 +
 +This contains various syncronisation primitives for use with eventlet.
 +may or may not be running eventlet hubs.
 +"""
 +
++from __future__ import with_statement
++
 +# XXX Integration TODO:
 +#
 +# * Check what versions of Python need to be supported.
 +import eventlet
 +
 +
++__all__ = ['HubCache', 'GLOBAL_HUBCACHE', 'Event', 'Lock', 'RLock',
++           'Message', 'Condition', 'Queue', 'PriorityQueue', 'LifoQueue']
++
++
 +# In Python 3.2 threading.Lock finally grew a timeout parameter.
 +_l = threading.Lock()
 +try:
 +"""
 +
 +
++class Waiter(object):
++    """A waiter object
++
++    This must be instantiated in the thread/greenthread which is going
++    to call .block().  The main feature of this class is that you can
++    release it before you have entered .block(), entering .block() in
++    this case just returns immediately.  But note that both .block()
++    and .release() can only be called once.
++
++    Do not create this directly, use the Notifier.create_waiter()
++    method instead.
++
++    Attributes
++    ----------
++
++    :released: Keeps track of the state, True if .release() was
++       called, False otherwise.  See Notifier.wait() to see how this
++       can be used to avoid reace conditions.
++    """
++    # Private attributes:
++    # :_hub: The hub this waiter was created for
++    # :_coro: The greenthread which wants to block, if created for a coro
++    # :_lock: The lock to control blocking, if created for a thread
++    __slots__ = ['hubcache', 'released', '_hub', '_coro', '_lock']
++
++    def __init__(self, hubcache=GLOBAL_HUBCACHE):
++        self.hubcache = hubcache
++        self._hub = hub = eventlet.hubs.get_hub()
++        self.released = False
++        if self._hub.running:
++            self._coro = eventlet.getcurrent()
++            if hub not in hubcache:
++                self._create_pipe()
++        else:
++            self._lock = threading.Lock()
++            self._lock.acquire()
++
++    def block(self, timeout=None):
++        """Block until .release() is called
++
++        If the waiter has already been released it will return
++        immediately.  Should only be called once, must not be called
++        concurrently.
++        """
++        if self.released:
++            return
++        if self._hub.running:
++            return self._gblock(timeout)
++        else:
++            return self._tblock(timeout)
++
++    def _gblock(self, timeout=None):
++        """Block a greenthread until released
++
++        This does not check ._released and instead will
++        unconditionally switch to the hub, so make sure it's only
++        called once, you should use .block() normally.
++        """
++        if self._coro is not eventlet.getcurrent():
++            raise RuntimeError('Mismatching greenthread')
++        if timeout is None:
++            self._hub.switch()
++            return True
++        else:
++            try:
++                with eventlet.Timeout(timeout):
++                    self._hub.switch()
++            except eventlet.Timeout:
++                return False
++            else:
++                return True
++
++    def _tblock(self, timeout=None):
++        """Block a thread until released"""
++        if timeout is None:
++            self._lock.acquire()
++            return True
++        elif _LOCK_HAS_TIMEOUT:
++            self._lock.acquire(timeout=timeout)
++        else:
++            # Spin around a little, just like the stdlib does
++            _time = time.time
++            _sleep = time.sleep
++            min = __builtin__.min
++            _lock = self._lock
++            endtime = _time() + timeout
++            delay = 0.0005      # 500 us -> initial delay of 1 ms
++            while True:
++                gotit = _lock.acquire(0)
++                if gotit:
++                    break
++                remaining = endtime - _time()
++                if remaining <= 0:
++                    break
++                delay = min(delay * 2, remaining, .05)
++                _sleep(delay)
++            if not gotit:
++                return False
++            else:
++                return True
++
++    def release(self):
++        """Release this waiter
++
++        Can be called before .block() has been entered.  Should only be
++        called once.
++        """
++        if self._hub.running:
++
++            def switcher():
++                if not self.released:
++                    self.released = True
++                    self._coro.switch()
++
++            self._hub.schedule_call_global(0, switcher)
++            if self._hub is not eventlet.hubs.get_hub():
++                self._kick_hub()
++        else:
++            self.released = True
++            try:
++                self._lock.release()
++            except thread.error:
++                pass            # Already released
++
++    def _create_pipe(self):
++        """Create a pipe for the hub
++
++        This creates a pipe (read and write fd) and registers it with
++        the hub so that ._kick_hub() can use this to signal the hub
++        from another thread.
++
++        This keeps a cache of hubs on ``self.hubcache`` so that only
++        one pipe is created per hub.  Furthermore this dict is never
++        cleared implicitly to avoid creating new sockets all the time.
++
++        This method is always called in the same thread as the running
++        hub.  Thus for this thread/hub it is atomic.
++        """
++        if self._hub in self.hubcache:
++            return
++
++        def read_callback(fd):
++            # This just reads the (bogus) data just written to empty
++            # the OS queues.  The only purpose was to kick the hub
++            # round it's loop which is now has.  The notif function
++            # scheduled by .notify() will now do it's work.
++            os.read(fd, 512)
++
++        rfd, wfd = os.pipe()
++        listener = self._hub.add(eventlet.hubs.hub.READ, rfd, read_callback)
++        self.hubcache[self._hub] = (rfd, wfd, listener)
++
++    def _kick_hub(self):
++        """Kick the hub around it's loop
++
++        Threads need to be able to kick a hub around their loop by
++        interrupting the sleep.  This is done with the help of a
++        filedescriptor to which the thread writes a byte (using this
++        method) which will then wake up the hub.
++        """
++        rfd, wfd, r_listener = self.hubcache[self._hub]
++        current_hub = eventlet.hubs.get_hub()
++        if current_hub.running:
++            def write(fd):
++                os.write(fd, 'A')
++                current_hub.remove(w_listener)
++            w_listener = current_hub.add(eventlet.hubs.hub.WRITE, wfd, write)
++        else:
++            os.write(wfd, 'A')
++
++    def __repr__(self):
++        return ('<gsync.Waiter object at 0x%x (%s)>' %
++                (id(self), 'blocked' if not self.released else 'released'))
++
++
 +class Notifier(object):
 +    """Notify one or more waiters
 +
 +        The waiter will already be available to the .notify*() methods
 +        when returned.  If they get notified before you passed them
 +        onto one of the .wait() methods .wait() will just return
-+        immediately.
++        immediately.  Be aware that when you are a greenlet you must
++        not switch until you use the waiter.  In theory this is only
++        required if your waiter is a thread, but in practice you might
++        not now whether the waiter is a thread or greenthread.
 +
-+        Returns the (waiter, hub) tuple.
++        Returns a Waiter object which you should treat as opaque.
 +        """
-+        if eventlet.hubs.has_running_hub():
-+            return self.create_gwaiter()
-+        else:
-+            return self.create_twaiter()
++        waiter = Waiter(self.hubcache)
++        self._waiters.add(waiter)
++        return waiter
 +
 +    def cancel_waiter(self, waiter):
 +        """Cancel an earlier created waiter
 +        This blocks the current thread/eventlet until it gets woken up
 +        by a call to .notify() or .notify_all().
 +
-+        This will automatically dispatch to .gwait() or .twait() as
-+        needed so that the blocking will be cooperative for greenlets.
-+
 +        The waiter argument, if used, must be the item returned by
-+        .g_create_waiter().  This is an API which allows you to create
-+        a waiter while holding a lock, release the lock and then wait
-+        on the waiter without race conditions.  See Condition for an
++        .create_waiter().  This is an API which allows you to create a
++        waiter while holding a lock, release the lock and then wait on
++        the waiter without race conditions.  See Condition for an
 +        example.
 +
 +        Returns True if this thread/eventlet was notified and False
 +        when a timeout occurred.
 +        """
-+        if eventlet.hubs.has_running_hub():
-+            return self.gwait(timeout, waiter)
-+        else:
-+            return self.twait(timeout, waiter)
-+
-+    def create_gwaiter(self):
-+        waiter = eventlet.getcurrent()
-+        hub = eventlet.hubs.get_hub()
-+        self._create_pipe(hub)
-+        self._waiters.add((waiter, hub))
-+        return (waiter, hub)
-+
-+    def gwait(self, timeout=None, waiter=None):
-+        """Wait from an eventlet
-+
-+        This cooperatively blocks the current eventlet by switching to
-+        the hub.  The hub will switch back to this eventlet when it
-+        gets notified.
-+
-+        Usually you can just call .wait() which will dispatch to this
-+        method if you are in an eventlet.
-+
-+        The waiter argument, if used, must be the item returned by
-+        .g_create_waiter().  This is an API which allows you to create
-+        a waiter while holding a lock, release the lock and then wait
-+        on the waiter without race conditions.  See Condition for an
-+        example.
-+
-+        Returns True if this thread/eventlet was notified and False
-+        when a timeout occurred.
-+        """
++        # Note it is important to use waiter.released as return value
++        # as there is a race condition between the return value of
++        # .block() and discarding the waiter.
 +        if waiter is None:
-+            waiter, hub = self.create_gwaiter()
-+        else:
-+            waiter, hub = waiter
-+        if timeout is not None:
-+            timeout = eventlet.Timeout(timeout)
-+            try:
-+                with timeout:
-+                    hub.switch()
-+            except eventlet.Timeout, t:
-+                if t is not timeout:
-+                    raise
-+                self._waiters.discard((waiter, hub))
-+                return False
-+            else:
-+                return True
-+        else:
-+            hub.switch()
-+            return True
-+
-+    def create_twaiter(self):
-+        waiter = threading.Lock()
-+        waiter.acquire()
-+        self._waiters.add((waiter, None))
-+        return (waiter, None)
-+
-+    def twait(self, timeout=None, waiter=None):
-+        """Wait from an thread
-+
-+        This blocks the current thread by using a conventional lock.
-+
-+        Usually you can just call .wait() which will dispatch to this
-+        method if you are in an eventlet.
-+
-+        Returns True if this thread/eventlet was notified and False
-+        when a timeout occurred.
-+        """
-+        if waiter is None:
-+            waiter, _ = self.create_twaiter()
-+        else:
-+            waiter, _ = waiter
-+        if timeout is None:
-+            waiter.acquire()
-+            return True
-+        elif _LOCK_HAS_TIMEOUT:
-+            waiter.acquire(timeout=timeout)
-+        else:
-+            # Spin around a little, just like the stdlib does
-+            _time = time.time
-+            _sleep = time.sleep
-+            min = __builtin__.min
-+            endtime = _time() + timeout
-+            delay = 0.0005      # 500 us -> initial delay of 1 ms
-+            while True:
-+                gotit = waiter.acquire(0)
-+                if gotit:
-+                    break
-+                remaining = endtime - _time()
-+                if remaining <= 0:
-+                    break
-+                delay = min(delay * 2, remaining, .05)
-+                _sleep(delay)
-+            if not gotit:
-+                self._waiters.discard((waiter, None))
-+                return False
-+            else:
-+                return True
++            waiter = self.create_waiter()
++        waiter.block(timeout)
++        if not waiter.released:
++            self._waiters.discard(waiter)
++        return waiter.released
 +
 +    def notify(self):
 +        """Notify one waiter
 +        thread if an eventlet from it is notified.
 +        """
 +        if self._waiters:
-+            waiter, hub = self._waiters.pop()
-+            if hub is None:
-+                # This is a waiting thread
-+                try:
-+                    waiter.release()
-+                except thread.error:
-+                    pass
-+            else:
-+                # This is a waiting greenlet
-+                hub.schedule_call_global(0, waiter.switch)
-+                if hub is not eventlet.hubs.get_hub():
-+                    self._kick_hub(hub)
++            waiter = self._waiters.pop()
++            waiter.release()
 +
 +    def notify_all(self):
 +        """Notify all waiters
 +        Similar to .notify() but will notify all waiters instead of
 +        just one.
 +        """
++        # XXX Should this be "while self._waiters: ..."?
 +        for i in xrange(len(self._waiters)):
 +            self.notify()
 +
-+    def _create_pipe(self, hub):
-+        """Create a pipe for a hub
-+
-+        This creates a pipe (read and write fd) and registers it with
-+        the hub so that ._kick_hub() can use this to signal the hub.
-+
-+        This keeps a cache of hubs on ``self.hubcache`` so that only
-+        one pipe is created per hub.  Furthermore this dict is never
-+        cleared implicitly to avoid creating new sockets all the time.
-+
-+        This method is always called from .gwait() and therefore can
-+        only run once for a given hub at the same time.  Thus it is
-+        threadsave.
-+        """
-+        if hub in self.hubcache:
-+            return
-+        def read_callback(fd):
-+            # This just reads the (bogus) data just written to empty
-+            # the os queues.  The only purpose was to kick the hub
-+            # round it's loop which is now has.  The notif function
-+            # scheduled by .notify() will now do it's work.
-+            os.read(fd, 512)
-+        rfd, wfd = os.pipe()
-+        listener = hub.add(eventlet.hubs.hub.READ, rfd, read_callback)
-+        self.hubcache[hub] = (rfd, wfd, listener)
-+
-+    def _kick_hub(self, hub):
-+        """Kick the hub around it's loop
-+
-+        Threads need to be able to kick a hub around their loop by
-+        interrupting the sleep.  This is done with the help of a
-+        filedescriptor to which the thread writes a byte (using this
-+        method) which will then wake up the hub.
-+        """
-+        rfd, wfd, r_listener = self.hubcache[hub]
-+        current_hub = eventlet.hubs.get_hub()
-+        if current_hub.running:
-+            def write(fd):
-+                os.write(fd, 'A')
-+                current_hub.remove(w_listener)
-+            w_listener = current_hub.add(eventlet.hubs.hub.WRITE, wfd, write)
-+        else:
-+            os.write(wfd, 'A')
-+
 +    def __repr__(self):
 +        return ('<xthread.Notifier object at 0x%x (%d waiters)>' %
 +                (id(self), len(self._waiters)))
 new file mode 100644
 --- /dev/null
 +++ b/tests/xthread_test.py
-@@ -0,0 +1,762 @@
+@@ -0,0 +1,1229 @@
++# Copyright (C) 2011-2012 Abilisoft Ltd.
++# http://abilisoft.com
++
++from __future__ import with_statement, absolute_import
++
 +import sys
 +import threading
 +import time
++import traceback
 +
 +import eventlet
-+import nose.tools
++import eventlet.debug
++import pytest
 +
 +from eventlet import xthread
 +
 +
++eventlet.debug.hub_listener_stacks(True) # XXX
++
++
++def mkthread(func):
++    """Simple helper to quickly make and start a daemon thread"""
++    t = threading.Thread(name=func.func_name, target=func)
++    t.daemon = True
++    t.start()
++    return t
++
++
++def join(item, timeout=3):
++    """Simple join method with a timeout
++
++    Can join either a thread or a greenthread and raises an
++    AssertionError if it failed to join after the timeout.  It uses
++    eventlet.sleep() so other greenthreads will have a chance to run.
++    """
++    __tracebackhide__ = True
++    if hasattr(item, 'start'):  # A thread
++        t = 0.0
++        while t <= timeout:
++            if not item.is_alive():
++                break
++            eventlet.sleep(0.01)
++            t += 0.01
++        else:
++            frame = sys._current_frames()[item.ident]
++            sys.stderr.write('Stack of (blocking) %r:\n' % item)
++            traceback.print_stack(frame)
++            sys.stderr.write('\n')
++            pytest.fail('Did not join %r' % item)
++    elif hasattr(item, 'wait'): # A greenthread
++        with eventlet.Timeout(timeout, False):
++            item.wait()
++        if not item.dead:
++            sys.stderr.write('Stack of (blocking) %r:\n' % item)
++            traceback.print_stack(item.gr_frame)
++            sys.stderr.write('\n')
++            pytest.fail('Did not join %r' % item)
++    else:
++        raise ValueError('Neither a thread or greenthread: %r' % item)
++
++
++class Evt(object):
++    """Simple and inefficient cross thread-greenthread event
++
++    This is very inefficient but works for the tests.
++    """
++    SNOOZE = 0.002
++
++    def __init__(self):
++        self._flag = False
++
++    def is_set(self):
++        return self._flag
++
++    def set(self):
++        self._flag = True
++
++    def clear(self):
++        self._flag = False
++
++    def wait(self, timeout=None):
++        hub = eventlet.hubs.get_hub()
++        if hub.running:
++            sleep = eventlet.sleep
++        else:
++            sleep = time.sleep
++        t = 0.0
++        while True:
++            if self._flag or timeout is not None and t >= timeout:
++                return self._flag
++            sleep(self.SNOOZE)
++            t += self.SNOOZE
++
++    def __repr__(self):
++        return '<Evt object at 0x%x (flag=%s)>' % (id(self), self._flag)
++
++
++def pytest_funcarg__shr(request):
++    """Return a simple "shared" object
++
++    The idea is that you can just assign something as an
++    attribute.  This facilitates sharing objects created in a
++    (green)thread with another (green)thread.
++    """
++    class SharedObject(object):
++
++        def _wait(self, attr, delay=0.005):
++            """Wait for an attribute to appear and return it
++
++            :delay: sleep for this much after the object appeared.
++               This is a fiddle factor in case you can't do any
++               better synchronisation.  E.g. when you still need
++               to call a method on the object just assigned as an
++               attribute.
++            """
++            if eventlet.hubs.get_hub().running:
++                sleep = eventlet.sleep
++            else:
++                sleep = time.sleep
++            while not hasattr(self, attr):
++                sleep(max(0.001, delay))
++            sleep(delay)
++            return getattr(self, attr)
++
++    return SharedObject()
++
++
++################ Waiter ################
++
++
++class TestWaiter:
++
++    def test_gblock_grelease_ordered(self, shr):
++        def block():
++            shr.waiter = xthread.Waiter(hubcache=xthread.HubCache())
++            shr.waiter.block()
++        def release():
++            shr.waiter.release()
++        b = eventlet.spawn(block)
++        r = eventlet.spawn_after(0.001, release)
++        join(r)
++        join(b)
++
++    def test_gblock_grelease_early_release(self, shr):
++        block_ready = Evt()
++        release_ready = Evt()
++        def block():
++            shr.waiter = xthread.Waiter(xthread.HubCache())
++            block_ready.set()
++            release_ready.wait()
++            shr.waiter.block()
++        def release():
++            block_ready.wait()
++            shr.waiter.release()
++            release_ready.set()
++        b = eventlet.spawn(block)
++        r = eventlet.spawn(release)
++        join(r)
++        join(b)
++
++    def test_gblock_grelease_multiple_block(self, shr):
++        def block():
++            shr.waiter = xthread.Waiter(xthread.HubCache())
++            shr.waiter.block()
++            shr.waiter.block()
++        def release():
++            shr.waiter.release()
++        b = eventlet.spawn(block)
++        r = eventlet.spawn_after(0.001, release)
++        join(r)
++        join(b)
++
++    def test_gblock_grelease_multiple_release(self, shr):
++        def block():
++            shr.waiter = xthread.Waiter(xthread.HubCache())
++            shr.waiter.block()
++        def release():
++            shr.waiter.release()
++            shr.waiter.release()
++        b = eventlet.spawn(block)
++        r = eventlet.spawn_after(0.001, release)
++        join(r)
++        join(b)
++
++    def test_gblock_trelease_ordered(self, shr):
++        def block():
++            shr.waiter = xthread.Waiter(xthread.HubCache())
++            shr.waiter.block()
++        def release():
++            shr._wait('waiter')
++            shr.waiter.release()
++        b = eventlet.spawn(block)
++        r = mkthread(release)
++        join(r)
++        join(b)
++
++    def test_gblock_trelease_early_release(self, shr): # XXX
++        block_ready = Evt()
++        release_ready = Evt()
++        def block():
++            shr.waiter = xthread.Waiter(xthread.HubCache())
++            block_ready.set()
++            release_ready.wait()
++            shr.waiter.block()
++        def release():
++            block_ready.wait()
++            shr.waiter.release()
++            release_ready.set()
++        b = eventlet.spawn(block)
++        r = mkthread(release)
++        join(r)
++        join(b)
++
++    def test_gblock_trelease_multiple_block(self, shr):
++        def block():
++            shr.waiter = xthread.Waiter(xthread.HubCache())
++            shr.waiter.block()
++            shr.waiter.block()
++        def release():
++            shr._wait('waiter')
++            shr.waiter.release()
++        b = eventlet.spawn(block)
++        r = mkthread(release)
++        join(r)
++        join(b)
++
++    def test_gblock_trelease_multiple_release(self, shr):
++        def block():
++            shr.waiter = xthread.Waiter(xthread.HubCache())
++            shr.waiter.block()
++        def release():
++            shr._wait('waiter')
++            shr.waiter.release()
++            shr.waiter.release()
++        b = eventlet.spawn(block)
++        r = mkthread(release)
++        join(r)
++        join(b)
++
++    def test_tblock_trelease_ordered(self, shr):
++        def block():
++            shr.waiter = xthread.Waiter(xthread.HubCache())
++            shr.waiter.block()
++        def release():
++            shr._wait('waiter')
++            shr.waiter.release()
++        b = mkthread(block)
++        r = mkthread(release)
++        join(r)
++        join(b)
++
++    def test_tblock_trelease_early_release(self, shr):
++        block_ready = Evt()
++        release_ready = Evt()
++        def block():
++            shr.waiter = xthread.Waiter(xthread.HubCache())
++            block_ready.set()
++            release_ready.wait()
++            shr.waiter.block()
++        def release():
++            block_ready.wait()
++            shr.waiter.release()
++            release_ready.set()
++        b = mkthread(block)
++        r = mkthread(release)
++        join(r)
++        join(b)
++
++    def test_tblock_trelease_multiple_block(self, shr):
++        def block():
++            shr.waiter = xthread.Waiter(xthread.HubCache())
++            shr.waiter.block()
++            shr.waiter.block()
++        def release():
++            shr._wait('waiter')
++            shr.waiter.release()
++        b = mkthread(block)
++        r = mkthread(release)
++        join(r)
++        join(b)
++
++    def test_tblock_trelease_multiple_release(self, shr):
++        def block():
++            shr.waiter = xthread.Waiter(xthread.HubCache())
++            shr.waiter.block()
++        def release():
++            shr._wait('waiter')
++            shr.waiter.release()
++            shr.waiter.release()
++
++    def test_tblock_grelease_ordered(self, shr):
++        def block():
++            shr.waiter = xthread.Waiter(xthread.HubCache())
++            shr.waiter.block()
++        def release():
++            shr._wait('waiter')
++            shr.waiter.release()
++        b = mkthread(block)
++        r = eventlet.spawn(release)
++        join(r)
++        join(b)
++
++    def test_tblock_grelease_early_release(self, shr):
++        block_ready = Evt()
++        release_ready = Evt()
++        def block():
++            shr.waiter = xthread.Waiter(xthread.HubCache())
++            block_ready.set()
++            release_ready.wait()
++            shr.waiter.block()
++        def release():
++            block_ready.wait()
++            shr.waiter.release()
++            release_ready.set()
++        b = mkthread(block)
++        r = eventlet.spawn(release)
++        join(r)
++        join(b)
++
++    def test_tblock_grelease_multiple_block(self, shr):
++        def block():
++            shr.waiter = xthread.Waiter(xthread.HubCache())
++            shr.waiter.block()
++            shr.waiter.block()
++        def release():
++            shr._wait('waiter')
++            shr.waiter.release()
++        b = mkthread(block)
++        r = eventlet.spawn(release)
++        join(r)
++        join(b)
++
++    def test_tblock_grelease_multiple_release(self, shr):
++        def block():
++            shr.waiter = xthread.Waiter(xthread.HubCache())
++            shr.waiter.block()
++        def release():
++            shr._wait('waiter')
++            shr.waiter.release()
++        b = mkthread(block)
++        r = eventlet.spawn(release)
++        join(r)
++        join(b)
++
++    def test_gblock_retval(self, shr):
++        def block():
++            shr.waiter = xthread.Waiter(xthread.HubCache())
++            shr.ret = shr.waiter.block(1)
++        def release():
++            shr._wait('waiter')
++            shr.waiter.release()
++        b = eventlet.spawn(block)
++        r = eventlet.spawn(release)
++        join(r)
++        join(b)
++        assert shr.ret is True
++
++    def test_gblock_timeout(self, shr):
++        def block():
++            waiter = xthread.Waiter(xthread.HubCache())
++            shr.ret = waiter.block(0.001)
++        b = eventlet.spawn(block)
++        join(b)
++        assert shr.ret is False
++
++    def test_tblock_retval(self, shr):
++        def block():
++            shr.waiter = xthread.Waiter(xthread.HubCache())
++            shr.ret = shr.waiter.block(1)
++        def release():
++            shr._wait('waiter')
++            shr.waiter.release()
++        b = mkthread(block)
++        r = mkthread(release)
++        join(r)
++        join(b)
++        assert shr.ret is True
++
++    def test_tblock_timeout(self, shr):
++        def block():
++            waiter = xthread.Waiter(xthread.HubCache())
++            shr.ret = waiter.block(0.001)
++        b = mkthread(block)
++        join(b)
++        assert shr.ret is False
++
++    def test_released(self):
++        # In it's thread to isolate from a possibly running hub in main thread
++        l = []
++        def func():
++            waiter = xthread.Waiter(xthread.HubCache())
++            l.append(waiter.released)
++            waiter.release()
++            l.append(waiter.released)
++        f = mkthread(func)
++        join(f)
++        assert l == [False, True]
++
++
 +################ Notifier ################
 +
 +
 +    return xthread.Notifier(hubcache=xthread.HubCache())
 +
 +
-+def test_notify_none():
-+    notif = pytest_funcarg__notif(None)
++def test_notify_none(notif):
 +    notif.notify()
 +    notif.notify_all()
-+    assert True
 +
 +
 +class TestNotifyFromEventletGwait:
 +
-+    def test_notify(self):
-+        notif = pytest_funcarg__notif(None)
-+        l = []
++    def test_notify(self, notif, shr):
 +        def waiter():
-+            notif.gwait()
-+            l.append(True)
++            shr.ret = notif.wait()
 +        def notifier():
 +            notif.notify()
 +        w = eventlet.spawn(waiter)
 +        n = eventlet.spawn_after(0.01, notifier)
-+        w.wait()
-+        assert len(l) == 1
++        join(n)
++        join(w)
++        assert shr.ret is True
 +        assert len(notif._waiters) == 0
 +
-+    def test_notify_all(self):
-+        notif = pytest_funcarg__notif(None)
-+        l = []
++    def test_notify_all(self, notif, shr):
 +        def waiter():
-+            notif.gwait()
-+            l.append(True)
++            shr.ret = notif.wait()
 +        def notifier():
 +            notif.notify_all()
 +        w1 = eventlet.spawn(waiter)
 +        w2 = eventlet.spawn(waiter)
 +        n = eventlet.spawn_after(0.01, notifier)
-+        w1.wait()
-+        w2.wait()
-+        assert len(l) == 2
++        join(n)
++        join(w1)
++        join(w2)
++        assert shr.ret is True
 +        assert len(notif._waiters) == 0
 +
-+    def test_notify_timeout(self):
-+        notif = pytest_funcarg__notif(None)
++    def test_notify_timeout(self, notif, shr):
++        def waiter():
++            shr.ret = notif.wait(0.001)
++        w = eventlet.spawn(waiter)
++        join(w)
++        assert shr.ret is False
++        assert len(notif._waiters) == 0
++
++    def test_notify_multiple_waiters(self, notif, shr):
++        # Wait with a timeout needs to be working before this test works
++        # This also tests the return value of .wait()
 +        l = []
 +        def waiter():
-+            notif.gwait(0.01)
-+            l.append(True)
-+        w = eventlet.spawn(waiter)
-+        w.wait()
-+        assert len(l) == 1
-+        assert len(notif._waiters) == 0
-+
-+    def test_notify_multiple_waiters(self):
-+        # Wait with a timeout needs to be working before this test works
-+        # This also tests the return value of .gwait()
-+        notif = pytest_funcarg__notif(None)
-+        l = []
-+        def waiter():
-+            if notif.gwait(0.1):
-+                l.append(True)
++            l.append(notif.wait(0.1))
 +        def notifier():
 +            notif.notify()
 +        w1 = eventlet.spawn(waiter)
 +        w2 = eventlet.spawn(waiter)
 +        n = eventlet.spawn_after(0.005, notifier)
-+        w1.wait()
-+        w2.wait()
-+        assert len(l) == 1
++        join(n)
++        join(w1)
++        join(w2)
++        assert len(l) == 2
++        assert l.count(True) == 1
++        assert l.count(False) == 1
 +        assert len(notif._waiters) == 0
 +
 +
 +        """This is used in test_notify as well as the hubcache tests"""
 +        l = []
 +        def waiter():
-+            notif.gwait()
++            notif.wait()
 +            l.append(True)
 +        def notifier():
++            while not notif._waiters:
++                time.sleep(0.005)
 +            notif.notify()
 +        w = eventlet.spawn(waiter)
-+        n = threading.Timer(0.01, notifier)
-+        ne = eventlet.spawn(n.start)
-+        w.wait()
-+        ne.wait()
-+        n.join()
++        n = mkthread(notifier)
++        join(n)
++        join(w)
 +        return l
 +
-+    def test_notify(self):
-+        notif = pytest_funcarg__notif(None)
++    def test_notify(self, notif):
 +        l = self.do_simple_notif(notif)
 +        assert len(l) == 1
 +        assert len(notif._waiters) == 0
 +
-+    def test_notify_twice(self):
-+        notif = pytest_funcarg__notif(None)
++    def test_notify_twice(self, notif):
 +        l = []
 +        def waiter():
-+            notif.gwait()
++            notif.wait()
 +            l.append(True)
 +        def notifier():
++            while not notif._waiters:
++                time.sleep(0.005)
 +            notif.notify()
 +        w1 = eventlet.spawn(waiter)
-+        n1 = threading.Timer(0.01, notifier)
-+        n1e = eventlet.spawn(n1.start)
-+        w1.wait()
-+        n1e.wait()
-+        n1.join()
++        n1 = mkthread(notifier)
++        join(n1)
++        join(w1)
 +        w2 = eventlet.spawn(waiter)
-+        n2 = threading.Timer(0.01, notifier)
-+        n2e = eventlet.spawn(n2.start)
-+        w2.wait()
-+        n2e.wait()
-+        n2.join()
++        n2 = mkthread(notifier)
++        join(n2)
++        join(w2)
 +        assert len(l) == 2
 +        assert len(notif._waiters) == 0
 +
-+    def test_hubcache(self):
-+        notif = pytest_funcarg__notif(None)
++    def test_hubcache(self, notif):
 +        self.do_simple_notif(notif)
 +        hub = eventlet.hubs.get_hub()
 +        rfd, wfd, listener = notif.hubcache.get(hub)
 +        READ = eventlet.hubs.hub.READ
-+
 +        assert len(notif.hubcache) == 1, '%s == 1' % len(notif.hubcache)
 +        assert listener in hub.listeners[READ].itervalues()
 +
-+    def test_hubcache_clear(self):
-+        notif = pytest_funcarg__notif(None)
++    def test_hubcache_clear(self, notif):
 +        self.do_simple_notif(notif)
 +        hub = eventlet.hubs.get_hub()
 +        rfd, wfd, listener = notif.hubcache.get(hub)
 +        READ = eventlet.hubs.hub.READ
-+
 +        notif.hubcache.clear()
-+
 +        assert len(notif.hubcache) == 0
 +        assert listener not in hub.listeners[READ].itervalues()
 +
-+    def test_hubcache_used(self):
-+        notif = pytest_funcarg__notif(None)
++    def test_hubcache_used(self, notif):
 +        def waiter():
-+            notif.gwait(0.01)
++            notif.wait(0.01)
 +        w1 = eventlet.spawn(waiter)
 +        w2 = eventlet.spawn(waiter)
-+        w1.wait()
-+        w2.wait()
++        join(w1)
++        join(w2)
 +        assert len(notif.hubcache) == 1
 +
 +
 +class TestNotifyFromThreadTwait:
 +
-+    def test_notify(self):
-+        notif = pytest_funcarg__notif(None)
++    def test_notify(self, notif, shr):
++        def waiter():
++            shr.ret = notif.wait()
++        def notifier():
++            time.sleep(0.01)
++            notif.notify()
++        w = mkthread(waiter)
++        n = mkthread(notifier)
++        join(n)
++        join(w)
++        assert shr.ret is True
++        assert len(notif._waiters) == 0
++
++    def test_notify_all(self, notif):
 +        l = []
 +        def waiter():
-+            notif.twait()
++            notif.wait()
 +            l.append(True)
 +        def notifier():
++            time.sleep(0.01)
++            notif.notify_all()
++        w1 = mkthread(waiter)
++        w2 = mkthread(waiter)
++        n = mkthread(notifier)
++        join(n)
++        join(w1)
++        join(w2)
++        assert len(l) == 2
++        assert len(notif._waiters) == 0
++
++    def test_notify_timeout(self, notif, shr):
++        def waiter():
++            shr.ret = notif.wait(0.01)
++        w = mkthread(waiter)
++        join(w)
++        assert shr.ret is False
++        assert len(notif._waiters) == 0
++
++    def test_notify_multiple(self, notif):
++        # Wait with a timeout needs to be working before this test works
++        # This also tests the return value of .wait()
++        l = []
++        def waiter():
++            if notif.wait(0.1):
++                l.append(True)
++        def notifier():
++            while len(notif._waiters) < 2:
++                time.sleep(0.001)
 +            notif.notify()
-+        w = threading.Thread(target=waiter)
-+        n = threading.Timer(0.01, notifier)
-+        w.start()
-+        n.start()
-+        w.join()
-+        n.join()
++        n = mkthread(notifier)
++        w1 = mkthread(waiter)
++        w2 = mkthread(waiter)
++        join(n)
++        join(w1)
++        join(w2)
 +        assert len(l) == 1
 +        assert len(notif._waiters) == 0
 +
-+    def test_notify_all(self):
-+        notif = pytest_funcarg__notif(None)
-+        l = []
-+        def waiter():
-+            notif.twait()
-+            l.append(True)
-+        def notifier():
-+            notif.notify_all()
-+        w1 = threading.Thread(target=waiter)
-+        w2 = threading.Thread(target=waiter)
-+        w1.start()
-+        w2.start()
-+        n = threading.Timer(0.01, notifier)
-+        n.start()
-+        w1.join()
-+        w2.join()
-+        assert len(l) == 2
-+        assert len(notif._waiters) == 0
-+
-+    def test_notify_timeout(self):
-+        notif = pytest_funcarg__notif(None)
-+        l = []
-+        def waiter():
-+            notif.twait(0.01)
-+            l.append(True)
-+        w = threading.Thread(target=waiter)
-+        w.start()
-+        w.join()
-+        assert len(l) == 1
-+        assert len(notif._waiters) == 0
-+
-+    def test_notify_multiple(self):
-+        # Wait with a timeout needs to be working before this test works
-+        # This also tests the return value of .twait()
-+        notif = pytest_funcarg__notif(None)
-+        l = []
-+        ready = []
-+        def waiter():
-+            ready.append(True)
-+            if notif.twait(0.1):
-+                l.append(True)
-+        def notifier():
-+            while len(ready) < 2:
-+                time.sleep(0.001)
-+            notif.notify()
-+        w1 = threading.Thread(target=waiter)
-+        w2 = threading.Thread(target=waiter)
-+        n = threading.Timer(0.005, notifier)
-+        w1.start()
-+        n.start()
-+        w2.start()
-+        w1.join()
-+        n.join()
-+        w2.join()
-+        assert len(l) == 1
-+        assert len(notif._waiters) == 0
-+
-+    def test_notify_multiple_hubs(self):
-+        notif = pytest_funcarg__notif(None)
++    def test_notify_multiple_hubs(self, notif):
 +        l = []
 +        h = []
 +        def waiter():
-+            notif.gwait()
++            notif.wait()
 +            l.append(True)
 +        def notifier():
++            while not notif._waiters:
++                time.sleep(0.005)
 +            def gnotifier():
 +                h.append(eventlet.hubs.get_hub())
 +                notif.notify()
 +            gn = eventlet.spawn(gnotifier)
 +            gn.wait()
 +        w = eventlet.spawn(waiter)
-+        n = threading.Timer(0.01, notifier)
-+        ne = eventlet.spawn(n.start)
-+        w.wait()
-+        ne.wait()
-+        n.join()
++        n = mkthread(notifier)
++        join(n)
++        join(w)
 +        assert len(l) == 1
 +        assert len(notif._waiters) == 0
 +        assert len(h[0].listeners[eventlet.hubs.hub.WRITE]) == 0
 +
 +class TestNotifyMixedWaiters:
 +
-+    def test_eventlet(self):
-+        notif = pytest_funcarg__notif(None)
++    def test_eventlet(self, notif):
 +        l = []
-+        ready = []
 +        def gwait():
-+            ready.append(True)
-+            notif.gwait()
++            notif.wait()
 +            l.append(True)
 +        def twait():
-+            ready.append(True)
-+            notif.twait()
++            notif.wait()
 +            l.append(True)
 +        def notifier():
-+            while len(ready) < 2:
++            while len(notif._waiters) < 2:
 +                eventlet.sleep(0.001)
 +            notif.notify_all()
 +        w1 = eventlet.spawn(gwait)
-+        w2 = threading.Thread(target=twait)
-+        w2e = eventlet.spawn(w2.start)
-+        n = eventlet.spawn_after(0.01, notifier)
-+        w1.wait()
-+        w2e.wait()
-+        w2.join()
++        eventlet.sleep(0)       # start w1
++        w2 = mkthread(twait)
++        n = eventlet.spawn(notifier)
++        join(n)
++        join(w1)
++        join(w2)
 +        assert len(l) == 2
 +
-+    def test_thread(self):
-+        notif = pytest_funcarg__notif(None)
++    def test_thread(self, notif):
 +        l = []
-+        ready = []
 +        def gwait():
-+            ready.append(True)
-+            notif.gwait()
++            notif.wait()
 +            l.append(True)
 +        def twait():
-+            ready.append(True)
-+            notif.twait()
++            notif.wait()
 +            l.append(True)
 +        def notifier():
-+            while len(ready) < 2:
++            while len(notif._waiters) < 2:
 +                time.sleep(0.001)
 +            notif.notify_all()
 +        w1 = eventlet.spawn(gwait)
-+        w2 = threading.Thread(target=twait)
-+        w2e = eventlet.spawn(w2.start)
-+        n = threading.Timer(0.01, notifier)
-+        ne = eventlet.spawn(n.start)
-+        w1.wait()
-+        w2e.wait()
-+        w2.join()
-+        ne.wait()
-+        n.join()
++        eventlet.sleep(0)       # start w1
++        w2 = mkthread(twait)
++        n = mkthread(notifier)
++        join(n)
++        join(w1)
++        join(w2)
 +        assert len(l) == 2
 +
 +
 +class TestUnifiedWait:
 +
-+    def test_eventlet_eventlet(self):
-+        notif = pytest_funcarg__notif(None)
++    def test_eventlet_eventlet(self, notif):
 +        l = []
 +        def waiter():
 +            notif.wait()
 +            l.append(True)
 +        def notifier():
++            while not notif._waiters:
++                eventlet.sleep(0.001)
 +            notif.notify()
 +        w = eventlet.spawn(waiter)
 +        n = eventlet.spawn_after(0.01, notifier)
-+        w.wait()
++        join(n)
++        join(w)
 +        assert len(l) == 1
 +
-+    def test_thread_eventlet(self):
-+        notif = pytest_funcarg__notif(None)
++    def test_thread_eventlet(self, notif):
 +        l = []
-+        started = []
 +        def waiter():
-+            started.append(True)
 +            notif.wait()
 +            l.append(True)
 +        def notifier():
-+            while not started:
++            while not notif._waiters:
 +                eventlet.sleep(0.001)
 +            notif.notify()
-+        w = threading.Thread(target=waiter)
-+        we = eventlet.spawn(w.start)
++        w = mkthread(waiter)
 +        n = eventlet.spawn_after(0.01, notifier)
-+        we.wait()
-+        n.wait()
-+        w.join()
++        join(n)
++        join(w)
 +        assert len(l) == 1
 +
 +
++class TestCreateWaiter:
++
++    def test_create_gwaiter_normal(self, notif):
++        def wait():
++            eventlet.sleep(0)   # ensure a running hub
++            waiter = notif.create_waiter()
++            notif.wait(waiter=waiter)
++        def notify():
++            notif.notify()
++        w = eventlet.spawn(wait)
++        eventlet.sleep(0)       # start w
++        n = eventlet.spawn_after(0.01, notify)
++        join(n)
++        join(w)
++
++    def test_create_twaiter_normal(self, notif):
++        def wait():
++            waiter = notif.create_waiter()
++            notif.wait(waiter=waiter)
++        def notify():
++            while not notif._waiters:
++                time.sleep(0.001)
++            notif.notify()
++        w = mkthread(wait)
++        n = mkthread(notify)
++        join(n)
++        join(w)
++
++    def test_create_twaiter_early(self, notif):
++        wait_ready = Evt()
++        notify_ready = Evt()
++        def wait():
++            waiter = notif.create_waiter()
++            wait_ready.set()
++            notify_ready.wait()
++            notif.wait(waiter=waiter)
++        def notify():
++            wait_ready.wait()
++            notif.notify()
++            notify_ready.set()
++        n = mkthread(notify)
++        w = mkthread(wait)
++        join(n)
++        join(w)
++
++    def test_create_gwaiter_tnotif_normal(self, notif):
++        def wait():
++            eventlet.sleep(0)   # ensure running hub
++            waiter = notif.create_waiter()
++            notif.wait(waiter=waiter)
++        def notify():
++            time.sleep(0.01)
++            notif.notify()
++        w = eventlet.spawn(wait)
++        eventlet.sleep(0)       # start w
++        n = mkthread(notify)
++        join(n)
++        join(w)
++
++    def test_create_gwaiter_tnotif_early(self, notif, shr):
++        wait_ready = Evt()
++        notify_ready = Evt()
++        def wait():
++            eventlet.sleep(0)   # ensure running hub
++            shr.waiter = notif.create_waiter()
++            wait_ready.set()
++            notify_ready.wait()
++            notif.wait(waiter=shr.waiter)
++        def notify():
++            wait_ready.wait()
++            notif.notify()
++            notify_ready.set()
++        w = eventlet.spawn(wait)
++        n = mkthread(notify)
++        join(n)
++        join(w)
++
++    def test_create_twaiter_gnotif_normal(self, notif):
++        ready = Evt()
++        def wait():
++            waiter = notif.create_waiter()
++            ready.set()
++            notif.wait(waiter=waiter)
++        def notify():
++            notif.notify()
++        w = mkthread(wait)
++        ready.wait()
++        n = eventlet.spawn_after(0.01, notify)
++        join(n)
++        join(w)
++
++    def test_create_twaiter_gnotif_early(self, notif):
++        wait_ready = Evt()
++        notify_ready = Evt()
++        def wait():
++            waiter = notif.create_waiter()
++            wait_ready.set()
++            notify_ready.wait()
++            notif.wait(waiter=waiter)
++        def notify():
++            wait_ready.wait()
++            notif.notify()
++            notify_ready.set()
++        w = mkthread(wait)
++        n = eventlet.spawn(notify)
++        join(n)
++        join(w)
++
++
 +################ Event ################
 +
 +# Note that there is no longer a need to test everything with both
 +    return xthread.Lock()
 +
 +
-+def test_lock():
-+    lock = pytest_funcarg__lock(None)
++def test_lock(lock):
 +    l = []
 +    def acquire():
 +        lock.acquire()
 +    assert len(l) == 2
 +
 +
-+def test_lock_unlocked():
-+    lock = pytest_funcarg__lock(None)
++def test_lock_unlocked(lock):
 +    assert lock.owner is None
 +
 +
-+def test_lock_owner():
-+    lock = pytest_funcarg__lock(None)
++def test_lock_owner(lock):
 +    lock.acquire()
 +    assert lock.owner == eventlet.getcurrent()
 +
 +
-+def test_lock_timeout():
-+    lock = pytest_funcarg__lock(None)
++def test_lock_timeout(lock):
 +    gotit = lock.acquire()
 +    assert gotit
 +    gotit = lock.acquire(timeout=0.01)
 +    assert not gotit1
 +
 +
-+@nose.tools.raises(RuntimeError)
 +def test_rlock_release_other():
 +    rlock = pytest_funcarg__rlock(None)
 +    gt = eventlet.spawn(rlock.acquire)
 +    gt.wait()
-+    rlock.release()
++    pytest.raises(RuntimeError, rlock.release)
 +
 +
 +def test_rlock_release_other_acquire():
 +    return xthread.Condition()
 +
 +
-+@nose.tools.raises(RuntimeError)
-+def test_cond_wait_notowner():
-+    cond = pytest_funcarg__cond(None)
-+    cond.wait()
++def test_cond_wait_notowner(cond):
++    pytest.raises(RuntimeError, cond.wait)
 +
 +
-+@nose.tools.raises(RuntimeError)
-+def test_cond_notify_notowner():
-+    cond = pytest_funcarg__cond(None)
-+    cond.notify()
++def test_cond_notify_notowner(cond):
++    pytest.raises(RuntimeError, cond.notify)
 +
 +
-+@nose.tools.raises(RuntimeError)
-+def test_cond_notify_all_notowner():
-+    cond = pytest_funcarg__cond(None)
-+    cond.notify_all()
++def test_cond_notify_all_notowner(cond):
++    pytest.raises(RuntimeError, cond.notify_all)
 +
 +
-+def test_cond_wait_notify_with():
-+    cond = pytest_funcarg__cond(None)
++def test_cond_wait_notify_with(cond):
++    l = []
++    def wait():
++        with cond:
++            l.append(0)
++            cond.wait()
++            l.append(3)
++    def notif():
++        with cond:
++            l.append(1)
++            cond.notify()
++            l.append(2)
++    w = eventlet.spawn(wait)
++    n = eventlet.spawn_after(0.01, notif)
++    w.wait()
++    assert l == [0, 1, 2, 3]
++
++
++def test_cond_wait_notify_with_sequencing(cond):
 +    l = []
 +    def wait():
 +        with cond:
 +            cond.wait()
-+            l.append(None)
++            l.append(2)
 +    def notif():
 +        with cond:
 +            cond.notify()
-+            l.append(None)
++            l.append(1)
 +    w = eventlet.spawn(wait)
-+    n = eventlet.spawn_after(0.01, notif)
++    n = eventlet.spawn(notif)
 +    w.wait()
-+    assert len(l) == 2
++    n.wait()
++    assert l == [1, 2]
++
++
++def test_cond_wait_notify_with_wrong_sequence(cond):
++    l = []
++    def wait():
++        with cond:
++            cond.wait(0.1)
++            l.append(2)
++    def notif():
++        with cond:
++            l.append(0)
++            cond.notify()
++            l.append(1)
++    n = eventlet.spawn(notif)
++    w = eventlet.spawn_after(0.01, wait)
++    n.wait()
++    w.wait()
++    assert l == [0, 1, 2]
 +
 +
 +def test_cond_wait_notify_aquire():
 +    return xthread.Queue()
 +
 +
-+def test_queue_put():
-+    queue = pytest_funcarg__queue(None)
++def test_queue_put(queue):
 +    queue.put(None)
 +    assert queue.qsize() == 1
 +
 +
-+def test_queue_get():
-+    queue = pytest_funcarg__queue(None)
++def test_queue_get(queue):
 +    it = object()
 +    queue.put(it)
 +    r = queue.get()
 +    assert it is r
 +
 +
-+def test_queue_get_block():
-+    queue = pytest_funcarg__queue(None)
++def test_queue_get_block(queue):
 +    it = object()
 +    def get():
 +        return queue.get()
 +    assert g.wait() is it
 +
 +
-+@nose.tools.raises(xthread.Empty)
-+def test_queue_get_timeout():
-+    queue = pytest_funcarg__queue(None)
-+    queue.get(timeout=0.01)
++def test_queue_get_timeout(queue):
++    pytest.raises(xthread.Empty, queue.get, timeout=0.01)
 +
 +
-+def test_queue_get_thread():
-+    queue = pytest_funcarg__queue(None)
++def test_queue_get_thread(queue):
 +    it = object()
 +    rl = []
 +    queue.put(it)
 +    assert rl[0] is it
 +
 +
-+def test_queue_get_thread_block():
-+    queue = pytest_funcarg__queue(None)
++def test_queue_get_thread_block(queue):
 +    it = object()
 +    rl = []
 +    thread_ready = []