Commits

Anonymous committed e3a2c7b Merge
  • Participants
  • Parent commits 645fbce, bd6dcb7

Comments (0)

Files changed (15)

 * Tavis Rudd
 * Sergey Shepelev
 * Chuck Thier
+* Nick V
 * Daniele Varrazzo
 * Ryan Williams
 
 * Favo Yang, twisted hub patch
 * Schmir, patch that fixes readline method with chunked encoding in wsgi.py, advice on patcher
 * Slide, for open-sourcing gogreen
-* Holger Krekel, websocket example small fix
+* Holger Krekel, websocket example small fix
+* mikepk, debugging MySQLdb/tpool issues

File eventlet/__init__.py

     TimeoutError = timeout.Timeout
     exc_after = greenthread.exc_after
     call_after_global = greenthread.call_after_global
-except ImportError:
-    # this is to make Debian packaging easier
-    import traceback
-    traceback.print_exc()
+except ImportError, e:
+    # This is to make Debian packaging easier, it ignores import
+    # errors of greenlet so that the packager can still at least
+    # access the version.  Also this makes easy_install a little quieter
+    if 'greenlet' not in str(e):
+        # any other exception should be printed
+        import traceback
+        traceback.print_exc()

File eventlet/green/MySQLdb.py

+__MySQLdb = __import__('MySQLdb')
+globals().update(dict([(var, getattr(__MySQLdb, var))
+                       for var in dir(__MySQLdb)
+                       if not var.startswith('__')]))
+                       
+__all__ = __MySQLdb.__all__
+__patched__ = ["connect", "Connect", 'Connection', 'connections']
+
+from eventlet import tpool
+
+__orig_connections = __import__('MySQLdb.connections').connections
+
+def Connection(*args, **kw):
+    conn = tpool.execute(__orig_connections.Connection, *args, **kw)
+    return tpool.Proxy(conn, autowrap_names=('cursor',))
+connect = Connect = Connection
+
+# replicate the MySQLdb.connections module but with a tpooled Connection factory
+class MySQLdbConnectionsModule(object):
+    pass
+connections = MySQLdbConnectionsModule()
+for var in dir(__orig_connections):
+    if not var.startswith('__'):
+        setattr(connections, var, getattr(__orig_connections, var))
+connections.Connection = Connection
+
+cursors = __import__('MySQLdb.cursors').cursors
+converters = __import__('MySQLdb.converters').converters
+
+# TODO support instantiating cursors.FooCursor objects directly
+# TODO though this is a low priority, it would be nice if we supported
+# subclassing eventlet.green.MySQLdb.connections.Connection

