Commits

Amaury Forgeot d'Arc committed 33df72c

Unicode version of os.rename()

  • Participants
  • Parent commits 8b9d847
  • Branches unicode_filename-2

Comments (0)

Files changed (4)

File pypy/module/posix/interp_posix.py

         self.space = space
         self.w_obj = w_obj
 
-    def encode(self):
+    def as_bytes(self):
         return self.space.path_w(self.w_obj)
 
-    def gettext(self):
+    def as_unicode(self):
         return self.space.unicode_w(self.w_obj)
 
+class FileDecoder:
+    def __init__(self, space, w_obj):
+        self.space = space
+        self.w_obj = w_obj
+
+    def as_bytes(self):
+        return self.space.path_w(self.w_obj)
+
+    def as_unicode(self):
+        space = self.space
+        filesystemencoding = space.sys.filesystemencoding
+        w_unicode = space.call_method(self.w_obj, 'decode',
+                                      space.wrap(filesystemencoding))
+        return space.unicode_w(w_unicode)
+
 @specialize.memo()
 def dispatch_filename(func):
     def dispatch(space, w_fname, *args):
             return func(fname, *args)
     return dispatch
 
+@specialize.memo()
+def dispatch_filename_2(func):
+    def dispatch(space, w_fname1, w_fname2, *args):
+        if space.isinstance_w(w_fname1, space.w_unicode):
+            fname1 = FileEncoder(space, w_fname1)
+            if space.isinstance_w(w_fname2, space.w_unicode):
+                fname2 = FileEncoder(space, w_fname2)
+                return func(fname1, fname2, *args)
+            else:
+                fname2 = FileDecoder(space, w_fname2)
+                return func(fname1, fname2, *args)
+        else:
+            fname1 = FileDecoder(space, w_fname1)
+            if space.isinstance_w(w_fname2, space.w_unicode):
+                fname2 = FileEncoder(space, w_fname2)
+                return func(fname1, fname2, *args)
+            else:
+                fname2 = FileDecoder(space, w_fname2)
+                return func(fname1, fname2, *args)
+    return dispatch
+
 def open(space, w_fname, flag, mode=0777):
     """Open a file (for low level IO).
 Return a file descriptor (a small integer)."""
         raise wrap_oserror2(space, e, w_path)
 chmod.unwrap_spec = [ObjSpace, W_Root, "c_int"]
 
-def rename(space, old, new):
+def rename(space, w_old, w_new):
     "Rename a file or directory."
-    try: 
-        os.rename(old, new)
-    except OSError, e: 
+    try:
+        dispatch_filename_2(rposix.rename)(space, w_old, w_new)
+    except OSError, e:
         raise wrap_oserror(space, e) 
-rename.unwrap_spec = [ObjSpace, str, str]
+rename.unwrap_spec = [ObjSpace, W_Root, W_Root]
 
 def umask(space, mask):
     "Set the current numeric umask and return the previous umask."

File pypy/rlib/rposix.py

     if isinstance(path, str):
         return os.open(path, flags, mode)
     else:
-        return os.open(path.encode(), flags, mode)
+        return os.open(path.as_bytes(), flags, mode)
 
 @specialize.argtype(0)
 def stat(path):
     if isinstance(path, str):
         return os.stat(path)
     else:
-        return os.stat(path.encode())
+        return os.stat(path.as_bytes())
 
 @specialize.argtype(0)
 def lstat(path):
     if isinstance(path, str):
         return os.lstat(path)
     else:
-        return os.lstat(path.encode())
+        return os.lstat(path.as_bytes())
 
 @specialize.argtype(0)
 def unlink(path):
     if isinstance(path, str):
         return os.unlink(path)
     else:
-        return os.unlink(path.encode())
+        return os.unlink(path.as_bytes())
+
+@specialize.argtype(0, 1)
+def rename(path1, path2):
+    return os.rename(path1.as_bytes(), path2.as_bytes())
 
 @specialize.argtype(0)
 def listdir(dirname):
     if isinstance(dirname, str):
         return os.listdir(dirname)
     else:
-        return os.listdir(dirname.encode())
+        return os.listdir(dirname.as_bytes())
 
 @specialize.argtype(0)
 def access(path, mode):
     if isinstance(path, str):
         return os.access(path, mode)
     else:
