]> git.openstreetmap.org Git - osqa.git/blob - forum_modules/openidauth/consumer.py
OSQA-828, Support fullname to real_name mapping from OpenID providers
[osqa.git] / forum_modules / openidauth / consumer.py
1 # -*- coding: utf-8 -*-
2
3 import re
4
5 from django.utils.encoding import smart_unicode
6 from django.utils.html import escape
7 from django.http import get_host
8
9 from forum.authentication.base import AuthenticationConsumer, InvalidAuthentication
10 import settings
11
12 from openid.yadis import xri
13 from openid.consumer.consumer import Consumer, SUCCESS, CANCEL, FAILURE, SETUP_NEEDED
14 from openid.consumer.discover import DiscoveryFailure
15 from openid.extensions.sreg import SRegRequest, SRegResponse
16 from openid.extensions.ax import FetchRequest as AXFetchRequest, AttrInfo, FetchResponse as AXFetchResponse
17 from django.utils.translation import ugettext as _
18
19 from store import OsqaOpenIDStore
20
21 class OpenIdAbstractAuthConsumer(AuthenticationConsumer):
22
23     dataype2ax_schema = {
24         'username': 'http://axschema.org/namePerson/friendly',
25         'email': 'http://axschema.org/contact/email',
26         #'web': 'http://axschema.org/contact/web/default',
27         #'firstname': 'http://axschema.org/namePerson/first',
28         #'lastname': 'http://axschema.org/namePerson/last',
29         #'birthdate': 'http://axschema.org/birthDate',
30     }
31
32     sreg_attributes = {
33         "required": {
34             "email": "email",
35             "nickname": "username",
36             "fullname": "real_name"
37         }
38     }
39
40     def get_user_url(self, request):
41         try:
42             return request.POST['openid_identifier']
43         except:
44             raise NotImplementedError()
45
46     def prepare_authentication_request(self, request, redirect_to):
47         if not redirect_to.startswith('http://') or redirect_to.startswith('https://'):
48             redirect_to =  get_url_host(request) + redirect_to
49
50         user_url = self.get_user_url(request)
51
52         if xri.identifierScheme(user_url) == 'XRI' and getattr(
53             settings, 'OPENID_DISALLOW_INAMES', False
54         ):
55             raise InvalidAuthentication('i-names are not supported')
56
57         consumer = Consumer(request.session, OsqaOpenIDStore())
58
59         try:
60             auth_request = consumer.begin(user_url)
61         except DiscoveryFailure:
62             raise InvalidAuthentication(_('Sorry, but your input is not a valid OpenId'))
63
64         sreg = getattr(self, 'sreg_attributes', False)
65
66         if sreg:
67             s = SRegRequest()
68
69             for k, attr_dic in sreg.items():
70                 if k == "policy_url":
71                     s.policy_url = attr_dic
72                     continue
73
74                 for attr_name in attr_dic.keys():
75                     s.requestField(field_name=attr_name, required=(k == "required"))
76
77             auth_request.addExtension(s)
78
79         ax_schema = getattr(self, 'dataype2ax_schema', False)
80
81         if ax_schema and request.session.get('force_email_request', True):
82             axr = AXFetchRequest()
83             for data_type, schema in ax_schema.items():
84                 if isinstance(schema, tuple):
85                     axr.add(AttrInfo(schema[0], required=True, alias=schema[1]))
86                 else:
87                     axr.add(AttrInfo(schema, required=True, alias=data_type))
88
89             auth_request.addExtension(axr)
90
91         trust_root = getattr(
92             settings, 'OPENID_TRUST_ROOT', get_url_host(request) + '/'
93         )
94
95         return auth_request.redirectURL(trust_root, redirect_to)
96
97     def process_authentication_request(self, request):
98         consumer = Consumer(request.session, OsqaOpenIDStore())
99
100         query_dict = dict([
101             (smart_unicode(k), smart_unicode(v)) for k, v in request.GET.items()
102         ])
103
104         #for i in query_dict.items():
105             #print "%s : %s" % i
106
107         url = get_url_host(request) + request.path
108         openid_response = consumer.complete(query_dict, url)
109
110         if openid_response.status == SUCCESS:
111
112             consumer_data = {}
113
114             sreg_attrs = getattr(self, 'sreg_attributes', False)
115
116             if sreg_attrs:
117                 sreg_response = SRegResponse.fromSuccessResponse(openid_response)
118
119                 if sreg_response:
120                     all_attrs = {}
121                     [all_attrs.update(d) for k,d in sreg_attrs.items() if k != "policy_url"]
122
123                     for attr_name, local_name in all_attrs.items():
124                         if attr_name in sreg_response:
125                             consumer_data[local_name] = sreg_response[attr_name]
126
127             ax_schema = getattr(self, 'dataype2ax_schema', False)
128
129             if ax_schema:
130                 ax = AXFetchResponse.fromSuccessResponse(openid_response, False)
131
132                 if ax:
133                     axargs = ax.getExtensionArgs()
134
135                     ax_schema2data_type = dict([(s, t) for t, s in ax_schema.items()])
136
137                     available_types = dict([
138                         (ax_schema2data_type[s], re.sub('^type\.', '', n))
139                         for n, s in axargs.items() if s in ax_schema2data_type
140                     ])
141
142                     for t, s in available_types.items():
143                         if not t in consumer_data:
144                             if axargs.get("value.%s.1" % s, None):
145                                 consumer_data[t] = axargs["value.%s.1" % s]
146                     
147             request.session['auth_consumer_data'] = consumer_data
148
149
150             return request.GET['openid.identity']
151         elif openid_response.status == CANCEL:
152             raise InvalidAuthentication(_('The OpenId authentication request was canceled'))
153         elif openid_response.status == FAILURE:
154             raise InvalidAuthentication(_('The OpenId authentication failed: ') + openid_response.message)
155         elif openid_response.status == SETUP_NEEDED:
156             raise InvalidAuthentication(_('Setup needed'))
157         else:
158             raise InvalidAuthentication(_('The OpenId authentication failed with an unknown status: ') + openid_response.status)
159
160     def get_user_data(self, key):
161         return {}
162
163 def get_url_host(request):
164     if request.is_secure():
165         protocol = 'https'
166     else:
167         protocol = 'http'
168     host = escape(get_host(request))
169     return '%s://%s' % (protocol, host)
170
171 def get_full_url(request):
172     return get_url_host(request) + request.get_full_path()