Wiki

Clone wiki

sqlalchemy / UsageRecipes / SymmetricEncryption

HybridSymmetricEncryption

Value processing via a hybrid, introduced at http://www.sqlalchemy.org/docs/orm/extensions/hybrid.html, to represent a model attribute in both an encrypted (or encoded, or whatever), database-backed form as well as an ad-hoc decrypted form. A second example illustrates the same idea using a TypeDecorator, introduced at http://www.sqlalchemy.org/docs/core/types.html#augmenting-existing-types.

from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import Session
from sqlalchemy.ext.hybrid import hybrid_property, Comparator
from Crypto.Cipher import AES
import binascii
import uuid

key = uuid.uuid4().bytes
"""The encryption key.   Random for this example."""

def aes_encrypt(data):
    cipher = AES.new(key)
    data = data + (" " * (16 - (len(data) % 16)))
    return binascii.hexlify(cipher.encrypt(data))

def aes_decrypt(data):
    cipher = AES.new(key)
    return cipher.decrypt(binascii.unhexlify(data)).rstrip()

Base = declarative_base()

class User(Base):
    __tablename__ = 'user'

    id = Column(Integer, primary_key=True)
    encrypted_value = Column(String(40), nullable=False)

    @hybrid_property
    def value(self):
        return aes_decrypt(self.encrypted_value)

    @value.setter
    def value(self, value):
        self.encrypted_value = aes_encrypt(value)

    class encrypt_comparator(Comparator):
        def operate(self, op, other, **kw):
            return op(
                self.__clause_element__(), aes_encrypt(other), 
                **kw
            )

    @value.comparator
    def value(cls):
        return cls.encrypt_comparator(
                    cls.encrypted_value
                )

e = create_engine('sqlite://', echo='debug')

Base.metadata.create_all(e)

s = Session(e)

# attribute set
u1 = User(value="some value")
s.add(u1)
s.commit()

# comparison
u2 = s.query(User).filter_by(value="some value").first()
assert u1 is u2

# attribute get
assert u1.value == "some value"

For comparison, here's the same recipe using TypeDecorator:

from sqlalchemy import create_engine, Column, Integer, String, TypeDecorator
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import Session
from Crypto.Cipher import AES
import binascii
import uuid

key = uuid.uuid4().bytes
"""The encryption key.   Random for this example."""

def aes_encrypt(data):
    cipher = AES.new(key)
    data = data + (" " * (16 - (len(data) % 16)))
    return binascii.hexlify(cipher.encrypt(data))

def aes_decrypt(data):
    cipher = AES.new(key)
    return cipher.decrypt(binascii.unhexlify(data)).rstrip()

class EncryptedValue(TypeDecorator):
    impl = String

    def process_bind_param(self, value, dialect):
        return aes_encrypt(value)

    def process_result_value(self, value, dialect):
        return aes_decrypt(value)

Base = declarative_base()

class User(Base):
    __tablename__ = 'user'

    id = Column(Integer, primary_key=True)
    value = Column("encrypted_value", EncryptedValue(40), nullable=False)

e = create_engine('sqlite://', echo='debug')

Base.metadata.create_all(e)

s = Session(e)

# attribute set
u1 = User(value="some value")
s.add(u1)
s.commit()

# comparison
u2 = s.query(User).filter_by(value="some value").first()
assert u1 is u2

# attribute get
assert u1.value == "some value"

Updated