openid2rp / openid2rp / testapp.py

#!/usr/bin/env python
################ Test Server #################################
import BaseHTTPServer, cgi, Cookie, socket, collections
from openid2rp import *
from time import time

# supported providers
providers = (
    ('Google', 'http://www.google.com/favicon.ico', 'https://www.google.com/accounts/o8/id'),
    ('Yahoo', 'http://www.yahoo.com/favicon.ico', 'http://yahoo.com/'),
    ('Verisign', 'http://pip.verisignlabs.com/favicon.ico', 'http://pip.verisignlabs.com'),
    ('myOpenID', 'https://www.myopenid.com/favicon.ico', 'https://www.myopenid.com/'),
    ('Launchpad', 'https://login.launchpad.net/favicon.ico', 'https://login.launchpad.net/')
    )
             
# Mapping from Claimed Identifier to OP Endpoint URL and OP-Local Identifier.
# Always updated on initiation (end user enters User-Supplied Identifier) and
# used to avoid repeating discovery when verifying assertions.  When verifying
# assertions, OP Endpoint URL is used to get an association if one is stored,
# and to perform Direct Verification otherwise.  OP-Local Identifier is used to
# prevent a malicious user from impersonating Claimed Identifiers that the OP
# is authorized to make assertions about, but that the user doesn't control
disco = {}

# Mapping from OP Endpoint URL to association responses;
# most recent association is last
sessions = collections.defaultdict(list)

class _Expired(Exception):
    'Local exception class to indicate expired sessions.'
    pass

