Source

django-publicauth / publicauth / backends / oauth2.py

from __future__ import absolute_import 

import urllib
import urlparse
import exceptions

from django.conf import settings as global_settings
from django.utils.datastructures import MultiValueDictKeyError
from django.core.urlresolvers import reverse
from django.utils import simplejson
from django.contrib import messages

from annoying.exceptions import Redirect

from publicauth.backends import BaseBackend
from publicauth import lang

def _extend_url(url, params):
    parsed_url = urlparse.urlsplit(url)
    parsed_params = dict(urlparse.parse_qsl(parsed_url.query, keep_blank_values = True))
    parsed_params.update(params)
    return urlparse.urlunsplit(parsed_url[:3] + (urllib.urlencode(parsed_params),""))

class OAuth2Backend(BaseBackend):
    CLIENT_ID = property(lambda self: getattr(global_settings, "%s_CLIENT_ID" % self.provider.upper()))
    CLIENT_SECRET = property(lambda self: getattr(global_settings, "%s_CLIENT_SECRET" % self.provider.upper()))
    AUTHORIZE_URL = property(lambda self: getattr(global_settings, "%s_AUTHORIZE_URL" % self.provider.upper()))
    ACCESS_TOKEN_URL = property(lambda self: getattr(global_settings, "%s_ACCESS_TOKEN_URL" % self.provider.upper()))
    API_URL = property(lambda self: getattr(global_settings, "%s_API_URL" % self.provider.upper()))

    def begin(self, request, data):
        callback = request.build_absolute_uri(reverse('publicauth-complete', args=[self.provider]))
        authorize_url = _extend_url(self.AUTHORIZE_URL, {
            "client_id": self.CLIENT_ID,
            "display": "popup",
            "redirect_uri": callback,
        })

	raise Redirect(authorize_url)

    def validate(self, request, data):
        raise exceptions.NotImplementedError()

    def complete(self, request, response):
        request.session['next_url'] = request.GET.get("next") or global_settings.LOGIN_REDIRECT_URL
        data = self.fill_extra_fields(request, self.get_extra_data(response))
        request.session['extra'] = data
        request.session['identity'] = data["id"]
        raise Redirect('publicauth-extra', self.provider)

    def get_extra_data(self, response):
        url = self.API_URL % self._identifier
        return simplejson.loads(urllib.urlopen(url).read())

    def extract_data(self, extra, backend_field, form_field):
        return {form_field: extra.get(backend_field, '')}
            

    def authorize_token(self, request, data, method = "get"):
        try:
            oauth2_code = data['code']
        except MultiValueDictKeyError:
            messages.error(request, lang.BACKEND_ERROR)
            raise Redirect('publicauth-login')

        callback = request.build_absolute_uri(reverse('publicauth-complete', args=[self.provider]))
        oauth2_request = _extend_url(self.ACCESS_TOKEN_URL, {
            "client_id": self.CLIENT_ID,
            "client_secret": self.CLIENT_SECRET,
            "code": oauth2_code,
            "redirect_uri": callback,
        })

        if method == "get":
            response = urllib.urlopen(oauth2_request).read()
        elif method == "post":
            url = urlparse.urlsplit(oauth2_request)
            response = urllib.urlopen(urlparse.urlunsplit(url[:3] + ('', '')), url.query).read()

        return response



class VkontakteBackend(OAuth2Backend):
    def validate(self, request, data):
        response = self.authorize_token(request, data)
        self._identifier = simplejson.loads(response)["user_id"]
        self.identity = simplejson.loads(response)["user_id"]
        return response

    def complete(self, request, response):
        data = self.fill_extra_fields(request, self.get_extra_data(response)["response"][0])
        request.session['extra'] = data
        request.session['identity'] = self.identity
        raise Redirect('publicauth-extra', self.provider)


class GoogleBackend(OAuth2Backend):
    def validate(self, request, data):
        response = self.authorize_token(request, data, "post")

        self._identifier = simplejson.loads(response)["access_token"]

        url = self.API_URL % self._identifier
        self.identity = simplejson.loads(urllib.urlopen(url).read())["id"]
        return response

class FacebookBackend(OAuth2Backend):
    def validate(self, request, data):
        response = self.authorize_token(request, data)

        self._identifier = dict(urlparse.parse_qsl(response))["access_token"]

        url = self.API_URL % self._identifier
        self.identity = simplejson.loads(urllib.urlopen(url).read())["id"]
        return response
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.