Commits

Armin Rigo  committed e1895c7

Merge branch/more-posix, adding some more functions to the 'os' module.

  • Participants
  • Parent commits 4969a85

Comments (0)

Files changed (6)

File pypy/module/posix/__init__.py

 
     if hasattr(os, 'chown'):
         interpleveldefs['chown'] = 'interp_posix.chown'
+    if hasattr(os, 'lchown'):
+        interpleveldefs['lchown'] = 'interp_posix.lchown'
     if hasattr(os, 'ftruncate'):
         interpleveldefs['ftruncate'] = 'interp_posix.ftruncate'
     if hasattr(os, 'fsync'):
     if hasattr(os, 'kill') and sys.platform != 'win32':
         interpleveldefs['kill'] = 'interp_posix.kill'
         interpleveldefs['abort'] = 'interp_posix.abort'
+    if hasattr(os, 'killpg'):
+        interpleveldefs['killpg'] = 'interp_posix.killpg'
     if hasattr(os, 'getpid'):
         interpleveldefs['getpid'] = 'interp_posix.getpid'
     if hasattr(os, 'link'):
         interpleveldefs['ttyname'] = 'interp_posix.ttyname'
     if hasattr(os, 'getloadavg'):
         interpleveldefs['getloadavg'] = 'interp_posix.getloadavg'
+    if hasattr(os, 'mkfifo'):
+        interpleveldefs['mkfifo'] = 'interp_posix.mkfifo'
+    if hasattr(os, 'mknod'):
+        interpleveldefs['mknod'] = 'interp_posix.mknod'
+    if hasattr(os, 'nice'):
+        interpleveldefs['nice'] = 'interp_posix.nice'
 
     for name in ['setsid', 'getuid', 'geteuid', 'getgid', 'getegid', 'setuid',
                  'seteuid', 'setgid', 'setegid', 'getpgrp', 'setpgrp',

File pypy/module/posix/interp_posix.py

 ftruncate.unwrap_spec = [ObjSpace, "c_int", r_longlong]
 
 def fsync(space, w_fd):
+    """Force write of file with filedescriptor to disk."""
     fd = space.c_filedescriptor_w(w_fd)
     try:
         os.fsync(fd)
 fsync.unwrap_spec = [ObjSpace, W_Root]
 
 def fdatasync(space, w_fd):
+    """Force write of file with filedescriptor to disk.
+Does not force update of metadata."""
     fd = space.c_filedescriptor_w(w_fd)
     try:
         os.fdatasync(fd)
 fdatasync.unwrap_spec = [ObjSpace, W_Root]
 
 def fchdir(space, w_fd):
+    """Change to the directory of the given file descriptor.  fildes must be
+opened on a directory, not a file."""
     fd = space.c_filedescriptor_w(w_fd)
     try:
         os.fchdir(fd)
         raise wrap_oserror(space, e) 
 rename.unwrap_spec = [ObjSpace, W_Root, W_Root]
 
+def mkfifo(space, w_filename, mode=0666):
+    """Create a FIFO (a POSIX named pipe)."""
+    try:
+        dispatch_filename(rposix.mkfifo)(space, w_filename, mode)
+    except OSError, e: 
+        raise wrap_oserror2(space, e, w_filename)
+mkfifo.unwrap_spec = [ObjSpace, W_Root, "c_int"]
+
+def mknod(space, w_filename, mode=0600, device=0):
+    """Create a filesystem node (file, device special file or named pipe)
+named filename. mode specifies both the permissions to use and the
+type of node to be created, being combined (bitwise OR) with one of
+S_IFREG, S_IFCHR, S_IFBLK, and S_IFIFO. For S_IFCHR and S_IFBLK,
+device defines the newly created device special file (probably using
+os.makedev()), otherwise it is ignored."""
+    try:
+        dispatch_filename(rposix.mknod)(space, w_filename, mode, device)
+    except OSError, e: 
+        raise wrap_oserror2(space, e, w_filename)
+mknod.unwrap_spec = [ObjSpace, W_Root, "c_int", "c_int"]
+
 def umask(space, mask):
     "Set the current numeric umask and return the previous umask."
     prevmask = os.umask(mask)
         raise wrap_oserror(space, e)
 kill.unwrap_spec = [ObjSpace, "c_int", "c_int"]
 
+def killpg(space, pgid, sig):
+    "Kill a process group with a signal."
+    try:
+        os.killpg(pgid, sig)
+    except OSError, e:
+        raise wrap_oserror(space, e)
+killpg.unwrap_spec = [ObjSpace, "c_int", "c_int"]
+
 def abort(space):
     """Abort the interpreter immediately.  This 'dumps core' or otherwise fails
 in the hardest way possible on the hosting operating system."""
     return space.w_None
 chown.unwrap_spec = [ObjSpace, str, "c_nonnegint", "c_nonnegint"]
 
+def lchown(space, path, uid, gid):
+    try:
+        os.lchown(path, uid, gid)
+    except OSError, e:
+        raise wrap_oserror(space, e, path)
+    return space.w_None
+lchown.unwrap_spec = [ObjSpace, str, "c_nonnegint", "c_nonnegint"]
+
 def getloadavg(space):
     try:
         load = os.getloadavg()
                            space.wrap(load[2])])
 getloadavg.unwrap_spec = [ObjSpace]
 
