Commits

Robert Brewer committed dd0ca7f

cptools.session_auth was growing an unwieldy number of parameters, so I moved it from a functional approach to an object. Also moved _cptools.setargs to Tool._setargs.

Comments (0)

Files changed (3)

cherrypy/_cptools.py

 import cherrypy
 
 
-def setargs(obj, func):
-    """Copy func parameter names to obj attributes."""
-    try:
-        import inspect
-        for arg in inspect.getargspec(func)[0]:
-            setattr(obj, arg, None)
-    except (ImportError, AttributeError):
-        pass
-
-
 class Tool(object):
     """A registered function for use with CherryPy request-processing hooks.
     
         self.callable = callable
         self._name = name
         self.__doc__ = self.callable.__doc__
-        setargs(self, callable)
+        self._setargs()
+    
+    def _setargs(self):
+        """Copy func parameter names to obj attributes."""
+        try:
+            import inspect
+            for arg in inspect.getargspec(self.callable)[0]:
+                setattr(self, arg, None)
+        except (ImportError, AttributeError):
+            pass
     
     def _merged_args(self, d=None):
         tm = cherrypy.request.toolmap
         MainTool._setup(self)
 
 
+class SessionAuthTool(MainTool):
+    
+    def _setargs(self):
+        for name in dir(cptools.SessionAuth):
+            if not name.startswith("__"):
+                setattr(self, name, None)
+
+
 class CachingTool(Tool):
     """Caching Tool for CherryPy."""
     
-    def __init__(self):
-        self._point = 'before_main'
-        self.callable = _caching.get
-        self._name = 'caching'
-        self.__doc__ = self.callable.__doc__
-        setargs(self, self.callable)
-    
     def _wrapper(self, **kwargs):
         request = cherrypy.request
         if _caching.get(**kwargs):
 
 
 default_toolbox = Toolbox()
-default_toolbox.session_auth = MainTool(cptools.session_auth)
+default_toolbox.session_auth = SessionAuthTool(cptools.session_auth)
 default_toolbox.proxy = Tool('before_request_body', cptools.proxy)
 default_toolbox.response_headers = Tool('on_start_resource', cptools.response_headers)
 # We can't call virtual_host in on_start_resource,
 default_toolbox.sessions = SessionTool('before_request_body', _sessions.init)
 default_toolbox.xmlrpc = XMLRPCTool()
 default_toolbox.wsgiapp = WSGIAppTool(_wsgiapp.run)
-default_toolbox.caching = CachingTool()
+default_toolbox.caching = CachingTool('before_main', _caching.get, 'caching')
 default_toolbox.expires = Tool('before_finalize', _caching.expires)
 default_toolbox.tidy = Tool('before_finalize', tidy.tidy)
 default_toolbox.nsgmls = Tool('before_finalize', tidy.nsgmls)

cherrypy/lib/cptools.py

 response_headers.failsafe = True
 
 
-_login_screen = """<html><body>
+class SessionAuth(object):
+    """Assert that the user is logged in."""
+    
+    session_key = "username"
+    
+    def check_login_and_password(self, login, password):
+        pass
+    
+    def anonymous(self):
+        """Provide a temporary user name for anonymous users."""
+        pass
+    
+    def load_user_by_username(self, username):
+        pass
+    
+    def on_login(self, login):
+        pass
+    
+    def on_logout(self, login):
+        pass
+    
+    def login_screen(self, from_page='..', login='', error_msg=''):
+        return """<html><body>
 Message: %(error_msg)s
 <form method="post" action="do_login">
     Login: <input type="text" name="login" value="%(login)s" size="10" /><br />
     <input type="hidden" name="from_page" value="%(from_page)s" /><br />
     <input type="submit" />
 </form>
-</body></html>"""
-
-def session_auth(check_login_and_password=None, not_logged_in=None,
-                 load_user_by_username=None, session_key='username',
-                 on_login=None, on_logout=None, login_screen=None):
-    """Assert that the user is logged in."""
+</body></html>""" % {'from_page': from_page, 'login': login,
+                     'error_msg': error_msg}
     
-    if login_screen is None:
-        login_screen = _login_screen
+    def do_login(self, login, password, from_page='..'):
+        """Login. May raise redirect, or return True if request handled."""
+        error_msg = self.check_login_and_password(login, password)
+        if error_msg:
+            body = self.login_screen(from_page, login, error_msg)
+            cherrypy.response.body = body
+            return True
+        else:
+            cherrypy.session[self.session_key] = login
+            self.on_login(login)
+            raise cherrypy.HTTPRedirect(from_page or "/")
     
-    request = cherrypy.request
-    tdata = cherrypy.thread_data
-    sess = cherrypy.session
-    request.user = None
-    tdata.user = None
+    def do_logout(self, from_page='..'):
+        """Logout. May raise redirect, or return True if request handled."""
+        sess = cherrypy.session
+        login = sess.get(self.session_key)
+        sess[self.session_key] = None
+        if login:
+            self.on_logout(login)
+        raise cherrypy.HTTPRedirect(from_page)
     
-    if request.path.endswith('login_screen'):
-        return False
-    elif request.path.endswith('do_logout'):
-        login = sess.get(session_key)
-        sess[session_key] = None
-        request.user = None
-        tdata.user = None
-        if login and on_logout:
-            on_logout(login)
-        from_page = request.params.get('from_page', '..')
-        raise cherrypy.HTTPRedirect(from_page)
-    elif request.path.endswith('do_login'):
-        from_page = request.params.get('from_page', '..')
-        login = request.params['login']
-        password = request.params['password']
-        error_msg = check_login_and_password(login, password)
-        if error_msg:
-            kw = {"from_page": from_page,
-                  "login": login, "error_msg": error_msg}
-            cherrypy.response.body = login_screen % kw
+    def check_user(self):
+        """Assert username. May raise redirect, or return True if request handled."""
+        sess = cherrypy.session
+        request = cherrypy.request
+        
+        username = sess.get(self.session_key)
+        if not username:
+            username = self.anonymous()
+        if not username:
+            cherrypy.response.body = self.login_screen(request.browser_url)
             return True
         
-        sess[session_key] = login
-        if on_login:
-            on_login(login)
-        raise cherrypy.HTTPRedirect(from_page or "/")
+        # Everything is OK: user is logged in
+        tdata = cherrypy.thread_data
+        if not tdata.user:
+            tdata.user = request.user = self.load_user_by_username(username)
     
-    # Check if user is logged in
-    temp_user = None
-    if (not sess.get(session_key)) and not_logged_in:
-        # Call not_logged_in so that applications where anonymous user
-        #   is OK can handle it
-        temp_user = not_logged_in()
-    if (not sess.get(session_key)) and not temp_user:
-        kw = {"from_page": request.browser_url, "login": "", "error_msg": ""}
-        cherrypy.response.body = login_screen % kw
-        return True
-    
-    # Everything is OK: user is logged in
-    if load_user_by_username and not tdata.user:
-        username = temp_user or sess[session_key]
-        request.user = load_user_by_username(username)
-        tdata.user = request.user
-    
-    return False
+    def run(self):
+        request = cherrypy.request
+        request.user = None
+        cherrypy.thread_data.user = None
+        
+        path = request.path
+        if path.endswith('login_screen'):
+            # pass and let the normal handler work
+            return self.login_screen()
+        elif path.endswith('do_login'):
+            return self.do_login(**request.params)
+        elif path.endswith('do_logout'):
+            return self.do_logout(**request.params)
+        else:
+            return self.check_user()
+
+
+def session_auth(**kwargs):
+    sa = SessionAuth()
+    for k, v in kwargs.iteritems():
+        setattr(sa, k, v)
+    return sa.run()
+
 
 def virtual_host(use_x_forwarded_host=True, **domains):
     """Redirect internally based on the Host header.

cherrypy/test/test_sessionauthenticate.py

 
 from cherrypy.test import helper
 
+
 class SessionAuthenticateTest(helper.CPWebCase):
     
     def testSessionAuthenticate(self):
         # request a page and check for login form
         self.getPage('/')
         self.assertInBody('<form method="post" action="do_login">')
-
+        
         # setup credentials
         login_body = 'login=login&password=password&from_page=/'
-
+        
         # attempt a login
         self.getPage('/do_login', method='POST', body=login_body)
-        self.assert_(self.status in ('302 Found', '303 See Other'))
-
+        self.assertStatus((302, 303))
+        
         # get the page now that we are logged in
         self.getPage('/', self.cookies)
         self.assertBody('Hi, you are logged in')
-
+        
         # do a logout
         self.getPage('/do_logout', self.cookies)
-        self.assert_(self.status in ('302 Found', '303 See Other'))
-
+        self.assertStatus((302, 303))
+        
         # verify we are logged out
         self.getPage('/', self.cookies)
         self.assertInBody('<form method="post" action="do_login">')
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.