Ginés Martínez Sánchez avatar Ginés Martínez Sánchez committed e594d51 Draft

reviewing python3 compatibility and tests

Comments (0)

Files changed (17)

ginsfsm/c_connex.py

         self.send_event(self.gsock, 'EV_CONNECT')
 
     def is_closed(self):
+        # TODO: avoid direct access to gsock
         return not self.gsock.socket
 
     def is_disconnected(self):
+        # TODO: avoid direct access to gsock
         return not self.gsock.connected
 
     def get_next_dst(self):

ginsfsm/c_sock.py

 from ginsfsm.gobj import GObj
 from ginsfsm.gconfig import GConfig
 from ginsfsm.utils import hexdump
+from ginsfsm.compat import tobytes
 from ginsfsm.buffers import (
     ReadOnlyFileBasedBuffer,
     OverflowableBuffer,
     # references to the underlying socket object.
     def __getattr__(self, attr):
         try:
-            retattr = getattr(self.socket, attr)
+            #retattr = getattr(self.socket, attr)
+            retattr = socket.__getattr__(self.socket, attr)
+
         except AttributeError:
             raise AttributeError("%s instance has no attribute '%s'"
                                  % (self.__class__.__name__, attr))

ginsfsm/compat.py

     urlparse = parse
     from urllib.parse import quote as url_quote
     from urllib.parse import quote_plus as url_quote_plus
+    from urllib.parse import unquote_plus as url_unquote_plus
     from urllib.parse import unquote as url_unquote
     from urllib.parse import urlencode as url_encode
     from urllib.request import urlopen as url_open
     import urlparse
     from urllib import quote as url_quote
     from urllib import quote_plus as url_quote_plus
+    from urllib import unquote_plus as url_unquote_plus
     from urllib import unquote as url_unquote
     from urllib import urlencode as url_encode
     from urllib2 import urlopen as url_open

ginsfsm/escape.py

 have crept in over time.
 """
 
-from __future__ import absolute_import, division, with_statement
+from __future__ import absolute_import, division, print_function, with_statement
 
-import htmlentitydefs
 import re
 import sys
-import urllib
 
-# Python3 compatibility:  On python2.5, introduce the bytes alias from 2.6
-try:
-    bytes
-except Exception:
-    bytes = str
+#from tornado.util import bytes_type, unicode_type, basestring_type, u
+
+# Fake unicode literal support:  Python 3.2 doesn't have the u'' marker for
+# literal strings, and alternative solutions like "from __future__ import
+# unicode_literals" have other problems (see PEP 414).  u() can be applied
+# to ascii strings that include \u escapes (but they must not contain
+# literal non-ascii characters).
+if type('') is not type(b''):
+    def u(s):
+        return s
+    bytes_type = bytes
+    unicode_type = str
+    basestring_type = str
+else:
+    def u(s):
+        return s.decode('unicode_escape')
+    bytes_type = str
+    unicode_type = unicode
+    basestring_type = basestring
+
 
 try:
-    from urlparse import parse_qs  # Python 2.6+
+    from urllib.parse import parse_qs as _parse_qs  # py3
 except ImportError:
-    from cgi import parse_qs
+    from urlparse import parse_qs as _parse_qs  # Python 2.6+
 
-# json module is in the standard library as of python 2.6; fall back to
-# simplejson if present for older versions.
 try:
-    import json
-    assert hasattr(json, "loads") and hasattr(json, "dumps")
-    _json_decode = json.loads
-    _json_encode = json.dumps
-except Exception:
-    try:
-        import simplejson
-        _json_decode = lambda s: simplejson.loads(_unicode(s))
-        _json_encode = lambda v: simplejson.dumps(v)
-    except ImportError:
-        try:
-            # For Google AppEngine
-            from django.utils import simplejson
-            _json_decode = lambda s: simplejson.loads(_unicode(s))
-            _json_encode = lambda v: simplejson.dumps(v)
-        except ImportError:
-            def _json_decode(s):
-                raise NotImplementedError(
-                    "A JSON parser is required, e.g., simplejson at "
-                    "http://pypi.python.org/pypi/simplejson/")
-            _json_encode = _json_decode
+    import htmlentitydefs  # py2
+except ImportError:
+    import html.entities as htmlentitydefs  # py3
 
+try:
+    import urllib.parse as urllib_parse  # py3
+except ImportError:
+    import urllib as urllib_parse  # py2
+
+import json
+
+try:
+    unichr
+except NameError:
+    unichr = chr
 
 _XHTML_ESCAPE_RE = re.compile('[&<>"]')
 _XHTML_ESCAPE_DICT = {'&': '&amp;', '<': '&lt;', '>': '&gt;', '"': '&quot;'}
 
 
 def xhtml_escape(value):
-    """Escapes a string so it is valid within XML or XHTML."""
-    return _XHTML_ESCAPE_RE.sub(
-        lambda match: _XHTML_ESCAPE_DICT[match.group(0)], to_basestring(value))
+    """Escapes a string so it is valid within HTML or XML."""
+    return _XHTML_ESCAPE_RE.sub(lambda match: _XHTML_ESCAPE_DICT[match.group(0)],
+                                to_basestring(value))
 
 
 def xhtml_unescape(value):
     # the javscript.  Some json libraries do this escaping by default,
     # although python's standard library does not, so we do it here.
     # http://stackoverflow.com/questions/1580647/json-why-are-forward-slashes-escaped
-    return _json_encode(recursive_unicode(value)).replace("</", "<\\/")
+    return json.dumps(value).replace("</", "<\\/")
 
 
 def json_decode(value):
     """Returns Python objects for the given JSON string."""
-    return _json_decode(to_basestring(value))
+    return json.loads(to_basestring(value))
 
 
 def squeeze(value):
 
 
 def url_escape(value):
-    """Returns a valid URL-encoded version of the given value."""
-    return urllib.quote_plus(utf8(value))
+    """Returns a URL-encoded version of the given value."""
+    return urllib_parse.quote_plus(utf8(value))
 
 # python 3 changed things around enough that we need two separate
 # implementations of url_unescape.  We also need our own implementation
         the result is a unicode string in the specified encoding.
         """
         if encoding is None:
-            return urllib.unquote_plus(utf8(value))
+            return urllib_parse.unquote_plus(utf8(value))
         else:
-            return unicode(urllib.unquote_plus(utf8(value)), encoding)
+            return unicode_type(urllib_parse.unquote_plus(utf8(value)), encoding)
 
-    parse_qs_bytes = parse_qs
+    parse_qs_bytes = _parse_qs
 else:
     def url_unescape(value, encoding='utf-8'):
         """Decodes the given value from a URL.
         the result is a unicode string in the specified encoding.
         """
         if encoding is None:
-            return urllib.parse.unquote_to_bytes(value)
+            return urllib_parse.unquote_to_bytes(value)
         else:
-            return urllib.unquote_plus(to_basestring(value), encoding=encoding)
+            return urllib_parse.unquote_plus(to_basestring(value), encoding=encoding)
 
     def parse_qs_bytes(qs, keep_blank_values=False, strict_parsing=False):
         """Parses a query string like urlparse.parse_qs, but returns the
         """
         # This is gross, but python3 doesn't give us another way.
         # Latin1 is the universal donor of character encodings.
-        result = parse_qs(qs, keep_blank_values, strict_parsing,
-                          encoding='latin1', errors='strict')
+        result = _parse_qs(qs, keep_blank_values, strict_parsing,
+                           encoding='latin1', errors='strict')
         encoded = {}
-        for k, v in result.iteritems():
+        for k, v in result.items():
             encoded[k] = [i.encode('latin1') for i in v]
         return encoded
 
 
-_UTF8_TYPES = (bytes, type(None))
+_UTF8_TYPES = (bytes_type, type(None))
 
 
 def utf8(value):
     """
     if isinstance(value, _UTF8_TYPES):
         return value
-    assert isinstance(value, unicode)
+    assert isinstance(value, unicode_type)
     return value.encode("utf-8")
 
-_TO_UNICODE_TYPES = (unicode, type(None))
+_TO_UNICODE_TYPES = (unicode_type, type(None))
 
 
 def to_unicode(value):
     """
     if isinstance(value, _TO_UNICODE_TYPES):
         return value
-    assert isinstance(value, bytes)
+    assert isinstance(value, bytes_type)
     return value.decode("utf-8")
 
 # to_unicode was previously named _unicode not because it was private,
 
 # When dealing with the standard library across python 2 and 3 it is
 # sometimes useful to have a direct conversion to the native string type
-if str is unicode:
+if str is unicode_type:
     native_str = to_unicode
 else:
     native_str = utf8
 
-_BASESTRING_TYPES = (basestring, type(None))
+_BASESTRING_TYPES = (basestring_type, type(None))
 
 
 def to_basestring(value):
     """
     if isinstance(value, _BASESTRING_TYPES):
         return value
-    assert isinstance(value, bytes)
+    assert isinstance(value, bytes_type)
     return value.decode("utf-8")
 
 
     Supports lists, tuples, and dictionaries.
     """
     if isinstance(obj, dict):
-        return dict(
-            (recursive_unicode(k), recursive_unicode(v)) \
-                for (k, v) in obj.iteritems()
-        )
+        return dict((recursive_unicode(k), recursive_unicode(v)) for (k, v) in obj.items())
     elif isinstance(obj, list):
         return list(recursive_unicode(i) for i in obj)
     elif isinstance(obj, tuple):
         return tuple(recursive_unicode(i) for i in obj)
-    elif isinstance(obj, bytes):
+    elif isinstance(obj, bytes_type):
         return to_unicode(obj)
     else:
         return obj
 # but it gets all exponential on certain patterns (such as too many trailing
 # dots), causing the regex matcher to never return.
 # This regex should avoid those problems.
-PY3 = sys.version_info[0] == 3
-if PY3:
-    _URL_RE = re.compile(
-        """\b((?:([\w-]+):(/{1,3})|www[.])(?:(?:(?:[^\s&()]|&amp;|&quot;)*(?:[^!"#$%&'()*+,.:;<=>?@\[\]^`{|}~\s]))|(?:\((?:[^\s&()]|&amp;|&quot;)*\)))+)"""
-    )
-else:
-    _URL_RE = re.compile(
-        ur"""\b((?:([\w-]+):(/{1,3})|www[.])(?:(?:(?:[^\s&()]|&amp;|&quot;)*(?:[^!"#$%&'()*+,.:;<=>?@\[\]^`{|}~\s]))|(?:\((?:[^\s&()]|&amp;|&quot;)*\)))+)"""
-    )
+# Use to_unicode instead of tornado.util.u - we don't want backslashes getting
+# processed as escapes.
+_URL_RE = re.compile(to_unicode(r"""\b((?:([\w-]+):(/{1,3})|www[.])(?:(?:(?:[^\s&()]|&amp;|&quot;)*(?:[^!"#$%&'()*+,.:;<=>?@\[\]^`{|}~\s]))|(?:\((?:[^\s&()]|&amp;|&quot;)*\)))+)"""))
 
 
 def linkify(text, shorten=False, extra_params="",
 
     Parameters:
 
-    shorten: Long urls will be shortened for display.
+    * ``shorten``: Long urls will be shortened for display.
 
-    extra_params: Extra text to include in the link tag, or a callable
+    * ``extra_params``: Extra text to include in the link tag, or a callable
         taking the link as an argument and returning the extra text
         e.g. ``linkify(text, extra_params='rel="nofollow" class="external"')``,
         or::
                     return 'class="external" rel="nofollow"'
             linkify(text, extra_params=extra_params_cb)
 
-    require_protocol: Only linkify urls which include a protocol. If this is
-        False, urls such as www.facebook.com will also be linkified.
+    * ``require_protocol``: Only linkify urls which include a protocol. If
+        this is False, urls such as www.facebook.com will also be linkified.
 
-    permitted_protocols: List (or set) of protocols which should be linkified,
-        e.g. linkify(text, permitted_protocols=["http", "ftp", "mailto"]).
-        It is very unsafe to include protocols such as "javascript".
+    * ``permitted_protocols``: List (or set) of protocols which should be
+        linkified, e.g. ``linkify(text, permitted_protocols=["http", "ftp",
+        "mailto"])``. It is very unsafe to include protocols such as
+        ``javascript``.
     """
     if extra_params and not callable(extra_params):
         extra_params = " " + extra_params.strip()
                 # (no more slug, etc), so it really just provides a little
                 # extra indication of shortening.
                 url = url[:proto_len] + parts[0] + "/" + \
-                        parts[1][:8].split('?')[0].split('.')[0]
+                    parts[1][:8].split('?')[0].split('.')[0]
 
             if len(url) > max_len * 1.5:  # still too long
                 url = url[:max_len]
                     # have a status bar, such as Safari by default)
                     params += ' title="%s"' % href
 
-        return u'<a href="%s"%s>%s</a>' % (href, params, url)
+        return u('<a href="%s"%s>%s</a>') % (href, params, url)
 
     # First HTML-escape so that our strings are all safe.
     # The regex is modified to avoid character entites other than &amp; so
 
 def _build_unicode_map():
     unicode_map = {}
-    for name, value in htmlentitydefs.name2codepoint.iteritems():
+    for name, value in htmlentitydefs.name2codepoint.items():
         unicode_map[name] = unichr(value)
     return unicode_map
 

ginsfsm/examples/essential_gobjs/ongconnex.py

 from ginsfsm.c_connex import GConnex
 from ginsfsm.utils import hexdump
 
-QUERY = "GET / HTTP/1.1\r\n" + \
-    "Host: www\r\n" + \
-    "\r\n"
+QUERY = b"GET / HTTP/1.1\r\n" + \
+    b"Host: www\r\n" + \
+    b"\r\n"
 
 
 def ac_rx_data(self, event):

ginsfsm/examples/essential_gobjs/ongsock.py

 from ginsfsm.c_timer import GTimer
 from ginsfsm.c_sock import GSock
 from ginsfsm.utils import hexdump
+from ginsfsm.compat import bytes_
 
 import logging
 logging.basicConfig(level=logging.DEBUG)
 
 
 def ac_transmit(self, event):
-    data = QUERY % self.config.url
+    data = bytes_(QUERY % self.config.url)
     if self.config.verbose:
         print('Send %s' % data)
     self.send_event(self.gsock, 'EV_SEND_DATA', data=data)

ginsfsm/examples/essential_gobjs/ontimer.py

     :members:
 
 """
-import envoy
+import os
 
 from ginsfsm.gobj import GObj
 from ginsfsm.gaplic import GAplic
     if self.config.verbose:
         print('Executing %s...' % self.config.command)
 
-    response = envoy.run(self.config.command)
-
-    if self.config.verbose:
-        print(response.std_out)
+    os.system(self.config.command)
 
     self.send_event(self.timer, 'EV_SET_TIMER', seconds=self.config.seconds)
 

ginsfsm/examples/essential_gobjs/stress_client_echo.py

 
 def ac_client_timeout(self, event):
     if self.connex is None:
-        self.connex = range(self.config.connections)
+        self.connex = list(range(self.config.connections))
 
     if self.n_clients < self.config.connections:
-        for i in range(self.n_clients, self.config.connections):
+        for i in list(range(self.n_clients, self.config.connections)):
             self.n_clients += 1
             self.connex[i] = self.create_gobj(
                 'client-%d' % i,
             #ret = self.connex[i].mt_connect(host='127.0.0.1', port=8000) #8082
             #if not ret:
             #    break
-    print "conectados: %d" % self.n_connected_clients
+    print("conectados: %d" % self.n_connected_clients)
     if self.n_connected_clients == self.config.connections:
         n_echoes = 0
         n_total = 0
         diff = datetime.timedelta(seconds=0)
         for cli in self.connex:
-            #print 'cli %d txed_msgs %d, rxed_msgs %d' % (cli.idx, cli.gsock.txed_msgs, cli.gsock.rxed_msgs)
+            #print('cli %d txed_msgs %d, rxed_msgs %d' % (cli.idx, cli.gsock.txed_msgs, cli.gsock.rxed_msgs))
             if cli.gsock.rxed_msgs == cli.gsock.txed_msgs:
                 n_total += 1
             if cli.sended_msgs == cli.received_msgs:
                 n_echoes += 1
             diff += self.diff
-        print "Echoes OK: %d of %d, taverage %s" % (n_echoes, self.config.connections, diff/self.config.connections)
+        print("Echoes OK: %d of %d, taverage %s" % (n_echoes, self.config.connections, diff/self.config.connections))
 
         for cli in self.connex:
             if cli.gsock.connected:
     if not event.source[-1].conectado:
         self.n_connected_clients += 1
         event.source[-1].conectado = 1
-    print "C: conectados: %d" % self.n_connected_clients
+    print("C: conectados: %d" % self.n_connected_clients)
 
 
 def ac_client_disconnected(self, event):
     if event.source[-1].conectado:
         self.n_connected_clients -= 1
         event.source[-1].conectado = 0
-    print "D: conectados: %d" % self.n_connected_clients
+    print("D: conectados: %d" % self.n_connected_clients)
 
 
 def ac_client_rx_data(self, event):
     if diff > self.max_response_time:
         self.max_response_time = diff
     self.diff = diff
-    #print 'recibo:', event.data
-    #print 'diff %s, min %s, max %s' % (diff, self.min_response_time, self.max_response_time)
+    #print('recibo:', event.data)
+    #print('diff %s, min %s, max %s' % (diff, self.min_response_time, self.max_response_time))
 
 
 CLIENT_FSM = {

ginsfsm/examples/essential_gobjs/stress_connections.py

 
 import time
 import logging
-logging.basicConfig(level=logging.DEBUG)
+logging.basicConfig(level=logging.INFO)
 
 from ginsfsm.gobj import GObj
 from ginsfsm.gaplic import GAplic
 
 def ac_client_timeout(self, event):
     if self.connex is None:
-        self.connex = range(self.config.connections)
+        self.connex = list(range(self.config.connections))
         for i in self.connex:
-            print i
             self.connex[i] = self.create_gobj(
                 'client-%02d' % i,
                 GConnex,

ginsfsm/examples/essential_gobjs/stress_server_echo.py

 
 def ac_clisrv_timeout(self, event):
     self.set_timeout(10)
-    print "Server's clients: %d, connected %d" % (
-        len(self.dl_childs), n_connected_clisrv)
+    print("Server's clients: %d, connected %d" % (
+        len(self.dl_childs), n_connected_clisrv))
 
 
 def ac_clisrv_connected(self, event):

ginsfsm/examples/router/test_router.py

 #                   GAplic TITI
 #===============================================================
 def ac_message(self, event):
-    print "ORDENNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNN %s %s" % (
+    print("ORDENNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNN %s %s" % (
         self.counter,
         event.__subscription_reference__,
-    )
+    ))
     list_globals()
     self.counter += 1
     if self.counter > 4:
             __subscription_reference__=event.__subscription_reference__,
         )
         if not ret:
-            print "Eiiiiiiiiiiiiiiiiiiiiii something wrong!"
+            print("Eiiiiiiiiiiiiiiiiiiiiii something wrong!")
 
     # causes broadcast EV_MESSAGE again
     self.gaplic.send_event_to_external_gaplic(
             self,           # subscriber_gobj
         )
         if not ret:
-            print "Eiiiiiiiiiiiiiiiiiiiiii something wrong!"
+            print("Eiiiiiiiiiiiiiiiiiiiiii something wrong!")
 
         self.gaplic.send_event_to_external_gaplic(
             'TOTO',             # gaplic_name
 
 
 def ac_timeout(self, event):
-    print "Broadcasting EV_MESSAGE"
+    print("Broadcasting EV_MESSAGE")
     self.broadcast_event('EV_MESSAGE')
 
 

ginsfsm/gconfig.py

     iteritems_,
     string_types,
     integer_types,
+    binary_type,
+    bytes_,
 )
 
 import logging  # use root logger
 
-accepted_types = (str, int, bool, list, tuple, dict)
+accepted_types = (str, int, bool, list, tuple, dict, bytes)
 
 
 class GConfigTemplateError(Exception):
         elif issubclass(type_, string_types):
             if value is not None:
                 value = str(value)
+        elif issubclass(type_, binary_type):
+            if value is not None:
+                value = bytes_(value)
         elif issubclass(type_, bool):  # first bool: it's a int type too!!
             value = asbool(value)
         elif issubclass(type_, integer_types):

ginsfsm/protocols/http/server/c_http_server.py

 
 def ac_timeout(self, event):
     self.set_timeout(10)
-    print "Server's clients: %d, connected %d" % (
-        len(self.dl_childs), self._n_channel)
+    #print("Server's clients: %d, connected %d" % (
+    #    len(self.dl_childs), self._n_channel))
 
 
 GHTTPSERVER_FSM = {

ginsfsm/protocols/sockjs/server/c_static.py

 
 import random
 import hashlib
-from sys import maxint
-
+from sys import maxsize
 from pyramid.view import view_config
 
 from ginsfsm.protocols.sockjs.server.proto import json
                 'websocket' not in self.context.sockjs_server.config.disabled_transports,
             'cookie_needed': self.context.sockjs_server.cookie_needed,
             'origins': ['*:*'],
-            'entropy': random.randint(0, maxint),
+            'entropy': random.randint(0, maxsize),
         }
         response.body = json.dumps(info)
         return response

ginsfsm/protocols/sockjs/server/c_transport_jsonp.py

     JSONP transport implementation.
 """
 import logging
-from urllib import unquote_plus
 
 from pyramid.view import view_config
 from pyramid.httpexceptions import HTTPNotFound, HTTPServerError
 
 from ginsfsm.gobj import GObj
 from ginsfsm.gconfig import GConfig
+from ginsfsm.compat import url_unquote_plus
 from ginsfsm.protocols.sockjs.server import proto
 from ginsfsm.protocols.sockjs.server.basehandler import SessionHandler
 
             if not data.startswith('d='):
                 return HTTPServerError("Payload expected.")
 
-            data = unquote_plus(data[2:])
+            data = url_unquote_plus(data[2:])
 
         if not data:
             logging.debug('ERROR jsonp_send: Payload expected.')

ginsfsm/tests/test_circular_fifo.py

         count = self.circular.free_space
         self.assertEqual(count, 5)
 
-        count = self.circular.putdata("123")
+        count = self.circular.putdata(b"123")
         self.assertEqual(count, 3)
         count = self.circular.free_space
         self.assertEqual(count, 2)
 
-        count = self.circular.putdata("ab")
+        count = self.circular.putdata(b"ab")
         self.assertEqual(count, 2)
         count = self.circular.free_space
         self.assertEqual(count, 0)
 
         self.assertRaises(CircularFullBufferError,
             self.circular.putdata,
-            "XX")
+            b"XX")
 
         data = self.circular.getdata(5)
         self.assertEqual(data, b"123ab")
         data = self.circular.getdata(1)
         self.assertEqual(data, b'')
 
-        count = self.circular.putdata("qw")
+        count = self.circular.putdata(b"qw")
         self.assertEqual(count, 2)
         data = self.circular.getdata()
         self.assertEqual(data, b'qw')
 
     def test_getdata_putdata_with_overwrite(self):
-        count = self.circular.putdata("123")
+        count = self.circular.putdata(b"123")
         self.assertEqual(count, 3)
         count = self.circular.free_space
         self.assertEqual(count, 2)
 
         self.assertRaises(CircularFullBufferError,
             self.circular.putdata,
-            "abc")
+            b"abc")
 
         data = self.circular.getdata(2)
         self.assertEqual(data, b"12")
         count = self.circular.busy_space
         self.assertEqual(count, 1)
 
-        count = self.circular.putdata("abcd")
+        count = self.circular.putdata(b"abcd")
         self.assertEqual(count, 4)
 
         data = self.circular.getdata(0)
         count = self.circular.free_space
         self.assertEqual(count, 5)
 
-        count = self.circular.putdata("zxcvb")
+        count = self.circular.putdata(b"zxcvb")
         self.assertEqual(count, 5)
         count = self.circular.free_space
         self.assertEqual(count, 0)
         self.assertEqual(count, 5)
 
     def test_getdata_putdata_with_ln(self):
-        s = bytearray(['1', '2', '3'])
+        s = bytearray(b'123')
         count = self.circular.putdata(s, 2)
         self.assertEqual(count, 2)
         count = self.circular.free_space
         self.assertEqual(count, 3)
 
-        s = "abc"
+        s = b"abc"
         count = self.circular.putdata(s, 5)
         self.assertEqual(count, 3)
         count = self.circular.free_space
 if __name__ == "__main__":
     circular = CircularFIFO(5)
 
-    circular.putdata("123")
+    circular.putdata(b"123")
     print(circular)
     circular.getdata(2)
     print(circular)
 
-    circular.putdata("abcd")
+    circular.putdata(b"abcd")
     print(circular)
 
     circular.getdata(0)
     return i.hexdigest()
 
 
-def string_to_bytearray(s):
+def string_to_bytearray(s, encoding='utf-8', errors='strict'):
     if isinstance(s, text_type):
-        s = bytearray(s, encoding="utf-8", errors="strict")
-    else:
-        s = bytes(s)
+        s = bytearray(s, encoding, errors)
+    elif not isinstance(s, bytearray):
+        s = bytearray(s)
     return s
 
 
-def string_to_pack(s):
-    if isinstance(s, text_type):
-        s = bytearray(s, encoding="utf-8", errors="strict")
-    s = bytes(s)
-    return s
+def hexdump(prefix, byts, length=16):
+    ''' hd: hexdump
+    dump in hex with pretty formating the hex value and ascii value (if any)
+    for a block of bytes [assumed to be a tuple]
 
-
-HEX_FILTER = ''.join(
-    [(len(repr(chr(x))) == 3) and chr(x) or '.' for x in range(256)]
-)
-
-
-def hexdump(prefix, src, length=16):
-    N = 0
+    byts: incoming bytes
+    length: how many bytes to display on each line.
+    '''
+    byts = string_to_bytearray(byts)
+    n = 0
     result = ''
-    src = bytes(src)
-    while src:
-        s, src = src[:length], src[length:]
-        hexa = ' '.join(["%02X" % ord(x) for x in s])
-        s = s.translate(HEX_FILTER)
-        result += "%s %04X: %-*s  %s\n" % (prefix, N, length*3, hexa, s)
-        N += length
+    while byts:
+        b_work = byts[:length]
+        byts = byts[length:]
+        hexa = ' '.join(["%02X" % x for x in b_work])
+        asc = ''.join([((((x < 32) or (x > 0x7e)) and '.') or chr(x)) for x in b_work])
+        result += "%s %04X %-*s %s\n" % (prefix, n, length*3, hexa, asc)
+        n += length
     return result
 
 
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.