Commits

almad  committed c01abbc

Refactored digestor and authenticator, preparing for model authentication.

  • Participants
  • Parent commits b20f50c

Comments (0)

Files changed (9)

File djangohttpdigest/__init__.py

 __version__ = (0, 1, 0, 'pre')
 __versionstr__ = '0.1.0-pre'
 
-from decorators import (
-    protect_digest
-)

File djangohttpdigest/authentication.py

 
 """
 Various authentication cases, used by decorators.
-"""
+"""
+
+__all__ = ("SimpleHardcodedAuthenticator")
+
+class Authenticator(object):
+    """ Authenticator """
+    
+    def __init__(self):
+        object.__init__(self)
+        
+        self.server_secret = None
+        self.a1 = None
+        self.a2 = None
+    
+    def get_a1(self, *args, **kwargs):
+        """ Get server's a1 hash for later comparison """
+        raise NotImplementedError("This (sub)class does not support this method, see another one.")
+    
+    def secret_passed(self, digestor):
+        """ Compare computed secret to secret from authentication backend.
+        If get_a1 was not called before, it's called with digestor as an argument.
+        Return bool whether it matches.
+        """
+        if not self.a1:
+            self.get_a1(digestor=digestor)
+        
+        assert self.a1 is not None
+        
+        client_secret = digestor.get_client_secret()
+        server_secret = digestor.get_server_secret(a1=self.a1)
+        
+        return client_secret == server_secret
+
+def check_hardcoded_authentication(parsed_header, method, path, params, realm, username, password):
+    """ Do information sent in header authenticates against given credentials? """
+    
+    return parsed_header['response'] == result_secret 
+
+class SimpleHardcodedAuthenticator(Authenticator):
+    """ Compares secret to explicitly given credentials """
+    
+    def __init__(self, server_realm, server_username, server_password):
+        Authenticator.__init__(self)
+        
+        self.server_realm = server_realm
+        self.server_username = server_username
+        self.server_password = server_password
+
+    def get_a1(self, digestor):
+        self.a1 = digestor.get_a1(realm=self.server_realm, username=self.server_username, password=self.server_password)
+        return self.a1 
+    
+
+class ModelAuthenticator(Authenticator):
+    def __init__(self, model, realm, realm_field, username_field, secret_field):
+         self.model = model
+         self.realm = realm
+         self.realm_field = realm_field
+         self.username_field = username_field
+         self.secret_field = secret_field
+    
+    
+    def get_a1(self, digestor):
+        try:
+            inst = model.objects.get(**{
+                self.realm_field : self.realm,
+                self.username_field : digestor.get_client_username()
+            })
+            self.a1 = getattr(inst, self.secret_field)
+            return self.a1
+        
+        except model.DoesNotExist:
+            raise ValueError()
+    

File djangohttpdigest/decorators.py

 from django.http import HttpResponseBadRequest
 
 from http import HttpResponseNotAuthorized
-from digest import Digestor, parse_authorization_header, check_hardcoded_authentication
+from digest import Digestor, parse_authorization_header
+from authentication import SimpleHardcodedAuthenticator
+
+__all__ = ("protect_digest", "protect_digest_model")
 
 def protect_digest(realm, username, password):
     def _innerDecorator(function):
         def _wrapper(request, *args, **kwargs):
             
+            digestor = Digestor(method=request.method, path=request.path, realm=realm)
+            
+            if request.META.has_key('HTTP_AUTHORIZATION'):
+                # successfull auth
+                if request.META['AUTH_TYPE'].lower() != 'digest':
+                    raise NotImplementedError("Only digest supported")
+                
+                try:
+                    parsed_header = digestor.parse_authorization_header(request.META['HTTP_AUTHORIZATION'])
+                except ValueError, err:
+                    return HttpResponseBadRequest(err)
+
+                authenticator = SimpleHardcodedAuthenticator(server_realm=realm, server_username=username, server_password=password)
+                
+                if authenticator.secret_passed(digestor):
+                    return function(request, *args, **kwargs)
+                
+            # nothing received, return challenge
+            response = HttpResponseNotAuthorized("Not Authorized")
+            response['www-authenticate'] = digestor.get_digest_challenge()
+            return response
+        return _wrapper
+    return _innerDecorator
+
+def protect_digest_model(model, realm_field='realm', username_field='username', secret_field='secret_field'):
+    def _innerDecorator(function):
+        def _wrapper(request, *args, **kwargs):
+            
             digestor = Digestor(realm=realm)
             
             if request.META.has_key('HTTP_AUTHORIZATION'):
                 # successfull auth
                 if request.META['AUTH_TYPE'].lower() != 'digest':
                     raise NotImplementedError("Only digest supported")
+                
                 try:
-                    parsed_header = parse_authorization_header(request.META['HTTP_AUTHORIZATION'])
+                    parsed_header = digestor.parse_authorization_header(request.META['HTTP_AUTHORIZATION'])
                 except ValueError, err:
                     return HttpResponseBadRequest(err)
