Commits

Anonymous committed b86e519

Introduced check_authority methods where needed.

Comments (0)

Files changed (3)

kay/generics/rest.py

                 model_handler = self.get_model_handler(model_name)
                 if(not model_handler):
                     return None
+                self.check_authority(request, OP_LIST, obj=None,
+                                     model_name=model_name, prop_name=None)
                 doc = impl.createDocument(XSD_NS, XSD_SCHEMA_NAME, None)
                 doc.documentElement.attributes[XSD_ATTR_XMLNS] = XSD_NS
                 model_handler.write_xsd_metadata(doc.documentElement,
                                                  model_name)
             else:
+                self.check_authority(request, OP_LIST, obj=None,
+                                     model_name=None, prop_name=None)
                 doc = impl.createDocument(None, TYPES_EL_NAME, None)
                 types_el = doc.documentElement
                 for model_name in self.model_handlers.iterkeys():
     def prop(self, request, model_name=None, key=None, prop=None):
         model_handler = self.get_model_handler(model_name)
         model = model_handler.get(key)
+        self.check_authority(request, OP_SHOW, obj=model,
+                             model_name=model_name, prop_name=prop)
         prop_handler = model_handler.get_property_handler(prop)
         prop_value = prop_handler.get_value(model)
         content_type = request.accept_mimetypes.best
         if real_method == "GET":
             model_handler = self.get_model_handler(model_name)
             model = model_handler.get(key)
