Commits

Cmed Technology  committed 46ca428 Draft
  • Participants
  • Parent commits 2b68b3c

Comments (0)

Files changed (4)

File restapiblueprint/blueprints/private.py

 blueprint = Blueprint(__name__, __name__)
 
 
-@blueprint.route('', methods=['GET'])
+def dumb_get_consumer_secret(key):
+    """Return secret for given key or raise KeyError."""
+    key_to_secret = {'akey': 'asecret'}
+    return key_to_secret[key]
+
+
+def dumb_check_have_capability(key, capabilities):
+    """Return True if key has one of the given capabilities."""
+    key_to_capabilities = {'akey': ['something']}
+    capabilities_for_key = key_to_capabilities.get(key, [])
+    for capability in capabilities_for_key:
+        if capability in capabilities:
+            return True
+    return False
+
+
+def access(*capabilities):
+    """Return a function to check authentication and capability."""
+    capabilities = set(capabilities)
+
+    def check_access_with_capabilities(*args):
+        key = oauth.verify(
+            dumb_get_consumer_secret,
+            request.url, request.method, request.headers)
+        if key is None:
+            return make_error(
+                'Failed to authenticate', 401,
+                additional_headers={'WWW-Authenticate': 'OAuth'})
+        else:
+            if not dumb_check_have_capability(key, capabilities):
+                return make_error('Insufficient privileges to access', 403)
+    return check_access_with_capabilities
+
+
+@blueprint.route('', methods=['GET', 'POST'])
 @http_method_dispatcher
 class Private(object):
 
-    def dumb_get_consumer_secret(self, key):
-        if key == 'akey':
-            return 'asecret'
-        raise KeyError
-
-    def access(self):
-        if not oauth.verify(
-                self.dumb_get_consumer_secret,
-                request.url, request.method, request.headers):
-            return make_error(
-                'Unauthorized', 401,
-                additional_headers={'WWW-Authenticate': 'OAuth'})
-
-    @check(access)
+    @check(access('something', 'somethingelse'))
     def get(self):
         return make_ok()
+
+    @check(access())
+    def post(self):
+        return make_ok()

File restapiblueprint/features/oauth.feature

     Given I use header OAuth with key="wrong_akey" and secret="asecret"
     When I send a GET request to "private"
     Then the response status should be "401"
+
+  Scenario: Cannot access a restricted URL with right key but wrong capabilities
+    Given I use header OAuth with key="akey" and secret="asecret"
+    When I send a POST request to "private"
+    Then the response status should be "403"

File restapiblueprint/lib/oauth.py

     The oauth parameters can be in the "Authorization" header (found in headers)
     or in the URL parameters.
 
+    Return the consumer secret key if verified ok, otherwise returns None. This
+    is because the Python caller is very likely to go on to check that the
+    verified HTTP caller is allowed to perform the action.
+
     """
     # We will build a set of all parameters...
     parameters = {}
         secret = get_secret(key)
         consumer = oauth2.Consumer(key, secret)
     except KeyError:
-        return False  # Missing consumer key or secret
+        return None  # Missing consumer key or secret
 
     # Create the oauth Request.
     (scheme, netloc, path, params, query, fragment) = urlparse.urlparse(url)
     # Verify signature.
     try:
         oauth_server.verify_request(req, consumer, None)
-        return True
+        return key
     except oauth2.Error:
-        return False
+        return None
 
 
 def get_HMAC_SHA1_signature_as_url(key, secret, url, http_method):

File restapiblueprint/lib/oauth_test.py

     def test_get_HMAC_SHA1_signature_as_url(self):
         url = oauth.get_HMAC_SHA1_signature_as_url(
             KEY, SECRET, 'http://a.b.c', 'GET')
-        self.assertTrue(oauth.verify(_test_get_secret, url, 'GET', {}))
+        self.assertEqual(oauth.verify(_test_get_secret, url, 'GET', {}), KEY)
+
+    def test_get_HMAC_SHA1_signature_as_url_fail(self):
+        url = oauth.get_HMAC_SHA1_signature_as_url(
+            KEY, SECRET, 'http://a.b.c', 'GET')
+        self.assertEqual(
+            oauth.verify(_test_get_secret, url, 'POST', {}), None)
 
     def test_get_HMAC_SHA1_signature_as_url_with_query(self):
         url = oauth.get_HMAC_SHA1_signature_as_url(
             KEY, SECRET, 'http://a.b.c?name=tim', 'GET')
-        self.assertTrue(oauth.verify(_test_get_secret, url, 'GET', {}))
+        self.assertEqual(oauth.verify(_test_get_secret, url, 'GET', {}), KEY)
 
     def test_get_HMAC_SHA1_signature_as_header(self):
         url = 'http://a.b.c'
         header = oauth.get_HMAC_SHA1_signature_as_header(
             KEY, SECRET, url, 'GET')
-        self.assertTrue(oauth.verify(_test_get_secret, url, 'GET', header))
+        self.assertEqual(
+            oauth.verify(_test_get_secret, url, 'GET', header), KEY)
+
+    def test_get_HMAC_SHA1_signature_as_header_fail(self):
+        url = 'http://a.b.c'
+        header = oauth.get_HMAC_SHA1_signature_as_header(
+            KEY, SECRET, url, 'GET')
+        self.assertEqual(
+            oauth.verify(_test_get_secret, 'http://x.y.z', 'GET', header), None)
 
     def test_get_HMAC_SHA1_signature_as_header_with_query(self):
         url = 'http://a.b.c?name=tim'
         header = oauth.get_HMAC_SHA1_signature_as_header(
             KEY, SECRET, url, 'GET')
-        self.assertTrue(oauth.verify(_test_get_secret, url, 'GET', header))
+        self.assertEqual(
+            oauth.verify(_test_get_secret, url, 'GET', header), KEY)
 
     def test_against_google_example(self):
         # Confirm that we create the same signature as provided in Appendix A of