File eventlet/greenthread.py

     occasionally; otherwise nothing else will run.
     """
     hub = hubs.get_hub()
-    current = greenlet.getcurrent()
+    current = getcurrent()
     assert hub.greenlet is not current, 'do not call blocking functions from the mainloop'
     timer = hub.schedule_call_global(seconds, current.switch)
     try:
                 g.main(just_raise, (), {})
             except:
                 pass
-    if getcurrent() is not hub.greenlet:
+    current = getcurrent()
+    if current is not hub.greenlet:
         # arrange to wake the caller back up immediately
-        hub.schedule_call_global(0,getcurrent().switch)
+        hub.schedule_call_global(0, current.switch)
     g.throw(*throw_args)

File eventlet/hubs/hub.py

             return None
         return t[0][0]
 
-    def run(self):
+    def run(self, *a, **kw):
         """Run the runloop until abort is called.
         """
+        # accept and discard variable arguments because they will be
+        # supplied if other greenlets have run and exited before the
+        # hub's greenlet gets a chance to run
         if self.running:
             raise RuntimeError("Already running!")
         try:
                 else:
                     self.wait(0)
             else:
-                self.canceled_timers = 0
+                self.timers_canceled = 0
                 del self.timers[:]
                 del self.next_timers[:]
         finally:

File eventlet/patcher.py

             _green_select_modules() +
             _green_socket_modules() +
             _green_thread_modules() +
-            _green_time_modules())
+            _green_time_modules()) 
+            #_green_MySQLdb()) # enable this after a short baking-in period
     
     # after this we are gonna screw with sys.modules, so capture the
     # state of all the modules we're going to mess with, and lock
     module if present; and thread, which patches thread, threading, and Queue.
 
     It's safe to call monkey_patch multiple times.
-    """
-    accepted_args = set(('os', 'select', 'socket', 'thread', 'time', 'psycopg'))
+    """    
+    accepted_args = set(('os', 'select', 'socket', 
+                         'thread', 'time', 'psycopg', 'MySQLdb'))
     default_on = on.pop("all",None)
     for k in on.iterkeys():
         if k not in accepted_args:
     if default_on is None:
         default_on = not (True in on.values())
     for modname in accepted_args:
+        if modname == 'MySQLdb':
+            # MySQLdb is only on when explicitly patched for the moment
+            on.setdefault(modname, False)
         on.setdefault(modname, default_on)
         
     modules_to_patch = []
     if on['time'] and not already_patched.get('time'):
         modules_to_patch += _green_time_modules()
         already_patched['time'] = True
+    if on.get('MySQLdb') and not already_patched.get('MySQLdb'):
+        modules_to_patch += _green_MySQLdb()
+        already_patched['MySQLdb'] = True
     if on['psycopg'] and not already_patched.get('psycopg'):
         try:
             from eventlet.support import psycopg2_patcher
     from eventlet.green import time
     return [('time', time)]
 
+def _green_MySQLdb():
+    try:
+        from eventlet.green import MySQLdb
+        return [('MySQLdb', MySQLdb)]
+    except ImportError:
+        return []
+
 
 if __name__ == "__main__":
     import sys

File eventlet/tpool.py

 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import imp
 import os
 import sys
 
     _wfile.write(_bytetosend)
     _wfile.flush()
 
-_reqq = None
 _rspq = None
 
 def tpool_trampoline():
-    global _reqq, _rspq
+    global _rspq
     while(True):
         try:
             _c = _rfile.read(1)
             except Empty:
                 pass
 
-def esend(meth,*args, **kwargs):
-    global _reqq, _rspq
-    e = event.Event()
-    _reqq.put((e,meth,args,kwargs))
-    return e
 
 SYS_EXCS = (KeyboardInterrupt, SystemExit)
 
 
-def tworker():
-    global _reqq, _rspq
+def tworker(reqq):
+    global _rspq
     while(True):
         try:
-            msg = _reqq.get()
+            msg = reqq.get()
         except AttributeError:
             return # can't get anything off of a dud queue
         if msg is None:
             raise
         except Exception:
             rv = sys.exc_info()
-        _rspq.put((e,rv))               # @@tavis: not supposed to
-                                        # keep references to
-                                        # sys.exc_info() so it would
-                                        # be worthwhile testing
-                                        # if this leads to memory leaks
+        # test_leakage_from_tracebacks verifies that the use of
+        # exc_info does not lead to memory leaks
+        _rspq.put((e,rv))
         meth = args = kwargs = e = rv = None
         _signal_t2e()
 
 
-def erecv(e):
-    rv = e.wait()
-    if isinstance(rv,tuple) and len(rv) == 3 and isinstance(rv[1],Exception):
-        import traceback
-        (c,e,tb) = rv
-        if not QUIET:
-            traceback.print_exception(c,e,tb)
-            traceback.print_stack()
-        raise c,e,tb
-    return rv
-
-
 def execute(meth,*args, **kwargs):
     """
     Execute *meth* in a Python thread, blocking the current coroutine/
     """
     global _threads
     setup()
+    # if already in tpool, don't recurse into the tpool
+    # also, call functions directly if we're inside an import lock, because
+    # if meth does any importing (sadly common), it will hang
     my_thread = threading.currentThread()
-    if my_thread in _threads:
+    if my_thread in _threads or imp.lock_held():
         return meth(*args, **kwargs)
-    e = esend(meth, *args, **kwargs)
-    rv = erecv(e)
+
+    cur = greenthread.getcurrent()
+    # a mini mixing function to make up for the fact that hash(greenlet) doesn't
+    # have much variability in the lower bits
+    k = hash(cur)
+    k = k + 0x2c865fd + (k >> 5)
+    k = k ^ 0xc84d1b7 ^ (k >> 7)
+    thread_index = k % len(_threads)
+    
+    reqq, _thread = _threads[thread_index]
+    e = event.Event()
+    reqq.put((e,meth,args,kwargs))
+
+    rv = e.wait()
+    if isinstance(rv,tuple) and len(rv) == 3 and isinstance(rv[1],Exception):
+        import traceback
+        (c,e,tb) = rv
+        if not QUIET:
+            traceback.print_exception(c,e,tb)
+            traceback.print_stack()
+        raise c,e,tb
     return rv
 
 
         return proxy_call(self._autowrap, self._obj.next)
 
 
-
 _nthreads = int(os.environ.get('EVENTLET_THREADPOOL_SIZE', 20))
-_threads = set()
+_threads = []
 _coro = None
 _setup_already = False
 def setup():
-    global _rfile, _wfile, _threads, _coro, _setup_already, _reqq, _rspq
+    global _rfile, _wfile, _threads, _coro, _setup_already, _rspq
     if _setup_already:
         return
     else:
         _rfile = greenio.GreenSocket(csock).makefile('rb', 0)
         _wfile = nsock.makefile('wb',0)
 
-    _reqq = Queue(maxsize=-1)
     _rspq = Queue(maxsize=-1)
     assert _nthreads >= 0, "Can't specify negative number of threads"
     for i in range(0,_nthreads):
-        t = threading.Thread(target=tworker, name="tpool_thread_%s" % i)
+        reqq = Queue(maxsize=-1)
+        t = threading.Thread(target=tworker, 
+                             name="tpool_thread_%s" % i, 
+                             args=(reqq,))
         t.setDaemon(True)
         t.start()
-        _threads.add(t)
+        _threads.append((reqq, t))
 
     _coro = greenthread.spawn_n(tpool_trampoline)
 
 
 def killall():
-    global _setup_already, _reqq, _rspq, _rfile, _wfile
+    global _setup_already, _rspq, _rfile, _wfile
     if not _setup_already:
         return
-    for i in _threads:
-        _reqq.put(None)
-    for thr in _threads:
+    for reqq, _ in _threads:
+        reqq.put(None)
+    for _, thr in _threads:
         thr.join()
-    _threads.clear()
+    del _threads[:]
     if _coro is not None:
         greenthread.kill(_coro)
     _rfile.close()
     _wfile.close()
     _rfile = None
     _wfile = None
-    _reqq = None
     _rspq = None
     _setup_already = False

File eventlet/websocket.py

                     environ.get('HTTP_ORIGIN'),
                     location))
         elif self.protocol_version == 76:
-            handshake_reply = ("HTTP/1.1 101 Web Socket Protocol Handshake\r\n"
+            handshake_reply = ("HTTP/1.1 101 WebSocket Protocol Handshake\r\n"
                                "Upgrade: WebSocket\r\n"
                                "Connection: Upgrade\r\n"
                                "Sec-WebSocket-Origin: %s\r\n"

File tests/__init__.py

     should return True to skip the test.
     """
     def skipped_wrapper(func):
