Commits

Thomas Waldmann committed f56b6d5

move ACL checks to protecting middleware

  • Participants
  • Parent commits 3c3c0f1

Comments (0)

Files changed (5)

File storage/middleware/_tests/test_indexing.py

 
 from config import NAME, SIZE, ITEMID, REVID, DATAID, HASH_ALGORITHM, CONTENT, COMMENT
 
-from ..indexing import IndexingMiddleware, AccessDenied
+from ..indexing import IndexingMiddleware
 
 from storage.backends.stores import MutableBackend
 from storage.stores.memory import BytesStore as MemoryBytesStore
         item = self.imw[item_name]
         r = item.store_revision(dict(name=item_name, acl=u'joe:read'), StringIO('public content'))
         revid_public = r.revid
-        item_name = u'secret'
-        item = self.imw[item_name]
-        r = item.store_revision(dict(name=item_name, acl=u''), StringIO('secret content'))
-        revid_secret = r.revid
         revids = [rev.revid for rev in self.imw.documents(all_revs=False)]
         assert revids == [revid_public]
 
         item = self.imw[item_name]
         r = item.store_revision(dict(name=item_name, acl=u'joe:read'), StringIO('public content'))
         revid_public = r.revid
-        item_name = u'secret'
-        item = self.imw[item_name]
-        r = item.store_revision(dict(name=item_name, acl=u'boss:read'), StringIO('secret content'))
-        revid_secret = r.revid
         # now testing:
         item_name = u'public'
         item = self.imw[item_name]
         r = item[revid_public]
         assert r.data.read() == 'public content'
-        item_name = u'secret'
-        item = self.imw[item_name]
-        with pytest.raises(AccessDenied):
-            r = item[revid_secret]
 
     def test_perf_create_only(self):
         pytest.skip("usually we do no performance tests")

File storage/middleware/_tests/test_protecting.py

+# Copyright: 2011 MoinMoin:ThomasWaldmann
+# License: GNU GPL v2 (or any later version), see LICENSE.txt for details.
+
+"""
+MoinMoin - protecting middleware tests
+"""
+
+
+from __future__ import absolute_import, division
+
+from StringIO import StringIO
+
+import pytest
+
+from config import ACL
+
+from ..protecting import ProtectingMiddleware, AccessDenied
+
+from .test_indexing import TestIndexingMiddleware
+
+
+class TestProtectingMiddleware(TestIndexingMiddleware):
+    def setup_method(self, method):
+        super(TestProtectingMiddleware, self).setup_method(method)
+        self.imw = ProtectingMiddleware(self.imw, user_name=u'joe')
+
+    def teardown_method(self, method):
+        self.imw = self.imw.indexer
+        super(TestProtectingMiddleware, self).teardown_method(method)
+
+    def _dummy(self):
+        # replacement for tests that use unsupported methods / attributes
+        pass
+
+    test_index_rebuild = _dummy
+    test_index_update = _dummy
+    test_indexed_content = _dummy
+
+    def test_documents(self):
+        item_name = u'public'
+        item = self.imw[item_name]
+        r = item.store_revision(dict(name=item_name, acl=u'joe:read'), StringIO('public content'))
+        revid_public = r.revid
+        item_name = u'secret'
+        item = self.imw[item_name]
+        r = item.store_revision(dict(name=item_name, acl=u''), StringIO('secret content'))
+        revid_secret = r.revid
+        revids = [rev.revid for rev in self.imw.documents(all_revs=False)]
+        assert revids == [revid_public]  # without revid_secret!
+
+    def test_getitem(self):
+        item_name = u'public'
+        item = self.imw[item_name]
+        r = item.store_revision(dict(name=item_name, acl=u'joe:read'), StringIO('public content'))
+        revid_public = r.revid
+        item_name = u'secret'
+        item = self.imw[item_name]
+        r = item.store_revision(dict(name=item_name, acl=u'boss:read'), StringIO('secret content'))
+        revid_secret = r.revid
+        # now testing:
+        item_name = u'public'
+        item = self.imw[item_name]
+        r = item[revid_public]
+        assert r.data.read() == 'public content'
+        item_name = u'secret'
+        item = self.imw[item_name]
+        with pytest.raises(AccessDenied):
+            r = item[revid_secret]
+

File storage/middleware/_tests/test_serialization.py

 
 from StringIO import StringIO
 
-from ..indexing import IndexingMiddleware, AccessDenied
+from ..indexing import IndexingMiddleware
 from ..serialization import serialize, deserialize
 
 from storage.backends.stores import MutableBackend

File storage/middleware/indexing.py

 * selecting
 * listing
 
-We also check ACLs here. Index has ALL content, so we must be careful not
-to show data from index to a user that is not allowed to read that data.
-
 Using Whoosh (a fast pure-Python indexing and search library), we build,
 maintain and use 2 indexes:
 
 Many methods provided by the indexing middleware will be fast, because they
 will not access the layers below (like the backend), but just the index files,
 usually it is even just the small and thus quick latest-revs index.