-        return os.access(path.encode(), mode)
+        return os.access(path.as_bytes(), mode)
 
 @specialize.argtype(0)
 def chmod(path, mode):
     if isinstance(path, str):
         return os.chmod(path, mode)
     else:
-        return os.chmod(path.encode(), mode)
+        return os.chmod(path.as_bytes(), mode)
 
 @specialize.argtype(0)
 def chdir(path):
     if isinstance(path, str):
         return os.chdir(path)
     else:
-        return os.chdir(path.encode())
+        return os.chdir(path.as_bytes())
 
 @specialize.argtype(0)
 def mkdir(path, mode=0777):
     if isinstance(path, str):
         return os.mkdir(path, mode)
     else:
-        return os.mkdir(path.encode(), mode)
+        return os.mkdir(path.as_bytes(), mode)
 
 @specialize.argtype(0)
 def rmdir(path):
     if isinstance(path, str):
         return os.rmdir(path)
     else:
-        return os.rmdir(path.encode())
+        return os.rmdir(path.as_bytes())
 
 if os.name == 'nt':
     import nt
         if isinstance(path, str):
             return nt._getfullpathname(path)
         else:
-            return nt._getfullpathname(path.encode())
+            return nt._getfullpathname(path.as_bytes())

File pypy/rlib/test/test_rposix.py

         self.unistr = unistr
 
     if sys.platform == 'win32':
-        def encode(self):
+        def as_bytes(self):
             from pypy.rlib.runicode import unicode_encode_mbcs
             return unicode_encode_mbcs(self.unistr, len(self.unistr),
                                        "strict")
     else:
-        def encode(self):
+        def as_bytes(self):
             from pypy.rlib.runicode import unicode_encode_utf_8
             return unicode_encode_utf_8(self.unistr, len(self.unistr),
                                         "strict")
 
-    def gettext(self):
+    def as_unicode(self):
         return self.unistr
 
 class TestPosixUnicode:
         f.write("test")
         f.close()
 
-        self.path = UnicodeWithEncoding(self.ufilename)
+        self.path  = UnicodeWithEncoding(self.ufilename)
+        self.path2 = UnicodeWithEncoding(self.ufilename + ".new")
 
     def test_open(self):
         def f():
         interpret(f, [])
         assert not os.path.exists(self.ufilename)
 
+    def test_rename(self):
+        def f():
+            return rposix.rename(self.path, self.path2)
+
+        interpret(f, [])
+        assert not os.path.exists(self.ufilename)
+        assert os.path.exists(self.ufilename + '.new')
+
     def test_listdir(self):
         udir = UnicodeWithEncoding(os.path.dirname(self.ufilename))
         def f():

File pypy/rpython/module/ll_os.py

     if not condition:
         return registering(None, condition=False)
 
+    func_name = func.__name__
+
+    @func_renamer(func_name + "_unicode")
     def unicodefunc(*args):
         return func(*args)
 
     arglist = ['arg%d' % (i,) for i in range(nbargs)]
     transformed_arglist = arglist[:]
     for i in argnums:
-        transformed_arglist[i] = transformed_arglist[i] + '.gettext()'
+        transformed_arglist[i] = transformed_arglist[i] + '.as_unicode()'
 
     args = ', '.join(arglist)
     transformed_args = ', '.join(transformed_arglist)
     main_arg = 'arg%d' % (argnum,)
 
-    func_name = func.__name__
     source = py.code.Source("""
     def %(func_name)s(%(args)s):
         if isinstance(%(main_arg)s, str):
         return extdef([str, str], s_None, llimpl=rename_llimpl,
                       export_name="ll_os.ll_os_rename")
 
+    @registering_unicode_version(os.rename, 2, [0, 1], sys.platform=='win32')
+    def register_os_rename_unicode(self):
+        os_wrename = self.llexternal(underscore_on_windows+'wrename',
+                                     [rffi.CWCHARP, rffi.CWCHARP], rffi.INT)
+
+        def rename_llimpl(oldpath, newpath):
+            res = rffi.cast(lltype.Signed, os_wrename(oldpath, newpath))
+            if res < 0:
+                raise OSError(rposix.get_errno(), "os_rename failed")
+
+        return extdef([unicode, unicode], s_None, llimpl=rename_llimpl,
+                      export_name="ll_os.ll_os_wrename")
+
     @registering(os.umask)
     def register_os_umask(self):
         os_umask = self.llexternal(underscore_on_windows+'umask', [rffi.MODE_T], rffi.MODE_T)