Commits

Empanado  committed 4bcb9a8

SqlAlchemy user class

  • Participants
  • Parent commits ca25c86

Comments (0)

Files changed (5)

File examples/sqlalchemy_model.py

-import datetime
 from flask import Flask, request, redirect, url_for
 from flaskext.sqlalchemy import SQLAlchemy
-from flaskext.auth import Auth, AuthUser, login_required, logout
+from flaskext.auth import Auth, login_required, logout
+from flaskext.auth.models.sa import get_user_class
 
 app = Flask(__name__)
 app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:////tmp/test.db'
 db = SQLAlchemy(app)
 auth = Auth(app, login_url_name='index')
 
-def now():
-    return datetime.datetime.utcnow()
-
-class User(db.Model, AuthUser):
-    """
-    Implementation of User for SQLAlchemy.
-    """
-    id = db.Column(db.Integer, primary_key=True)
-    username = db.Column(db.String(80), unique=True, nullable=False)
-    password = db.Column(db.String(120), nullable=False)
-    salt = db.Column(db.String(80))
-    role = db.Column(db.String(80))
-    created = db.Column(db.DateTime(), default=now)
-    modified = db.Column(db.DateTime())
-
-    def __init__(self, *args, **kwargs):
-        super(User, self).__init__(*args, **kwargs)
-        password = kwargs.get('password')
-        if password is not None and not self.id:
-            self.created = datetime.datetime.utcnow()
-            # Initialize and encrypt password before first save.
-            self.set_and_encrypt_password(password)
+User = get_user_class(db.Model)
 
 @login_required()
 def admin():

File flaskext/auth/models/__init__.py

Empty file added.

File flaskext/auth/models/sa.py

+import datetime
+from sqlalchemy import Column, Integer, String, DateTime
+from flaskext.auth import AuthUser
+
+class User(object):
+    pass
+
+def get_user_class(declarative_base):
+    """
+    Factory function to create the SQLAlchemy model. It can either be called
+    with the db.Model declarative base from the Flask-SQLAlchemy extension or
+    with your own declarative base.
+    """
+    class User(declarative_base, AuthUser):
+        """
+        Implementation of User for SQLAlchemy.
+        """
+        id = Column(Integer, primary_key=True)
+        username = Column(String(80), unique=True, nullable=False)
+        password = Column(String(120), nullable=False)
+        salt = Column(String(80))
+        role = Column(String(80))
+        created = Column(DateTime, default=datetime.datetime.utcnow)
+        modified = Column(DateTime)
+
+        def __init__(self, *args, **kwargs):
+            super(User, self).__init__(*args, **kwargs)
+            password = kwargs.get('password')
+            if password is not None and not self.id:
+                self.created = datetime.datetime.utcnow()
+                # Initialize and encrypt password before first save.
+                self.set_and_encrypt_password(password)
+
+            def __getstate__(self):
+                global User
+                User = self.__class__
+                return (self.__dict__, declarative_base)
+
+            def __setstate__(self, state):
+                self.__class__ = get_user_class(state[1])
+                self.__dict__.update(state[0])
+
+    return User
     long_description=__doc__,
     packages=[
         'flaskext',
-        'flaskext.auth'
+        'flaskext.auth',
     ],
     namespace_packages=['flaskext'],
     zip_safe=False,
         'Topic :: Internet :: WWW/HTTP :: Dynamic Content',
         'Topic :: Software Development :: Libraries :: Python Modules'
     ],
-    test_suite="test_auth.suite"
+    test_suite='test_auth.suite',
 )

File test_auth.py