-
-Indexing Middleware also checks ACLs, so a user will not see items in search
-results that he is not allowed to read. Also, trying to access a revision
-without read permission will give an AccessDenied exception.
 """
 
 
                 doc = hit.fields()
                 latest_doc = not all_revs and doc or None
                 item = Item(self, user_name=self.user_name, latest_doc=latest_doc, itemid=doc[ITEMID])
-                if item.allows('read'):
-                    yield item[doc[REVID]]
+                yield item[doc[REVID]]
 
     def search_page(self, q, all_revs=False, pagenum=1, pagelen=10, **kw):
         """
                 doc = hit.fields()
                 latest_doc = not all_revs and doc or None
                 item = Item(self, user_name=self.user_name, latest_doc=latest_doc, itemid=doc[ITEMID])
-                if item.allows('read'):
-                    yield item[doc[REVID]]
+                yield item[doc[REVID]]
 
     def documents(self, all_revs=False, **kw):
         """
         for doc in self._documents(all_revs, **kw):
             latest_doc = not all_revs and doc or None
             item = Item(self, user_name=self.user_name, latest_doc=latest_doc, itemid=doc[ITEMID])
-            if item.allows('read'):
-                yield item[doc[REVID]]
+            yield item[doc[REVID]]
 
     def _documents(self, all_revs=False, **kw):
         """
-        Yield documents matching the kw args (internal use only, no ACL checks).
+        Yield documents matching the kw args (internal use only).
         """
         with self.get_index(all_revs).searcher() as searcher:
             # Note: callers must consume everything we yield, so the for loop
         if doc:
             latest_doc = not all_revs and doc or None
             item = Item(self, user_name=self.user_name, latest_doc=latest_doc, itemid=doc[ITEMID])
-            if item.allows('read'):
-                return item[doc[REVID]]
+            return item[doc[REVID]]
 
     def _document(self, all_revs=False, **kw):
         """
-        Return a document matching the kw args (internal use only, no ACL checks).
+        Return a document matching the kw args (internal use only).
         """
         with self.get_index(all_revs).searcher() as searcher:
             return searcher.document(**kw)
         return Item.existing(self, user_name=self.user_name, **query)
 
 
-class AccessDenied(Exception):
-    """
-    raised when a user is denied access to an Item or Revision by ACL.
-    """
-
-
 class Item(object):
     def __init__(self, indexer, user_name=None, latest_doc=None, **query):
         """
         self._current[ITEMID] = value
     itemid = property(_get_itemid, _set_itemid)
 
+    @property
+    def acl(self):
+        return self._current.get(ACL)
+
     @classmethod
     def create(cls, indexer, user_name=None, **query):
         """
         """
         return self.itemid is not None
 
-    def allows(self, capability):
-        # TODO: this is just a temporary hack to be able to test this without real ACL code,
-        # replace it by a sane one later.
-        # e.g. acl = "joe:read"  --> user joe may read
-        if not self.indexer.acl_support:
-            return True
-        acl = self._current.get(ACL)
-        user_name = self.user_name
-        if acl is None or user_name is None:
-            allow = True
-        else:
-            allow = "%s:%s" % (user_name, capability) in acl
-        #print "item allows user '%s' to '%s' (acl: %s): %s" % (user_name, capability, acl, ["no", "yes"][allow])
-        return allow
-
-    def require(self, capability):
-        if not self.allows(capability):
-            raise AccessDenied("item does not allow user '%r' to '%r'" % (self.user_name, capability))
-
     def iter_revs(self):
         """
         Iterate over Revisions belonging to this item.
         """
         Get Revision with revision id <revid>.
         """
-        self.require('read')
         rev = Revision(self, revid)
         rev.data # XXX trigger KeyError if rev does not exist
         return rev
         :param overwrite: if True, allow overwriting of existing revs.
         :returns: a Revision instance of the just created revision
         """
-        self.require('write')
         if self.itemid is None:
-            self.require('create')
             self.itemid = make_uuid()
         backend = self.backend
-        if overwrite:
-            self.require('overwrite')
-        else:
+        if not overwrite:
             revid = meta.get(REVID)
             if revid is not None and revid in backend:
-                raise AccessDenied('need overwrite flag to overwrite existing revisions')
+                raise ValueError('need overwrite=True to overwrite existing revisions')
         meta[ITEMID] = self.itemid
         revid = backend.store(meta, data)
         data.seek(0)  # rewind file
         """
         Destroy revision <revid>.
         """
-        self.require('destroy')
         self.backend.remove(revid)
         self.indexer.remove_revision(revid)
 

File storage/middleware/protecting.py