+def nice(space, inc):
+    "Decrease the priority of process by inc and return the new priority."
+    try:
+        res = os.nice(inc)
+    except OSError, e:
+        raise wrap_oserror(space, e)
+    return space.wrap(res)
+nice.unwrap_spec = [ObjSpace, "c_int"]
+
 if _WIN:
     from pypy.rlib import rwin32
 

File pypy/module/posix/test/test_posix2.py

             f.close()
             os.chown(self.path, os.getuid(), os.getgid())
 
+    if hasattr(os, 'lchown'):
+        def test_lchown(self):
+            os = self.posix
+            os.unlink(self.path)
+            raises(OSError, os.lchown, self.path, os.getuid(), os.getgid())
+            os.symlink('foobar', self.path)
+            os.lchown(self.path, os.getuid(), os.getgid())
+
+    if hasattr(os, 'mkfifo'):
+        def test_mkfifo(self):
+            os = self.posix
+            os.mkfifo(self.path2 + 'test_mkfifo', 0666)
+            st = os.lstat(self.path2 + 'test_mkfifo')
+            import stat
+            assert stat.S_ISFIFO(st.st_mode)
+
+    if hasattr(os, 'mknod'):
+        def test_mknod(self):
+            import stat
+            os = self.posix
+            # not very useful: os.mknod() without specifying 'mode'
+            os.mknod(self.path2 + 'test_mknod-1')
+            st = os.lstat(self.path2 + 'test_mknod-1')
+            assert stat.S_ISREG(st.st_mode)
+            # os.mknod() with S_IFIFO
+            os.mknod(self.path2 + 'test_mknod-2', 0600 | stat.S_IFIFO)
+            st = os.lstat(self.path2 + 'test_mknod-2')
+            assert stat.S_ISFIFO(st.st_mode)
+
+        def test_mknod_with_ifchr(self):
+            # os.mknod() with S_IFCHR
+            # -- usually requires root priviledges --
+            os = self.posix
+            if hasattr(os.lstat('.'), 'st_rdev'):
+                import stat
+                try:
+                    os.mknod(self.path2 + 'test_mknod-3', 0600 | stat.S_IFCHR,
+                             0x105)
+                except OSError, e:
+                    skip("os.mknod() with S_IFCHR: got %r" % (e,))
+                else:
+                    st = os.lstat(self.path2 + 'test_mknod-3')
+                    assert stat.S_ISCHR(st.st_mode)
+                    assert st.st_rdev == 0x105
+
+    if hasattr(os, 'nice') and hasattr(os, 'fork') and hasattr(os, 'waitpid'):
+        def test_nice(self):
+            os = self.posix
+            myprio = os.nice(0)
+            #
+            pid = os.fork()
+            if pid == 0:    # in the child
+                res = os.nice(3)
+                os._exit(res)
+            #
+            pid1, status1 = os.waitpid(pid, 0)
+            assert pid1 == pid
+            assert os.WIFEXITED(status1)
+            assert os.WEXITSTATUS(status1) == myprio + 3
+
+
 class AppTestEnvironment(object):
     def setup_class(cls): 
         cls.space = space 

File pypy/rlib/rposix.py

     else:
         return os.rmdir(path.as_bytes())
 
+@specialize.argtype(0)
+def mkfifo(path, mode):
+    if isinstance(path, str):
+        os.mkfifo(path, mode)
+    else:
+        os.mkfifo(path.as_bytes(), mode)
+
+@specialize.argtype(0)
+def mknod(path, mode, device):
+    if isinstance(path, str):
+        os.mknod(path, mode, device)
+    else:
+        os.mknod(path.as_bytes(), mode, device)
+
 if os.name == 'nt':
     import nt
     def _getfullpathname(path):

