Commits

Amaury Forgeot d'Arc committed 830a302

Add RPython support for 'replace' and 'ignore' error handlers.

Comments (0)

Files changed (2)

pypy/rlib/runicode.py

     ORD = ord
 
 
-def raise_unicode_exception_decode(errors, encoding, msg, s,
-                                   startingpos, endingpos):
+def default_unicode_error_decode(errors, encoding, msg, s,
+                                 startingpos, endingpos):
+    if errors == 'replace':
+        return u'\ufffd', endingpos
+    if errors == 'ignore':
+        return u'', endingpos
     raise UnicodeDecodeError(encoding, s, startingpos, endingpos, msg)
 
-def raise_unicode_exception_encode(errors, encoding, msg, u,
-                                   startingpos, endingpos):
+def default_unicode_error_encode(errors, encoding, msg, u,
+                                 startingpos, endingpos):
+    if errors == 'replace':
+        return u'?', endingpos
+    if errors == 'ignore':
+        return u'', endingpos
     raise UnicodeEncodeError(encoding, u, startingpos, endingpos, msg)
 
 # ____________________________________________________________
 def str_decode_utf_8(s, size, errors, final=False,
                      errorhandler=None, allow_surrogates=False):
     if errorhandler is None:
-        errorhandler = raise_unicode_exception_decode
+        errorhandler = default_unicode_error_decode
     return str_decode_utf_8_impl(s, size, errors, final, errorhandler,
                                  allow_surrogates=allow_surrogates)
 
 def unicode_encode_utf_8(s, size, errors, errorhandler=None,
                          allow_surrogates=False):
     if errorhandler is None:
-        errorhandler = raise_unicode_exception_encode
+        errorhandler = default_unicode_error_encode
     return unicode_encode_utf_8_impl(s, size, errors, errorhandler,
                                      allow_surrogates=allow_surrogates)
 
                              errorhandler=None,
                              byteorder="native"):
     if errorhandler is None:
-        errorhandler = raise_unicode_exception_decode
+        errorhandler = default_unicode_error_decode
     bo = 0
 
     if BYTEORDER == 'little':
                              errorhandler=None,
                              byteorder="native"):
     if errorhandler is None:
-        errorhandler = raise_unicode_exception_decode
+        errorhandler = default_unicode_error_decode
     bo = 0
 
     if BYTEORDER == 'little':
 def str_decode_utf_7(s, size, errors, final=False,
                      errorhandler=None):
     if errorhandler is None:
-        errorhandler = raise_unicode_exception_decode
+        errorhandler = default_unicode_error_decode
     if size == 0:
         return u'', 0
 
 def str_decode_ascii(s, size, errors, final=False,
                      errorhandler=None):
     if errorhandler is None:
-        errorhandler = raise_unicode_exception_decode
+        errorhandler = default_unicode_error_decode
     # ASCII is equivalent to the first 128 ordinals in Unicode.
     result = UnicodeBuilder(size)
     pos = 0
 def unicode_encode_ucs1_helper(p, size, errors,
                                errorhandler=None, limit=256):
     if errorhandler is None:
-        errorhandler = raise_unicode_exception_encode
+        errorhandler = default_unicode_error_encode
     if limit == 256:
         reason = "ordinal not in range(256)"
         encoding = "latin-1"
         return str_decode_latin_1(s, size, errors, final=final,
                                   errorhandler=errorhandler)
     if errorhandler is None:
-        errorhandler = raise_unicode_exception_decode
+        errorhandler = default_unicode_error_decode
     if size == 0:
         return u'', 0
 
                                       errorhandler=errorhandler)
 
     if errorhandler is None:
-        errorhandler = raise_unicode_exception_encode
+        errorhandler = default_unicode_error_encode
 
     if size == 0:
         return ''
                               errorhandler=False,
                               unicodedata_handler=None):
     if errorhandler is None:
-        errorhandler = raise_unicode_exception_decode
+        errorhandler = default_unicode_error_decode
 
     if size == 0:
         return u'', 0
 def str_decode_raw_unicode_escape(s, size, errors, final=False,
                                   errorhandler=None):
     if errorhandler is None:
-        errorhandler = raise_unicode_exception_decode
+        errorhandler = default_unicode_error_decode
     if size == 0:
         return u'', 0
 
 def str_decode_unicode_internal(s, size, errors, final=False,
                                 errorhandler=None):
     if errorhandler is None:
-        errorhandler = raise_unicode_exception_decode
+        errorhandler = default_unicode_error_decode
     if size == 0:
         return u'', 0
 
             return u"", 0
 
         if errorhandler is None:
