Wiki

Clone wiki

sqlalchemy / UsageRecipes / DatabaseCrypt

DatabaseCrypt

Two methods illustrating how to use database-provided crypt() functions to produce transparent database-side password crypt and authentication. The example here uses Postgresql pgcrypto functions.

One example uses the ORM-level Hybrid Properties feature, and the other makes use of the Core TypeDecorator construct, in conjunction with the new Operator API and SQL bind processing features introduced in SQLAlchemy 0.8.

The key feature here is comparison of a database-encrypted password to a cleartext password, making use of the database-side crypt() function to encrypt the incoming cleartext. As this requires a function that uses both values simultaneously, we use a "Comparator" which provides for full customization of the __eq__() operation. Both Hybrids and TypeDecorators make use of the same "Comparator" API, though the difference in usage results in slightly different implementations here.

Example 1 - Hybrid Properties

The Hybrid Properties feature is ORM specific and allows us to define SQL comparison and assignment behavior at the mapped class level.

from sqlalchemy import Column, Integer, String, func
from sqlalchemy.orm import deferred
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.ext.hybrid import hybrid_property, Comparator

Base = declarative_base()

class User(Base):
    __tablename__ = 'user'

    id = Column(Integer, primary_key=True)
    name = Column(String(100), nullable=False)

    password_hashed = deferred(Column(String(40), nullable=False))
    """hashed password column.

    deferred() here is optional.  Using it prevents the 
    password_hashed column from being
    fetched by default when a User object is loaded back from 
    the database.

    """

    @hybrid_property
    def password(self):
        """Relying upon database-side crypt() only, so in-Python usage
        is notimplemented.

        """
        raise NotImplementedError(
                "Comparison only supported via the database")

    class CryptComparator(Comparator):
        """A Comparator which provides an __eq__() method that will run
        crypt() against both sides of the expression, to provide the 
        test password/salt pair.

        """
        def __init__(self, password_hashed):
            self.password_hashed = password_hashed

        def __eq__(self, other):
            return self.password_hashed == \
                    func.crypt(other, self.password_hashed)

    @password.comparator
    def password(cls):
        """Provide a Comparator object which calls crypt in the
        appropriate fashion.

        """
        return User.CryptComparator(cls.password_hashed)

    @password.setter
    def password(self, value):
        """assign the value of 'password', 
        using a UOW-evaluated SQL function.

        See http://www.sqlalchemy.org/docs/orm/session.html#embedding-sql-insert-update-expressions-into-a-flush
        for a description of SQL expression assignment.

        """
        self.password_hashed = func.crypt(value, func.gen_salt('md5'))


if __name__ == '__main__':
    from sqlalchemy import create_engine
    from sqlalchemy.orm import Session

    engine = create_engine("postgresql://scott:tiger@localhost/test",
                    echo=True)


    Base.metadata.drop_all(engine)
    Base.metadata.create_all(engine)

    session = Session(engine)

    session.add_all([
        User(name='user1', password="farb%349"),
        User(name='user2', password="vy6kia%345"),
        User(name='user3', password="0sm3EF88s"),
    ])
    session.commit()


    assert session.query(User.name).\
                filter_by(password="vy6kia%345").scalar() == "user2"

    assert session.query(User).\
                filter_by(name='user3').\
                filter_by(password="0sm3EF88s").count() == 1

    assert session.query(User).\
                filter_by(name='user1').\
                filter_by(password="wrong").count() == 0

Example 2 - Custom Types

(requires 0.8 or above)

Using the TypeDecorator API, we can build the same crypt() feature entirely into a single datatype. TypeDecorator allows us to augment the behavior of a built-in type, in this case the String type. We install the crypt() function as a "bind expression", which has the effect of an incoming bound value being wrapped within the given SQL expression when a statement is constructed. We also define a Comparator to provide the crypt() function during a comparison. This Comparator is similar to that of the hybrid, but includes that we use the type_coerce() function to remove the PasswordType from the expression first, so that we don't go into an endless loop when we re-use the __eq__() operation.

from sqlalchemy import Column, Integer, String, func, TypeDecorator, type_coerce
from sqlalchemy.orm import deferred
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()


class PasswordType(TypeDecorator):
    impl = String(40)

    def bind_expression(self, bindvalue):
        """Apply a SQL expression to an incoming cleartext value being
        rendered as a bound parameter.

        For this example, this handler is intended only for the
        INSERT and UPDATE statements.  Comparison operations
        within a SELECT are handled below by the Comparator.

        """
        return func.crypt(bindvalue, func.gen_salt('md5'))

    class comparator_factory(String.comparator_factory):
        def __eq__(self, other):
            """Compare the local password column to an incoming cleartext
            password.

            This handler is invoked when a PasswordType column
            is used in conjunction with the == operator in a SQL 
            expression, replacing the usage of the "bind_expression()"
            handler.

            """
            # we coerce our own "expression" down to String,
            # so that invoking == doesn't cause an endless loop
            # back into __eq__() here
            local_pw = type_coerce(self.expr, String)
            return local_pw == \
                    func.crypt(other, local_pw)

class User(Base):
    __tablename__ = 'user'

    id = Column(Integer, primary_key=True)
    name = Column(String(100), nullable=False)

    password = deferred(Column("password_hashed", 
                                  PasswordType, nullable=False))
    """password column.

    The 'password' mapped attribute refers to a column
    called 'password_hashed'.  This illustrates a local 
    Python attribute named differently than the database column.

    deferred() here is optional.  Using it prevents the password 
    column from being fetched when a User object is loaded back 
    from the database.

    """

if __name__ == '__main__':
    from sqlalchemy import create_engine
    from sqlalchemy.orm import Session

    engine = create_engine("postgresql://scott:tiger@localhost/test",
                    echo=True)


    Base.metadata.drop_all(engine)
    Base.metadata.create_all(engine)

    session = Session(engine)

    session.add_all([
        User(name='user1', password="farb%349"),
        User(name='user2', password="vy6kia%345"),
        User(name='user3', password="0sm3EF88s"),
    ])
    session.commit()


    assert session.query(User.name).\
                filter_by(password="vy6kia%345").scalar() == "user2"

    assert session.query(User).\
                filter_by(name='user3').\
                filter_by(password="0sm3EF88s").count() == 1

    assert session.query(User).\
                filter_by(name='user1').\
                filter_by(password="wrong").count() == 0

Updated