Commits

Andriy Kornatskyy  committed c0e7d68

Moved code from wheezy http.

  • Participants
  • Parent commits e036dbf

Comments (0)

Files changed (4)

File src/wheezy/security/crypto/comp.py

+
+""" ``comp`` module.
+"""
+
+import sys
+
+
+PY3 = sys.version_info[0] >= 3
+
+if PY3:  # pragma: nocover
+    from io import BytesIO
+
+    def ntob(n, encoding):
+        """ Converts native string to bytes
+        """
+        return n.encode(encoding)
+
+    def bton(b, encoding):
+        """ Converts bytes to native string
+        """
+        return b.decode(encoding)
+
+    chr = lambda i: bytes([i])
+    ord = lambda b: b
+    b = lambda s: s.encode('latin1')
+    n = lambda s: s.decode('latin1')
+
+else:  # pragma: nocover
+    from cStringIO import StringIO as BytesIO
+
+    def ntob(n, encoding):
+        """ Converts native string to bytes
+        """
+        return n
+
+    def bton(b, encoding):
+        """ Converts bytes to native string
+        """
+        return b
+
+    chr = chr
+    ord = ord
+    b = lambda s: s
+    n = lambda s: s
+
+
+# Hash functions
+try:  # pragma: nocover
+    # Python 2.5+
+    from hashlib import md5
+    from hashlib import sha1
+    digest_size = lambda d: d().digest_size
+except ImportError:  # pragma: nocover
+    import md5
+    import sha as sha1
+    digest_size = lambda d: d.digest_size
+
+
+# Encryption interface
+block_size = None
+encrypt = None
+decrypt = None
+
+# Supported cyphers
+aes128 = None
+aes192 = None
+aes256 = None
+
+# Python Cryptography Toolkit (pycrypto)
+try:  # pragma: nocover
+    from Crypto.Cipher import AES
+
+    # pycrypto interface
+    block_size = lambda c: c.block_size
+    encrypt = lambda c, v: c.encrypt(v)
+    decrypt = lambda c, v: c.decrypt(v)
+
+    # suppored cyphers
+    def aes(key, key_size=32):
+        key = key[-key_size:]
+        iv = key[-16:]
+        return lambda: AES.new(key, AES.MODE_CBC, iv)
+
+    aes128 = lambda key: aes(key, 16)
+    aes192 = lambda key: aes(key, 24)
+    aes256 = lambda key: aes(key, 32)
+except ImportError:  # pragma: nocover
+    # TODO: add fallback to other encryption providers
+    pass

File src/wheezy/security/crypto/config.py

+
+""" ``config`` module.
+"""
+
+ENCODING = 'utf-8'
+CONTENT_TYPE = 'text/html'
+
+MAX_CONTENT_LENGTH = 4 * 1024 * 1024
+
+ENVIRON_HTTPS = 'wsgi.url_scheme'
+#ENVIRON_HTTPS = 'HTTPS'
+#ENVIRON_HTTPS = 'X-FORWARDED-PROTO'
+#ENVIRON_HTTPS = 'SERVER_PORT_SECURE'
+
+ENVIRON_HTTPS_VALUE = 'https'
+#ENVIRON_HTTPS_VALUE = 'on'
+#ENVIRON_HTTPS_VALUE = '1'
+
+ENVIRON_HOST = 'HTTP_HOST'
+#ENVIRON_HOST = 'HTTP_X_FORWARDED_HOST'
+
+ENVIRON_REMOTE_ADDR = 'REMOTE_ADDR'
+#ENVIRON_REMOTE_ADDR = 'HTTP_X_FORWARDED_FOR'
+
+HTTP_COOKIE_DOMAIN = None
+HTTP_COOKIE_SECURE = False
+HTTP_COOKIE_HTTPONLY = False
+
+CRYPTO_VALIDATION_KEY = ''
+CRYPTO_ENCRYPTION_KEY = ''

File src/wheezy/security/crypto/padding.py

+
+""" ``padding`` module.
+
+    see http://www.di-mgt.com.au/cryptopad.html
+"""
+
+from wheezy.security.crypto.comp import b
+from wheezy.security.crypto.comp import chr
+from wheezy.security.crypto.comp import ord
+
+
+def pad(s, block_size):
+    """ Pad with zeroes except make the last byte equal to the
+        number of padding bytes.
+
+        The convention with this method is usually always to
+        add a padding string, even if the original plaintext was
+        already an exact multiple of 8 bytes.
+
+        ``s`` - byte string.
+
+        >>> from binascii import hexlify
+        >>> from wheezy.security.crypto.comp import n
+        >>> n(hexlify(pad(b('workbook'), 8)))
+        '776f726b626f6f6b0000000000000008'
+        >>> n(hexlify(pad(b('for'), 8)))
+        '666f720000000005'
+        >>> n(hexlify(pad(b(''), 8)))
+        '0000000000000008'
+    """
+    n = len(s) % block_size
+    if n > 0:
+        n = block_size - n
+    else:
+        n = block_size
+    return (chr(0) * (n - 1)).join((s, chr(n)))
+
+
+def unpad(s, block_size):
+    """ Strip right by the last byte number.
+
+        ``s`` - byte string.
+
+        >>> from binascii import unhexlify
+        >>> from wheezy.security.crypto.comp import n
+        >>> s = unhexlify(b('666f720000000005'))
+        >>> n(unpad(s, 8))
+        'for'
+        >>> s = unhexlify(b('776f726b626f6f6b'))
+        >>> n(unpad(s, 8))
+        'workbook'
+        >>> unpad('', 8)
+        >>> unpad('abcd', 8)
+    """
+    n = len(s)
+    if n == 0:
+        return None
+    n = n % block_size
+    if n > 0:
+        return None
+    n = ord(s[-1])
+    if n >= block_size:
+        return s
+    return s[:-n]

