Commits

Antoine Pitrou committed a71f3ca

Fix openat support after big API refactoring in 3.3beta

Comments (0)

Files changed (2)

         return parts[-1].partition('.')[0].upper() in self.reserved_names
 
 
-_NO_FD = getattr(os, "AT_FDCWD", -100)
+_NO_FD = None
 
 class _PosixFlavour(_Flavour):
     sep = '/'
                     # work to do than the last time.
                     raise ValueError("Symlink loop from %r" % cur)
                 try:
-                    target = accessor.readlinkat(resolved_fd, cur, part)
+                    target = accessor.readlink(cur, part, dir_fd=resolved_fd)
                 except OSError as e:
                     if e.errno != EINVAL:
                         raise
                     # Not a symlink
-                    resolved_fd = accessor.walk_down(resolved_fd, cur, part)
+                    resolved_fd = accessor.walk_down(cur, part, dir_fd=resolved_fd)
                     resolved = cur
                 else:
                     # Take note of remaining work from this symlink
     accessing paths on the filesystem."""
 
 
-# We need all of these
-_at_functions = [
-    'fchmodat', 'fdlistdir', 'fstatat', 'mkdirat', 'openat', 'readlinkat',
-    'renameat', 'symlinkat', 'unlinkat',
-    ]
-supports_openat = all(hasattr(os, fn) for fn in _at_functions)
+if sys.version_info >= (3, 3):
+    supports_openat = (os.listdir in os.supports_fd and
+        os.supports_dir_fd >= { os.chmod, os.stat, os.mkdir, os.open,
+                                os.readlink, os.rename, os.symlink, os.unlink, }
+    )
+else:
+    supports_openat = False
 
 if supports_openat:
 
-    _AT_SYMLINK_NOFOLLOW = os.AT_SYMLINK_NOFOLLOW
-    _AT_REMOVE_DIR = os.AT_REMOVEDIR
-
     def _fdnamepair(pathobj):
         """Get a (parent fd, name) pair from a path object, suitable for use
         with the various *at functions."""
 
         def _wrap_atfunc(atfunc):
             @wraps(atfunc)
-            def wrapped(pathobj, *args):
+            def wrapped(pathobj, *args, **kwargs):
                 parent_fd, name = _fdnamepair(pathobj)
-                return atfunc(parent_fd, name, *args)
+                return atfunc(name, *args, dir_fd=parent_fd, **kwargs)
             return staticmethod(wrapped)
 
         def _wrap_binary_atfunc(atfunc):
             @wraps(atfunc)
-            def wrapped(pathobjA, pathobjB, *args):
+            def wrapped(pathobjA, pathobjB, *args, **kwargs):
                 parent_fd_A, nameA = _fdnamepair(pathobjA)
                 # We allow pathobjB to be a plain str, for convenience
                 if isinstance(pathobjB, Path):
                 else:
                     # If it's a str then at best it's cwd-relative
                     parent_fd_B, nameB = _NO_FD, str(pathobjB)
-                return atfunc(parent_fd_A, nameA, parent_fd_B, nameB, *args)
+                return atfunc(nameA, nameB, *args,
+                              src_dir_fd=parent_fd_A, dst_dir_fd=parent_fd_B,
+                              **kwargs)
             return staticmethod(wrapped)
 
-        stat = _wrap_atfunc(os.fstatat)
+        stat = _wrap_atfunc(os.stat)
 
-        def lstat(self, pathobj):
-            return self.stat(pathobj, _AT_SYMLINK_NOFOLLOW)
+        lstat = _wrap_atfunc(os.lstat)
 
-        open = _wrap_atfunc(os.openat)
+        open = _wrap_atfunc(os.open)
 
-        chmod = _wrap_atfunc(os.fchmodat)
+        chmod = _wrap_atfunc(os.chmod)
 
         def lchmod(self, pathobj, mode):
-            return self.chmod(pathobj, mode, _AT_SYMLINK_NOFOLLOW)
+            return self.chmod(pathobj, mode, follow_symlinks=False)
 
-        mkdir = _wrap_atfunc(os.mkdirat)
+        mkdir = _wrap_atfunc(os.mkdir)
 
-        unlink = _wrap_atfunc(os.unlinkat)
+        unlink = _wrap_atfunc(os.unlink)
 
-        def rmdir(self, pathobj):
-            self.unlink(pathobj, _AT_REMOVE_DIR)
+        rmdir = _wrap_atfunc(os.rmdir)
 
         def listdir(self, pathobj):
             fd = self._make_fd(pathobj, tolerant=False)
-            return os.fdlistdir(fd)
+            return os.listdir(fd)
 
-        rename = _wrap_binary_atfunc(os.renameat)
+        rename = _wrap_binary_atfunc(os.rename)
 
         def symlink(self, target, pathobj, target_is_directory):
             parent_fd, name = _fdnamepair(pathobj)
-            os.symlinkat(str(target), parent_fd, name)
+            os.symlink(str(target), name, dir_fd=parent_fd)
 
         def _make_fd(self, pathobj, tolerant=True):
             fd = pathobj._cached_fd
                 return newpath
             parent_fd = pathfd
             for part in parts[:-1]:
-                fd = os.openat(parent_fd, part, os.O_RDONLY)
+                fd = os.open(part, os.O_RDONLY, dir_fd=parent_fd)
                 if parent_fd != pathfd:
                     # Forget intermediate fds
                     os.close(parent_fd)
             return newpath
 
         # Helpers for resolve()
-        def walk_down(self, dirfd, path, name):
-            if dirfd != _NO_FD:
+        def walk_down(self, path, name, *, dir_fd):
+            if dir_fd != _NO_FD:
                 try:
-                    return os.openat(dirfd, name, os.O_RDONLY)
+                    return os.open(name, os.O_RDONLY, dir_fd=dir_fd)
                 finally:
-                    os.close(dirfd)
+                    os.close(dir_fd)
             else:
                 return os.open(path, os.O_RDONLY)
 
-        def readlinkat(self, dirfd, path, name):
-            if dirfd != _NO_FD:
-                return os.readlinkat(dirfd, name)
+        def readlink(self, path, name, *, dir_fd):
+            if dir_fd != _NO_FD:
+                return os.readlink(name, dir_fd=dir_fd)
             else:
                 return os.readlink(path)
 
         return None
 
     # Helpers for resolve()
-    def walk_down(self, dirfd, path, name):
-        assert dirfd == _NO_FD
+    def walk_down(self, path, name, *, dir_fd):
+        assert dir_fd == _NO_FD
         return _NO_FD
 
-    def readlinkat(self, dirfd, path, name):
-        assert dirfd == _NO_FD
+    def readlink(self, path, name, *, dir_fd):
+        assert dir_fd == _NO_FD
         return os.readlink(path)
 
 _normal_accessor = _NormalAccessor()
             self._readlinkat_fds = []
             self._walk_fds = []
 
-        def readlinkat(self, dirfd, path, name):
-            self._readlinkat_fds.append((dirfd, name))
-            return super().readlinkat(dirfd, path, name)
+        def readlink(self, path, name, *, dir_fd):
+            self._readlinkat_fds.append((dir_fd, name))
+            return super().readlink(path, name, dir_fd=dir_fd)
 
-        def walk_down(self, dirfd, path, name):
-            self._walk_fds.append((dirfd, name))
-            return super().walk_down(dirfd, path, name)
+        def walk_down(self, path, name, *, dir_fd):
+            self._walk_fds.append((dir_fd, name))
+            return super().walk_down(path, name, dir_fd=dir_fd)
 
 
 class Mock:
     def __enter__(self):
         def wrapper(*args, **kwargs):
             self.calls += 1
+            self.call_params.append((args, kwargs))
             return self.orig_func(*args, **kwargs)
         self.calls = 0
+        self.call_params = []
         setattr(self.module, self.qualname, wrapper)
         return self
 
         self.assertEqual(len(pathlib.Path._wrs), n)
 
     def test_iter(self):
-        with Mock("os.fdlistdir") as mock_fdlistdir, \
-             Mock("os.listdir") as mock_listdir:
+        with Mock("os.listdir") as mock_listdir:
             super().test_iter()
-        self.assertEqual(mock_fdlistdir.calls, 1)
-        self.assertEqual(mock_listdir.calls, 0)
+        self.assertEqual(mock_listdir.calls, 1)
+        # A file descriptor was passed
+        args = mock_listdir.call_params[0][0]
+        fd = args[0]
+        self.assertIsInstance(fd, int)
+        self.assertGreaterEqual(fd, 0)
 
     def test_cwd(self):
         p = pathlib.PosixPath.cwd(use_openat=True)
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.