+# Copyright: 2011 MoinMoin:ThomasWaldmann
+# License: GNU GPL v2 (or any later version), see LICENSE.txt for details.
+
+"""
+MoinMoin - protecting middleware
+
+This checks ACLs (access control lists), so a user will not be able to do
+operations without the respective permissions.
+
+Note: for method / attribute docs, please see the same methods / attributes in
+      IndexingMiddleware class.
+"""
+
+
+from __future__ import absolute_import, division
+
+import logging
+
+from config import ACL
+
+
+class AccessDenied(Exception):
+    """
+    raised when a user is denied access to an Item or Revision by ACL.
+    """
+
+
+class ProtectingMiddleware(object):
+    def __init__(self, indexer, user_name):
+        """
+        :param indexer: indexing middleware instance
+        :param user_name: the user's name (used for checking permissions)
+        """
+        self.indexer = indexer
+        self.user_name = user_name
+
+    def search(self, q, all_revs=False, **kw):
+        for rev in self.indexer.search(q, all_revs, **kw):
+            rev = ProtectedRevision(self, rev)
+            if rev.allows('read'):
+                yield rev
+
+    def search_page(self, q, all_revs=False, pagenum=1, pagelen=10, **kw):
+        for rev in self.indexer.search_page(q, all_revs, pagenum, pagelen, **kw):
+            rev = ProtectedRevision(self, rev)
+            if rev.allows('read'):
+                yield rev
+
+    def documents(self, all_revs=False, **kw):
+        for rev in self.indexer.documents(all_revs, **kw):
+            rev = ProtectedRevision(self, rev)
+            if rev.allows('read'):
+                yield rev
+
+    def document(self, all_revs=False, **kw):
+        rev = self.indexer.document(all_revs, **kw)
+        if rev:
+            rev = ProtectedRevision(self, rev)
+            if rev.allows('read'):
+                return rev
+
+    def __getitem__(self, name):
+        item = self.indexer[name]
+        return ProtectedItem(self, item)
+
+    def get_item(self, **query):
+        item = self.indexer.get_item(**query)
+        return ProtectedItem(self, item)
+
+    def create_item(self, **query):
+        item = self.indexer.create_item(**query)
+        return ProtectedItem(self, item)
+
+    def existing_item(self, **query):
+        item = self.indexer.existing_item(**query)
+        return ProtectedItem(self, item)
+
+
+class ProtectedItem(object):
+    def __init__(self, protector, item):
+        """
+        :param protector: protector middleware
+        :param item: item to protect
+        """
+        self.protector = protector
+        self.item = item
+
+    def _get_itemid(self):
+        return self.item.itemid
+    def _set_itemid(self, value):
+        self.item.itemid = value
+    itemid = property(_get_itemid, _set_itemid)
+
+    def __nonzero__(self):
+        return bool(self.item)
+
+    def allows(self, capability):
+        """
+        check latest ACL whether capability is allowed
+        """
+        # TODO: this is just a temporary hack to be able to test this without real ACL code,
+        # replace it by a sane one later.
+        # e.g. acl = "joe:read"  --> user joe may read
+        acl = self.item.acl
+        user_name = self.protector.user_name
+        if acl is None or user_name is None:
+            allow = True
+        else:
+            allow = "%s:%s" % (user_name, capability) in acl
+        #print "item allows user '%s' to '%s' (acl: %s): %s" % (user_name, capability, acl, ["no", "yes"][allow])
+        return allow
+
+    def require(self, capability):
+        if not self.allows(capability):
+            raise AccessDenied("item does not allow user '%r' to '%r'" % (self.protector.user_name, capability))
+
+    def iter_revs(self):
+        if self:
+            for rev in self.item.iter_revs():
+                yield ProtectedRevision(self.protector, rev, p_item=self)
+
+    def __getitem__(self, revid):
+        self.require('read')
+        rev = self.item[revid]
+        return ProtectedRevision(self.protector, rev, p_item=self)
+
+    def get_revision(self, revid):
+        return self[revid]
+
+    def store_revision(self, meta, data, overwrite=False):
+        self.require('write')
+        if not self:
+            self.require('create')
+        if overwrite:
+            self.require('overwrite')
+        rev = self.item.store_revision(meta, data, overwrite=overwrite)
+        return ProtectedRevision(self.protector, rev, p_item=self)
+
+    def store_all_revisions(self, meta, data):
+        self.require('overwrite')
+        self.item.store_all_revisions(meta, data)
+
+    def destroy_revision(self, revid):
+        self.require('destroy')
+        self.item.destroy_revision(revid)
+
+    def destroy_all_revisions(self):
+        for rev in self.item.iter_revs():
+            self.destroy_revision(rev.revid)
+
+
+class ProtectedRevision(object):
+    def __init__(self, protector, rev, p_item=None):
+        """
+        :param protector: Protector middleware
+        :param rev: Revision to protect
+        :param p_item: instance of ProtectedItem for rev.item (optional)
+        """
+        self.protector = protector
+        self.rev = rev
+        self.item = p_item or ProtectedItem(protector, rev.item)
+
+    def allows(self, capability):
+        # to check allowance for a revision, we always ask the item
+        return self.item.allows(capability)
+
+    def require(self, capability):
+        if not self.allows(capability):
+            raise AccessDenied("revision does not allow '%r'" % (capability, ))
+
+    @property
+    def revid(self):
+        return self.rev.revid
+
+    @property
+    def meta(self):
+        self.require('read')
+        return self.rev.meta
+
+    @property
+    def data(self):
+        self.require('read')
+        return self.rev.data
+
+    def close(self):
+        self.rev.close()
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_value, exc_tb):
+        self.close()
+
+    def __cmp__(self, other):
+        return cmp(self.meta, other.meta)
+