Source

pythonhk / src / pythonhk / models.py

Full commit
import calendar
import os.path
import os
from base64 import urlsafe_b64decode, urlsafe_b64encode
from datetime import datetime, timedelta, date, time
from hashlib import sha256
from urlparse import urlparse, urlunparse

from sqlalchemy import ForeignKey, Table, Column, Integer, Unicode, \
    UnicodeText, Date, DateTime, Time, String, BigInteger, Enum, SmallInteger, \
    Boolean, func, event
from sqlalchemy.orm import relationship, deferred, backref
from sqlalchemy.orm.interfaces import PropComparator
from sqlalchemy.orm import synonym
from sqlalchemy.ext.declarative import synonym_for, declarative_base, \
    comparable_using
from sqlalchemy.schema import AddConstraint, UniqueConstraint

__all__ = ["User", "Group", "Permission"]


Base = declarative_base()
metadata = Base.metadata


group_permission_table = Table("group_permission",
                                metadata,
                                Column("group_id",
                                       Integer,
                                       ForeignKey("group.id",
                                                  onupdate="CASCADE",
                                                  ondelete="CASCADE"),
                                       primary_key=True),
                                Column("permission_id",
                                       Integer,
                                       ForeignKey("permission.id",
                                                  onupdate="CASCADE",
                                                  ondelete="CASCADE"),
                                       primary_key=True))


user_group_table = Table("user_group",
                          metadata,
                          Column("user_id", Integer,
                                 ForeignKey("user.id",
                                            onupdate="CASCADE",
                                            ondelete="CASCADE"),
                                 primary_key=True),
                          Column("group_id", Integer,
                                 ForeignKey("group.id",
                                            onupdate="CASCADE",
                                            ondelete="CASCADE"),
                                 primary_key=True))


class _AgeComparator(PropComparator):

    def __clause_element__(self):
        return func.date_part("year", func.age(self.mapper.c.date_of_birth))

    def operate(self, op, *args, **kwargs):
        return op(self.__clause_element__(), *args, **kwargs)

    def reverse_operate(self, op, *args, **kwargs):
        return op(self.__clause_element__(), *args, **kwargs)


class User(Base):
    """
    This class represents a user. Only minimal demographical information
    is stored here.
    """

    __tablename__ = "user"


    def __init__(self, **kwargs):
        super(User, self).__init__(**kwargs)

    id = Column(Integer, autoincrement=True, primary_key=True)
    """SERIAL PRIMARY KEY"""
#    fbid = Column(BigInteger, unique=True, index=True)
#    """BIGINT Facebook ID, indexed"""
#    username = Column(Unicode(80), unique=True, index=True)
#    """VARCHAR(80) UNIQUE, indexed"""
    displayname = Column(Unicode(80), nullable=False)
    """VARCHAR(80) NOT NULL"""
    email = Column(Unicode(80), unique=True, nullable=False, index=True)
    """VARCHAR(80) UNIQUE NOT NULL, indexed"""

    _salt = Column("salt", String(12))

    @synonym_for("_salt")
    @property
    def salt(self):
        """Generates a cryptographically random salt and sets its Base64 encoded
        version to the salt column, and returns the encoded salt.
        """
        if not self.id and not self._salt:
            self._salt = urlsafe_b64encode(os.urandom(8))

        return self._salt

    # 40 is the length of the SHA-256 encoded string length
    _password = Column("password", Unicode(64))
    def __set_password(self, password):
        self._password = self.__encrypt_password(password,
                                                 urlsafe_b64decode(self.salt))
    def __get_password(self):
        return self._password
    password = synonym("_password", descriptor=property(__get_password,
                                                        __set_password))
    """VARCHAR(64) because len(base64(sha256(password, salt))) == 64"""

    def __encrypt_password(self, password, salt):
        """
        Encrypts the password with the given salt using SHA-256. The salt must
        be cryptographically random bytes.

        :param password: the password that was provided by the user to try and
                         authenticate. This is the clear text version that we
                         will need to match against the encrypted one in the
                         database.
        :type password: basestring

        :param salt: the salt is used to strengthen the supplied password
                     against dictionary attacks.
        :type salt: an 8-byte long cryptographically random byte string
        """

        if isinstance(password, unicode):
            password_bytes = password.encode("UTF-8")
        else:
            password_bytes = password

        hashed_password = sha256()
        hashed_password.update(password_bytes)
        hashed_password.update(salt)
        hashed_password = hashed_password.hexdigest()

        if not isinstance(hashed_password, unicode):
            hashed_password = hashed_password.decode("UTF-8")

        return hashed_password

    def validate_password(self, password):
        """Check the password against existing credentials.

        :type password: unicode
        :param password: clear text password
        :rtype: bool
        """
        return self.password == self.__encrypt_password(password,
                                                        urlsafe_b64decode(self.salt))

    sex = Column(Enum("m", "f", name="sex"))
    """ENUM('m','f')"""
    birthday = Column(Date)
    """TIMESTAMP without zone"""
#    bio = deferred(Column(UnicodeText(65536)))
#    """TEXT"""

    @comparable_using(_AgeComparator)
    @property
    def age(self):
        """Property calculated from (current time - :attr:`User.date_of_birth` - leap days)"""
        if self.date_of_birth:
            today = (datetime.utcnow() + timedelta(hours=self.timezone)).date()
            birthday = self.date_of_birth
            if isinstance(birthday, datetime):
                birthday = birthday.date()
            age = today - (birthday or (today - timedelta(1)))
            return (age.days - calendar.leapdays(birthday.year, today.year)) / 365
        return -1

#    locale = Column(String(10), nullable=False)
#    """VARCHAR(10) NOT NULL len(CLDR lang tag with script) == 10"""
#    timezone = Column(SmallInteger, nullable=False)
#    """SMALLINT NOT NULL values range: [-12, 0, 11], timezone offset same as Facebook"""
    created = Column(DateTime, default=datetime.utcnow, nullable=False)
    """TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP"""
    lastmodified = Column(DateTime, default=datetime.utcnow, nullable=False)
    """TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP"""
    lastaccessed = Column(DateTime, default=datetime.utcnow, nullable=False)
    """TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP"""


class Group(Base):
    """
    Each group can contain 0 or more users, and 0 or more permissions.
    """

    __tablename__ = "group"

    id = Column(Integer, autoincrement=True, primary_key=True)

    name = Column(Unicode(40), unique=True, nullable=False)
    group_name = synonym("name", descriptor=property(lambda self: self.name))

    description = Column(Unicode(255))
    created = Column(DateTime, default=datetime.utcnow)

    users = relationship(User, secondary=user_group_table, collection_class=set,
                         backref=backref("groups", collection_class=set))


class Permission(Base):
    """
    A relationship that determines what each Group can do.
    """

    __tablename__ = "permission"

    id = Column(Integer, autoincrement=True, primary_key=True)

    name = Column(Unicode(40), unique=True, nullable=False)
    permission_name = synonym("name", descriptor=property(lambda self: self.name))

    groups = relationship(Group, secondary=group_permission_table,
                          collection_class=set,
                          backref=backref("permissions", collection_class=set))

    description = Column(Unicode(255))