1. Aldona Majorek
  2. eventlet-py3k

Commits

amajorek  committed 354b6d8

Reimplemented GreenPipe without using file object.
file.write is not returning number of bytes writen and partial writes were not handled properly.
New implementation is using os module calls which support partial writes.
It also implements missing calls from file object (like seek, tell, truncate, ...).
The later is not very usefull, because regular files never return EAGAIN.
New GreenPipe can be constructed from int, string or file object.

  • Participants
  • Parent commits e35514a
  • Branches default

Comments (0)

Files changed (7)

File eventlet/green/os.py

View file
 for var in dir(os_orig):
     exec "%s = os_orig.%s" % (var, var)
 
-__original_fdopen__ = os_orig.fdopen
-def fdopen(*args, **kw):
+def fdopen(fd, *args, **kw):
     """fdopen(fd [, mode='r' [, bufsize]]) -> file_object
     
     Return an open file object connected to a file descriptor."""
-    return greenio.GreenPipe(__original_fdopen__(*args, **kw))
+    if not isinstance(fd, int):
+        raise TypeError('fd should be int, not %r' % fd)
+    return greenio.GreenPipe(fd, *args, **kw)
 
 __original_read__ = os_orig.read
 def read(fd, n):

File eventlet/green/subprocess.py

View file
     # this __init__() override is to wrap the pipes for eventlet-friendly
     # non-blocking I/O, don't even bother overriding it on Windows.
     if not subprocess_orig.mswindows:
-        def __init__(self, *args, **kwds):
+        def __init__(self, args, bufsize=0, *argss, **kwds):
             # Forward the call to base-class constructor
-            subprocess_orig.Popen.__init__(self, *args, **kwds)
+            subprocess_orig.Popen.__init__(self, args, 0, *argss, **kwds)
             # Now wrap the pipes, if any. This logic is loosely borrowed from 
             # eventlet.processes.Process.run() method.
             for attr in "stdin", "stdout", "stderr":
                 pipe = getattr(self, attr)
                 if pipe is not None:
-                    greenio.set_nonblocking(pipe)
-                    wrapped_pipe = greenio.GreenPipe(pipe)
-                    # The default 'newlines' attribute is '\r\n', which aren't
-                    # sent over pipes.
-                    wrapped_pipe.newlines = '\n'
+                    wrapped_pipe = greenio.GreenPipe(pipe, pipe.mode, bufsize)
                     setattr(self, attr, wrapped_pipe)
         __init__.__doc__ = subprocess_orig.Popen.__init__.__doc__
 

File eventlet/greenio.py

View file
     def gettimeout(self):
         return self._timeout
 
+class _SocketDuckForFd(object):
+    """ Class implementing all socket method used by _fileobject in cooperative manner using low level os I/O calls."""
+    def __init__(self, fileno):
+        self._fileno = fileno
 
-class GreenPipe(object):
-    """ GreenPipe is a cooperatively-yielding wrapper around OS pipes.
+    @property
+    def _sock(self):
+        return self
+
+    def fileno(self):
+        return self._fileno
+
+    def recv(self, buflen):
+        while True:
+            try:
+                data = os.read(self._fileno, buflen)
+                return data
+            except OSError, e:
+                if get_errno(e) != errno.EAGAIN:
+                    raise IOError(*e.args)
+            trampoline(self, read=True)
+
+    def sendall(self, data):
+        len_data = len(data)
+        os_write = os.write
+        fileno = self._fileno
+        try:
+            total_sent = os_write(fileno, data)
+        except OSError, e:
+            if get_errno(e) != errno.EAGAIN:
+                raise IOError(*e.args)
+            total_sent = 0
+        while total_sent <len_data:
+            trampoline(self, write=True)
+            try:
+                total_sent += os_write(fileno, data[total_sent:])
+            except OSError, e:
+                if get_errno(e) != errno. EAGAIN:
+                    raise IOError(*e.args)
+
+    def __del__(self):
+        os.close(self._fileno)
+
+    def __repr__(self):  
+        return "%s:%d" % (self.__class__.__name__, self._fileno)
+
+def _operationOnClosedFile(*args, **kwargs):
+    raise ValueError("I/O operation on closed file")
+
+class GreenPipe(_fileobject):
     """