-            errorhandler = raise_unicode_exception_decode
+            errorhandler = default_unicode_error_decode
 
         # Skip trailing lead-byte unless 'final' is set
         if not final and is_dbcs_lead_byte(s[size-1]):
     are treated as errors. This includes embedded NULL bytes.
     """
     if errorhandler is None:
-        errorhandler = raise_unicode_exception_encode
+        errorhandler = default_unicode_error_encode
     if size == 0:
         return ''
     result = StringBuilder(size)

pypy/rlib/test/test_runicode.py

     def test_ascii_error(self):
         self.checkdecodeerror("abc\xFF\xFF\xFFcde", "ascii", 3, 4)
 
+    def test_decode_replace(self):
+        decoder = self.getdecoder('utf-8')
+        assert decoder('caf\xe9', 4, 'replace', True) == (u'caf\ufffd', 4)
+
     def test_utf16_errors(self):
         # trunkated BOM
         for s in ["\xff", "\xfe"]:
     def __init__(self):
         self.decoder = self.getdecoder('utf-8')
 
-    def replace_handler(self, errors, codec, message, input, start, end):
-        return u'\ufffd', end
-
-    def ignore_handler(self, errors, codec, message, input, start, end):
-        return u'', end
-
     def to_bytestring(self, bytes):
         return ''.join(chr(int(c, 16)) for c in bytes.split())
 
             raises(UnicodeDecodeError, self.decoder, byte, 1, None, final=True)
             self.checkdecodeerror(byte, 'utf-8', 0, 1, addstuff=False,
                                   msg='invalid start byte')
-            assert self.decoder(byte, 1, None, final=True,
-                       errorhandler=self.replace_handler) == (FFFD, 1)
-            assert (self.decoder('aaaa' + byte + 'bbbb', 9, None,
-                        final=True, errorhandler=self.replace_handler) ==
+            assert self.decoder(byte, 1, 'replace', final=True) == (FFFD, 1)
+            assert (self.decoder('aaaa' + byte + 'bbbb', 9, 'replace',
+                        final=True) ==
                         (u'aaaa'+ FFFD + u'bbbb', 9))
-            assert self.decoder(byte, 1, None, final=True,
-                           errorhandler=self.ignore_handler) == (u'', 1)
-            assert (self.decoder('aaaa' + byte + 'bbbb', 9, None,
-                        final=True, errorhandler=self.ignore_handler) ==
-                        (u'aaaabbbb', 9))
+            assert self.decoder(byte, 1, 'ignore', final=True) == (u'', 1)
+            assert (self.decoder('aaaa' + byte + 'bbbb', 9, 'ignore',
+                        final=True) == (u'aaaabbbb', 9))
 
     def test_unexpected_end_of_data(self):
         """
                    None, final=True)
             self.checkdecodeerror(seq, 'utf-8', 0, len(seq), addstuff=False,
                                   msg='unexpected end of data')
-            assert self.decoder(seq, len(seq), None, final=True,
-                       errorhandler=self.replace_handler) == (FFFD, len(seq))
-            assert (self.decoder('aaaa' + seq + 'bbbb', len(seq) + 8, None,
-                        final=True, errorhandler=self.replace_handler) ==
+            assert self.decoder(seq, len(seq), 'replace', final=True
+                                ) == (FFFD, len(seq))
+            assert (self.decoder('aaaa' + seq + 'bbbb', len(seq) + 8,
+                                 'replace', final=True) ==
                         (u'aaaa'+ FFFD + u'bbbb', len(seq) + 8))
-            assert self.decoder(seq, len(seq), None, final=True,
-                           errorhandler=self.ignore_handler) == (u'', len(seq))
-            assert (self.decoder('aaaa' + seq + 'bbbb', len(seq) + 8, None,
-                        final=True, errorhandler=self.ignore_handler) ==
-                        (u'aaaabbbb', len(seq) + 8))
+            assert self.decoder(seq, len(seq), 'ignore', final=True
+                                ) == (u'', len(seq))
+            assert (self.decoder('aaaa' + seq + 'bbbb', len(seq) + 8, 'ignore',
+                        final=True) == (u'aaaabbbb', len(seq) + 8))
 
     def test_invalid_cb_for_2bytes_seq(self):
         """
                    None, final=True)
             self.checkdecodeerror(seq, 'utf-8', 0, 1, addstuff=False,
                                   msg='invalid continuation byte')
-            assert self.decoder(seq, len(seq), None, final=True,
-                       errorhandler=self.replace_handler) == (res, len(seq))
-            assert (self.decoder('aaaa' + seq + 'bbbb', len(seq) + 8, None,
-                        final=True, errorhandler=self.replace_handler) ==
+            assert self.decoder(seq, len(seq), 'replace', final=True
+                                ) == (res, len(seq))
+            assert (self.decoder('aaaa' + seq + 'bbbb', len(seq) + 8,
+                                 'replace', final=True) ==
                         (u'aaaa' + res + u'bbbb', len(seq) + 8))
             res = res.replace(FFFD, u'')
-            assert self.decoder(seq, len(seq), None, final=True,
-                           errorhandler=self.ignore_handler) == (res, len(seq))
-            assert (self.decoder('aaaa' + seq + 'bbbb', len(seq) + 8, None,
-                        final=True, errorhandler=self.ignore_handler) ==
+            assert self.decoder(seq, len(seq), 'ignore', final=True
+                                ) == (res, len(seq))
+            assert (self.decoder('aaaa' + seq + 'bbbb', len(seq) + 8,
+                                 'ignore', final=True) ==
                         (u'aaaa' + res + u'bbbb', len(seq) + 8))
 
     def test_invalid_cb_for_3bytes_seq(self):
                    None, final=True)
             self.checkdecodeerror(seq, 'utf-8', 0, len(seq)-1, addstuff=False,
                                   msg='invalid continuation byte')
-            assert self.decoder(seq, len(seq), None, final=True,
-                       errorhandler=self.replace_handler) == (res, len(seq))
-            assert (self.decoder('aaaa' + seq + 'bbbb', len(seq) + 8, None,
-                        final=True, errorhandler=self.replace_handler) ==
+            assert self.decoder(seq, len(seq), 'replace', final=True
+                                ) == (res, len(seq))
+            assert (self.decoder('aaaa' + seq + 'bbbb', len(seq) + 8,
+                                 'replace', final=True) ==
                         (u'aaaa' + res + u'bbbb', len(seq) + 8))
             res = res.replace(FFFD, u'')
-            assert self.decoder(seq, len(seq), None, final=True,
-                           errorhandler=self.ignore_handler) == (res, len(seq))
-            assert (self.decoder('aaaa' + seq + 'bbbb', len(seq) + 8, None,
-                        final=True, errorhandler=self.ignore_handler) ==
-                        (u'aaaa' + res + u'bbbb', len(seq) + 8))
+            assert self.decoder(seq, len(seq), 'ignore', final=True
+                                ) == (res, len(seq))
+            assert (self.decoder('aaaa' + seq + 'bbbb', len(seq) + 8, 'ignore',
+                        final=True) == (u'aaaa' + res + u'bbbb', len(seq) + 8))
 
     def test_invalid_cb_for_4bytes_seq(self):
         """
                    None, final=True)
             self.checkdecodeerror(seq, 'utf-8', 0, len(seq)-1, addstuff=False,
                                   msg='invalid continuation byte')