+import unittest, hashlib
+from flask import Flask, session
+from flaskext.auth import Auth, AuthUser, login, logout, get_current_user, \
+        encrypt, Role, Permission, has_permission, login_required, \
+        permission_required
+from flaskext.auth.auth import SESSION_USER_KEY, SESSION_LOGIN_KEY
+
+class EncryptionTestCase(unittest.TestCase):
+    PASSWORD = 'password'
+    SALT = '123'
+    HASH_ALGORITHM = hashlib.sha1
+    PRECOMPUTED_RESULT = 'cbfdac6008f9cab4083784cbd1874f76618d2a97'
+
+    def setUp(self):
+        app = Flask(__name__)
+        auth = Auth(app)
+        self.app = app
+        auth.hash_algorithm = self.HASH_ALGORITHM
+        user = AuthUser(username='user')
+        self.user = user
+
+    def test_encryption(self):
+        assert encrypt(password=self.PASSWORD, salt=self.SALT,
+                       hash_algorithm=self.HASH_ALGORITHM) == self.PRECOMPUTED_RESULT
+
+    def test_set_and_encrypt_password(self):
+        with self.app.test_request_context():
+            self.user.set_and_encrypt_password(self.PASSWORD, self.SALT)
+        assert self.user.password == self.PRECOMPUTED_RESULT
+        assert self.user.salt == self.SALT
+
+class LoginTestCase(unittest.TestCase):
+    PASSWORD = 'password'
+    
+    def setUp(self):
+        app = Flask(__name__)
+        app.secret_key = 'N4buDSXfaHx2oO8g'
+        auth = Auth(app)
+        auth.hash_algorithm = hashlib.sha1
+        user = AuthUser(username='user')
+        with app.test_request_context():
+            user.set_and_encrypt_password(self.PASSWORD)
+        self.app = app
+        self.user = user
+
+    def tearDown(self):
+        with self.app.test_request_context():
+            logout()
+
+    def test_login(self):
+        with self.app.test_request_context():
+            login(self.user)
+            assert session[SESSION_USER_KEY] == self.user
+            assert session.get(SESSION_LOGIN_KEY)
+
+    def test_current_user(self):
+        with self.app.test_request_context():
+            login(self.user)
+            assert get_current_user() == self.user
+
+    def test_logout(self):
+        with self.app.test_request_context():
+            login(self.user)
+            user = logout()
+            assert user == self.user
+            assert session.get(SESSION_USER_KEY) is None
+            assert session.get(SESSION_LOGIN_KEY) is None
+
+    def test_user_expiration(self):
+        import time
+        with self.app.test_request_context():
+            self.app.auth.user_timeout = 0.01
+            login(self.user)
+            time.sleep(0.02)
+            assert get_current_user() is None
+
+    def test_user_expiration_override(self):
+        import time
+        with self.app.test_request_context():
+            self.app.auth.user_timeout = 0.01
+            login(self.user)
+            time.sleep(0.02)
+            assert get_current_user(apply_timeout=False) == self.user
+
+    def test_authenticate(self):
+        with self.app.test_request_context():
+            assert self.user.authenticate(self.PASSWORD) == True
+            assert self.user.is_logged_in() == True
+            assert get_current_user() == self.user
+
+    def test_authenticate_fail(self):
+        with self.app.test_request_context():
+            assert self.user.authenticate('bla') == False
+            assert self.user.is_logged_in() == False
+
+class PermissionTestCase(unittest.TestCase):
+    post_view = Permission('post', 'view')
+    post_update = Permission('post', 'update')
+    ROLES = {'testuser': Role('testuser', [post_view])}
+
+    def setUp(self):
+        app = Flask(__name__)
+        auth = Auth(app)
+        self.app = app
+        def load_role(role_name):
+            return self.ROLES.get(role_name)
+        auth.load_role = load_role
+        user = AuthUser(username='user')
+        user.role = 'testuser'
+        self.user = user
+
+    def tearDown(self):
+        pass
+
+    def test_has_permission(self):
+        with self.app.test_request_context():
+            assert has_permission(self.user, 'post', 'view') == True
+            assert has_permission(self.user, 'post', 'update') == False
+            assert has_permission(self.user, 'user', 'view') == False
+            assert has_permission(self.user, 'user', 'create') == False
+
+    def test_permission_equals(self):
+        assert self.post_view == Permission('post', 'view')
+        assert self.post_update != self.post_view
+
+class RequestTestCase(unittest.TestCase):
+
+    def setUp(self):
+        app = Flask(__name__)
+        app.secret_key = 'N4buDSXfaHx2oO8g'
+        self.app = app
+        auth = Auth(app)
+        @login_required()
+        def needs_login():
+            return 'needs_login'
+        app.add_url_rule('/needs_login/', 'needs_login', needs_login)
+
+        @permission_required(resource='post', action='view')
+        def post_view():
+            return 'needs_post_view'
+        app.add_url_rule('/post_view/', 'post_view', post_view)
+
+        @app.route('/login_view/')
+        def login_view():
+            return 'login_view'
+
+        user = AuthUser(username='user')
+        user.role = 'testuser'
+        testuser_role = Role('testuser', [Permission('post', 'view')])
+        auth.load_role = lambda _: testuser_role
+        self.user = user
+
+    def tearDown(self):
+        with self.app.test_request_context():
+            logout()
+
+    def test_default_not_logged_in_callback(self):
+        with self.app.test_request_context():
+            with self.app.test_client() as client:
+                assert client.get('/needs_login/').status_code == 401
+
+    def test_login_url_callback(self):
+        self.app.auth = Auth(self.app, login_url_name='login_view')
+        with self.app.test_request_context():
+            with self.app.test_client() as client:
+                assert client.get('/needs_login/').status_code == 302 
+                assert client.get('/needs_login/',
+                                  follow_redirects=True).data == 'login_view'
+
+    def test_permission_required_no_login(self):
+        with self.app.test_request_context():
+            with self.app.test_client() as client:
+                assert client.get('/post_view/').status_code == 401
+
+suite = unittest.TestLoader().discover(start_dir='.')
+if __name__ == '__main__':
+    unittest.TextTestRunner(verbosity=2).run(suite)