File src/wheezy/security/crypto/ticket.py

+
+""" ``crypto`` module.
+"""
+
+from base64 import b64decode
+from base64 import b64encode
+from hmac import new as hmac_new
+from os import urandom
+from struct import pack
+from struct import unpack
+from time import time
+
+from wheezy.core.config import Config
+from wheezy.security.crypto import config
+from wheezy.security.crypto.comp import aes128
+from wheezy.security.crypto.comp import b
+from wheezy.security.crypto.comp import block_size
+from wheezy.security.crypto.comp import bton
+from wheezy.security.crypto.comp import decrypt
+from wheezy.security.crypto.comp import digest_size
+from wheezy.security.crypto.comp import encrypt
+from wheezy.security.crypto.comp import ntob
+from wheezy.security.crypto.comp import sha1
+from wheezy.security.crypto.padding import pad
+from wheezy.security.crypto.padding import unpad
+
+BASE64_ALTCHARS = b('-~')
+EPOCH = 1317212745
+
+
+def ensure_strong_key(key):
+    """ Translates a given key to a computed strong key of length
+        320 bit suitable for encryption.
+
+        >>> from wheezy.security.crypto.comp import n
+        >>> k = ensure_strong_key(b(''))
+        >>> len(k)
+        40
+        >>> n(b64encode(k))
+        '+9sdGxiqbAgyS31ktx+3Y3BpDh1fA7dyIanFu+fzE/Lc5EaX+NQGsA=='
+        >>> n(b64encode(ensure_strong_key(b('abc'))))
+        'zEfjwKoMKYRFRHbQYRCMCxEBd66sLMHe/H6umZFFQMixhLcp8jfwGQ=='
+    """
+    hmac = hmac_new(key, digestmod=sha1)
+    key = hmac.digest()
+    hmac.update(key)
+    return key + hmac.digest()
+
+
+class Ticket:
+    """ Protects sensitive information (e.g. user id). Default policy
+        applies verification and encryption. Verification is provided
+        by ``hmac`` initialized with ``sha1`` digestmod. Encryption
+        is provided if available, by default it attempts to use AES
+        cypher.
+
+        >>> t = Ticket()
+        >>> x = t.encode('hello')
+        >>> t.decode(x)
+        'hello'
+
+        If cypher is not available verification is still applied.
+
+        >>> t = Ticket(cypher=None)
+        >>> x = t.encode('hello')
+        >>> t.decode(x)
+        'hello'
+    """
+
+    cypher = None
+
+    def __init__(self, max_age=900, salt='', digestmod=None,
+            cypher=aes128, options=None):
+        self.max_age = max_age
+        digestmod = digestmod or sha1
+        c = Config(options, master=config)
+        key = b(salt + c.CRYPTO_VALIDATION_KEY)
+        key = ensure_strong_key(key)
+        self.hmac = hmac_new(key, digestmod=digestmod)
+        self.digest_size = digest_size(digestmod)
+        if cypher:
+            key = b(salt + c.CRYPTO_ENCRYPTION_KEY)
+            key = ensure_strong_key(key)
+            self.cypher = cypher(key)
+            self.block_size = block_size(self.cypher())
+
+    def encode(self, value, encoding='utf-8'):
+        """ Encode ``value`` accoring to ticket policy.
+        """
+        value = ntob(value, encoding)
+        expires = pack('<i', self.timestamp())
+        noise = urandom(12)
+        value = b('').join((
+            noise[:4],
+            expires,
+            noise[4:8],
+            value,
+            noise[8:]
+        ))
+        cypher = self.cypher
+        if cypher:
+            cypher = cypher()
+            value = encrypt(cypher, pad(value, self.block_size))
+        return b64encode(self.sign(value) + value, BASE64_ALTCHARS)
+
+    def decode(self, value, encoding='utf-8'):
+        """ Decode ``value`` according to ticket policy.
+
+            The ``value`` length is at least 56.
+
+            >>> t = Ticket(cypher=None)
+            >>> t.decode('abc')
+
+            Signature is not valid
+
+            >>> value = 'cf-0eDoyN6VwP-IyZap4zTBjsHqqaZua4MkG'
+            >>> value += 'AA11HGdoZWxsbxBSjyg='
+            >>> t.decode(b(value))
+
+            Expired
+
+            >>> value = '1ZRcHGsYENF~lzezpMKFFF9~QBCQkqPlIMoG'
+            >>> value += 'AA11HGdoZWxsbxBSjyg='
+            >>> t.decode(b(value))
+        """
+        if len(value) < 56:
+            return None
+        value = b64decode(value, BASE64_ALTCHARS)
+        signature = value[:self.digest_size]
+        value = value[self.digest_size:]
+        if signature != self.sign(value):
+            return None
+        cypher = self.cypher
+        if cypher:
+            cypher = cypher()
+            value = unpad(decrypt(cypher, value), self.block_size)
+        expires, value = value[4:8], value[12:-4]
+        if unpack('<i', expires)[0] < self.timestamp():
+            return None
+        return bton(value, encoding)
+
+    def timestamp(self):
+        return int(time()) - EPOCH + self.max_age
+
+    def sign(self, value):
+        h = self.hmac.copy()
+        h.update(value)
+        return h.digest()