+            self.check_authority(request, OP_SHOW, obj=model,
+                                 model_name=model_name, prop_name=None)
             if model is None:
                 return NotFound()
             return self.out_to_response(
             return self.update_impl(request, model_name, key, True)
         elif real_method == "DELETE":
             model_handler = self.get_model_handler(model_name)
+            model = model_handler.get(key)
+            self.check_authority(request, OP_DELETE, obj=model,
+                                 model_name=model_name, prop_name=None)
             try:
                 db.delete(db.Key(key))
                 return Response("OK")
         if real_method == "POST":
             return self.update_impl(request, model_name, None, False)
         elif real_method == "GET":
+            self.check_authority(request, OP_LIST, obj=None,
+                                 model_name=model_name, prop_name=None)
             return self.get_all_impl(request, model_name)
 
     def get_all_impl(self, request, model_name):
             finally:
                 doc.unlink()
 
+        if not model_key:
+            self.check_authority(request, OP_CREATE, obj=None,
+                                 model_name=model_name, prop_name=None)
+        else:
+            for model in models:
+                self.check_authority(request, OP_UPDATE, obj=model,
+                                     model_name=model_name, prop_name=None)
         db.put(models)
 
         # if input was not a list, convert single element models list

kay/tests/rest_test.py

+
+import logging
 
 from werkzeug import (
   BaseResponse, Request
 from kay.utils import url_for
 
 from kay.tests.restapp.models import RestModel
+logging.getLogger().setLevel(logging.ERROR)
 
 class RestTestCase(GAETestBase):
   KIND_NAME_UNSWAPPED = False
   CLEANUP_USED_KIND = True
 
   def setUp(self):
+    try:
+      self.original_user = os.environ['USER_EMAIL']
+      self.original_is_admin = os.environ['USER_IS_ADMIN']
+      del os.environ['USER_EMAIL']
+      del os.environ['USER_IS_ADMIN']
+    except Exception:
+      pass
     s = LazySettings(settings_module='kay.tests.rest_settings')
     app = get_application(settings=s)
     self.client = Client(app, BaseResponse)
 
   def tearDown(self):
-    pass
+    if hasattr(self, "original_user"):
+      os.environ["USER_EMAIL"] = self.original_user
+    if hasattr(self, "original_is_admin"):
+      os.environ["USER_IS_ADMIN"] = self.original_is_admin
 
   def test_rest_json(self):
 
     headers = Headers({"Accept": "application/json"})
 
     response = self.client.get('/rest/metadata', headers=headers)
+    self.assertEqual(response.status_code, 403)
+
+    self.client.test_login(email="test@example.com")
+    response = self.client.get('/rest/metadata', headers=headers)
+    self.assertEqual(response.status_code, 403)
+
+    self.client.test_login(email="test@example.com", is_admin="1")
+    response = self.client.get('/rest/metadata', headers=headers)
     self.assertEqual(response.status_code, 200)
 
+    self.client.test_logout()
+    response = self.client.get('/rest/metadata/RestModel', headers=headers)
+    self.assertEqual(response.status_code, 403)
+
+    self.client.test_login(email="test@example.com")
+    response = self.client.get('/rest/metadata/RestModel', headers=headers)
+    self.assertEqual(response.status_code, 403)
+
+    self.client.test_login(email="test@example.com", is_admin="1")
     response = self.client.get('/rest/metadata/RestModel', headers=headers)
     self.assertEqual(response.status_code, 200)
 
+
+    self.client.test_logout()
+    response = self.client.post(
+      '/rest/RestModel',
+      data='{"RestModel": {"i_prop": 12, "s_prop": "string"}}',
+      content_type="application/json; charset=utf-8")
+    self.assertEqual(response.status_code, 403)
+
+    self.client.test_login(email="test@example.com")
+    response = self.client.post(
+      '/rest/RestModel',
+      data='{"RestModel": {"i_prop": 12, "s_prop": "string"}}',
+      content_type="application/json; charset=utf-8")
+    self.assertEqual(response.status_code, 403)
+
+    self.client.test_login(email="test@example.com", is_admin="1")
     response = self.client.post(
       '/rest/RestModel',
       data='{"RestModel": {"i_prop": 12, "s_prop": "string"}}',
       content_type="application/json; charset=utf-8")
     self.assertEqual(response.status_code, 200)
+
     key = response.data
     elm = RestModel.get(key)
     self.assertEqual(elm.s_prop, "string")
     self.assertEqual(elm.i_prop, 12)
 
+    self.client.test_logout()
+    response = self.client.post(
+      '/rest/RestModel/%s' % key,
+      data='{"RestModel": {"i_prop": 14}}',
+      content_type="application/json; charset=utf-8")
+    self.assertEqual(response.status_code, 403)
+
+    self.client.test_login(email="test@example.com")
+    response = self.client.post(
+      '/rest/RestModel/%s' % key,
+      data='{"RestModel": {"i_prop": 14}}',
+      content_type="application/json; charset=utf-8")
+    self.assertEqual(response.status_code, 403)
+
+    self.client.test_login(email="test@example.com", is_admin="1")
     response = self.client.post(
       '/rest/RestModel/%s' % key,
       data='{"RestModel": {"i_prop": 14}}',
       content_type="application/json; charset=utf-8")
     self.assertEqual(response.status_code, 200)
+
     key2 = response.data
     self.assertEqual(key, key2)
     elm = RestModel.get(key)
     self.assertEqual(response.status_code, 200)
     self.assertEqual(response.data, "14")
 
+    self.client.test_logout()
+    response = self.client.delete('/rest/RestModel/%s' % key,
+                                  headers=headers)
+    self.assertEqual(response.status_code, 403)
+
+    self.client.test_login(email="test@example.com")
+    response = self.client.delete('/rest/RestModel/%s' % key,
+                                  headers=headers)
+    self.assertEqual(response.status_code, 403)
+
+    self.client.test_login(email="test@example.com", is_admin="1")
     response = self.client.delete('/rest/RestModel/%s' % key,
                                   headers=headers)
     self.assertEqual(response.status_code, 200)
 
+
     response = self.client.get('/rest/RestModel/%s' % key,
                                headers=headers)
     self.assertEqual(response.status_code, 404)
 
 
   def test_rest_operations(self):
+    self.client.test_login(email="test@example.com", is_admin="1")
     response = self.client.get('/rest/metadata')
     self.assertEqual(response.status_code, 200)
 

kay/tests/restapp/urls.py

 """
 
 from kay.generics.rest import RESTViewGroup
+from kay.generics import admin_required
 
 class MyRESTViewGroup(RESTViewGroup):
   models = [
     'kay.tests.restapp.models.RestModel',
   ]
+  authorize = admin_required
 
 view_groups = [
   MyRESTViewGroup(),