-        if isinstance(condition, bool):
-            result = condition
-        else:
-            result = condition(func)
-        if result:
-            return skipped(func)
-        else:
-            return func
+        def wrapped(*a, **kw):
+            if isinstance(condition, bool):
+                result = condition
+            else:
+                result = condition(func)
+            if result:
+                return skipped(func)(*a, **kw)
+            else:
+                return func(*a, **kw)
+        wrapped.__name__ = func.__name__
+        return wrapped
     return skipped_wrapper
 
 
     should return True if the condition is satisfied.
     """    
     def skipped_wrapper(func):
-        if isinstance(condition, bool):
-            result = condition
-        else:
-            result = condition(func)
-        if not result:
-            return skipped(func)
-        else:
-            return func
+        def wrapped(*a, **kw):
+            if isinstance(condition, bool):
+                result = condition
+            else:
+                result = condition(func)
+            if not result:
+                return skipped(func)(*a, **kw)
+            else:
+                return func(*a, **kw)
+        wrapped.__name__ = func.__name__
+        return wrapped
     return skipped_wrapper
 
 
 def using_pyevent(_f):
     from eventlet.hubs import get_hub
     return 'pyevent' in type(get_hub()).__module__
+
     
 def skip_with_pyevent(func):
     """ Decorator that skips a test if we're using the pyevent hub."""