-            assert self.decoder(seq, len(seq), None, final=True,
-                       errorhandler=self.replace_handler) == (res, len(seq))
-            assert (self.decoder('aaaa' + seq + 'bbbb', len(seq) + 8, None,
-                        final=True, errorhandler=self.replace_handler) ==
+            assert self.decoder(seq, len(seq), 'replace', final=True
+                                ) == (res, len(seq))
+            assert (self.decoder('aaaa' + seq + 'bbbb', len(seq) + 8, 
+                                 'replace', final=True) ==
                         (u'aaaa' + res + u'bbbb', len(seq) + 8))
             res = res.replace(FFFD, u'')
-            assert self.decoder(seq, len(seq), None, final=True,
-                           errorhandler=self.ignore_handler) == (res, len(seq))
-            assert (self.decoder('aaaa' + seq + 'bbbb', len(seq) + 8, None,
-                        final=True, errorhandler=self.ignore_handler) ==
-                        (u'aaaa' + res + u'bbbb', len(seq) + 8))
+            assert self.decoder(seq, len(seq), 'ignore', final=True
+                                ) == (res, len(seq))
+            assert (self.decoder('aaaa' + seq + 'bbbb', len(seq) + 8, 'ignore',
+                        final=True) == (u'aaaa' + res + u'bbbb', len(seq) + 8))
 
     def test_utf8_errors(self):
         # unexpected end of data
         for n, (seq, res) in enumerate(sequences):
             decoder = self.getdecoder('utf-8')
             raises(UnicodeDecodeError, decoder, seq, len(seq), None, final=True)
-            assert decoder(seq, len(seq), None, final=True,
-                           errorhandler=self.replace_handler) == (res, len(seq))
-            assert decoder(seq + 'b', len(seq) + 1, None, final=True,
-                           errorhandler=self.replace_handler) == (res + u'b',
-                                                                  len(seq) + 1)
+            assert decoder(seq, len(seq), 'replace', final=True
+                           ) == (res, len(seq))
+            assert decoder(seq + 'b', len(seq) + 1, 'replace', final=True
+                           ) == (res + u'b', len(seq) + 1)
             res = res.replace(FFFD, u'')
-            assert decoder(seq, len(seq), None, final=True,
-                           errorhandler=self.ignore_handler) == (res, len(seq))
+            assert decoder(seq, len(seq), 'ignore', final=True
+                           ) == (res, len(seq))
 
 class TestEncoding(UnicodeTests):
-    def replace_handler(self, errors, codec, message, input, start, end):
-        if errors=='strict':
-            runicode.raise_unicode_exception_encode(errors, codec, message,
-                                                    input, start, end)
-        return u'?', end
-
     def test_all_ascii(self):
         for i in range(128):
             if sys.version >= "2.7":
         encoder = self.getencoder('decimal')
         assert encoder(u' 12, 34 ', 8, None) == ' 12, 34 '
         raises(UnicodeEncodeError, encoder, u' 12, \u1234 ', 7, None)
-        assert encoder(u'u\u1234', 2, 'replace', self.replace_handler) == 'u?'
+        assert encoder(u'u\u1234', 2, 'replace') == 'u?'
 
 class TestTranslation(object):
     def setup_class(cls):