Ned Batchelder avatar Ned Batchelder committed e4704f5

A correct implementation of os.fstat, with a test.

Comments (0)

Files changed (2)

pypy/translator/sandbox/sandlib.py

     def __init__(self, *args, **kwds):
         super(VirtualizedSandboxedProc, self).__init__(*args, **kwds)
         self.virtual_root = self.build_virtual_root()
-        self.open_fds = {}   # {virtual_fd: real_file_object}
+        self.open_fds = {}   # {virtual_fd: (real_file_object, node)}
 
     def build_virtual_root(self):
         raise NotImplementedError("must be overridden")
     def do_ll_os__ll_os_isatty(self, fd):
         return self.virtual_console_isatty and fd in (0, 1, 2)
 
-    def allocate_fd(self, f):
+    def allocate_fd(self, f, node=None):
         for fd in self.virtual_fd_range:
             if fd not in self.open_fds:
-                self.open_fds[fd] = f
+                self.open_fds[fd] = (f, node)
                 return fd
         else:
             raise OSError(errno.EMFILE, "trying to open too many files")
 
-    def get_file(self, fd):
+    def get_fd(self, fd, throw=True):
+        """Get the objects implementing file descriptor `fd`.
+
+        Returns a pair, (open file, vfs node)
+
+        `throw`: if true, raise OSError for bad fd, else return (None, None).
+        """
         try:
-            return self.open_fds[fd]
+            f, node = self.open_fds[fd]
         except KeyError:
-            raise OSError(errno.EBADF, "bad file descriptor")
+            if throw:
+                raise OSError(errno.EBADF, "bad file descriptor")
+            return None, None
+        return f, node
+
+    def get_file(self, fd, throw=True):
+        """Return the open file for file descriptor `fd`."""
+        return self.get_fd(fd, throw)[0]
 
     def do_ll_os__ll_os_open(self, vpathname, flags, mode):
         node = self.get_node(vpathname)
             raise OSError(errno.EPERM, "write access denied")
         # all other flags are ignored
         f = node.open()
-        return self.allocate_fd(f)
+        return self.allocate_fd(f, node)
 
     def do_ll_os__ll_os_close(self, fd):
         f = self.get_file(fd)
         f.close()
 
     def do_ll_os__ll_os_read(self, fd, size):
-        try:
-            f = self.open_fds[fd]
-        except KeyError:
+        f = self.get_file(fd, throw=False)
+        if f is None:
             return super(VirtualizedSandboxedProc, self).do_ll_os__ll_os_read(
                 fd, size)
         else:
             return f.read(min(size, 256*1024))
 
     def do_ll_os__ll_os_fstat(self, fd):
-        try:
-            f = self.open_fds[fd]
-        except KeyError:
-            return super(VirtualizedSandboxedProc, self).do_ll_os__ll_os_fstat(fd)
-        else:
-            return os.stat(f.name)      # Isn't there a better way to do this?
+        f, node = self.get_fd(fd)
+        return node.stat()
     do_ll_os__ll_os_fstat.resulttype = s_StatResult
 
     def do_ll_os__ll_os_lseek(self, fd, pos, how):
 
     def do_ll_os__ll_os_read(self, fd, size):
         if fd in self.sockets:
-            return self.open_fds[fd].recv(size)
+            return self.get_file(fd).recv(size)
         return super(VirtualizedSocketProc, self).do_ll_os__ll_os_read(
             fd, size)
 
     def do_ll_os__ll_os_write(self, fd, data):
         if fd in self.sockets:
-            return self.open_fds[fd].send(data)
+            return self.get_file(fd).send(data)
         return super(VirtualizedSocketProc, self).do_ll_os__ll_os_write(
             fd, data)
 

pypy/translator/sandbox/test/test_sandlib.py

     output, error = proc.communicate("")
     assert output == "All ok!\n"
     assert error == ""
+
+def test_fstat():
+    def compare(a, b, i):
+        if a != b:
+            print "stat and fstat differ @%d: %s != %s" % (i, a, b)
+
+    def entry_point(argv):
+        try:
+            # Open a file, and compare stat and fstat
+            fd = os.open('/hi.txt', os.O_RDONLY, 0777)
+            st = os.stat('/hi.txt')
+            fs = os.fstat(fd)
+            compare(st[0], fs[0], 0)
+            compare(st[1], fs[1], 1)
+            compare(st[2], fs[2], 2)
+            compare(st[3], fs[3], 3)
+            compare(st[4], fs[4], 4)
+            compare(st[5], fs[5], 5)
+            compare(st[6], fs[6], 6)
+            compare(st[7], fs[7], 7)
+            compare(st[8], fs[8], 8)
+            compare(st[9], fs[9], 9)
+        except OSError, e:
+            print "OSError: %s" % (e.errno,)
+        print "All ok!"
+        return 0
+    exe = compile(entry_point)
+
+    proc = SandboxedProcWithFiles([exe])
+    output, error = proc.communicate("")
+    assert output == "All ok!\n"
+    assert error == ""
+
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.