File tests/env_test.py

 class Tpool(ProcessBase):
     @skip_with_pyevent
     def test_tpool_size(self):
+        expected = "40"
+        normal = "20"
         new_mod = """from eventlet import tpool
 import eventlet
 import time
     if current[0] > highwater[0]:
         highwater[0] = current[0]
     current[0] -= 1
-expected = 40
+expected = %s
+normal = %s
 p = eventlet.GreenPool()
-for i in xrange(expected):
-    p.spawn(tpool.execute,count)
+for i in xrange(expected*2):
+    p.spawn(tpool.execute, count)
 p.waitall()
-assert highwater[0] == expected, "%s != %s" % (highwater[0], expected)"""
-        os.environ['EVENTLET_THREADPOOL_SIZE'] = "40"
+assert highwater[0] > 20, "Highwater %%s  <= %%s" %% (highwater[0], normal)
+"""
+        os.environ['EVENTLET_THREADPOOL_SIZE'] = expected
         try:
-            self.write_to_tempfile("newmod", new_mod)
+            self.write_to_tempfile("newmod", new_mod % (expected, normal))
             output, lines = self.launch_subprocess('newmod.py')
             self.assertEqual(len(lines), 1, lines)
         finally:

File tests/hub_test.py

         for i in xrange(2000):
             t = hubs.get_hub().schedule_call_global(60, noop)
             t.cancel()
-            self.assert_less_than_equal(hub.timers_canceled - scanceled,
-                                  hub.get_timers_count() - stimers + 1)
+            self.assert_less_than_equal(hub.timers_canceled,
+                                  hub.get_timers_count() + 1)
         # there should be fewer than 1000 new timers and canceled
         self.assert_less_than_equal(hub.get_timers_count(), 1000 + stimers)
         self.assert_less_than_equal(hub.timers_canceled, 1000)
         for i in xrange(2000):
             t = hubs.get_hub().schedule_call_global(60, noop)
             eventlet.sleep()
-            self.assert_less_than_equal(hub.timers_canceled - scanceled,
-                                  hub.get_timers_count() - stimers + 1)
+            self.assert_less_than_equal(hub.timers_canceled,
+                                  hub.get_timers_count() + 1)
             t.cancel()
-            self.assert_less_than_equal(hub.timers_canceled - scanceled,
-                                  hub.get_timers_count() - stimers + 1)
+            self.assert_less_than_equal(hub.timers_canceled,
+                                  hub.get_timers_count() + 1, hub.timers)
         # there should be fewer than 1000 new timers and canceled
         self.assert_less_than_equal(hub.get_timers_count(), 1000 + stimers)
         self.assert_less_than_equal(hub.timers_canceled, 1000)
             t2 = hubs.get_hub().schedule_call_global(60, noop)
             t3 = hubs.get_hub().schedule_call_global(60, noop)
             eventlet.sleep()
-            self.assert_less_than_equal(hub.timers_canceled - scanceled,
-                                        hub.get_timers_count() - stimers + 1)
+            self.assert_less_than_equal(hub.timers_canceled,
+                                        hub.get_timers_count() + 1)
             t.cancel()
-            self.assert_less_than_equal(hub.timers_canceled - scanceled,
-                                        hub.get_timers_count() - stimers + 1)
+            self.assert_less_than_equal(hub.timers_canceled,
+                                        hub.get_timers_count() + 1)
             uncanceled_timers.append(t2)
             uncanceled_timers.append(t3)
         # 3000 new timers, plus a few extras
         self.assertEqual(hub.timers_canceled, 1000)
         for t in uncanceled_timers:
             t.cancel()
-            self.assert_less_than_equal(hub.timers_canceled - scanceled,
-                                        hub.get_timers_count() - stimers + 1)
+            self.assert_less_than_equal(hub.timers_canceled,
+                                        hub.get_timers_count())
         eventlet.sleep()
         
 

File tests/mysqldb_test.py

+import os
+import sys
+import time
+from tests import skipped, skip_unless, using_pyevent, get_database_auth, LimitedTestCase
+import eventlet
+from eventlet import event
+try:
+    from eventlet.green import MySQLdb
+except ImportError:
+    MySQLdb = False
+
+def mysql_requirement(_f):
+    """We want to skip tests if using pyevent, MySQLdb is not installed, or if
+    there is no database running on the localhost that the auth file grants
+    us access to.
+    
+    This errs on the side of skipping tests if everything is not right, but
+    it's better than a million tests failing when you don't care about mysql
+    support."""
+    if using_pyevent(_f):
+        return False
+    if MySQLdb is False:
+        print "Skipping mysql tests, MySQLdb not importable"
+        return False
+    try:
+        auth = get_database_auth()['MySQLdb'].copy()
+        MySQLdb.connect(**auth)
+        return True
+    except MySQLdb.OperationalError:
+        print "Skipping mysql tests, error when connecting:"
+        traceback.print_exc()
+        return False
+
+class MySQLdbTester(LimitedTestCase):
+    def setUp(self):
+        self._auth = get_database_auth()['MySQLdb']
+        self.create_db()
+        self.connection = None
+        self.connection = MySQLdb.connect(**self._auth)
+        cursor = self.connection.cursor()
+        cursor.execute("""CREATE TABLE gargleblatz
+        (
+        a INTEGER
+        );""")
+        self.connection.commit()
+        cursor.close()
+
+    def tearDown(self):
+        if self.connection:
+            self.connection.close()
+        self.drop_db()
+
+    @skip_unless(mysql_requirement)    
+    def create_db(self):
+        auth = self._auth.copy()
+        try:
+            self.drop_db()
+        except Exception:
+            pass
+        dbname = 'test_%d_%d' % (os.getpid(), time.time()*1000)
+        db = MySQLdb.connect(**auth).cursor()
+        db.execute("create database "+dbname)
+        db.close()
+        self._auth['db'] = dbname
+        del db
+
+    def drop_db(self):
+        db = MySQLdb.connect(**self._auth).cursor()
+        db.execute("drop database "+self._auth['db'])
+        db.close()
+        del db
+
+    def set_up_dummy_table(self, connection=None):
+        close_connection = False
+        if connection is None:
+            close_connection = True
+            if self.connection is None:
+                connection = MySQLdb.connect(**self._auth)
+            else:
+                connection = self.connection
+
+        cursor = connection.cursor()
+        cursor.execute(self.dummy_table_sql)
+        connection.commit()
+        cursor.close()
+        if close_connection:
+            connection.close()
+
+    dummy_table_sql = """CREATE TEMPORARY TABLE test_table
+        (
+        row_id INTEGER PRIMARY KEY AUTO_INCREMENT,
+        value_int INTEGER,
+        value_float FLOAT,
+        value_string VARCHAR(200),
+        value_uuid CHAR(36),
+        value_binary BLOB,
+        value_binary_string VARCHAR(200) BINARY,
+        value_enum ENUM('Y','N'),
+        created TIMESTAMP
+        ) ENGINE=InnoDB;"""
+
+    def assert_cursor_yields(self, curs):
+        counter = [0]
+        def tick():
+            while True:
+                counter[0] += 1
+                eventlet.sleep()
+        gt = eventlet.spawn(tick)
+        curs.execute("select 1")
+        rows = curs.fetchall()
+        self.assertEqual(rows, ((1L,),))
+        self.assert_(counter[0] > 0, counter[0])
+        gt.kill()
+
+    def assert_cursor_works(self, cursor):
+        cursor.execute("select 1")
+        rows = cursor.fetchall()
+        self.assertEqual(rows, ((1L,),))
+        self.assert_cursor_yields(cursor)
+        
+    def assert_connection_works(self, conn):
+        curs = conn.cursor()
+        self.assert_cursor_works(curs)
+
+    def test_module_attributes(self):
+        import MySQLdb as orig
+        for key in dir(orig):
+            if key not in ('__author__', '__path__', '__revision__',
+                           '__version__'):
+                self.assert_(hasattr(MySQLdb, key), "%s %s" % (key, getattr(orig, key)))
+
+    def test_connecting(self):
+        self.assert_(self.connection is not None)
+        
+    def test_connecting_annoyingly(self):
+        self.assert_connection_works(MySQLdb.Connect(**self._auth))
+        self.assert_connection_works(MySQLdb.Connection(**self._auth))
+        self.assert_connection_works(MySQLdb.connections.Connection(**self._auth))
+
+    def test_create_cursor(self):
+        cursor = self.connection.cursor()
+        cursor.close()
+
+    def test_run_query(self):
+        cursor = self.connection.cursor()
+        self.assert_cursor_works(cursor)
+        cursor.close()
+
+    def test_run_bad_query(self):
+        cursor = self.connection.cursor()
+        try:
+            cursor.execute("garbage blah blah")
+            self.assert_(False)
+        except AssertionError:
+            raise
+        except Exception:
+            pass
+        cursor.close()
+
+    def fill_up_table(self, conn):
+        curs = conn.cursor()
+        for i in range(1000):
+            curs.execute('insert into test_table (value_int) values (%s)' % i)
+        conn.commit()
+
+    def test_yields(self):
+        conn = self.connection
+        self.set_up_dummy_table(conn)
+        self.fill_up_table(conn)
+        curs = conn.cursor()
+        results = []
+        SHORT_QUERY = "select * from test_table"
+        evt = event.Event()
+        def a_query():
+            self.assert_cursor_works(curs)
+            curs.execute(SHORT_QUERY)
+            results.append(2)
+            evt.send()
+        eventlet.spawn(a_query)
+        results.append(1)
+        self.assertEqual([1], results)
+        evt.wait()
+        self.assertEqual([1, 2], results)
+
+    def test_visibility_from_other_connections(self):
+        conn = MySQLdb.connect(**self._auth)
+        conn2 = MySQLdb.connect(**self._auth)
+        curs = conn.cursor()
+        try:
+            curs2 = conn2.cursor()
+            curs2.execute("insert into gargleblatz (a) values (%s)" % (314159))
+            self.assertEqual(curs2.rowcount, 1)
+            conn2.commit()
+            selection_query = "select * from gargleblatz"
+            curs2.execute(selection_query)
+            self.assertEqual(curs2.rowcount, 1)
+            del curs2, conn2
+            # create a new connection, it should see the addition
+            conn3 = MySQLdb.connect(**self._auth)
+            curs3 = conn3.cursor()
+            curs3.execute(selection_query)
+            self.assertEqual(curs3.rowcount, 1)
+            # now, does the already-open connection see it?
+            curs.execute(selection_query)
+            self.assertEqual(curs.rowcount, 1)
+            del curs3, conn3
+        finally:
+            # clean up my litter
+            curs.execute("delete from gargleblatz where a=314159")
+            conn.commit()
+
+from tests import patcher_test
+
+class MonkeyPatchTester(patcher_test.ProcessBase):
+    @skip_unless(mysql_requirement)    
+    def test_monkey_patching(self):
+        output, lines = self.run_script("""
+from eventlet import patcher
+import MySQLdb as m
+from eventlet.green import MySQLdb as gm
+patcher.monkey_patch(all=True, MySQLdb=True)
+print "mysqltest", ",".join(sorted(patcher.already_patched.keys()))
+print "connect", m.connect == gm.connect
+""")
+        self.assertEqual(len(lines), 3)
+        self.assertEqual(lines[0].replace("psycopg", ""),
+                         'mysqltest MySQLdb,os,select,socket,thread,time')
+        self.assertEqual(lines[1], "connect True")

File tests/patcher_test.py

         python_path = os.pathsep.join(sys.path + [self.tempdir])
         new_env = os.environ.copy()
         new_env['PYTHONPATH'] = python_path
+        if not filename.endswith('.py'):
+            filename = filename + '.py'
         p = subprocess.Popen([sys.executable, 
                               os.path.join(self.tempdir, filename)],
                 stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=new_env)
         lines = output.split("\n")
         return output, lines
 
+    def run_script(self, contents, modname=None):
+        if modname is None:
+            modname = "testmod"
+        self.write_to_tempfile(modname, contents)
+        return self.launch_subprocess(modname)
+
 
 class ImportPatched(ProcessBase):
     def test_patch_a_module(self):
         patched_modules = lines[0][len(ap):].strip()
         # psycopg might or might not be patched based on installed modules
         patched_modules = patched_modules.replace("psycopg,", "")
+        # ditto for MySQLdb
+        patched_modules = patched_modules.replace("MySQLdb,", "")
         self.assertEqual(patched_modules, expected,
                          "Logic:%s\nExpected: %s != %s" %(call, expected,
                                                           patched_modules))

File tests/tpool_test.py

 from sys import stdout
 import time
 import re
+import gc
 from tests import skipped, skip_with_pyevent, LimitedTestCase, main
 
 from eventlet import tpool, debug
             self.assert_(item >= previtem)
         # make sure the tick happened at least a few times so that we know
         # that our iterations in foo() were actually tpooled
-        print counter[0]
         self.assert_(counter[0] > 10, counter[0])
         gt.kill()
 
                 self.assertEquals(token, rv)
                 eventlet.sleep(random.random()/200.0)
 
-        pile = eventlet.GreenPile(10)
-        for i in xrange(10):
+        cnt = 10
+        pile = eventlet.GreenPile(cnt)
+        for i in xrange(cnt):
             pile.spawn(sender_loop,i)
         results = list(pile)
-        self.assertEquals(len(results), 10)
+        self.assertEquals(len(results), cnt)
         tpool.killall()
         
     @skipped
             iterations, tpool_overhead, best_normal, best_tpool)
         tpool.killall()
 
+    @skip_with_pyevent
+    def test_leakage_from_tracebacks(self):
+        tpool.execute(noop)  # get it started
+        gc.collect()
+        initial_objs = len(gc.get_objects())
+        for i in xrange(10):
+            self.assertRaises(RuntimeError, tpool.execute, raise_exception)
+        gc.collect()
+        middle_objs = len(gc.get_objects())
+        # some objects will inevitably be created by the previous loop
+        # now we test to ensure that running the loop an order of
+        # magnitude more doesn't generate additional objects
+        for i in xrange(100):
+            self.assertRaises(RuntimeError, tpool.execute, raise_exception)
+        first_created = middle_objs - initial_objs
+        gc.collect()
+        second_created = len(gc.get_objects()) - middle_objs
+        self.assert_(second_created - first_created < 10,
+                     "first loop: %s, second loop: %s" % (first_created,
+                                                          second_created))
+        tpool.killall()
+
 
 if __name__ == '__main__':
     main()

File tests/websocket_test.py

 from eventlet.websocket import WebSocket, WebSocketWSGI
 from eventlet import wsgi
 from eventlet import event
+from eventlet import greenio
 
 from tests import mock, LimitedTestCase, certificate_file, private_key_file
 from tests.wsgi_test import _TestBase
         result = sock.recv(1024)
         ## The server responds the correct Websocket handshake
         self.assertEqual(result,
-                         '\r\n'.join(['HTTP/1.1 101 Web Socket Protocol Handshake',
+                         '\r\n'.join(['HTTP/1.1 101 WebSocket Protocol Handshake',
                                       'Upgrade: WebSocket',
                                       'Connection: Upgrade',
                                       'Sec-WebSocket-Origin: http://localhost:%s' % self.port,
         sock.sendall(' end\xff')
         result = sock.recv(1024)
         self.assertEqual(result, '\x00start end\xff')
-        sock.shutdown(socket.SHUT_RDWR)
+        greenio.shutdown_safe(sock)
         sock.close()
         eventlet.sleep(0.01)