-    newlines = '\n'
-    def __init__(self, fd):
-        set_nonblocking(fd)
-        self.fd = fd
-        self.closed = False
-        self.recvbuffer = ''
+    GreenPipe is a cooperative replacement for file class.
+    It will cooperate on pipes. It will block on regular file.
+    Differneces from file class:
+    - mode is r/w property. Should re r/o
+    - encoding property not implemented
+    - write/writelines will not raise TypeError exception when non-string data is written
+      it will write str(data) instead
+    - Universal new lines are not supported and newlines property not implementeded
+    - file argument can be descriptor, file name or file object.
+    """
+    def __init__(self, f, mode='r', bufsize=-1):
+        if not isinstance(f, (basestring, int, file)):
+            raise TypeError('f(ile) should be int, str, unicode or file, not %r' % f)
+
+        if isinstance(f, basestring):
+            f = open(f, mode, bufsize=0)
+ 
+        if isinstance(f, int):
+            fileno = f
+            self._name = "<fd:%d>" % fileno
+        else:
+            fileno = os.dup(f.fileno())
+            self._name = f.name
+            if f.mode != mode:
+                raise ValueError('file.mode %r does not match mode parameter %r' % (f.mode, mode))
+            self._name = f.name
+            f.close()
+
+        super(GreenPipe, self).__init__(_SocketDuckForFd(fileno), mode, bufsize)
+        set_nonblocking(self)
+        self.softspace = 0
+
+    @property
+    def name(self): return self._name
+
+    def __repr__(self):
+        return "<%s %s %r, mode %r at 0x%x>" % (
+            self.closed and 'closed' or 'open',
+            self.__class__.__name__,
+            self.name,
+            self.mode,
+            (id(self) < 0) and (sys.maxint +id(self)) or id(self))
 
     def close(self):
-        self.fd.close()
-        self.closed = True
+        super(GreenPipe, self).close()
+        for method in ['fileno', 'flush', 'isatty', 'next', 'read', 'readinto',
+                   'readline', 'readlines', 'seek', 'tell', 'truncate',
+                   'write', 'xreadlines', '__iter__', 'writelines']:
+            setattr(self, method, _operationOnClosedFile)
 
-    def fileno(self):
-        return self.fd.fileno()
+    if getattr(file, '__enter__', None):
+        def __enter__(self):
+            return self
 
-    def _recv(self, buflen):
-        fd = self.fd
-        buf = self.recvbuffer
-        if buf:
-            chunk, self.recvbuffer = buf[:buflen], buf[buflen:]
-            return chunk
-        while True:
+        def __exit__(self, *args):
+            self.close()
+
+    def xreadlines(self, buffer):
+        return iterator(self)
+
+    def readinto(self, buf):
+        data = self.read(len(buf)) #FIXME could it be done without allocating intermediate?
+        n = len(data)
+        try:
+            buf[:n] = data
+        except TypeError, err:
+            if not isinstance(buf, array.array):
+                raise err
+            buf[:n] = array.array('c', data)
+        return n
+
+    def _get_readahead_len(self):
+        try:
+            return len(self._rbuf.getvalue()) # StringIO in 2.5
+        except AttributeError:
+            return len(self._rbuf) # str in 2.4
+
+    def _clear_readahead_buf(self):
+        len = self._get_readahead_len()
+        if len>0:
+            self.read(len)
+
+    def tell(self):
+        self.flush()
+        try:
+            return os.lseek(self.fileno(), 0, 1) - self._get_readahead_len()
+        except OSError, e:
+            raise IOError(*e.args)
+
+    def seek(self, offset, whence=0):
+        self.flush()
+        if whence == 1 and offset==0: # tell synonym
+            return self.tell()
+        if whence == 1: # adjust offset by what is read ahead
+            offset -= self.get_readahead_len()
+        try:
+            rv = os.lseek(self.fileno(), offset, whence)
+        except OSError, e:
+            raise IOError(*e.args)
+        else:
+            self._clear_readahead_buf()
+            return rv
+
+    if getattr(file, "truncate", None): # not all OSes implement truncate
+        def truncate(self, size=-1):
+            self.flush()
+            if size ==-1:
+                size = self.tell()
             try:
-                return fd.read(buflen)
-            except IOError, e:
-                if get_errno(e) != errno.EAGAIN:
-                    return ''
-            except socket.error, e:
-                if get_errno(e) == errno.EPIPE:
-                    return ''
-                raise
-            trampoline(fd, read=True)
+                rv = os.ftruncate(self.fileno(), size)
+            except OSError, e:
+                raise IOError(*e.args)
+            else:
+                self.seek(size) # move position&clear buffer
+                return rv
 
-
-    def read(self, size=None):
-        """read at most size bytes, returned as a string."""
-        accum = ''
-        while True:
-            if size is None:
-                recv_size = BUFFER_SIZE
-            else:
-                recv_size = size - len(accum)
-            chunk =  self._recv(recv_size)
-            accum += chunk
-            if chunk == '':
-                return accum
-            if size is not None and len(accum) >= size:
-                return accum
-
-    def write(self, data):
-        fd = self.fd
-        while True:
-            try:
-                fd.write(data)
-                fd.flush()
-                return len(data)
-            except IOError, e:
-                if get_errno(e) != errno.EAGAIN:
-                    raise
-            except ValueError, e:
-                # what's this for?
-                pass
-            except socket.error, e:
-                if get_errno(e) != errno.EPIPE:
-                    raise
-            trampoline(fd, write=True)
-
-    def flush(self):
-        pass
-
-    def readuntil(self, terminator, size=None):
-        buf, self.recvbuffer = self.recvbuffer, ''
-        checked = 0
-        if size is None:
-            while True:
-                found = buf.find(terminator, checked)
-                if found != -1:
-                    found += len(terminator)
-                    chunk, self.recvbuffer = buf[:found], buf[found:]
-                    return chunk
-                checked = max(0, len(buf) - (len(terminator) - 1))
-                d = self._recv(BUFFER_SIZE)
-                if not d:
-                    break
-                buf += d
-            return buf
-        while len(buf) < size:
-            found = buf.find(terminator, checked)
-            if found != -1:
-                found += len(terminator)
-                chunk, self.recvbuffer = buf[:found], buf[found:]
-                return chunk
-            checked = len(buf)
-            d = self._recv(BUFFER_SIZE)
-            if not d:
-                break
-            buf += d
-        chunk, self.recvbuffer = buf[:size], buf[size:]
-        return chunk
-
-    def readline(self, size=None):
-        return self.readuntil(self.newlines, size=size)
-
-    def __iter__(self):
-        return self.xreadlines()
-
-    def __enter__(self):
-        return self
-
-    def __exit__(self, *exc_info):
-        self.close()
-
-    def xreadlines(self, size=None):
-        if size is None:
-            while True:
-                line = self.readline()
-                if not line:
-                    break
-                yield line
-        else:
-            while size > 0:
-                line = self.readline(size)
-                if not line:
-                    break
-                yield line
-                size -= len(line)
-
-    def writelines(self, lines):
-        for line in lines:
-            self.write(line)
+    def isatty(self):
+        try:
+            return os.isatty(self.fileno())
+        except OSError, e:
+            raise IOError(*e.args)
 
 
 # import SSL module here so we can refer to greenio.SSL.exceptionclass

File eventlet/processes.py

View file
         self.popen4 = popen2.Popen4([self.command] + self.args)
         child_stdout_stderr = self.popen4.fromchild
         child_stdin = self.popen4.tochild
-        greenio.set_nonblocking(child_stdout_stderr)
-        greenio.set_nonblocking(child_stdin)
-        self.child_stdout_stderr = greenio.GreenPipe(child_stdout_stderr)
-        self.child_stdout_stderr.newlines = '\n'  # the default is
-                                        # \r\n, which aren't sent over
-                                        # pipes
-        self.child_stdin = greenio.GreenPipe(child_stdin)
-        self.child_stdin.newlines = '\n'
+        self.child_stdout_stderr = greenio.GreenPipe(child_stdout_stderr, child_stdout_stderr.mode, 0)
+        self.child_stdin = greenio.GreenPipe(child_stdin, child_stdin.mode, 0)
 
         self.sendall = self.child_stdin.write
         self.send = self.child_stdin.write

File eventlet/tpool.py

View file
         _setup_already = True
     try:
         _rpipe, _wpipe = os.pipe()
-        _wfile = os.fdopen(_wpipe,"wb",0)
-        _rfile = os.fdopen(_rpipe,"rb",0)
-        ## Work whether or not wrap_pipe_with_coroutine_pipe was called
-        if not isinstance(_rfile, greenio.GreenPipe):
-            _rfile = greenio.GreenPipe(_rfile)
+        _wfile = greenio.GreenPipe(_wpipe, 'wb', 0)
+        _rfile = greenio.GreenPipe(_rpipe, 'rb', 0)
     except ImportError:
         # This is Windows compatibility -- use a socket instead of a pipe because
         # pipes don't really exist on Windows.

File tests/greenio_test.py

View file
         # also ensures that readline() terminates on '\n' and '\r\n'
         r, w = os.pipe()
 
-        r = os.fdopen(r)
-        w = os.fdopen(w, 'w')
-
         r = greenio.GreenPipe(r)
-        w = greenio.GreenPipe(w)
+        w = greenio.GreenPipe(w, 'w')
 
         def writer():
             eventlet.sleep(.1)
     def test_pipe_writes_large_messages(self):
         r, w = os.pipe()
 
-        r = os.fdopen(r)
-        w = os.fdopen(w, 'w', 0)
-
         r = greenio.GreenPipe(r)
-        w = greenio.GreenPipe(w)
+        w = greenio.GreenPipe(w, 'w')
 
         large_message = "".join([1024*chr(i) for i in xrange(65)])
         def writer():

File tests/greenpipe_test_with_statement.py

View file
         # ensure using a pipe as a context actually closes it.
         r, w = os.pipe()
 
-        r = os.fdopen(r)
-        w = os.fdopen(w, 'w')
-
         r = greenio.GreenPipe(r)
-        w = greenio.GreenPipe(w)
+        w = greenio.GreenPipe(w, 'w')
 
         with r:
             pass