Commits

Armin Rigo committed 8275a73

Add os.fchown() and os.fchmod().

Comments (0)

Files changed (5)

pypy/module/posix/__init__.py

         interpleveldefs['chown'] = 'interp_posix.chown'
     if hasattr(os, 'lchown'):
         interpleveldefs['lchown'] = 'interp_posix.lchown'
+    if hasattr(os, 'fchown'):
+        interpleveldefs['fchown'] = 'interp_posix.fchown'
+    if hasattr(os, 'fchmod'):
+        interpleveldefs['fchmod'] = 'interp_posix.fchmod'
     if hasattr(os, 'ftruncate'):
         interpleveldefs['ftruncate'] = 'interp_posix.ftruncate'
     if hasattr(os, 'fsync'):

pypy/module/posix/interp_posix.py

     except OSError, e:
         raise wrap_oserror2(space, e, w_path)
 
+@unwrap_spec(mode=c_int)
+def fchmod(space, w_fd, mode):
+    """Change the access permissions of the file given by file
+descriptor fd."""
+    fd = space.c_filedescriptor_w(w_fd)
+    try:
+        os.fchmod(fd, mode)
+    except OSError, e:
+        raise wrap_oserror(space, e)
+
 def rename(space, w_old, w_new):
     "Rename a file or directory."
     try:
 
 @unwrap_spec(path='str0', uid=c_uid_t, gid=c_gid_t)
 def chown(space, path, uid, gid):
+    """Change the owner and group id of path to the numeric uid and gid."""
     check_uid_range(space, uid)
     check_uid_range(space, gid)
     try:
         os.chown(path, uid, gid)
     except OSError, e:
         raise wrap_oserror(space, e, path)
-    return space.w_None
 
 @unwrap_spec(path='str0', uid=c_uid_t, gid=c_gid_t)
 def lchown(space, path, uid, gid):
+    """Change the owner and group id of path to the numeric uid and gid.
+This function will not follow symbolic links."""
     check_uid_range(space, uid)
     check_uid_range(space, gid)
     try:
         os.lchown(path, uid, gid)
     except OSError, e:
         raise wrap_oserror(space, e, path)
-    return space.w_None
+
+@unwrap_spec(uid=c_uid_t, gid=c_gid_t)
+def fchown(space, w_fd, uid, gid):
+    """Change the owner and group id of the file given by file descriptor
+fd to the numeric uid and gid."""
+    fd = space.c_filedescriptor_w(w_fd)
+    check_uid_range(space, uid)
+    check_uid_range(space, gid)
+    try:
+        os.fchown(fd, uid, gid)
+    except OSError, e:
+        raise wrap_oserror(space, e)
 
 def getloadavg(space):
     try:

pypy/module/posix/test/test_posix2.py

             os.symlink('foobar', self.path)
             os.lchown(self.path, os.getuid(), os.getgid())
 
+    if hasattr(os, 'fchown'):
+        def test_fchown(self):
+            os = self.posix
+            f = open(self.path, "w")
+            os.fchown(f.fileno(), os.getuid(), os.getgid())
+            f.close()
+
+    if hasattr(os, 'chmod'):
+        def test_chmod(self):
+            os = self.posix
+            os.unlink(self.path)
+            raises(OSError, os.chmod, self.path, 0600)
+            f = open(self.path, "w")
+            f.write("this is a test")
+            f.close()
+            os.chmod(self.path, 0200)
+            assert (os.stat(self.path).st_mode & 0777) == 0200
+
+    if hasattr(os, 'fchmod'):
+        def test_fchmod(self):
+            os = self.posix
+            f = open(self.path, "w")
+            os.fchmod(f.fileno(), 0200)
+            assert (os.fstat(f.fileno()).st_mode & 0777) == 0200
+            f.close()
+            assert (os.stat(self.path).st_mode & 0777) == 0200
+
     if hasattr(os, 'mkfifo'):
         def test_mkfifo(self):
             os = self.posix

pypy/rpython/module/ll_os.py

         return extdef([str0, int, int], None, "ll_os.ll_os_lchown",
                       llimpl=os_lchown_llimpl)
 
+    @registering_if(os, 'fchown')
+    def register_os_fchown(self):
+        os_fchown = self.llexternal('fchown',[rffi.INT, rffi.INT, rffi.INT],
+                                    rffi.INT)
+
+        def os_fchown_llimpl(fd, uid, gid):
+            res = os_fchown(fd, uid, gid)
+            if res == -1:
+                raise OSError(rposix.get_errno(), "os_fchown failed")
+
+        return extdef([int, int, int], None, "ll_os.ll_os_fchown",
+                      llimpl=os_fchown_llimpl)
+
     @registering_if(os, 'readlink')
     def register_os_readlink(self):
         os_readlink = self.llexternal('readlink',
         return extdef([traits.str0, int], s_None, llimpl=chmod_llimpl,
                       export_name=traits.ll_os_name('chmod'))
 
+    @registering_if(os, 'fchmod')
+    def register_os_fchmod(self):
+        os_fchmod = self.llexternal('fchmod', [rffi.INT, rffi.MODE_T],
+                                    rffi.INT)
+
+        def fchmod_llimpl(fd, mode):
+            mode = rffi.cast(rffi.MODE_T, mode)
+            res = rffi.cast(lltype.Signed, os_fchmod(fd, mode))
+            if res < 0:
+                raise OSError(rposix.get_errno(), "os_fchmod failed")
+
+        return extdef([int, int], s_None, "ll_os.ll_os_fchmod",
+                      llimpl=fchmod_llimpl)
+
     @registering_str_unicode(os.rename)
     def register_os_rename(self, traits):
         os_rename = self.llexternal(traits.posix_function_name('rename'),

pypy/translator/c/test/test_extfunc.py

     os.chmod(tmpfile2, 0644)
     assert os.stat(tmpfile).st_mode & 0777 == os.stat(tmpfile2).st_mode & 0777
 
+if hasattr(os, 'fchmod'):
+    def test_os_fchmod():
+        tmpfile1 = str(udir.join('test_os_fchmod.txt'))
+        def does_stuff():
+            fd = os.open(tmpfile1, os.O_WRONLY | os.O_CREAT, 0777)
+            os.fchmod(fd, 0200)
+            os.close(fd)
+        f1 = compile(does_stuff, [])
+        f1()
+        assert os.stat(tmpfile1).st_mode & 0777 == 0200
+
 def test_os_rename():
     tmpfile1 = str(udir.join('test_os_rename_1.txt'))
     tmpfile2 = str(udir.join('test_os_rename_2.txt'))
         f1 = compile(does_stuff, [])
         f1()
 
+if hasattr(os, 'fchown'):
+    def test_os_fchown():
+        path1 = udir.join('test_os_fchown.txt')
+        tmpfile1 = str(path1)
+        def does_stuff():
+            # xxx not really a test, just checks that it is callable
+            fd = os.open(tmpfile1, os.O_WRONLY | os.O_CREAT, 0777)
+            os.fchown(fd, os.getuid(), os.getgid())
+            os.close(fd)
+        f1 = compile(does_stuff, [])
+        f1()
+
 if hasattr(os, 'getlogin'):
     def test_os_getlogin():
         def does_stuff():