Commits

howerton committed 4b4531f Merge

adding upstream changes

Comments (0)

Files changed (31)

 d528852d10962c7c8e0cc4d8b14625df10c84cea 0.9.10
 ce7ad90f6d038b56e91732e3be1eeda4504a401f 0.9.12
 5185591ef81340f4da5a508d37f15257d2defa68 0.9.13
+5140fe20544f0e74b14ef36ca9fdb6e7366920e4 0.9.14
 * Holger Krekel, websocket example small fix
 * mikepk, debugging MySQLdb/tpool issues
 * Malcolm Cleaton, patch for Event exception handling
+* Alexey Borzenkov, for finding and fixing issues with Windows error detection (#66, #69), reducing dependencies in zeromq hub (#71)
+* Anonymous, finding and fixing error in websocket chat example (#70)
+* Edward George, finding and fixing an issue in the [e]poll hubs (#74)
+* Ruijun Luo, figuring out incorrect openssl import for wrap_ssl (#73)
+* rfk, patch to get green zmq to respect noblock flag.
+* Soren Hansen, finding and fixing issue in subprocess (#77)
+* Stefano Rivera, making tests pass in absence of postgres (#78)
+* Joshua Kwan, fixing busy-wait in eventlet.green.ssl.
+0.9.14
+======
+* Many fixes to the ZeroMQ hub, which now requires version 2.0.10 or later.  Thanks to Ben Ford.
+* ZeroMQ hub no longer depends on pollhub, and thus works on Windows (thanks, Alexey Borzenkov)
+* Better handling of connect errors on Windows, thanks again to Alexey Borzenkov.
+* More-robust Event delivery, thanks to Malcolm Cleaton
+* wsgi.py now distinguishes between an empty query string ("") and a non-existent query string (no entry in environ).
+* wsgi.py handles ipv6 correctly (thanks, redbo)
+* Better behavior in tpool when you give it nonsensical numbers, thanks to R. Tyler for the nonsense.  :)
+* Fixed importing on 2.5 (#73, thanks to Ruijun Luo)
+* Hub doesn't hold on to invalid fds (#74, thanks to Edward George)
+* Documentation for eventlet.green.zmq, courtesy of Ben Ford
+
 0.9.13
 ======
 * ZeroMQ hub, and eventlet.green.zmq make supersockets green.  Thanks to Ben Ford!
     Lowest-common-denominator, available everywhere.
 **pyevent**
     This is a libevent-based backend and is thus the fastest.  It's disabled by default, because it does not support native threads, but you can enable it yourself if your use case doesn't require them.  (You have to install pyevent, too.)
+**zeromq**
+    `ZeroMQ <http://zeromq.org>`_-powered hub.  This hub supports all normal non-blocking socket operations, and it's required for :mod:`eventlet.green.zeromq` to work.   
 
 If the selected hub is not idea for the application, another can be selected.  You can make the selection either with the environment variable :ref:`EVENTLET_HUB <env_vars>`, or with use_hub.
 
    examples
    ssl
    threading
+   zeromq
    hubs
    testing
    environment
    modules/timeout
    modules/websocket
    modules/wsgi
+   modules/zmq
+:mod:`eventlet.green.zmq` -- ØMQ support
+========================================
+
+.. automodule:: eventlet.green.zmq
+    :show-inheritance:
+
+.. currentmodule:: eventlet.green.zmq
+
+.. autofunction:: Context
+
+.. autoclass:: _Context
+    :show-inheritance:
+
+    .. automethod:: socket
+
+.. autoclass:: Socket
+    :show-inheritance:
+    :inherited-members:
+
+    .. automethod:: recv
+
+    .. automethod:: send
+
+.. module:: zmq
+
+:mod:`zmq` -- The pyzmq ØMQ python bindings
+===========================================
+
+:mod:`pyzmq <zmq>` [1]_ Is a python binding to the C++ ØMQ [2]_ library written in Cython [3]_. The following is
+auto generated :mod:`pyzmq's <zmq>` from documentation.
+
+.. autoclass:: zmq.core.context.Context
+    :members:
+
+.. autoclass:: zmq.core.socket.Socket
+
+.. autoclass:: zmq.core.poll.Poller
+    :members:
+
+
+.. [1] http://github.com/zeromq/pyzmq
+.. [2] http://www.zeromq.com
+.. [3] http://www.cython.org
 
 <p>Alternately, you can download the source tarball from <a href="http://pypi.python.org/pypi/eventlet/">PyPi</a>:
 <ul>
-<li><a href="http://pypi.python.org/packages/source/e/eventlet/eventlet-0.9.13.tar.gz">eventlet-0.9.13.tar.gz</a></li>
+<li><a href="http://pypi.python.org/packages/source/e/eventlet/eventlet-0.9.14.tar.gz">eventlet-0.9.14.tar.gz</a></li>
 </ul>
 </p>
 
+Zeromq
+######
+
+What is ØMQ?
+============
+
+"A ØMQ socket is what you get when you take a normal TCP socket, inject it with a mix of radioactive isotopes stolen
+from a secret Soviet atomic research project, bombard it with 1950-era cosmic rays, and put it into the hands of a drug-addled
+comic book author with a badly-disguised fetish for bulging muscles clad in spandex."
+
+Key differences to conventional sockets
+Generally speaking, conventional sockets present a synchronous interface to either connection-oriented reliable byte streams (SOCK_STREAM),
+or connection-less unreliable datagrams (SOCK_DGRAM). In comparison, 0MQ sockets present an abstraction of an asynchronous message queue,
+with the exact queueing semantics depending on the socket type in use. Where conventional sockets transfer streams of bytes or discrete datagrams,
+0MQ sockets transfer discrete messages.
+
+0MQ sockets being asynchronous means that the timings of the physical connection setup and teardown,
+reconnect and effective delivery are transparent to the user and organized by 0MQ itself.
+Further, messages may be queued in the event that a peer is unavailable to receive them.
+
+Conventional sockets allow only strict one-to-one (two peers), many-to-one (many clients, one server),
+or in some cases one-to-many (multicast) relationships. With the exception of ZMQ::PAIR,
+0MQ sockets may be connected to multiple endpoints using connect(),
+while simultaneously accepting incoming connections from multiple endpoints bound to the socket using bind(), thus allowing many-to-many relationships.
+
+API documentation
+=================
+
+ØMQ support is provided in the :mod:`eventlet.green.zmq` module
-version_info = (0, 9, 14, "dev")
+version_info = (0, 9, 15, "dev")
 __version__ = ".".join(map(str, version_info))
 
 try:

eventlet/convenience.py

     wrap_ssl_impl = ssl.wrap_socket
 except ImportError:
     # < 2.6, trying PyOpenSSL
-    from eventlet.green.OpenSSL import SSL
     try:
+        from eventlet.green.OpenSSL import SSL
         def wrap_ssl_impl(sock, keyfile=None, certfile=None, server_side=False,
                           cert_reqs=None, ssl_version=None, ca_certs=None,
                           do_handshake_on_connect=True, 

eventlet/green/ssl.py

                     return func(*a, **kw)
                 except SSLError, exc:
                     if get_errno(exc) == SSL_ERROR_WANT_READ:
-                        trampoline(self, 
-                                   read=True, 
-                                   timeout=self.gettimeout(), 
+                        trampoline(self,
+                                   read=True,
+                                   timeout=self.gettimeout(),
                                    timeout_exc=timeout_exc('timed out'))
                     elif get_errno(exc) == SSL_ERROR_WANT_WRITE:
-                        trampoline(self, 
-                                   write=True, 
-                                   timeout=self.gettimeout(), 
+                        trampoline(self,
+                                   write=True,
+                                   timeout=self.gettimeout(),
                                    timeout_exc=timeout_exc('timed out'))
                     else:
                         raise
 
-        
     def write(self, data):
         """Write DATA to the underlying SSL channel.  Returns
         number of bytes of DATA actually transmitted."""
         """Read up to LEN bytes and return them.
         Return zero-length string on EOF."""
         return self._call_trampolining(
-            super(GreenSSLSocket, self).read,len)
+            super(GreenSSLSocket, self).read, len)
 
     def send (self, data, flags=0):
-        # *NOTE: gross, copied code from ssl.py becase it's not factored well enough to be used as-is
         if self._sslobj:
-            if flags != 0:
-                raise ValueError(
-                    "non-zero flags not allowed in calls to send() on %s" %
-                    self.__class__)
-            while True:
-                try:
-                    v = self._sslobj.write(data)
-                except SSLError, e:
-                    if get_errno(e) == SSL_ERROR_WANT_READ:
-                        return 0
-                    elif get_errno(e) == SSL_ERROR_WANT_WRITE:
-                        return 0
-                    else:
-                        raise
-                else:
-                    return v
+            return self._call_trampolining(
+                super(GreenSSLSocket, self).send, data, flags)
         else:
-            while True:
-                try:
-                    return socket.send(self, data, flags)
-                except orig_socket.error, e:
-                    if self.act_non_blocking:
-                        raise
-                    if get_errno(e) == errno.EWOULDBLOCK or \
-                       get_errno(e) == errno.ENOTCONN:
-                        return 0
-                    raise
+            trampoline(self, write=True, timeout_exc=timeout_exc('timed out'))
+            return socket.send(self, data, flags)
 
     def sendto (self, data, addr, flags=0):
         # *NOTE: gross, copied code from ssl.py becase it's not factored well enough to be used as-is

eventlet/green/subprocess.py

             # eventlet.processes.Process.run() method.
             for attr in "stdin", "stdout", "stderr":
                 pipe = getattr(self, attr)
-                if pipe is not None:
+                if pipe is not None and not type(pipe) == greenio.GreenPipe:
                     wrapped_pipe = greenio.GreenPipe(pipe, pipe.mode, bufsize)
                     setattr(self, attr, wrapped_pipe)
         __init__.__doc__ = subprocess_orig.Popen.__init__.__doc__

eventlet/green/zmq.py

+"""The :mod:`zmq` module wraps the :class:`Socket` and :class:`Context` found in :mod:`pyzmq <zmq>` to be non blocking
+"""
 __zmq__ = __import__('zmq')
 from eventlet import sleep
 from eventlet.hubs import trampoline, get_hub
 
 
 def get_hub_name_from_instance(hub):
+    """Get the string name the eventlet uses to refer to hub
+
+    :param hub: An eventlet hub
+    """
     return hub.__class__.__module__.rsplit('.',1)[-1]
 
 def Context(io_threads=1):
+    """Factory function replacement for :class:`zmq.core.context.Context`
+
+    This factory ensures the :class:`zeromq hub <eventlet.hubs.zeromq.Hub>`
+    is the active hub, and defers creation (or retreival) of the ``Context``
+    to the hub's :meth:`~eventlet.hubs.zeromq.Hub.get_context` method
+    
+    It's a factory function due to the fact that there can only be one :class:`_Context`
+    instance per thread. This is due to the way :class:`zmq.core.poll.Poller`
+    works
+    """
     hub = get_hub()
     hub_name = get_hub_name_from_instance(hub)
     if hub_name != 'zeromq':
     return hub.get_context(io_threads)
 
 class _Context(__zmq__.Context):
+    """Internal subclass of :class:`zmq.core.context.Context`
+
+    .. warning:: Do not grab one of these yourself, use the factory function
+        :func:`eventlet.green.zmq.Context`
+    """
 
     def socket(self, socket_type):
+        """Overridden method to ensure that the green version of socket is used
+
+        Behaves the same as :meth:`zmq.core.context.Context.socket`, but ensures
+        that a :class:`Socket` with all of its send and recv methods set to be
+        non-blocking is returned
+        """
         return Socket(self, socket_type)
 
 class Socket(__zmq__.Socket):
-            
+    """Green version of :class:`zmq.core.socket.Socket
 
-    def _send_message(self, data, flags=0, copy=True):
+    The following four methods are overridden:
+
+        * _send_message
+        * _send_copy
+        * _recv_message
+        * _recv_copy
+
+    To ensure that the ``zmq.NOBLOCK`` flag is set and that sending or recieving
+    is deferred to the hub (using :func:`eventlet.hubs.trampoline`) if a
+    ``zmq.EAGAIN`` (retry) error is raised
+    """
+
+
+    def _send_message(self, msg, flags=0):
+        if flags & __zmq__.NOBLOCK:
+            super(Socket, self)._send_message(msg, flags)
+            return
         flags |= __zmq__.NOBLOCK
         while True:
             try:
-                super(Socket, self)._send_message(data, flags)
+                super(Socket, self)._send_message(msg, flags)
                 return
             except __zmq__.ZMQError, e:
                 if e.errno != EAGAIN:
                     raise
             trampoline(self, write=True)
 
-    def _send_copy(self, data, flags=0, copy=True):
+    def _send_copy(self, msg, flags=0):
+        if flags & __zmq__.NOBLOCK:
+            super(Socket, self)._send_copy(msg, flags)
+            return
         flags |= __zmq__.NOBLOCK
         while True:
             try:
-                super(Socket, self)._send_copy(data, flags)
+                super(Socket, self)._send_copy(msg, flags)
                 return
             except __zmq__.ZMQError, e:
                 if e.errno != EAGAIN:
                     raise
             trampoline(self, write=True)
 
-    def _recv_message(self, flags=0):
-
+    def _recv_message(self, flags=0, track=False):
+        if flags & __zmq__.NOBLOCK:
+            return super(Socket, self)._recv_message(flags, track)
         flags |= __zmq__.NOBLOCK
         while True:
             try:
-                m = super(Socket, self)._recv_message(flags)
-                if m:
+                m = super(Socket, self)._recv_message(flags, track)
+                if m is not None:
                     return m
             except __zmq__.ZMQError, e:
                 if e.errno != EAGAIN:
             trampoline(self, read=True)
 
     def _recv_copy(self, flags=0):
+        if flags & __zmq__.NOBLOCK:
+            return super(Socket, self)._recv_copy(flags)
         flags |= __zmq__.NOBLOCK
         while True:
             try:
                 m = super(Socket, self)._recv_copy(flags)
-                if m:
+                if m is not None:
                     return m
             except __zmq__.ZMQError, e:
                 if e.errno != EAGAIN:
 
 
 
-    
 
 CONNECT_ERR = set((errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK))
 CONNECT_SUCCESS = set((0, errno.EISCONN))
+if sys.platform[:3]=="win":
+    CONNECT_ERR.add(errno.WSAEINVAL)   # Bug 67
 
 # Emulate _fileobject class in 3.x implementation
 # Eventually this internal socket structure could be replaced with makefile calls.
         raise socket.error(err, errno.errorcode[err])
     return descriptor
 
+def socket_checkerr(descriptor):
+    err = descriptor.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
+    if err not in CONNECT_SUCCESS:
+        raise socket.error(err, errno.errorcode[err])
 
 def socket_accept(descriptor):
     """
         if self.gettimeout() is None:
             while not socket_connect(fd, address):
                 trampoline(fd, write=True)
+                socket_checkerr(fd)
         else:
             end = time.time() + self.gettimeout()
             while True:
                     raise socket.timeout("timed out")
                 trampoline(fd, write=True, timeout=end-time.time(),
                         timeout_exc=socket.timeout("timed out"))
+                socket_checkerr(fd)
 
     def connect_ex(self, address):
         if self.act_non_blocking:
             while not socket_connect(fd, address):
                 try:
                     trampoline(fd, write=True)
+                    socket_checkerr(fd)
                 except socket.error, ex:
                     return get_errno(ex)
         else:
                         raise socket.timeout(errno.EAGAIN)
                     trampoline(fd, write=True, timeout=end-time.time(),
                             timeout_exc=socket.timeout(errno.EAGAIN))
+                    socket_checkerr(fd)
                 except socket.error, ex:
                     return get_errno(ex)
 
         bucket = self.listeners[evtype]
         if fileno in bucket:
             if g_prevent_multiple_readers:
-               raise RuntimeError("Second simultaneous %s on fileno %s "\
+                raise RuntimeError("Second simultaneous %s on fileno %s "\
                      "detected.  Unless you really know what you're doing, "\
                      "make sure that only one greenthread can %s any "\
                      "particular socket.  Consider using a pools.Pool. "\
         self.listeners[evtype].pop(fileno, None)
         # migrate a secondary listener to be the primary listener
         if fileno in self.secondaries[evtype]:
-            sec = self.secondaries[evtype].get(fileno, ())
+            sec = self.secondaries[evtype].get(fileno, None)
             if not sec:
                 return
             self.listeners[evtype][fileno] = sec.pop(0)
     def remove_descriptor(self, fileno):
         """ Completely remove all listeners for this fileno.  For internal use
         only."""
-        self.listeners[READ].pop(fileno, None)
-        self.listeners[WRITE].pop(fileno, None)
-        self.secondaries[READ].pop(fileno, None)
-        self.secondaries[WRITE].pop(fileno, None)
+        listeners = []
+        listeners.append(self.listeners[READ].pop(fileno, noop))
+        listeners.append(self.listeners[WRITE].pop(fileno, noop))
+        listeners.extend(self.secondaries[READ].pop(fileno, ()))
+        listeners.extend(self.secondaries[WRITE].pop(fileno, ()))
+        for listener in listeners:
+            try:
+                listener.cb(fileno)
+            except Exception, e:
+                self.squelch_generic_exception(sys.exc_info())
 
     def switch(self):
         cur = greenlet.getcurrent()
                 switch_out()
             except:
                 self.squelch_generic_exception(sys.exc_info())
-                clear_sys_exc_info()
         if self.greenlet.dead:
             self.greenlet = greenlet.greenlet(self.run)
         try:
         if self.debug_exceptions:
             traceback.print_exception(*exc_info)
             sys.stderr.flush()
+            clear_sys_exc_info()
 
     def squelch_timer_exception(self, timer, exc_info):
         if self.debug_exceptions:
             traceback.print_exception(*exc_info)
             sys.stderr.flush()
+            clear_sys_exc_info()
 
     def add_timer(self, timer):
         scheduled_time = self.clock() + timer.seconds

eventlet/hubs/poll.py

             mask |= READ_MASK | EXC_MASK
         if self.listeners[WRITE].get(fileno):
             mask |= WRITE_MASK | EXC_MASK
-        if mask:
-            if new:
-                self.poll.register(fileno, mask)
-            else:
+        try:
+            if mask:
+                if new:
+                    self.poll.register(fileno, mask)
+                else:
+                    try:
+                        self.modify(fileno, mask)
+                    except (IOError, OSError):
+                        self.poll.register(fileno, mask)
+            else: 
                 try:
-                    self.modify(fileno, mask)
-                except (IOError, OSError):
-                    self.poll.register(fileno, mask)
-        else: 
-            try:
-                self.poll.unregister(fileno)
-            except (KeyError, IOError, OSError):
-                # raised if we try to remove a fileno that was
-                # already removed/invalid
-                pass
+                    self.poll.unregister(fileno)
+                except (KeyError, IOError, OSError):
+                    # raised if we try to remove a fileno that was
+                    # already removed/invalid
+                    pass
+        except ValueError:
+            # fileno is bad, issue 74
+            self.remove_descriptor(fileno)
+            raise
 
     def remove_descriptor(self, fileno):
         super(Hub, self).remove_descriptor(fileno)

eventlet/hubs/selects.py

             try:
                 select.select([fd], [], [], 0)
             except select.error, e:
-                if get_errno(e) == errno.EBADF:
+                if get_errno(e) in BAD_SOCK:
                     self.remove_descriptor(fd)
 
     def wait(self, seconds=None):

eventlet/hubs/zeromq.py

 from eventlet import patcher
 from eventlet.green import zmq
-from eventlet.hubs import poll, _threadlocal
-from eventlet.hubs.hub import BaseHub, noop
-from eventlet.hubs.poll  import READ, WRITE
+from eventlet.hubs import _threadlocal
+from eventlet.hubs.hub import BaseHub, READ, WRITE, noop
 from eventlet.support import clear_sys_exc_info
 import sys
 
 READ_MASK = zmq.POLLIN
 WRITE_MASK = zmq.POLLOUT
 
-class Hub(poll.Hub):
-
-
+class Hub(BaseHub):
     def __init__(self, clock=time.time):
         BaseHub.__init__(self, clock)
         self.poll = zmq.Poller()
             _threadlocal.context = zmq._Context(io_threads)
             return _threadlocal.context
 
+    def add(self, evtype, fileno, cb):
+        listener = super(Hub, self).add(evtype, fileno, cb)
+        self.register(fileno, new=True)
+        return listener
+
+    def remove(self, listener):
+        super(Hub, self).remove(listener)
+        self.register(listener.fileno)
+
     def register(self, fileno, new=False):
         mask = 0
         if self.listeners[READ].get(fileno):
         else:
             self.poll.unregister(fileno)
 
+    def remove_descriptor(self, fileno):
+        super(Hub, self).remove_descriptor(fileno)
+        try:
+            self.poll.unregister(fileno)
+        except (KeyError, ValueError, IOError, OSError):
+            # raised if we try to remove a fileno that was
+            # already removed/invalid
+            pass
+
+    def do_poll(self, seconds):
+        # zmq.Poller.poll expects milliseconds
+        return self.poll.poll(seconds * 1000.0)
 
     def wait(self, seconds=None):
         readers = self.listeners[READ]
 
         if self.debug_blocking:
             self.block_detect_post()
-
-
-#    def do_poll(self, seconds):
-#        print 'poll: ', seconds
-#        if seconds < 0:
-#            seconds = 500
-#        return self.poll.poll(seconds)
     cooperate with green threads by sticking them in native threads, at the cost
     of some overhead.
     """
-    global _threads
     setup()
     # if already in tpool, don't recurse into the tpool
     # also, call functions directly if we're inside an import lock, because
     # if meth does any importing (sadly common), it will hang
     my_thread = threading.currentThread()
-    if my_thread in _threads or imp.lock_held():
+    if my_thread in _threads or imp.lock_held() or _nthreads == 0:
         return meth(*args, **kwargs)
 
     cur = greenthread.getcurrent()
     k = hash(cur)
     k = k + 0x2c865fd + (k >> 5)
     k = k ^ 0xc84d1b7 ^ (k >> 7)
-    thread_index = k % len(_threads)
+    thread_index = k % _nthreads
     
     reqq, _thread = _threads[thread_index]
     e = event.Event()
 
     _rspq = Queue(maxsize=-1)
     assert _nthreads >= 0, "Can't specify negative number of threads"
-    for i in range(0,_nthreads):
+    if _nthreads == 0:
+        import warnings
+        warnings.warn("Zero threads in tpool.  All tpool.execute calls will\
+            execute in main thread.  Check the value of the environment \
+            variable EVENTLET_THREADPOOL_SIZE.", RuntimeWarning)
+    for i in xrange(_nthreads):
         reqq = Queue(maxsize=-1)
         t = threading.Thread(target=tworker, 
                              name="tpool_thread_%s" % i, 
         t.setDaemon(True)
         t.start()
         _threads.append((reqq, t))
+        
 
     _coro = greenthread.spawn_n(tpool_trampoline)
 
                     write('')
             except Exception:
                 self.close_connection = 1
-                exc = traceback.format_exc()
-                self.server.log_error(exc)
+                tb = traceback.format_exc()
+                self.server.log_error(tb)
                 if not headers_set:
+                    err_body = ""
+                    if(self.server.debug):
+                        err_body = tb
                     start_response("500 Internal Server Error",
                                    [('Content-type', 'text/plain'),
-                                    ('Content-length', len(exc))])
-                    write(exc)
+                                    ('Content-length', len(err_body))])
+                    write(err_body)
         finally:
             if hasattr(result, 'close'):
                 result.close()
             env['CONTENT_LENGTH'] = length
         env['SERVER_PROTOCOL'] = 'HTTP/1.0'
 
-        host, port = self.request.getsockname()
+        host, port = self.request.getsockname()[:2]
         env['SERVER_NAME'] = host
         env['SERVER_PORT'] = str(port)
         env['REMOTE_ADDR'] = self.client_address[0]
         self.connection.close()
 
 
+
 class Server(BaseHTTPServer.HTTPServer):
     def __init__(self,
                  socket,
                  minimum_chunk_size=None,
                  log_x_forwarded_for=True,
                  keepalive=True,
-                 log_format=DEFAULT_LOG_FORMAT):
+                 log_format=DEFAULT_LOG_FORMAT,
+                 debug=True):
 
         self.outstanding_requests = 0
         self.socket = socket
         if minimum_chunk_size is not None:
             protocol.minimum_chunk_size = minimum_chunk_size
         self.log_x_forwarded_for = log_x_forwarded_for
+        self.log_format = log_format
+        self.debug = debug
 
     def get_environ(self):
         d = {
            log_x_forwarded_for=True,
            custom_pool=None,
            keepalive=True,
-           log_format=DEFAULT_LOG_FORMAT):
+           log_format=DEFAULT_LOG_FORMAT,
+           debug=True):
     """  Start up a wsgi server handling requests from the supplied server
     socket.  This function loops forever.  The *sock* object will be closed after server exits,
     but the underlying file descriptor will remain open, so if you have a dup() of *sock*,
     :param custom_pool: A custom GreenPool instance which is used to spawn client green threads.  If this is supplied, max_size is ignored.
     :param keepalive: If set to False, disables keepalives on the server; all connections will be closed after serving one request.
     :param log_format: A python format string that is used as the template to generate log lines.  The following values can be formatted into it: client_ip, date_time, request_line, status_code, body_length, wall_seconds.  The default is a good example of how to use it.
+    :param debug: True if the server should send exception tracebacks to the clients on 500 errors.  If False, the server will respond with empty bodies.
     """
     serv = Server(sock, sock.getsockname(),
                   site, log,
                   minimum_chunk_size=minimum_chunk_size,
                   log_x_forwarded_for=log_x_forwarded_for,
                   keepalive=keepalive,
-                  log_format=log_format)
+                  log_format=log_format,
+                  debug=debug)
     if server_event is not None:
         server_event.send(serv)
     if max_size is None:
     else:
         pool = greenpool.GreenPool(max_size)
     try:
-        host, port = sock.getsockname()
+        host, port = sock.getsockname()[:2]
         port = ':%s' % (port, )
         if hasattr(sock, 'do_handshake'):
             scheme = 'https'

examples/websocket_chat.py

         return handle(environ, start_response)
     else:
         start_response('200 OK', [('content-type', 'text/html')])
-        return [open(os.path.join(
-                     os.path.dirname(__file__), 
-                     'websocket_chat.html')).read() % PORT]
+        html_path = os.path.join(os.path.dirname(__file__), 'websocket_chat.html')
+        return [open(html_path).read() % {'port': PORT}]
         
 if __name__ == "__main__":
     # run an example app from the command line            
     from eventlet.hubs import get_hub
     return 'pyevent' in type(get_hub()).__module__
 
+
+def using_zmq(_f):
+    try:
+        import zmq
+    except ImportError:
+        return False
+    from eventlet.hubs import get_hub
+    return zmq and 'zeromq' in type(get_hub()).__module__
+
+def skip_unless_zmq(func):
+    """ Decorator that skips a test if we're not using the zeromq hub."""
+    return skip_unless(using_zmq)(func)
+
+def skip_with_zmq(func):
+    """ Decorator that skips a test if we're using the zeromq hub."""
+    return skip_if(using_zmq)(func)
+
     
 def skip_with_pyevent(func):
     """ Decorator that skips a test if we're using the pyevent hub."""
         finally:
             del os.environ['EVENTLET_THREADPOOL_SIZE']
 
+    def test_tpool_zero(self):
+        new_mod = """from eventlet import tpool
+import eventlet
+import time
+def do():
+    print "ran it"
+tpool.execute(do)
+"""
+        os.environ['EVENTLET_THREADPOOL_SIZE'] = "0"
+        try:
+            self.write_to_tempfile("newmod", new_mod)
+            output, lines = self.launch_subprocess('newmod.py')
+            self.assertEqual(len(lines), 4, lines)
+            self.assertEqual(lines[-2], 'ran it', lines)
+            self.assert_('Warning' in lines[1] or 'Warning' in lines[0], lines)
+        finally:
+            del os.environ['EVENTLET_THREADPOOL_SIZE']
+
+
 
 class Hub(ProcessBase):
 

tests/greenio_test.py

 import socket as _orig_sock
-from tests import LimitedTestCase, skip_with_pyevent, main, skipped, s2b
+from tests import LimitedTestCase, skip_with_pyevent, main, skipped, s2b, skip_if, skip_on_windows
 from eventlet import event
 from eventlet import greenio
 from eventlet import debug
     test_sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1)
     return test_sock.getsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF)
 
+
+def using_epoll_hub(_f):
+        from eventlet.hubs import get_hub
+        try:
+            return 'epolls' in type(get_hub()).__module__
+        except Exception:
+            return False
+
+
 class TestGreenSocket(LimitedTestCase):
     def assertWriteToClosedFileRaises(self, fd):
         if sys.version_info[0]<3:
         s.sendall('b')
         a.wait()
 
-        
+    @skip_with_pyevent
+    @skip_if(using_epoll_hub)
+    def test_closure(self):
+        def spam_to_me(address):
+            sock = eventlet.connect(address)
+            while True:
+                try:
+                    sock.sendall('hello world')
+                except socket.error, e:
+                    if get_errno(e)== errno.EPIPE:
+                        return
+                    raise
+
+        server = eventlet.listen(('127.0.0.1', 0))
+        sender = eventlet.spawn(spam_to_me, server.getsockname())
+        client, address = server.accept()
+        server.close()
+
+        def reader():
+            try:
+                while True:
+                    data = client.recv(1024)
+                    self.assert_(data)
+            except socket.error, e:
+                # we get an EBADF because client is closed in the same process
+                # (but a different greenthread)
+                if get_errno(e) != errno.EBADF:
+                    raise
+
+        def closer():
+            client.close()
+
+        reader = eventlet.spawn(reader)
+        eventlet.spawn_n(closer)
+        reader.wait()
+        sender.wait()
+    
+    def test_invalid_connection(self):
+        # find an unused port by creating a socket then closing it
+        port = eventlet.listen(('127.0.0.1', 0)).getsockname()[1]
+        self.assertRaises(socket.error, eventlet.connect, ('127.0.0.1', port))
+    
 class TestGreenPipe(LimitedTestCase):
+    @skip_on_windows
     def setUp(self):
         super(self.__class__, self).setUp()
         self.tempdir = tempfile.mkdtemp('_green_pipe_test')
-from tests import LimitedTestCase, main, skip_with_pyevent, skip_if_no_itimer
+from tests import LimitedTestCase, main, skip_with_pyevent, skip_if_no_itimer, skip_with_zmq
 import time
 import eventlet
 from eventlet import hubs
         gt = eventlet.spawn(look_im_blocking)
         self.assertRaises(RuntimeError, gt.wait)
         debug.hub_blocking_detection(False)
+        
 
 class TestSuspend(LimitedTestCase):
     TEST_TIMEOUT=3
         shutil.rmtree(self.tempdir)
 
 
+class TestBadFilenos(LimitedTestCase):
+    @skip_with_pyevent
+    @skip_with_zmq
+    def test_repeated_selects(self):
+        from eventlet.green import select
+        self.assertRaises(ValueError, select.select, [-1], [], [])
+        self.assertRaises(ValueError, select.select, [-1], [], [])
+
+
 class Foo(object):
     pass
 

tests/patcher_psycopg_test.py

 import os
 
-from tests import patcher_test
+from tests import patcher_test, skip_unless
 from tests import get_database_auth
+from tests.db_pool_test import postgres_requirement
 
 psycopg_test_file = """
 import os
 """
 
 class PatchingPsycopg(patcher_test.ProcessBase):
+    @skip_unless(postgres_requirement)
     def test_psycopg_patched(self):
         if 'PSYCOPG_TEST_DSN' not in os.environ:
             # construct a non-json dsn for the subprocess

tests/patcher_test.py

         self.assertEqual(len(lines), 2, "\n".join(lines))
 
 
+class Subprocess(ProcessBase):
+    def test_monkeypatched_subprocess(self):
+        new_mod = """import eventlet
+eventlet.monkey_patch()
+from eventlet.green import subprocess
+
+subprocess.Popen(['/bin/true'], stdin=subprocess.PIPE)
+print "done"
+"""
+        self.write_to_tempfile("newmod", new_mod)
+        output, lines = self.launch_subprocess('newmod')
+        self.assertEqual(output, "done\n", output)
+
+
 if __name__ == '__main__':
     main()

tests/stdlib/test_ssl.py

 # external servers.
 NetworkedTests.testConnect = lambda s: None
 NetworkedTests.testFetchServerCert = lambda s: None
+NetworkedTests.test_algorithms = lambda s: None
 
 # these don't pass because nonblocking ssl sockets don't report
 # when the socket is closed uncleanly, per the docstring on 
 ThreadedTests.testSocketServer = lambda s: None
 
 if __name__ == "__main__":
-    test_main()
+    test_main()
             raise ConnectionClosed
         raise
     if not response_line:
-        raise ConnectionClosed
+        raise ConnectionClosed(response_line)
     
     header_lines = []
     while True:
         eventlet.sleep(0) # give previous server a chance to start
         if self.killer:
             greenthread.kill(self.killer)
-            eventlet.sleep(0) # give killer a chance to kill
 
         new_kwargs = dict(max_size=128,
                           log=self.logfile,
         self.assertEqual(headers['connection'], 'close')
         self.assert_('unicode' in body)
 
+    def test_ipv6(self):
+        try:
+            sock = eventlet.listen(('::1', 0), family=socket.AF_INET6)
+        except (socket.gaierror, socket.error):  # probably no ipv6
+            return
+        log = StringIO()
+        # first thing the server does is try to log the IP it's bound to
+        def run_server():
+            try:
+                server = wsgi.server(sock=sock, log=log, site=Site())
+            except ValueError:
+                log.write('broked')
+        eventlet.spawn_n(run_server)
+        logval = log.getvalue()
+        while not logval:
+            eventlet.sleep(0.0)
+            logval = log.getvalue()
+        if 'broked' in logval:
+            self.fail('WSGI server raised exception with ipv6 socket')
+
+    def test_debug(self):
+        self.spawn_server(debug=False)
+        def crasher(env, start_response):
+            raise RuntimeError("intentional crash")
+        self.site.application = crasher
+
+        sock = eventlet.connect(('localhost', self.port))
+        fd = sock.makefile('w')
+        fd.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n')
+        fd.flush()
+        response_line, headers, body = read_http(sock)
+        self.assert_(response_line.startswith('HTTP/1.1 500 Internal Server Error'))
+        self.assertEqual(body, '')
+        self.assertEqual(headers['connection'], 'close')
+        self.assert_('transfer-encoding' not in headers)
+
+        # verify traceback when debugging enabled
+        self.spawn_server(debug=True)
+        self.site.application = crasher
+        sock = eventlet.connect(('localhost', self.port))
+        fd = sock.makefile('w')
+        fd.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n')
+        fd.flush()
+        response_line, headers, body = read_http(sock)
+        self.assert_(response_line.startswith('HTTP/1.1 500 Internal Server Error'))
+        self.assert_('intentional crash' in body)
+        self.assert_('RuntimeError' in body)
+        self.assert_('Traceback' in body)
+        self.assertEqual(headers['connection'], 'close')
+        self.assert_('transfer-encoding' not in headers)
+
+
 def read_headers(sock):
     fd = sock.makefile()
     try:
             signal.alarm(0)
             signal.signal(signal.SIGALRM, signal.SIG_DFL)
 
-        assert not got_signal, "caught alarm signal. infinite loop detected."
+        assert not got_signal, "caught alarm signal. infinite loop detected."        
 
-        
 
 if __name__ == '__main__':
     main()
 from eventlet import event, spawn, sleep, patcher
 from eventlet.hubs import get_hub, _threadlocal, use_hub
 from nose.tools import *
-from tests import mock, LimitedTestCase, skip_unless
+from tests import mock, LimitedTestCase, skip_unless_zmq
 from unittest import TestCase
 
 from threading import Thread
     zmq = None
     Hub = None
 
-def using_zmq(_f):
-    return zmq and 'zeromq' in type(get_hub()).__module__
-
-def skip_unless_zmq(func):
-    """ Decorator that skips a test if we're using the pyevent hub."""
-    return skip_unless(using_zmq)(func)
 
 class TestUpstreamDownStream(LimitedTestCase):
 
         sleep()
         msg = dict(res=None)
         done = event.Event()
+
         def rx():
             msg['res'] = rep.recv()
             done.send('done')
+
         spawn(rx)
         req.send('test')
         done.wait()
     @skip_unless_zmq
     def test_close_socket_raises_enotsup(self):
         req, rep, port = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
+
         rep.close()
         req.close()
         self.assertRaisesErrno(zmq.ENOTSUP, rep.recv)
         req, rep, port = self.create_bound_pair(zmq.REQ, zmq.REP)
         sleep()
         done = event.Event()
+
         def tx():
             tx_i = 0
             req.send(str(tx_i))
             while req.recv() != 'done':
                 tx_i += 1
                 req.send(str(tx_i))
+
         def rx():
             while True:
                 rx_i = rep.recv()
     def test_send_1k_push_pull(self):
         down, up, port = self.create_bound_pair(zmq.PUSH, zmq.PULL)
         sleep()
+
         done = event.Event()
+
         def tx():
             tx_i = 0
             while tx_i <= 1000:
                 tx_i += 1
                 down.send(str(tx_i))
+
         def rx():
             while True:
                 rx_i = up.recv()
         sub_all.setsockopt(zmq.SUBSCRIBE, '')
         sub1.setsockopt(zmq.SUBSCRIBE, 'sub1')
         sub2.setsockopt(zmq.SUBSCRIBE, 'sub2')
+
         sub_all_done = event.Event()
         sub1_done = event.Event()
         sub2_done = event.Event()
+
         sleep(0.2)
+
         def rx(sock, done_evt, msg_count=10000):
             count = 0
             while count < msg_count:
         pub, sub, port = self.create_bound_pair(zmq.PUB, zmq.SUB)
         sub.setsockopt(zmq.SUBSCRIBE, 'test')
 
+        sleep(0.2)
         sub_done = event.Event()
-        sleep(0.2)
+
         def rx(sock, done_evt):
             count = 0
             sub = 'test'
                     sock.setsockopt(zmq.UNSUBSCRIBE, 'test')
                     sock.setsockopt(zmq.SUBSCRIBE, 'done')
                     sub = 'done'
-                    #continue # We don't want to count this message
                 count += 1
             done_evt.send(count)
 
         rx_count = sub_done.wait()
         self.assertEqual(rx_count, 50)
 
+    @skip_unless_zmq
+    def test_recv_multipart_bug68(self):
+        req, rep, port = self.create_bound_pair(zmq.REQ, zmq.REP)
+        msg = ['']
+        req.send_multipart(msg)
+        recieved_msg = rep.recv_multipart()
+        self.assertEqual(recieved_msg, msg)
+
+        # Send a message back the other way
+        msg2 = [""]
+        rep.send_multipart(msg2, copy=False)
+        # When receiving a copy it's a zmq.core.message.Message you get back
+        recieved_msg = req.recv_multipart(copy=False)
+        # So it needs to be converted to a string
+        # I'm calling str(m) consciously here; Message has a .data attribute
+        # but it's private __str__ appears to be the way to go
+        self.assertEqual([str(m) for m in recieved_msg], msg2)
+
+    @skip_unless_zmq
+    def test_recv_noblock_bug76(self):
+        req, rep, port = self.create_bound_pair(zmq.REQ, zmq.REP)
+        self.assertRaisesErrno(zmq.EAGAIN, rep.recv, zmq.NOBLOCK)
+        self.assertRaisesErrno(zmq.EAGAIN, rep.recv, zmq.NOBLOCK, True)
+
+
 
 class TestThreadedContextAccess(TestCase):
     """zmq's Context must be unique within a hub
     All zmq sockets passed to the zmq_poll() function must share the same zmq
     context and must belong to the thread calling zmq_poll()
 
-    As zmq_poll is what's eventually being called then we need to insure that
+    As zmq_poll is what's eventually being called then we need to ensure that
     all sockets that are going to be passed to zmq_poll (via hub.do_poll) are
     in the same context
     """
             context = zmq.Context()
             test_result = []
             def assert_different(ctx):
-    #            assert not hasattr(_threadlocal, 'hub')
-    #            import os
-    #            os.environ['EVENTLET_HUB'] = 'zeromq'
                 hub = get_hub()
                 try:
                     this_thread_context = zmq.Context()
                 sleep(0.1)
             self.assertFalse(test_result[0])
 
+
 class TestCheckingForZMQHub(TestCase):
+
     @skip_unless_zmq
     def setUp(self):
         self.orig_hub = zmq.get_hub_name_from_instance(get_hub())
-        use_hub('poll')
+        use_hub('selects')
 
     def tearDown(self):
         use_hub(self.orig_hub)