]> git.openstreetmap.org Git - osqa.git/blobdiff - forum_modules/openidauth/consumer.py
OSQA-747, applying patch that resolves potential problem with the OpenID Auth consume...
[osqa.git] / forum_modules / openidauth / consumer.py
old mode 100755 (executable)
new mode 100644 (file)
index 17d1378..ee25f39
-from django.utils.html import escape\r
-from django.http import get_host\r
-\r
-from forum.authentication.base import AuthenticationConsumer, InvalidAuthentication\r
-import settings\r
-\r
-from openid.yadis import xri\r
-from openid.consumer.consumer import Consumer, SUCCESS, CANCEL, FAILURE, SETUP_NEEDED\r
-from openid.consumer.discover import DiscoveryFailure\r
-from openid.extensions.sreg import SRegRequest, SRegResponse\r
-from openid.extensions.ax import FetchRequest as AXFetchRequest, AttrInfo, FetchResponse as AXFetchResponse\r
-from django.utils.translation import ugettext as _\r
-\r
-from store import OsqaOpenIDStore\r
-\r
-class OpenIdAbstractAuthConsumer(AuthenticationConsumer):\r
-\r
-    def get_user_url(self, request):\r
-        try:\r
-            return request.POST['openid_identifier']\r
-        except:\r
-            raise NotImplementedError()\r
-\r
-    def prepare_authentication_request(self, request, redirect_to):\r
-        if not redirect_to.startswith('http://') or redirect_to.startswith('https://'):\r
-                   redirect_to =  get_url_host(request) + redirect_to\r
-\r
-        user_url = self.get_user_url(request)\r
-\r
-        if xri.identifierScheme(user_url) == 'XRI' and getattr(\r
-            settings, 'OPENID_DISALLOW_INAMES', False\r
-        ):\r
-            raise InvalidAuthentication('i-names are not supported')\r
-\r
-        consumer = Consumer(request.session, OsqaOpenIDStore())\r
-\r
-        try:\r
-            auth_request = consumer.begin(user_url)\r
-        except DiscoveryFailure:\r
-            raise InvalidAuthentication(_('Sorry, but your input is not a valid OpenId'))\r
-\r
-        #sreg = getattr(settings, 'OPENID_SREG', False)\r
-\r
-        #if sreg:\r
-        #    s = SRegRequest()\r
-        #    for sarg in sreg:\r
-        #        if sarg.lower().lstrip() == "policy_url":\r
-        #            s.policy_url = sreg[sarg]\r
-        #        else:\r
-        #            for v in sreg[sarg].split(','):\r
-        #                s.requestField(field_name=v.lower().lstrip(), required=(sarg.lower().lstrip() == "required"))\r
-        #    auth_request.addExtension(s)\r
-\r
-        #auth_request.addExtension(SRegRequest(required=['email']))\r
-\r
-        if request.session.get('force_email_request', True):\r
-            axr = AXFetchRequest()\r
-            axr.add(AttrInfo("http://axschema.org/contact/email", 1, True, "email"))\r
-            auth_request.addExtension(axr)\r
-\r
-        trust_root = getattr(\r
-            settings, 'OPENID_TRUST_ROOT', get_url_host(request) + '/'\r
-        )\r
-\r
-\r
-        return auth_request.redirectURL(trust_root, redirect_to)\r
-\r
-    def process_authentication_request(self, request):\r
-        consumer = Consumer(request.session, OsqaOpenIDStore())\r
-\r
-        query_dict = dict([\r
-            (k.encode('utf8'), v.encode('utf8')) for k, v in request.GET.items()\r
-        ])\r
-\r
-        #for i in query_dict.items():\r
-        #    print "%s : %s" % i\r
-\r
-        url = get_url_host(request) + request.path\r
-        openid_response = consumer.complete(query_dict, url)\r
-\r
-        if openid_response.status == SUCCESS:\r
-            if request.session.get('force_email_request', True):\r
-                try:\r
-                    ax = AXFetchResponse.fromSuccessResponse(openid_response)\r
-                    email = ax.getExtensionArgs()['value.ext0.1']\r
-                    request.session['auth_email_request'] = email\r
-                except Exception, e:\r
-                    pass\r
-\r
-            return request.GET['openid.identity']\r
-        elif openid_response.status == CANCEL:\r
-            raise InvalidAuthentication(_('The OpenId authentication request was canceled'))\r
-        elif openid_response.status == FAILURE:\r
-            raise InvalidAuthentication(_('The OpenId authentication failed: ') + openid_response.message)\r
-        elif openid_response.status == SETUP_NEEDED:\r
-            raise InvalidAuthentication(_('Setup needed'))\r
-        else:\r
-            raise InvalidAuthentication(_('The OpenId authentication failed with an unknown status: ') + openid_response.status)\r
-\r
-    def get_user_data(self, key):\r
-        return {}\r
-\r
-def get_url_host(request):\r
-    if request.is_secure():\r
-        protocol = 'https'\r
-    else:\r
-        protocol = 'http'\r
-    host = escape(get_host(request))\r
-    return '%s://%s' % (protocol, host)\r
-\r
-def get_full_url(request):\r
+import re
+
+from django.utils.html import escape
+from django.http import get_host
+
+from forum.authentication.base import AuthenticationConsumer, InvalidAuthentication
+import settings
+
+from openid.yadis import xri
+from openid.consumer.consumer import Consumer, SUCCESS, CANCEL, FAILURE, SETUP_NEEDED
+from openid.consumer.discover import DiscoveryFailure
+from openid.extensions.sreg import SRegRequest, SRegResponse
+from openid.extensions.ax import FetchRequest as AXFetchRequest, AttrInfo, FetchResponse as AXFetchResponse
+from django.utils.translation import ugettext as _
+
+from store import OsqaOpenIDStore
+
+class OpenIdAbstractAuthConsumer(AuthenticationConsumer):
+
+    dataype2ax_schema = {
+        'username': 'http://axschema.org/namePerson/friendly',
+        'email': 'http://axschema.org/contact/email',
+        #'web': 'http://axschema.org/contact/web/default',
+        #'firstname': 'http://axschema.org/namePerson/first',
+        #'lastname': 'http://axschema.org/namePerson/last',
+        #'birthdate': 'http://axschema.org/birthDate',
+    }
+
+    sreg_attributes = {
+        "required": {
+            "email": "email",
+            "nickname": "username"
+        }
+    }
+
+    def get_user_url(self, request):
+        try:
+            return request.POST['openid_identifier']
+        except:
+            raise NotImplementedError()
+
+    def prepare_authentication_request(self, request, redirect_to):
+        if not redirect_to.startswith('http://') or redirect_to.startswith('https://'):
+                   redirect_to =  get_url_host(request) + redirect_to
+
+        user_url = self.get_user_url(request)
+
+        if xri.identifierScheme(user_url) == 'XRI' and getattr(
+            settings, 'OPENID_DISALLOW_INAMES', False
+        ):
+            raise InvalidAuthentication('i-names are not supported')
+
+        consumer = Consumer(request.session, OsqaOpenIDStore())
+
+        try:
+            auth_request = consumer.begin(user_url)
+        except DiscoveryFailure:
+            raise InvalidAuthentication(_('Sorry, but your input is not a valid OpenId'))
+
+        sreg = getattr(self, 'sreg_attributes', False)
+
+        if sreg:
+            s = SRegRequest()
+
+            for k, attr_dic in sreg.items():
+                if k == "policy_url":
+                    s.policy_url = attr_dic
+                    continue
+
+                for attr_name in attr_dic.keys():
+                    s.requestField(field_name=attr_name, required=(k == "required"))
+
+            auth_request.addExtension(s)
+
+        ax_schema = getattr(self, 'dataype2ax_schema', False)
+
+        if ax_schema and request.session.get('force_email_request', True):
+            axr = AXFetchRequest()
+            for data_type, schema in ax_schema.items():
+                if isinstance(schema, tuple):
+                    axr.add(AttrInfo(schema[0], 1, True, schema[1]))
+                else:
+                    axr.add(AttrInfo(schema, 1, True, data_type))
+
+            auth_request.addExtension(axr)
+
+        trust_root = getattr(
+            settings, 'OPENID_TRUST_ROOT', get_url_host(request) + '/'
+        )
+
+        return auth_request.redirectURL(trust_root, redirect_to)
+
+    def process_authentication_request(self, request):
+        consumer = Consumer(request.session, OsqaOpenIDStore())
+
+        query_dict = dict([
+            (k.encode('utf8'), v.encode('utf8')) for k, v in request.GET.items()
+        ])
+
+        #for i in query_dict.items():
+            #print "%s : %s" % i
+
+        url = get_url_host(request) + request.path
+        openid_response = consumer.complete(query_dict, url)
+
+        if openid_response.status == SUCCESS:
+
+            consumer_data = {}
+
+            sreg_attrs = getattr(self, 'sreg_attributes', False)
+
+            if sreg_attrs:
+                sreg_response = SRegResponse.fromSuccessResponse(openid_response)
+
+                if sreg_response:
+                    all_attrs = {}
+                    [all_attrs.update(d) for k,d in sreg_attrs.items() if k != "policy_url"]
+
+                    for attr_name, local_name in all_attrs.items():
+                        if attr_name in sreg_response:
+                            consumer_data[local_name] = sreg_response[attr_name]
+
+            ax_schema = getattr(self, 'dataype2ax_schema', False)
+
+            if ax_schema:
+                ax = AXFetchResponse.fromSuccessResponse(openid_response, False)
+
+                if ax:
+                    axargs = ax.getExtensionArgs()
+
+                    ax_schema2data_type = dict([(s, t) for t, s in ax_schema.items()])
+
+                    available_types = dict([
+                        (ax_schema2data_type[s], re.sub('^type\.', '', n))
+                        for n, s in axargs.items() if s in ax_schema2data_type
+                    ])
+
+                    for t, s in available_types.items():
+                        if not t in consumer_data:
+                            if axargs.get("value.%s.1" % s, None):
+                                consumer_data[t] = axargs["value.%s.1" % s]
+                    
+            request.session['auth_consumer_data'] = consumer_data
+
+
+            return request.GET['openid.identity']
+        elif openid_response.status == CANCEL:
+            raise InvalidAuthentication(_('The OpenId authentication request was canceled'))
+        elif openid_response.status == FAILURE:
+            raise InvalidAuthentication(_('The OpenId authentication failed: ') + openid_response.message)
+        elif openid_response.status == SETUP_NEEDED:
+            raise InvalidAuthentication(_('Setup needed'))
+        else:
+            raise InvalidAuthentication(_('The OpenId authentication failed with an unknown status: ') + openid_response.status)
+
+    def get_user_data(self, key):
+        return {}
+
+def get_url_host(request):
+    if request.is_secure():
+        protocol = 'https'
+    else:
+        protocol = 'http'
+    host = escape(get_host(request))
+    return '%s://%s' % (protocol, host)
+
+def get_full_url(request):
     return get_url_host(request) + request.get_full_path()
\ No newline at end of file