class Handler(BaseHTTPServer.BaseHTTPRequestHandler):

    def write(self, payload, type):
        self.send_response(200)
        self.send_header("Content-type", type)
        self.send_header("Content-length", str(len(payload)))
        self.end_headers()
        self.wfile.write(payload)

    def do_GET(self):
        if self.path == '/':
            return self.root()
        path = self.path
        i = path.rfind('?')
        if i >= 0:
            querystring = path[i+1:]
            query = cgi.parse_qs(querystring)
            path = path[:i]
        else:
            query = {}
        if path == '/':
            if 'provider' in query:
                prov = [p for p in providers if p[0]  == query['provider'][0]]
                if len(prov) != 1:
                    return self.not_found()
                prov = prov[0]
                services, url, op_local = discover(prov[2])

                # Get most recent association.  Establish association if it
                # expired, or if none exist
                now = time()
                try:
                    session, expire = sessions[url][-1]
                    if now > expire:
                        raise _Expired
                except (KeyError, _Expired):
                    try:
                        session = associate(services, url)
                    except ValueError, e:
                        return self.error(str(e))
                    sessions[url].append((session, now+int(session['expires_in'])))

                self.send_response(307) # temporary redirect - do not cache
                self.send_header("Location", request_authentication
                                 (services, url, session['assoc_handle'],
                                  self.base_url+"?returned=1"))
                self.end_headers()
                return
            if 'claimed' in query:
                kind, claimed = normalize_uri(query['claimed'][0])
                if kind == 'xri':
                    res = resolve_xri(claimed)
                    if res:
                        # A.5: XRI resolution requires to use canonical ID
                        # Original claimed ID may be preserved for display
                        # purposes
                        claimed = res[0]
                        res = res[1:]
                else:
                    res = discover(claimed)
                if res is None:
                    return self.error('Discovery failed')
                services, url, op_local = res

                # Avoid repeating discovery when verifying assertions
                disco[claimed] = url, op_local or claimed

                # Get most recent association.  Establish association if it
                # expired, or if none exist
                now = time()
                try:
                    session, expire = sessions[url][-1]
                    if now > expire:
                        raise _Expired
                except (KeyError, _Expired):
                    try:
                        session = associate(services, url)
                    except ValueError, e:
                        return self.error(str(e))
                    sessions[url].append((session, now+int(session['expires_in'])))

                self.send_response(307)
                self.send_header("Location", request_authentication
                                 (services, url, session['assoc_handle'],
                                  self.base_url+"?returned=1",
                                  claimed, op_local))

                # 1.1 compatibility: openid.claimed_id" is not defined by
                # OpenID Authentication 1.1.  RPs MAY send the value when
                # making requests, but MUST NOT depend on the value being
                # present in authentication responses.  When the OP-Local
                # Identifier ("openid.identity") is different from the Claimed
                # Identifier, the RP MUST keep track of what Claimed Identifier
                # was used to discover the OP-Local Identifier, for example by
                # keeping it in session state.  Although the Claimed Identifier
                # will not be present in the response, it MUST be used as the
                # identifier for the user
                self.send_header('Set-Cookie', 'openid.claimed_id='+claimed)

                self.end_headers()
                return                
            if 'returned' in query:
                if 'openid.mode' not in query:
                    return self.rp_discovery()
                if query['openid.mode'][0] == 'cancel':
                    return self.write('Login failed', 'text/plain')

                try:
                    claimed_id, = query['openid.claimed_id']
                except KeyError:
                    no_fragment = claimed_id = Cookie.SimpleCookie(self.headers['Cookie'])['openid.claimed_id'].value
                else:

                    # If the Claimed Identifier in the assertion is a URL and
                    # contains a fragment, the fragment part and the fragment
                    # delimiter character "#" MUST NOT be used for the purposes
                    # of verifying the discovered information
                    try:
                        no_fragment = claimed_id[:claimed_id.index('#')]
                    except ValueError:
                        no_fragment = claimed_id

                # If the Claimed Identifier is included in the assertion, it
                # MUST have been discovered by the RP and the information in
                # the assertion MUST be present in the discovered information.
                # The Claimed Identifier MUST NOT be an OP Identifier
                #
                # If the Claimed Identifier was not previously discovered by
                # the RP (the "openid.identity" in the request was
                # "http://specs.openid.net/auth/2.0/identifier_select" or a
                # different Identifier, or if the OP is sending an unsolicited
                # positive assertion), the RP MUST perform discovery on the
                # Claimed Identifier in the response to make sure that the OP
                # is authorized to make assertions about the Claimed Identifier
                try:
                    disco_endpoint, disco_identity = disco[no_fragment]
                except KeyError:
                    _, disco_endpoint, disco_identity = discover(no_fragment)
                    if not disco_identity:
                        disco_identity = no_fragment

                # Prevent a malicious user who controls an OP-Local Identifier
                # from impersonating Claimed Identifiers that the OP is
                # authorized to make assertions about, but that the user
                # doesn't control.  A user who controls an OP-Local Identifier
                # can obtain assertions from the OP about Claimed Identifiers
                # that the user doesn't control
                identity, = query['openid.identity']
                if disco_identity != identity:
                    return self.error('OP-Local Identifier in the assertion not present in the discovered information')

                # If the RP has stored an association with the association
                # handle specified in the assertion, it MUST check the
                # signature on the assertion itself.  If it does not have an
                # association stored, it MUST request that the OP verify the
                # signature via Direct Verification
                try:
                    session = sessions[disco_endpoint]
                except KeyError:
                    try:
                        verify_signature_directly(disco_endpoint, query)
                    except Exception, e:
                        return self.error('Verifying signature directly failed: '+repr(e))
                    signed, = query['openid.signed']
                    signed = signed.split(',')
                else:
                    assoc_handle, = query['openid.assoc_handle']
                    for session, _ in session:
                        if assoc_handle == session['assoc_handle']:
                            try:
                                signed = authenticate(session, querystring)
                            except Exception, e:
                                return self.error('Verifying signature with an association failed: '+repr(e))
                            break
                    else:
                        try:
                            verify_signature_directly(disco_endpoint, query)
                        except Exception, e:
                            return self.error('Verifying signature directly failed: '+repr(e))
                        signed, = query['openid.signed']
                        signed = signed.split(',')

                payload = "Hello "+claimed_id+"\n"
                ax = get_ax(querystring, get_namespaces(querystring), signed)
                sreg = get_sreg(querystring, signed)
                email = get_email(querystring)
                if email:
                    payload += 'Your email is '+email+"\n"
                else:
                    payload += 'No email address is known\n'
                if 'nickname' in sreg:
                    username = sreg['nickname']
                elif "http://axschema.org/namePerson/first" in ax:
                    username = ax["http://axschema.org/namePerson/first"]
                    if "http://axschema.org/namePerson/last" in ax:
                        username += "." + ax["http://axschema.org/namePerson/last"]
                else:
                    username = None
                if username:
                    payload += 'Your nickname is '+username+'\n'
                else:
                    payload += 'No nickname is known\n'
                if isinstance(payload, unicode):
                    payload = payload.encode('utf-8')
                return self.write(payload, "text/plain")
                
        return self.not_found()

    

    def debug(self, value):
        payload = repr(value)
        if isinstance(payload, unicode):
            payload = payload.encode('utf-8')
        self.write(payload, "text/plain")

    def error(self, text):
        if isinstance(text, unicode):
            text = text.encode('utf-8')
        self.write(text, "text/plain")

    def root(self):
        payload = u"<html><head><title>OpenID login</title></head><body>\n"
        
        for name, icon, provider in providers:
            payload += u"<p><a href='%s?provider=%s'><img src='%s' alt='%s'></a></p>\n" % (
                self.base_url, name, icon, name)
        payload += u"<form>Type your OpenID:<input name='claimed'/><input type='submit'/></form>\n"
        payload += u"</body></html>"
        self.write(payload.encode('utf-8'), "text/html")

    def rp_discovery(self):
        payload = '''<xrds:XRDS  
                xmlns:xrds="xri://$xrds"  
                xmlns="xri://$xrd*($v*2.0)">  
                <XRD>  
                     <Service priority="1">  
                              <Type>http://specs.openid.net/auth/2.0/return_to</Type>  
                              <URI>%s</URI>  
                     </Service>  
                </XRD>  
                </xrds:XRDS>
        ''' % (self.base_url+"/?returned=1")
        self.write(payload, 'application/xrds+xml')

    def not_found(self):
        self.send_response(404)
        self.end_headers()

if hasattr(socket, 'IPV6_V6ONLY'):
    class HTTPServer(BaseHTTPServer.HTTPServer):
        def __init__(self, addr, handler):
            if not addr[0]:
                # use V6 here, set to wildcard below
                self.address_family = socket.AF_INET6
            BaseHTTPServer.HTTPServer.__init__(self, addr, handler)
        def server_bind(self):
            if self.address_family == socket.AF_INET6:
                self.socket.setsockopt(socket.IPPROTO_IPV6,
                                       socket.IPV6_V6ONLY,
                                       False)
            BaseHTTPServer.HTTPServer.server_bind(self)
else:
    HTTPServer = BaseHTTPServer.HTTPServer

        
# OpenID providers often attempt relying-party discovery
# This requires the test server to use a globally valid URL
# If Python cannot correctly determine the base URL, you
# can pass it as command line argument
def test_server():
    import socket, sys
    if len(sys.argv) > 1:
        base_url = sys.argv[1]
    else:
        base_url = "http://" + socket.getfqdn() + ":8000/"
    print "Listening on", base_url
    Handler.base_url = base_url
    httpd = HTTPServer(('', 8000), Handler)
    httpd.serve_forever()

if __name__ == '__main__':
    test_server()
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.