+
+                authenticator = SimpleHardcodedAuthenticator(realm=realm, username=username, password=password)
                 
-                if check_hardcoded_authentication(parsed_header, request.method, request.path, request.GET.urlencode(), realm, username, password):
+                if authenticator.secret_passed():
                     return function(request, *args, **kwargs)
-            
+                
             # nothing received, return challenge
             response = HttpResponseNotAuthorized("Not Authorized")
             response['www-authenticate'] = digestor.get_digest_challenge()

File djangohttpdigest/digest.py

 from md5 import md5
 #from sha import sha
 
-class Digestor(object):
-    """ Main class for handling digest algorithms as described in RFC 2617 """
-    
-    # map string representation of algorithm to hash function
-    algorithm_implementation_map = {
-        'md5' : md5,
-#        'sha' : sha,
-    }
-    
-    def __init__(self, realm=None, qop=None, opaque=None, algorithm=None):
-        object.__init__(self)
-        
-        self.algorithm = algorithm or 'md5'
-        self.opaque = opaque or 'ToDoMoveThisToSettings'
-        self.qop = qop or 'auth'
-        self.realm = realm or None
-        
-        assert self.algorithm in self.algorithm_implementation_map
-    
-    def get_digest_challenge(self):
-        """ Return HTTP digest challenge, which has to be placed into www-authenticate header"""
-        
-        nonce = self.algorithm_implementation_map[self.algorithm]("%s:%s" % (time(), self.realm)).hexdigest()
-        
-        return 'Digest realm="%(realm)s", qop="%(qop)s", nonce="%(nonce)s", opaque="%(opaque)s"' % {
-            'realm' : self.realm,
-            'qop' : self.qop,
-            'nonce' : nonce,
-            'opaque' : self.opaque
-        }
+__all__ = ("Digestor", "parse_authorization_header", "check_credentials", "check_hardcoded_authentication")
 
 def parse_authorization_header(header):
     """ Parse requests authorization header into list.
         if not params.has_key(field):
             raise ValueError("Required field %s not found" % field)
 
-    # check for qop companions
-    # (RFC 2617, sect. 3.2.2)
+    # check for qop companions (sect. 3.2.2)
     if params.has_key("qop") and not params.has_key("cnonce") and params.has_key("cn"):
         raise ValueError("qop sent without cnonce and cn")
 
     return params
 
-def check_credentials(request):
-    """
-    Check if request contains credentials.
-    Raise HttpResponseBadRequest if malformed header was send.
-    """
-    if request.META.has_key('AUTHORIZATION'):
-        header = parse_authorization_header(request.meta['AUTHORIZATION'])
-    else:
-        return False
+class Digestor(object):
+    """ Main class for handling digest algorithms as described in RFC 2617 """
     
-def check_hardcoded_authentication(parsed_header, method, path, params, realm, username, password):
-    """ Do information sent in header authenticates against given credentials? """
-    assert parsed_header['qop'] == 'auth'
+    # map string representation of algorithm to hash function
+    algorithm_implementation_map = {
+        'md5' : md5,
+#        'sha' : sha,
+    }
     
-    # compute A1 according to RFC 2617, section 3.2.2.2
-    a1 = md5("%s:%s:%s" % (username, realm, password)).hexdigest()
-    # A2, according to section 3.2.2.3
-    a2 = md5("%s:%s" % (method,path)).hexdigest()
+    def __init__(self, method, path, realm=None, qop=None, opaque=None, algorithm=None, username=None, password=None):
+        object.__init__(self)
+        
+        self.method = method
+        self.path = path
+        self.algorithm = algorithm or 'md5'
+        self.opaque = opaque or 'ToDoMoveThisToSettings'
+        self.qop = qop or 'auth'
 
-    request = "%s:%s:%s:%s:%s" % (
-            parsed_header["nonce"],
-            parsed_header["nc"],
-            parsed_header["cnonce"],
-            parsed_header["qop"],
-            a2
-    )
+        self.realm = realm or None
+        
+        self.parsed_header = None
+        
+        assert self.algorithm in self.algorithm_implementation_map
     
-
-    result_secret = md5("%s:%s" % (a1, request)).hexdigest()
+    def get_a1(self, realm, username, password):
+        # compute A1 according to RFC 2617, section 3.2.2.2
+        return self.algorithm_implementation_map[self.algorithm]("%s:%s:%s" % (username, realm, password)).hexdigest()
     
-    return parsed_header['response'] == result_secret 
+    def get_digest_challenge(self):
+        """ Return HTTP digest challenge, which has to be placed into www-authenticate header"""
+        
+        nonce = self.algorithm_implementation_map[self.algorithm]("%s:%s" % (time(), self.realm)).hexdigest()
+        
+        return 'Digest realm="%(realm)s", qop="%(qop)s", nonce="%(nonce)s", opaque="%(opaque)s"' % {
+            'realm' : self.realm,
+            'qop' : self.qop,
+            'nonce' : nonce,
+            'opaque' : self.opaque
+        }
+    
+    def get_client_secret(self):
+        """ Get secret as computed by client """
+        return self.parsed_header['response']
+    
+    def get_server_secret(self, a1):
+        """ Compute server secret from provided, partially computed values """
+        assert 'auth' == self.parsed_header['qop']
+        
+        # A2, according to section 3.2.2.3
+        a2 = self.algorithm_implementation_map[self.algorithm]("%s:%s" % (self.method, self.path)).hexdigest()
+    
+        request_digest = "%s:%s:%s:%s:%s" % (
+                self.parsed_header["nonce"],
+                self.parsed_header["nc"],
+                self.parsed_header["cnonce"],
+                self.parsed_header["qop"],
+                a2
+        )
+        
+    
+        return self.algorithm_implementation_map[self.algorithm]("%s:%s" % (a1, request_digest)).hexdigest()
+    
+    def parse_authorization_header(self, header):
+        """ Provide wrap around parse_authorization_header function for those who like to have
+        everything with digest.
+        This also stores parsed header in instvar, so we must not passing it around.
+        """
+        self.parsed_header = parse_authorization_header(header)
+        return self.parsed_header
+    
+    def get_client_username(self, username):
+        return self.parsed_header['username']
+    
     

File djangohttpdigest/tests/test_digest.py

 from django.http import HttpRequest
 from django.core.handlers.wsgi import WSGIRequest
 
-from djangohttpdigest.digest import Digestor, check_credentials, parse_authorization_header
+from djangohttpdigest.digest import Digestor, parse_authorization_header
 
 class TestDigestor(TestCase):
     """ Test digestor, our wrapping class for handling digests """
     
     def setUp(self):
-        self.digestor = Digestor(realm='testrealm')
+        self.digestor = Digestor(realm='testrealm', method='GET', path='/testapi/simpleprotected/')
     
     def test_get_digest_challenge(self):
         challenge = self.digestor.get_digest_challenge()
         'SERVER_PROTOCOL':   'HTTP/1.1',
     }
     
-    def test_check_credentials(self):
-        """ Manually construct requests and check parse function behave correctly """
-        request = WSGIRequest(self.__class__.environment)
-        self.assertEquals(False, check_credentials(request))
-        
-        # bad authentication content
-        request.META['AUTHENTICATION'] = ''
-        
-        self.assertEquals(False, check_credentials(request))
-    
     def test_parse_authorization_header(self):
         """ Authorization header parsing, for various inputs """
         self.assertRaises(ValueError, lambda:parse_authorization_header(''))

File djangohttpdigest/tests/test_simple_digest.py

 #        response = client.get(self.path)
 #        self.assertEquals(200, response.status_code)
         
-
-    def test_autentization_compatible(self):
-        """ Check our server-side autentization is compatible with standard (urllib2) one """
-        
+    
+    def _check_authentication_compatibility(self, uri):
         auth_handler = urllib2.HTTPDigestAuthHandler()
         auth_handler.add_password('simple', self.url, 'username', 'password')
         opener = urllib2.build_opener(auth_handler)
             raise
         self.assertEquals(200, response.code)
         response.close()
+    
+    def test_autentization_compatible(self):
+        """ Check our server-side autentizations is compatible with standard (urllib2) one """
+        self._check_authentication_compatibility(uri='/testapi/simpleprotected/')
+        self._check_authentication_compatibility(uri='/testapi/modelprotected/')
 

File testproject/testapi/models.py

+from django.db import models
+
+class ClearTextModel(models.Model):
+    realm = models.CharField(max_length=30)
+    username = models.CharField(max_length=30)
+    password = models.CharField(max_length=30)
+    
+class ModelWithDefaultRealm(models.Model):
+    username = models.CharField(max_length=30)
+    secret = models.CharField(max_length=50)
+
+class ModelWithRealmSet(models.Model):
+    realm = models.CharField(max_length=30)
+    username = models.CharField(max_length=30)
+    secret = models.CharField(max_length=50)

File testproject/testapi/urls.py

 
 urlpatterns = patterns('testapi.views',
     url('simpleprotected', 'simpleprotected'),
+    url('modelprotected', 'modelprotected'),
 )

File testproject/testapi/views.py

 from django.http import HttpResponse
-from djangohttpdigest import protect_digest
+from djangohttpdigest.decorators import protect_digest, protect_digest_model
+
+from testapi.models import ModelWithRealmSet
 
 @protect_digest(realm='simple', username='username', password='password')
 def simpleprotected(request):
     view returns 401 on failure or for challenge, 200 with empty body
     on successfull authorization. 
     """
+    return HttpResponse('')
+
+@protect_digest_model(model=ModelWithRealmSet,
+      realm_field='realm',
+      username_field='username',
+      secret_field='secret_field'
+)
+def modelprotected(request):
+    """
+    Example of model-protected site.
+    """
     return HttpResponse('')