File pypy/rpython/module/ll_os.py

         return 'll_os.ll_os_w' + name
 
 def registering_str_unicode(posixfunc, condition=True):
-    if not condition:
+    if not condition or posixfunc is None:
         return registering(None, condition=False)
 
     func_name = posixfunc.__name__
         return extdef([str, int, int], None, "ll_os.ll_os_chown",
                       llimpl=os_chown_llimpl)
 
+    @registering_if(os, 'lchown')
+    def register_os_lchown(self):
+        os_lchown = self.llexternal('lchown',[rffi.CCHARP, rffi.INT, rffi.INT],
+                                    rffi.INT)
+
+        def os_lchown_llimpl(path, uid, gid):
+            res = os_lchown(path, uid, gid)
+            if res == -1:
+                raise OSError(rposix.get_errno(), "os_lchown failed")
+
+        return extdef([str, int, int], None, "ll_os.ll_os_lchown",
+                      llimpl=os_lchown_llimpl)
+
     @registering_if(os, 'readlink')
     def register_os_readlink(self):
         os_readlink = self.llexternal('readlink',
         return extdef([traits.str, traits.str], s_None, llimpl=rename_llimpl,
                       export_name=traits.ll_os_name('rename'))
 
+    @registering_str_unicode(getattr(os, 'mkfifo', None))
+    def register_os_mkfifo(self, traits):
+        os_mkfifo = self.llexternal(traits.posix_function_name('mkfifo'),
+                                    [traits.CCHARP, rffi.MODE_T], rffi.INT)
+
+        def mkfifo_llimpl(path, mode):
+            res = rffi.cast(lltype.Signed, os_mkfifo(path, mode))
+            if res < 0:
+                raise OSError(rposix.get_errno(), "os_mkfifo failed")
+
+        return extdef([traits.str, int], s_None, llimpl=mkfifo_llimpl,
+                      export_name=traits.ll_os_name('mkfifo'))
+
+    @registering_str_unicode(getattr(os, 'mknod', None))
+    def register_os_mknod(self, traits):
+        os_mknod = self.llexternal(traits.posix_function_name('mknod'),
+                                   [traits.CCHARP, rffi.MODE_T, rffi.INT],
+                                   rffi.INT)      # xxx: actually ^^^ dev_t
+
+        def mknod_llimpl(path, mode, dev):
+            res = rffi.cast(lltype.Signed, os_mknod(path, mode, dev))
+            if res < 0:
+                raise OSError(rposix.get_errno(), "os_mknod failed")
+
+        return extdef([traits.str, int, int], s_None, llimpl=mknod_llimpl,
+                      export_name=traits.ll_os_name('mknod'))
+
     @registering(os.umask)
     def register_os_umask(self):
         os_umask = self.llexternal(underscore_on_windows+'umask', [rffi.MODE_T], rffi.MODE_T)
         return extdef([int, int], s_None, llimpl=kill_llimpl,
                       export_name="ll_os.ll_os_kill")
 
+    @registering_if(os, 'killpg')
+    def register_os_killpg(self):
+        os_killpg = self.llexternal('killpg', [rffi.INT, rffi.INT],
+                                    rffi.INT)
+
+        def killpg_llimpl(pid, sig):
+            res = rffi.cast(lltype.Signed, os_killpg(rffi.cast(rffi.INT, pid),
+                                                     rffi.cast(rffi.INT, sig)))
+            if res < 0:
+                raise OSError(rposix.get_errno(), "os_killpg failed")
+
+        return extdef([int, int], s_None, llimpl=killpg_llimpl,
+                      export_name="ll_os.ll_os_killpg")
+
     @registering_if(os, 'link')
     def register_os_link(self):
         os_link = self.llexternal('link', [rffi.CCHARP, rffi.CCHARP],
         return extdef([int], s_None, llimpl=_exit_llimpl,
                       export_name="ll_os.ll_os__exit")
 
+    @registering_if(os, 'nice')
+    def register_os_nice(self):
+        os_nice = self.llexternal('nice', [rffi.INT], rffi.INT)
+
+        def nice_llimpl(inc):
+            # Assume that the system provides a standard-compliant version
+            # of nice() that returns the new priority.  Nowadays, FreeBSD
+            # might be the last major non-compliant system (xxx check me).
+            rposix.set_errno(0)
+            res = rffi.cast(lltype.Signed, os_nice(inc))
+            if res == -1:
+                err = rposix.get_errno()
+                if err != 0:
+                    raise OSError(err, "os_nice failed")
+            return res
+
+        return extdef([int], int, llimpl=nice_llimpl,
+                      export_name="ll_os.ll_os_nice")
+
 # --------------------------- os.stat & variants ---------------------------
 
     @registering(os.fstat)

File pypy/translator/c/test/test_extfunc.py

 from pypy.tool.udir import udir
 from pypy.rlib.rarithmetic import r_longlong
 from pypy.translator.c.test.test_genc import compile
+from pypy.translator.c.test.test_standalone import StandaloneTests
 posix = __import__(os.name)
 
 # note: clock synchronizes itself!
     assert os.path.exists(tmpfile2)
     assert not os.path.exists(tmpfile1)
 
+if hasattr(os, 'mkfifo'):
+    def test_os_mkfifo():
+        tmpfile = str(udir.join('test_os_mkfifo.txt'))
+        def does_stuff():
+            os.mkfifo(tmpfile, 0666)
+        f1 = compile(does_stuff, [])
+        f1()
+        import stat
+        st = os.lstat(tmpfile)
+        assert stat.S_ISFIFO(st.st_mode)
+
+if hasattr(os, 'mknod'):
+    def test_os_mknod():
+        import stat
+        tmpfile = str(udir.join('test_os_mknod.txt'))
+        def does_stuff():
+            os.mknod(tmpfile, 0600 | stat.S_IFIFO, 0)
+        f1 = compile(does_stuff, [])
+        f1()
+        st = os.lstat(tmpfile)
+        assert stat.S_ISFIFO(st.st_mode)
+
 def test_os_umask():
     def does_stuff():
         mask1 = os.umask(0660)
         # for what reason do they want us to shift by 8? See the doc
         assert status1 >> 8 == 4
 
+if hasattr(os, 'kill'):
+    def test_kill_to_send_sigusr1():
+        import signal
+        from pypy.module.signal import interp_signal
+        def does_stuff():
+            interp_signal.pypysig_setflag(signal.SIGUSR1)
+            os.kill(os.getpid(), signal.SIGUSR1)
+            interp_signal.pypysig_ignore(signal.SIGUSR1)
+            while True:
+                n = interp_signal.pypysig_poll()
+                if n < 0 or n == signal.SIGUSR1:
+                    break
+            return n
+        f1 = compile(does_stuff, [])
+        got_signal = f1()
+        assert got_signal == signal.SIGUSR1
+
+if hasattr(os, 'killpg'):
+    def test_killpg():
+        import signal
+        from pypy.module.signal import interp_signal
+        def does_stuff():
+            interp_signal.pypysig_setflag(signal.SIGUSR1)
+            os.killpg(os.getpgrp(), signal.SIGUSR1)
+            interp_signal.pypysig_ignore(signal.SIGUSR1)
+            while True:
+                n = interp_signal.pypysig_poll()
+                if n < 0 or n == signal.SIGUSR1:
+                    break
+            return n
+        f1 = compile(does_stuff, [])
+        got_signal = f1()
+        assert got_signal == signal.SIGUSR1
+
+if hasattr(os, 'chown') and hasattr(os, 'lchown'):
+    def test_os_chown_lchown():
+        path1 = udir.join('test_os_chown_lchown-1.txt')
+        path2 = udir.join('test_os_chown_lchown-2.txt')
+        path1.write('foobar')
+        path2.mksymlinkto('some-broken-symlink')
+        tmpfile1 = str(path1)
+        tmpfile2 = str(path2)
+        def does_stuff():
+            # xxx not really a test, just checks that they are callable
+            os.chown(tmpfile1, os.getuid(), os.getgid())
+            os.lchown(tmpfile1, os.getuid(), os.getgid())
+            os.lchown(tmpfile2, os.getuid(), os.getgid())
+            try:
+                os.chown(tmpfile2, os.getuid(), os.getgid())
+            except OSError:
+                pass
+            else:
+                raise AssertionError("os.chown(broken symlink) should raise")
+        f1 = compile(does_stuff, [])
+        f1()
+
 # ____________________________________________________________
 
 def _real_getenv(var):
         finally:
             os.chdir(localdir)
         assert res == True
+
+# ____________________________________________________________
+
+
+class TestExtFuncStandalone(StandaloneTests):
+
+    if hasattr(os, 'nice'):
+        def test_os_nice(self):
+            def does_stuff(argv):
+                res =  os.nice(3)
+                print 'os.nice returned', res
+                return 0
+            t, cbuilder = self.compile(does_stuff)
+            data = cbuilder.cmdexec('')
+            res = os.nice(0) + 3
+            assert data.startswith('os.nice returned %d\n' % res)