Commits

Colin Copeland committed c370bfe

basic secure functionality

Comments (0)

Files changed (4)

+from django.contrib import admin
+
+from perms import models as perms
+
+
+class ObjectPermissionAdmin(admin.ModelAdmin):
+    raw_id_fields = ('user',)
+    list_display = ('user', 'action', 'content_type', 'object_id')
+    list_filter = ('action', 'content_type')
+
+admin.site.register(perms.ObjectPermission, ObjectPermissionAdmin)
+from django.db import models
+from django.contrib.contenttypes.models import ContentType
+
+
+class SecureQuerySet(models.query.QuerySet):
+    def secure(self, user, perm):
+        clone = self._clone()
+        perms_class = models.get_model('perms', 'ObjectPermission')
+        content_type = ContentType.objects.get_for_model(self.model)
+        data = {
+            'obj_table': clone.model._meta.db_table,
+            'obj_column': clone.model._meta.pk.column,
+            'perm_table': perms_class._meta.db_table,
+            'content_type_id': content_type.pk,
+        }
+        where = [
+            # user constraint
+            '%(perm_table)s.user_id = ' % data + '%d',
+            # content_type constraint
+            "%(perm_table)s.content_type_id = %(content_type_id)s" % data,
+            # object id constraint
+            "%(obj_table)s.%(obj_column)s = %(perm_table)s.object_id" % data,
+        ]
+        return clone.extra(tables=[data['perm_table']], where=where,
+                           params=[user.pk])
+
+
+class SecureManager(models.Manager):
+    def secure(self, user, perm):
+        return SecureQuerySet(model=self.model).secure(user, perm)
 from django.contrib.contenttypes.models import ContentType
 from django.core.exceptions import ValidationError
 
+from perms.managers import SecureManager
+
 
 PERMISSIONS = {
     '__all__': ('view', 'change', 'delete'),
     'decision': ('mark_as_finished',)
 }
 
+class ObjectPermission(models.Model):
 
-
-class ObjectPermission(models.Model):
-  
     user = models.ForeignKey(User)
-    action = models.CharField(max_length=32,)
+    action = models.CharField(max_length=32)
     content_type = models.ForeignKey(ContentType)
     object_id = models.PositiveIntegerField()
-    
+
     def clean(self):
         """
         Test that Perms exist in the PERMISSIONS dictionary for content_type
         """
         object_model =  self.content_type.model
-        if PERMISSIONS.has_key(object_model):
-            perms = PERMISSIONS['__all__'] + PERMISSIONS['decision']
+        if object_model in PERMISSIONS:
+            perms = []
+            for model, actions in PERMISSIONS.iteritems():
+                perms.extend(actions)
         else:
             perms = PERMISSIONS['__all__']
         if not self.action in perms:
-            raise ValidationError(
-                'This permission is not avialable for content type.'
-            )
+            raise ValidationError('%s is not an avialable action for %s.' % (self.action, object_model))
         """
         Test the existance of the object id
         """
         try:
             obj_model.objects.get(pk=self.object_id)
         except obj_model.DoesNotExist:
-            raise ValidationError(
-            'No object with this id exists for the model: %s' % object_model
-            )
-        
-        
+            raise ValidationError('No object with this id exists for the model: %s' % object_model)
+
+    def __unicode__(self):
+        return "%s %s (%d) for %s" % (self.content_type, self.action,
+                                      self.object_id, self.user)
+
+
+class TestMessage(models.Model):
+    body = models.TextField()
+    objects = SecureManager()
 from django.test import TestCase
 from django.contrib.auth.models import User
 from django.contrib.contenttypes.models import ContentType
+from django.core.exceptions import ValidationError
 
-from perms.models import ObjectPermission
-from django.core.exceptions import ValidationError
+from perms.models import ObjectPermission, TestMessage
+
 
 class SimpleTest(TestCase):
     def setUp(self):
-        self.user = User.objects.create_user('test', 'test@abc.com', 'abc')
+        self.user = User.objects.create_user('user', 'test@abc.com', 'abc')
         self.ct = ContentType.objects.get_for_model(self.user)
     
-    def add_perm(self, name):
+    def add_perm(self, name, obj=None):
+        if obj:
+            content_type = ContentType.objects.get_for_model(obj)
+            object_id = obj.pk
+        else:
+            content_type = self.ct
+            object_id = self.user.pk
         op = ObjectPermission(
             user=self.user,
             action=name,
-            content_type=self.ct,
-            object_id=self.user.id,
+            content_type=content_type,
+            object_id=object_id,
         )
         op.clean()
         return op.save()
-    
+
     def test_basic_addition(self):
         self.assertFalse(self.user.has_perm('view', self.user))
         self.add_perm('view')
         self.assertTrue(self.user.has_perm('view', self.user))
-        
+
     def test_bad_perm_addition(self):
         self.assertFalse(self.user.has_perm('no_go', self.user))
         try:
         except ValidationError:
             pass
         self.assertFalse(self.user.has_perm('no_go', self.user))
-        
+
     def test_bad_objectid_addition(self):
         op = ObjectPermission(
             user=self.user,
         except ValidationError:
             pass
         self.assertFalse(self.user.has_perm('view', self.user))
+
+    def test_secure(self):
+        message = TestMessage.objects.create(body='test')
+        # no access
+        result = list(TestMessage.objects.secure(self.user, 'view'))
+        self.assertEqual(len(result), 0)
+        # access
+        self.add_perm('view', message)
+        perm = ObjectPermission.objects.all()[0]
+        result = list(TestMessage.objects.secure(self.user, 'view'))
+        self.assertEqual(len(result), 1)