Ticket #233: THE.diff
| File THE.diff, 17.3 kB (added by http://keturn.myopenid.com/, 6 months ago) |
|---|
-
old-openid/openid/consumer/consumer.py
old new 1471 1461 self.assoc = assoc 1472 1462 self.endpoint = endpoint 1473 1463 self.return_to_args = {} 1474 self.message = Message() 1475 self.message.setOpenIDNamespace(endpoint.preferredNamespace()) 1464 self.message = Message(endpoint.preferredNamespace()) 1476 1465 self._anonymous = False 1477 1466 1478 1467 def setAnonymous(self, is_anonymous): -
old-openid/openid/message.py
old new 26 26 27 27 # The OpenID 1.X namespace URI 28 28 OPENID1_NS = 'http://openid.net/signon/1.0' 29 THE_OTHER_OPENID1_NS = 'http://openid.net/signon/1.1' 30 31 OPENID1_NAMESPACES = OPENID1_NS, THE_OTHER_OPENID1_NS 29 32 30 33 # The OpenID 2.0 namespace URI 31 34 OPENID2_NS = 'http://specs.openid.net/auth/2.0' … … 109 124 URI. 110 125 """ 111 126 112 allowed_openid_namespaces = [OPENID1_NS, OPENID2_NS]127 allowed_openid_namespaces = [OPENID1_NS, THE_OTHER_OPENID1_NS, OPENID2_NS] 113 128 114 129 def __init__(self, openid_namespace=None): 115 130 """Create an empty Message. 116 131 117 132 118 133 @raises InvalidOpenIDNamespace: if openid_namespace is not in 119 134 L{Message.allowed_openid_namespaces} 120 135 """ 121 136 self.args = {} 122 137 self.namespaces = NamespaceMap() 123 138 if openid_namespace is None: 124 139 self._openid_ns_uri = None 125 140 else: 126 self.setOpenIDNamespace(openid_namespace) 141 implicit = openid_namespace in OPENID1_NAMESPACES 142 self.setOpenIDNamespace(openid_namespace, implicit) 127 143 128 144 def fromPostArgs(cls, args): 129 145 """Construct a Message containing a set of POST arguments. """ 130 146 self = cls() 131 147 132 148 # Partition into "openid." args and bare args … … 157 184 fromOpenIDArgs = classmethod(fromOpenIDArgs) 158 185 159 186 def _fromOpenIDArgs(self, openid_args): 160 global registered_aliases161 162 187 ns_args = [] 163 188 164 189 # Resolve namespaces … … 173 198 self.namespaces.addAlias(value, ns_key) 174 199 elif ns_alias == NULL_NAMESPACE and ns_key == 'ns': 175 200 # null namespace 176 self. namespaces.addAlias(value, NULL_NAMESPACE)201 self.setOpenIDNamespace(value, False) 177 202 else: 178 203 ns_args.append((ns_alias, ns_key, value)) 179 204 180 # Ensure that there is an OpenID namespace definition 181 openid_ns_uri = self.namespaces.getNamespaceURI(NULL_NAMESPACE) 182 if openid_ns_uri is None: 183 openid_ns_uri = OPENID1_NS 184 185 self.setOpenIDNamespace(openid_ns_uri) 205 # Implicitly set an OpenID namespace definition (OpenID 1) 206 if not self.getOpenIDNamespace(): 207 self.setOpenIDNamespace(OPENID1_NS, True) 186 208 187 209 # Actually put the pairs into the appropriate namespaces 188 210 for (ns_alias, ns_key, value) in ns_args: 189 211 ns_uri = self.namespaces.getNamespaceURI(ns_alias) 190 212 if ns_uri is None: 191 213 # we found a namespaced arg without a namespace URI defined 192 214 ns_uri = self._getDefaultNamespace(ns_alias) 193 215 if ns_uri is None: 194 216 ns_uri = self.getOpenIDNamespace() 195 217 ns_key = '%s.%s' % (ns_alias, ns_key) 196 218 else: 197 219 self.namespaces.addAlias(ns_uri, ns_alias, implicit=True) 198 220 199 221 self.setArg(ns_uri, ns_key, value) 200 222 201 def setOpenIDNamespace(self, openid_ns_uri): 223 def setOpenIDNamespace(self, openid_ns_uri, implicit): 224 """Set the OpenID namespace URI used in this message. 225 226 @raises InvalidOpenIDNamespace: if the namespace is not in 227 L{Message.allowed_openid_namespaces} 228 """ 202 229 if openid_ns_uri not in self.allowed_openid_namespaces: 203 raise ValueError('Invalid null namespace: %r' % (openid_ns_uri,))230 raise InvalidOpenIDNamespace(openid_ns_uri) 204 231 205 self.namespaces.addAlias(openid_ns_uri, NULL_NAMESPACE )232 self.namespaces.addAlias(openid_ns_uri, NULL_NAMESPACE, implicit) 206 233 self._openid_ns_uri = openid_ns_uri 207 234 208 235 def getOpenIDNamespace(self): 209 236 return self._openid_ns_uri 210 237 211 238 def isOpenID1(self): 212 return self.getOpenIDNamespace() == OPENID1_NS239 return self.getOpenIDNamespace() in OPENID1_NAMESPACES 213 240 214 241 def isOpenID2(self): 215 242 return self.getOpenIDNamespace() == OPENID2_NS -
old-openid/openid/test/test_consumer.py
old new 1176 1176 assert args == { 1177 1177 'openid.mode':'check_authentication', 1178 1178 'openid.signed':'foo', 1179 'openid.ns':OPENID1_NS 1179 1180 }, args 1180 1181 return None 1181 1182 … … 1218 1219 def test_signedList(self): 1219 1220 query = Message.fromOpenIDArgs({ 1220 1221 'mode': 'id_res', 1221 'ns': OPENID2_NS,1222 1222 'sig': 'rabbits', 1223 1223 'identity': '=example', 1224 1224 'assoc_handle': 'munchkins', … … 1227 1227 'signed': 'identity,mode,ns.sreg,sreg.email', 1228 1228 'foo': 'bar', 1229 1229 }) 1230 expected = Message.fromOpenIDArgs({1231 'mode': 'check_authentication',1232 'sig': 'rabbits',1233 'assoc_handle': 'munchkins',1234 'identity': '=example',1235 'signed': 'identity,mode,ns.sreg,sreg.email',1236 'ns.sreg': 'urn:sreg',1237 'sreg.email': 'bogus@example.com',1238 })1239 1230 args = self.consumer._createCheckAuthRequest(query) 1240 self.failUnlessEqual(args.toPostArgs(), expected.toPostArgs()) 1231 self.failUnless(args.isOpenID1()) 1232 for signed_arg in query.getArg(OPENID_NS, 'signed').split(','): 1233 self.failUnless(args.getAliasedArg(signed_arg), signed_arg) 1234 1235 def test_112(self): 1236 args = {'openid.assoc_handle': 'fa1f5ff0-cde4-11dc-a183-3714bfd55ca8', 1237 'openid.claimed_id': 'http://binkley.lan/user/test01', 1238 'openid.identity': 'http://test01.binkley.lan/', 1239 'openid.mode': 'id_res', 1240 'openid.ns': 'http://specs.openid.net/auth/2.0', 1241 'openid.ns.pape': 'http://specs.openid.net/extensions/pape/1.0', 1242 'openid.op_endpoint': 'http://binkley.lan/server', 1243 'openid.pape.auth_policies': 'none', 1244 'openid.pape.auth_time': '2008-01-28T20:42:36Z', 1245 'openid.pape.nist_auth_level': '0', 1246 'openid.response_nonce': '2008-01-28T21:07:04Z99Q=', 1247 'openid.return_to': 'http://binkley.lan:8001/process?janrain_nonce=2008-01-28T21%3A07%3A02Z0tMIKx', 1248 'openid.sig': 'YJlWH4U6SroB1HoPkmEKx9AyGGg=', 1249 'openid.signed': 'assoc_handle,identity,response_nonce,return_to,claimed_id,op_endpoint,pape.auth_time,ns.pape,pape.nist_auth_level,pape.auth_policies' 1250 } 1251 self.failUnlessEqual(OPENID2_NS, args['openid.ns']) 1252 incoming = Message.fromPostArgs(args) 1253 self.failUnless(incoming.isOpenID2()) 1254 car = self.consumer._createCheckAuthRequest(incoming) 1255 expected_args = args.copy() 1256 expected_args['openid.mode'] = 'check_authentication' 1257 expected =Message.fromPostArgs(expected_args) 1258 self.failUnless(expected.isOpenID2()) 1259 self.failUnlessEqual(expected, car) 1260 self.failUnlessEqual(expected_args, car.toPostArgs()) 1241 1261 1242 1262 1243 1263 -
old-openid/openid/test/test_message.py
old new 1 1 from openid import message 2 2 from openid import oidutil 3 from openid.extensions import sreg 3 4 4 5 import urllib 5 6 import cgi … … 400 401 def test_isOpenID2(self): 401 402 self.failIf(self.msg.isOpenID2()) 402 403 403 class OpenID1ExplicitMessageTest( OpenID1MessageTest):404 class OpenID1ExplicitMessageTest(unittest.TestCase): 404 405 def setUp(self): 405 406 self.msg = message.Message.fromPostArgs({'openid.mode':'error', 406 407 'openid.error':'unit test', 407 408 'openid.ns':message.OPENID1_NS 408 409 }) 409 410 411 def test_toPostArgs(self): 412 self.failUnlessEqual(self.msg.toPostArgs(), 413 {'openid.mode':'error', 414 'openid.error':'unit test', 415 'openid.ns':message.OPENID1_NS 416 }) 417 418 def test_toArgs(self): 419 self.failUnlessEqual(self.msg.toArgs(), {'mode':'error', 420 'error':'unit test', 421 'ns':message.OPENID1_NS}) 422 423 def test_toKVForm(self): 424 self.failUnlessEqual(self.msg.toKVForm(), 425 'error:unit test\nmode:error\nns:%s\n' 426 %message.OPENID1_NS) 427 428 def test_toURLEncoded(self): 429 self.failUnlessEqual(self.msg.toURLEncoded(), 430 'openid.error=unit+test&openid.mode=error&openid.ns=http%3A%2F%2Fopenid.net%2Fsignon%2F1.0') 431 432 def test_toURL(self): 433 base_url = 'http://base.url/' 434 actual = self.msg.toURL(base_url) 435 actual_base = actual[:len(base_url)] 436 self.failUnlessEqual(actual_base, base_url) 437 self.failUnlessEqual(actual[len(base_url)], '?') 438 query = actual[len(base_url) + 1:] 439 parsed = cgi.parse_qs(query) 440 self.failUnlessEqual(parsed, {'openid.mode':['error'], 441 'openid.error':['unit test'], 442 'openid.ns':[message.OPENID1_NS] 443 }) 444 445 def test_isOpenID1(self): 446 self.failUnless(self.msg.isOpenID1()) 447 410 448 411 449 class OpenID2MessageTest(unittest.TestCase): 412 450 def setUp(self): … … 769 769 self._checkForm(html, m, self.action_url, 770 770 tag_attrs, self.submit_text) 771 771 772 773 def test_setOpenIDNamespace_invalid(self): 774 m = message.Message() 775 invalid_things = [ 776 # Empty string is not okay here. 777 '', 778 # Good guess! But wrong. 779 'http://openid.net/signon/2.0', 780 # What? 781 u'http://specs%\\\r2Eopenid.net/auth/2.0', 782 # Too much escapings! 783 'http%3A%2F%2Fspecs.openid.net%2Fauth%2F2.0', 784 # This is a Type URI, not a openid.ns value. 785 'http://specs.openid.net/auth/2.0/signon', 786 ] 787 788 for x in invalid_things: 789 self.failUnlessRaises(message.InvalidOpenIDNamespace, 790 m.setOpenIDNamespace, x, False) 791 792 793 def test_isOpenID1(self): 794 v1_namespaces = [ 795 # Yes, there are two of them. 796 'http://openid.net/signon/1.1', 797 'http://openid.net/signon/1.0', 798 ] 799 800 for ns in v1_namespaces: 801 m = message.Message(ns) 802 self.failUnless(m.isOpenID1(), "%r not recognized as OpenID 1" % 803 (ns,)) 804 self.failUnlessEqual(ns, m.getOpenIDNamespace()) 805 self.failUnless(m.namespaces.isImplicit(ns), 806 m.namespaces.getNamespaceURI(message.NULL_NAMESPACE)) 807 808 def test_isOpenID2(self): 809 ns = 'http://specs.openid.net/auth/2.0' 810 m = message.Message(ns) 811 self.failUnless(m.isOpenID2()) 812 self.failIf(m.namespaces.isImplicit(message.NULL_NAMESPACE)) 813 self.failUnlessEqual(ns, m.getOpenIDNamespace()) 814 815 def test_setOpenIDNamespace_explicit(self): 816 m = message.Message() 817 m.setOpenIDNamespace(message.THE_OTHER_OPENID1_NS, False) 818 self.failIf(m.namespaces.isImplicit(message.THE_OTHER_OPENID1_NS)) 819 820 def test_setOpenIDNamespace_implicit(self): 821 m = message.Message() 822 m.setOpenIDNamespace(message.THE_OTHER_OPENID1_NS, True) 823 self.failUnless(m.namespaces.isImplicit(message.THE_OTHER_OPENID1_NS)) 824 825 826 def test_explicitOpenID11NSSerialzation(self): 827 m = message.Message() 828 m.setOpenIDNamespace(message.THE_OTHER_OPENID1_NS, implicit=False) 829 830 post_args = m.toPostArgs() 831 self.failUnlessEqual(post_args, 832 {'openid.ns':message.THE_OTHER_OPENID1_NS}) 833 834 def test_fromPostArgs_ns11(self): 835 # An example of the stuff that some Drupal installations send us, 836 # which includes openid.ns but is 1.1. 837 query = { 838 u'openid.assoc_handle': u'', 839 u'openid.claimed_id': u'http://foobar.invalid/', 840 u'openid.identity': u'http://foobar.myopenid.com', 841 u'openid.mode': u'checkid_setup', 842 u'openid.ns': u'http://openid.net/signon/1.1', 843 u'openid.ns.sreg': u'http://openid.net/extensions/sreg/1.1', 844 u'openid.return_to': u'http://drupal.invalid/return_to', 845 u'openid.sreg.required': u'nickname,email', 846 u'openid.trust_root': u'http://drupal.invalid', 847 } 848 m = message.Message.fromPostArgs(query) 849 self.failUnless(m.isOpenID1()) 850 851 852 772 853 class NamespaceMapTest(unittest.TestCase): 773 854 def test_onealias(self): 774 855 nsm = message.NamespaceMap() -
old-openid/openid/test/test_server.py
old new 945 970 len(expected_list) + 2, 946 971 answer.fields.toPostArgs()) 947 972 948 def _expectAnswerv1(self, answer, identity=None):949 expected_list = [950 ('mode', 'id_res'),951 ('return_to', self.request.return_to),952 ('identity', identity),953 ]954 955 for k, expected in expected_list:956 actual = answer.fields.getArg(OPENID_NS, k)957 self.failUnlessEqual(actual, expected, "%s: expected %s, got %s" % (k, expected, actual))958 959 self.failUnless(answer.fields.hasKey(OPENID_NS, 'response_nonce'))960 self.failUnlessEqual(answer.fields.getOpenIDNamespace(), OPENID1_NS)961 962 # One for nonce963 self.failUnlessEqual(len(answer.fields.toPostArgs()),964 len(expected_list) + 1,965 answer.fields.toPostArgs())966 967 968 973 def test_answerAllow(self): 969 974 """Check the fields specified by "Positive Assertions" 970 975 … … 1110 1154 """Test .allow() with an OpenID 1.x Message on a CheckIDRequest 1111 1155 built without an op_endpoint parameter. 1112 1156 """ 1157 identity = 'http://bambam.unittest/' 1113 1158 reqmessage = Message.fromOpenIDArgs({ 1114 'identity': 'http://bambam.unittest/',1159 'identity': identity, 1115 1160 'trust_root': 'http://bar.unittest/', 1116 1161 'return_to': 'http://bar.unittest/999', 1117 1162 }) 1118 1163 self.request = server.CheckIDRequest.fromMessage(reqmessage, None) 1119 1164 answer = self.request.answer(True) 1120 self._expectAnswerv1(answer, 'http://bambam.unittest/') 1165 1166 expected_list = [ 1167 ('mode', 'id_res'), 1168 ('return_to', self.request.return_to), 1169 ('identity', identity), 1170 ] 1171 1172 for k, expected in expected_list: 1173 actual = answer.fields.getArg(OPENID_NS, k) 1174 self.failUnlessEqual( 1175 expected, actual, 1176 "%s: expected %s, got %s" % (k, expected, actual)) 1177 1178 self.failUnless(answer.fields.hasKey(OPENID_NS, 'response_nonce')) 1179 self.failUnlessEqual(answer.fields.getOpenIDNamespace(), OPENID1_NS) 1180 self.failUnless(answer.fields.namespaces.isImplicit(OPENID1_NS)) 1181 1182 # One for nonce (OpenID v1 namespace is implicit) 1183 self.failUnlessEqual(len(answer.fields.toPostArgs()), 1184 len(expected_list) + 1, 1185 answer.fields.toPostArgs()) 1121 1186 1122 1187 def test_answerImmediateDenyOpenID2(self): 1123 1188 """Look for mode=setup_needed in checkid_immediate negative … … 1150 1215 self.failUnlessEqual(answer.request, self.request) 1151 1216 self.failUnlessEqual(len(answer.fields.toPostArgs()), 2, answer.fields) 1152 1217 self.failUnlessEqual(answer.fields.getOpenIDNamespace(), OPENID1_NS) 1218 self.failUnless(answer.fields.namespaces.isImplicit(OPENID1_NS)) 1153 1219 self.failUnlessEqual(answer.fields.getArg(OPENID_NS, 'mode'), 'id_res') 1154 1220 self.failUnless(answer.fields.getArg( 1155 1221 OPENID_NS, 'user_setup_url', '').startswith(server_url)) … … 1202 1268 immediate = False, 1203 1269 op_endpoint = self.server.op_endpoint, 1204 1270 ) 1271 self.request.message = Message(OPENID2_NS) 1205 1272 self.response = server.OpenIDResponse(self.request) 1206 1273 self.response.fields.setArg(OPENID_NS, 'mode', 'id_res') 1207 1274 self.response.fields.setArg(OPENID_NS, 'blue', 'star')

