Thomas Johansson avatar Thomas Johansson committed f156b38

Add support for two-legged OAuth.

Comments (0)

Files changed (6)

piston/authentication/oauth/__init__.py

 
 
 class OAuthAuthentication(object):
-    def __init__(self, realm='API'):
+    def __init__(self, realm='API', two_legged=False):
         self.realm = realm
+        self.two_legged = two_legged
 
     def is_authenticated(self, request):
         oauth_request = get_oauth_request(request)
 
+        if self.two_legged:
+            return self._authenticate_two_legged(request, oauth_request)
+        else:
+            return self._authenticate_three_legged(request, oauth_request)
+
+    def _authenticate_two_legged(self, request, oauth_request):
+        missing_params = require_params(oauth_request)
+        if missing_params is not None:
+            return False
+
+        try:
+            consumer = store.get_consumer(request, oauth_request, oauth_request['oauth_consumer_key'])
+        except InvalidConsumerError:
+            return False
+
+        if not verify_oauth_request(request, oauth_request, consumer):
+            return False
+
+        request.user = store.get_user_for_consumer(request, oauth_request, consumer)
+        request.consumer = consumer
+        request.throttle_extra = consumer.key
+
+        return True
+        
+    def _authenticate_three_legged(self, request, oauth_request):
         missing_params = require_params(oauth_request, ('oauth_token',))
         if missing_params is not None:
             return False

piston/authentication/oauth/store/__init__.py

         """
         raise NotImplementedError
 
+    def get_user_for_consumer(self, request, oauth_request, consumer):
+        """
+        Return the associated User for `consumer`.
+        
+        `request`: The Django request object.
+        `oauth_request`: The `oauth2.Request` object.
+        `consumer`: The Consumer that made the request.
+        """
+        raise NotImplementedError
+
     def check_nonce(self, request, oauth_request, nonce):
         """
         Return `True` if the nonce has not yet been used, `False` otherwise.

piston/authentication/oauth/store/db.py

         try:
             return Consumer.objects.get(key=consumer_key)
         except Consumer.DoesNotExist:
-            raise InvalidConsumer()
+            raise InvalidConsumerError()
 
     def get_consumer_for_request_token(self, request, oauth_request, request_token):
         return request_token.consumer
     def get_user_for_access_token(self, request, oauth_request, access_token):
         return access_token.user
 
+    def get_user_for_consumer(self, request, oauth_request, consumer):
+        return consumer.user
+
     def check_nonce(self, request, oauth_request, nonce):
         nonce, created = Nonce.objects.get_or_create(
             consumer_key=oauth_request['oauth_consumer_key'],

piston/authentication/oauth/utils.py

     return True
 
 
-def require_params(oauth_request, parameters):
+def require_params(oauth_request, parameters=[]):
     """ Ensures that the request contains all required parameters. """
     params = [
         'oauth_consumer_key',

tests/test_project/apps/testapp/tests.py

     request_token_url = 'http://testserver/api/oauth/get_request_token'
     authorize_url = 'http://testserver/api/oauth/authorize_request_token'
     access_token_url = 'http://testserver/api/oauth/get_access_token'
-    api_access_url = 'http://testserver/api/oauth/api_access'
+    two_legged_api_url = 'http://testserver/api/oauth/two_legged_api'
+    three_legged_api_url = 'http://testserver/api/oauth/three_legged_api'
 
     def setUp(self):
         super(OAuthTests, self).setUp()
         params = dict(urlparse.parse_qsl(response.content))
         return oauth.Token(params['oauth_token'], params['oauth_token_secret'])
 
-    def test_api_access(self):
+    def test_two_legged_api(self):
+        request = oauth.Request.from_consumer_and_token(self.consumer, None, 'GET', self.two_legged_api_url, {'msg': 'expected response'})
+        request.sign_request(self.signature_method, self.consumer, None)
+
+        response = self.client.get(self.two_legged_api_url, request)
+        self.assertEquals(response.status_code, 200)
+        self.assert_('expected response' in response.content)
+
+    def test_three_legged_api(self):
         access_token = self.test_get_access_token()
 
-        request = oauth.Request.from_consumer_and_token(self.consumer, access_token, 'GET', self.api_access_url, {'msg': 'expected response'})
+        request = oauth.Request.from_consumer_and_token(self.consumer, access_token, 'GET', self.three_legged_api_url, {'msg': 'expected response'})
         request.sign_request(self.signature_method, self.consumer, access_token)
 
-        response = self.client.get(self.api_access_url, request)
+        response = self.client.get(self.three_legged_api_url, request)
         self.assertEquals(response.status_code, 200)
         self.assert_('expected response' in response.content)
 

tests/test_project/apps/testapp/urls.py

 popo = Resource(handler=PlainOldObjectHandler)
 list_fields = Resource(handler=ListFieldsHandler)
 issue58 = Resource(handler=Issue58Handler)
-ouath_access = Resource(handler=EchoHandler, authentication=OAuthAuthentication(realm='TestApplication'))
 
 AUTHENTICATORS = [auth,]
 SIMPLE_USERS = (('admin', 'secr3t'),
 multiauth = Resource(handler=PlainOldObjectHandler, 
                         authentication=AUTHENTICATORS)
 
+ouath_two_legged_api = Resource(handler=EchoHandler, authentication=OAuthAuthentication(realm='TestApplication', two_legged=True))
+ouath_three_legged_api = Resource(handler=EchoHandler, authentication=OAuthAuthentication(realm='TestApplication'))
+
 urlpatterns = patterns('',
     url(r'^entries/$', entries),
     url(r'^entries/(?P<pk>.+)/$', entries),
 
     # OAuth
     url(r'^oauth/', include('piston.authentication.oauth.urls')),
-    url(r'^oauth/api_access$', ouath_access),
+    url(r'^oauth/two_legged_api$', ouath_two_legged_api),
+    url(r'^oauth/three_legged_api$', ouath_three_legged_api),
 
     url(r'^list_fields$', list_fields),
     url(r'^list_fields/(?P<id>.+)$', list_fields),
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.