1. Michael Pavone
  2. pypy

Commits

Philip Jenvey  committed 66eb9aa

encoding error handlers now return unicode or strs to encoders so they can
special case bytes results on py3

  • Participants
  • Parent commits 5ec61ff
  • Branches default

Comments (0)

Files changed (6)

File pypy/module/_codecs/interp_codecs.py

View file
         self.codec_search_cache = {}
         self.codec_error_registry = {}
         self.codec_need_encodings = True
-        self.decode_error_handler = self.make_errorhandler(space, True)
-        self.encode_error_handler = self.make_errorhandler(space, False)
+        self.decode_error_handler = self.make_decode_errorhandler(space)
+        self.encode_error_handler = self.make_encode_errorhandler(space)
 
         self.unicodedata_handler = None
 
-    def make_errorhandler(self, space, decode):
-        def unicode_call_errorhandler(errors,  encoding, reason, input,
-                                      startpos, endpos):
+    def _make_errorhandler(self, space, decode):
+        def call_errorhandler(errors, encoding, reason, input, startpos,
+                              endpos):
+            """Generic wrapper for calling into error handlers.
 
+            Returns (unicode_or_none, str_or_none, newpos) as error
+            handlers may return unicode or on Python 3, bytes.
+            """
             w_errorhandler = lookup_error(space, errors)
             if decode:
                 w_cls = space.w_UnicodeDecodeError
                     "position %d from error handler out of bounds", newpos)
             replace = space.unicode_w(w_replace)
             return replace, newpos
-        return unicode_call_errorhandler
+        return call_errorhandler
+
+    def make_decode_errorhandler(self, space):
+        return self._make_errorhandler(space, True)
+
+    def make_encode_errorhandler(self, space):
+        errorhandler = self._make_errorhandler(space, False)
+        def encode_call_errorhandler(errors, encoding, reason, input, startpos,
+                                     endpos):
+            replace, newpos = errorhandler(errors, encoding, reason, input,
+                                           startpos, endpos)
+            return replace, None, newpos
+        return encode_call_errorhandler
 
     def get_unicodedata_handler(self, space):
         if self.unicodedata_handler:

File pypy/module/_multibytecodec/c_codecs.py

View file
             replace = "?"
     else:
         assert errorcb
-        ret, end = errorcb(errors, namecb, reason,
-                           unicodedata, start, end)
-        codec = pypy_cjk_enc_getcodec(encodebuf)
-        replace = encode(codec, ret, "strict", errorcb, namecb)
+        retu, rets, end = errorcb(errors, namecb, reason,
+                                  unicodedata, start, end)
+        if rets is not None:
+            # py3k only
+            replace = rets
+        else:
+            codec = pypy_cjk_enc_getcodec(encodebuf)
+            replace = encode(codec, retu, "strict", errorcb, namecb)
     inbuf = rffi.get_nonmovingbuffer(replace)
     try:
         r = pypy_cjk_enc_replace_on_error(encodebuf, inbuf, len(replace), end)

File pypy/module/_multibytecodec/test/test_app_codecs.py

View file
         repl = u"\u2014"
         s = u"\uDDA1".encode("gbk", "test.multi_bad_handler")
         assert s == '\xA1\xAA'
+
+    def test_encode_custom_error_handler_type(self):
+        import codecs
+        import sys
+        codecs.register_error("test.test_encode_custom_error_handler_type",
+                              lambda e: ('\xc3', e.end))
+        raises(TypeError, u"\uDDA1".encode, "gbk",
+               "test.test_encode_custom_error_handler_type")

File pypy/module/_multibytecodec/test/test_c_codecs.py

View file
     c = getcodec('iso2022_jp')
     s = encode(c, u'\u83ca\u5730\u6642\u592b')
     assert s == '\x1b$B5FCO;~IW\x1b(B' and type(s) is str
+
+def test_encode_custom_error_handler_bytes():
+    c = getcodec("hz")
+    def errorhandler(errors, enc, msg, t, startingpos, endingpos):
+        return None, '\xc3', endingpos
+    s = encode(c, u'abc\u1234def', 'foo', errorhandler)
+    assert '\xc3' in s

File pypy/rlib/runicode.py

View file
 def default_unicode_error_encode(errors, encoding, msg, u,
                                  startingpos, endingpos):
     if errors == 'replace':
-        return u'?', endingpos
+        return u'?', None, endingpos
     if errors == 'ignore':
-        return u'', endingpos
+        return u'', None, endingpos
     raise UnicodeEncodeError(encoding, u, startingpos, endingpos, msg)
 
 # ____________________________________________________________
                             _encodeUCS4(result, ch3)
                             continue
                     if not allow_surrogates:
-                        r, pos = errorhandler(errors, 'utf-8',
-                                              'surrogates not allowed',
-                                              s, pos-1, pos)
-                        for ch in r:
+                        ru, rs, pos = errorhandler(errors, 'utf-8',
+                                                   'surrogates not allowed',
+                                                   s, pos-1, pos)
+                        if rs is not None:
+                            # py3k only
+                            result.append(rs)
+                            continue
+                        for ch in ru:
                             if ord(ch) < 0x80:
                                 result.append(chr(ord(ch)))
                             else:
             collend = pos+1
             while collend < len(p) and ord(p[collend]) >= limit:
                 collend += 1
-            r, pos = errorhandler(errors, encoding, reason, p,
-                                  collstart, collend)
-            for ch in r:
+            ru, rs, pos = errorhandler(errors, encoding, reason, p,
+                                       collstart, collend)
+            if rs is not None:
+                # py3k only
+                result.append(rs)
+                continue
+            for ch in ru:
                 if ord(ch) < limit:
                     result.append(chr(ord(ch)))
                 else:
 
         c = mapping.get(ch, '')
         if len(c) == 0:
-            res, pos = errorhandler(errors, "charmap",
-                                    "character maps to <undefined>",
-                                    s, pos, pos + 1)
-            for ch2 in res:
+            ru, rs, pos = errorhandler(errors, "charmap",
+                                       "character maps to <undefined>",
+                                       s, pos, pos + 1)
+            if rs is not None:
+                # py3k only
+                result.append(rs)
+                continue
+            for ch2 in ru:
                 c2 = mapping.get(ch2, '')
                 if len(c2) == 0:
                     errorhandler(
                 pass
             collend += 1
         msg = "invalid decimal Unicode string"
-        r, pos = errorhandler(errors, 'decimal',
-                              msg, s, collstart, collend)
-        for char in r:
+        ru, rs, pos = errorhandler(errors, 'decimal',
+                                   msg, s, collstart, collend)
+        if rs is not None:
+            # py3k only
+            errorhandler('strict', 'decimal', msg, s, collstart, collend)
+        for char in ru:
             ch = ord(char)
             if unicodedb.isspace(ch):
                 result.append(' ')

File pypy/rlib/test/test_runicode.py

View file
             assert t is s
             assert start == startingpos
             assert stop == endingpos
-            return "42424242", stop
+            return u"42424242", None, stop
         encoder = self.getencoder(encoding)
         result = encoder(s, len(s), "foo!", errorhandler)
         assert called[0]
         assert "42424242" in result
 
+        # ensure bytes results passthru
+        def errorhandler_bytes(errors, enc, msg, t, startingpos,
+                               endingpos):
+            return None, '\xc3', endingpos
+        result = encoder(s, len(s), "foo!", errorhandler_bytes)
+        assert '\xc3' in result
+
     def checkdecodeerror(self, s, encoding, start, stop,
                          addstuff=True, msg=None):
         called = [0]