Commits

Geert Jansen  committed 0d0a717

Adding new source files that i forgot to add in the previous
commit.

  • Participants
  • Parent commits 2ccffd0

Comments (0)

Files changed (2)

File gevent/os.py

+# Wrapper module for stdlib os. Written by Geert Jansen.
+
+"""
+This module provides cooperative versions of os.read() and os.write().
+On Posix platforms this uses non-blocking IO, on Windows a threadpool
+is used.
+"""
+
+from __future__ import absolute_import
+
+import os
+import sys
+from gevent.hub import get_hub, reinit
+from gevent.socket import EBADF, EAGAIN
+
+try:
+    import fcntl
+except ImportError:
+    fcntl = None
+
+__implements__ = ['read', 'write', 'fork']
+__all__ = __implements__
+
+os_read = os.read
+os_write = os.write
+os_fork = os.fork
+
+
+def _map_errors(func, *args):
+    """Map IOError to OSError."""
+    try:
+        return func(*args)
+    except IOError, e:
+        # IOError is structered like OSError in that it has two args: an error
+        # number and a error string. So we can just re-raise OSError passing it
+        # the IOError args. If at some point we want to catch other errors and
+        # map those to OSError as well, we need to make sure that it follows
+        # the OSError convention and it gets passed a valid error number and
+        # error string.
+        raise OSError(*e.args), None, sys.exc_info()[2]
+
+
+def posix_read(fd, n):
+    """Read up to `n` bytes from file descriptor `fd`. Return a string
+    containing the bytes read. If end-of-file is reached, an empty string
+    is returned."""
+    while True:
+        flags = _map_errors(fcntl.fcntl, fd, fcntl.F_GETFL, 0)
+        if not flags & os.O_NONBLOCK:
+            _map_errors(fcntl.fcntl, fd, fcntl.F_SETFL, flags|os.O_NONBLOCK)
+        try:
+            return os_read(fd, n)
+        except OSError, e:
+            if e.errno != EAGAIN:
+                raise
+            sys.exc_clear()
+        finally:
+            # Be sure to restore the fcntl flags before we switch into the hub.
+            # Sometimes multiple file descriptors share the same fcntl flags
+            # (e.g. when using ttys/ptys). Those other file descriptors are
+            # impacted by our change of flags, so we should restore them
+            # before any other code can possibly run.
+            if not flags & os.O_NONBLOCK:
+                _map_errors(fcntl.fcntl, fd, fcntl.F_SETFL, flags)
+        hub = get_hub()
+        event = hub.loop.io(fd, 1)
+        _map_errors(hub.wait, event)
+
+def posix_write(fd, buf):
+    """Write bytes from buffer `buf` to file descriptor `fd`. Return the
+    number of bytes written."""
+    while True:
+        flags = _map_errors(fcntl.fcntl, fd, fcntl.F_GETFL, 0)
+        if not flags & os.O_NONBLOCK:
+            _map_errors(fcntl.fcntl, fd, fcntl.F_SETFL, flags|os.O_NONBLOCK)
+        try:
+            return os_write(fd, buf)
+        except OSError, e:
+            if e.errno != EAGAIN:
+                raise
+            sys.exc_clear()
+        finally:
+            # See note in posix_read().
+            if not flags & os.O_NONBLOCK:
+                _map_errors(fcntl.fcntl, fd, fcntl.F_SETFL, flags)
+        hub = get_hub()
+        event = hub.loop.io(fd, 2)
+        _map_errors(hub.wait, event)
+
+
+def threadpool_read(fd, n):
+    """Read up to `n` bytes from file descriptor `fd`. Return a string
+    containing the bytes read. If end-of-file is reached, an empty string
+    is returned."""
+    threadpool = get_hub().threadpool
+    return _map_errors(threadpool.apply, os_read, (fd, n))
+
+def threadpool_write(fd, buf):
+    """Write bytes from buffer `buf` to file descriptor `fd`. Return the
+    number of bytes written."""
+    threadpool = get_hub().threadpool
+    return _map_errors(threadpool.apply, os_write, (fd, buf))
+
+
+if fcntl is None:
+    read = threadpool_read
+    write = threadpool_write
+else:
+    read = posix_read
+    write = posix_write
+
+
+if hasattr(os, 'fork'):
+
+    def fork():
+        result = os_fork()
+        if not result:
+            _map_errors(reinit)
+        return result
+
+else:
+    __all__.remove('fork')

File greentest/test_os.py

+from gevent import monkey; monkey.patch_all()
+
+import os
+from select import PIPE_BUF
+from greentest import TestCase, main
+from gevent import Greenlet, joinall
+from gevent.socket import EAGAIN
+from errno import EINTR
+try:
+    import fcntl
+except ImportError:
+    fcntl = None
+
+
+class TestOS(TestCase):
+
+    __timeout__ = 5
+
+    def test_if_pipe_blocks(self):
+        r, w = os.pipe()
+        # set nbytes such that for sure it is > maximum pipe buffer
+        nbytes = 1000000
+        block = 'x' * 4096
+        buf = buffer(block)
+        # Lack of "nonlocal" keyword in Python 2.x:
+        bytesread = [0]
+        byteswritten = [0]
+        def produce():
+            while byteswritten[0] != nbytes:
+                bytesleft = nbytes - byteswritten[0]
+                try:
+                    byteswritten[0] += os.write(w, buf[:min(bytesleft, 4096)])
+                except OSError:
+                    code = sys.exc_info()[1].args[0]
+                    assert code != EAGAIN
+                    if code == EINTR:
+                        continue
+                    raise
+        def consume():
+            while bytesread[0] != nbytes:
+                bytesleft = nbytes - bytesread[0]
+                try:
+                    bytesread[0] += len(os.read(r, min(bytesleft, 4096)))
+                except OSError:
+                    code = sys.exc_info()[1].args[0]
+                    assert code != EAGAIN
+                    if code == EINTR:
+                        continue
+                    raise
+        producer = Greenlet(produce)
+        producer.start()
+        consumer = Greenlet(consume)
+        consumer.start_later(1)
+        # If patching was not succesful, the producer will have filled
+        # the pipe before the consumer starts, and would block the entire
+        # process. Therefore the next line would never finish.
+        joinall([producer, consumer])
+        assert bytesread[0] == nbytes
+        assert bytesread[0] == byteswritten[0]
+ 
+    def test_fd_flags_restored(self):
+        if fcntl is None:
+            return
+        r, w = os.pipe()
+        flags = fcntl.fcntl(r, fcntl.F_GETFL, 0)
+        assert not flags & os.O_NONBLOCK
+        flags = fcntl.fcntl(w, fcntl.F_GETFL, 0)
+        assert not flags & os.O_NONBLOCK
+        os.write(w, 'foo')
+        buf = os.read(r, 3)
+        assert buf == 'foo'
+        flags = fcntl.fcntl(r, fcntl.F_GETFL, 0)
+        assert not flags & os.O_NONBLOCK
+        flags = fcntl.fcntl(w, fcntl.F_GETFL, 0)
+        assert not flags & os.O_NONBLOCK
+
+
+if __name__ == '__main__':
+    main()