incubator-heraldry-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ket...@apache.org
Subject svn commit: r463060 [5/6] - in /incubator/heraldry/libraries/python/openid/trunk: ./ admin/ examples/ openid/ openid/consumer/ openid/server/ openid/store/ openid/test/ openid/test/data/ openid/yadis/
Date Wed, 11 Oct 2006 23:22:36 GMT
Added: incubator/heraldry/libraries/python/openid/trunk/openid/test/test_server.py
URL: http://svn.apache.org/viewvc/incubator/heraldry/libraries/python/openid/trunk/openid/test/test_server.py?view=auto&rev=463060
==============================================================================
--- incubator/heraldry/libraries/python/openid/trunk/openid/test/test_server.py (added)
+++ incubator/heraldry/libraries/python/openid/trunk/openid/test/test_server.py Wed Oct 11 16:22:33 2006
@@ -0,0 +1,1337 @@
+"""Tests for openid.server.
+"""
+from openid.server import server
+from openid import association, cryptutil, oidutil
+from openid.message import Message, OPENID_NS, OPENID2_NS, OPENID1_NS, \
+     IDENTIFIER_SELECT
+import _memstore
+import cgi
+
+import unittest
+import warnings
+
+from urlparse import urlparse
+
+# In general, if you edit or add tests here, try to move in the direction
+# of testing smaller units.  For testing the external interfaces, we'll be
+# developing an implementation-agnostic testing suite.
+
+# for more, see /etc/ssh/moduli
+
+ALT_MODULUS = 0xCAADDDEC1667FC68B5FA15D53C4E1532DD24561A1A2D47A12C01ABEA1E00731F6921AAC40742311FDF9E634BB7131BEE1AF240261554389A910425E044E88C8359B010F5AD2B80E29CB1A5B027B19D9E01A6F63A6F45E5D7ED2FF6A2A0085050A7D0CF307C3DB51D2490355907B4427C23A98DF1EB8ABEF2BA209BB7AFFE86A7
+ALT_GEN = 5
+
+class CatchLogs(object):
+    def setUp(self):
+        self.old_logger = oidutil.log
+        oidutil.log = self.gotLogMessage
+        self.messages = []
+
+    def gotLogMessage(self, message):
+        self.messages.append(message)
+
+    def tearDown(self):
+        oidutil.log = self.old_logger
+
+class TestProtocolError(unittest.TestCase):
+    def test_browserWithReturnTo(self):
+        return_to = "http://rp.unittest/consumer"
+        # will be a ProtocolError raised by Decode or CheckIDRequest.answer
+        args = Message.fromPostArgs({
+            'openid.mode': 'monkeydance',
+            'openid.identity': 'http://wagu.unittest/',
+            'openid.return_to': return_to,
+            })
+        e = server.ProtocolError(args, "plucky")
+        self.failUnless(e.hasReturnTo())
+        expected_args = {
+            'openid.mode': ['error'],
+            'openid.error': ['plucky'],
+            }
+
+        rt_base, result_args = e.encodeToURL().split('?', 1)
+        result_args = cgi.parse_qs(result_args)
+        self.failUnlessEqual(result_args, expected_args)
+
+    def test_noReturnTo(self):
+        # will be a ProtocolError raised by Decode or CheckIDRequest.answer
+        args = Message.fromPostArgs({
+            'openid.mode': 'zebradance',
+            'openid.identity': 'http://wagu.unittest/',
+            })
+        e = server.ProtocolError(args, "waffles")
+        self.failIf(e.hasReturnTo())
+        expected = """error:waffles
+mode:error
+"""
+        self.failUnlessEqual(e.encodeToKVForm(), expected)
+
+
+
+class TestDecode(unittest.TestCase):
+    def setUp(self):
+        self.id_url = "http://decoder.am.unittest/"
+        self.rt_url = "http://rp.unittest/foobot/?qux=zam"
+        self.tr_url = "http://rp.unittest/"
+        self.assoc_handle = "{assoc}{handle}"
+        self.decode = server.Decoder().decode
+
+    def test_none(self):
+        args = {}
+        r = self.decode(args)
+        self.failUnlessEqual(r, None)
+
+    def test_irrelevant(self):
+        args = {
+            'pony': 'spotted',
+            'sreg.mutant_power': 'decaffinator',
+            }
+        self.failUnlessRaises(server.ProtocolError, self.decode, args)
+
+    def test_bad(self):
+        args = {
+            'openid.mode': 'twos-compliment',
+            'openid.pants': 'zippered',
+            }
+        self.failUnlessRaises(server.ProtocolError, self.decode, args)
+
+    def test_dictOfLists(self):
+        args = {
+            'openid.mode': ['checkid_setup'],
+            'openid.identity': self.id_url,
+            'openid.assoc_handle': self.assoc_handle,
+            'openid.return_to': self.rt_url,
+            'openid.trust_root': self.tr_url,
+            }
+        try:
+            result = self.decode(args)
+        except TypeError, err:
+            self.failUnless(str(err).find('values') != -1, err)
+        else:
+            self.fail("Expected TypeError, but got result %s" % (result,))
+
+    def test_checkidImmediate(self):
+        args = {
+            'openid.mode': 'checkid_immediate',
+            'openid.identity': self.id_url,
+            'openid.assoc_handle': self.assoc_handle,
+            'openid.return_to': self.rt_url,
+            'openid.trust_root': self.tr_url,
+            # should be ignored
+            'openid.some.extension': 'junk',
+            }
+        r = self.decode(args)
+        self.failUnless(isinstance(r, server.CheckIDRequest))
+        self.failUnlessEqual(r.mode, "checkid_immediate")
+        self.failUnlessEqual(r.immediate, True)
+        self.failUnlessEqual(r.identity, self.id_url)
+        self.failUnlessEqual(r.trust_root, self.tr_url)
+        self.failUnlessEqual(r.return_to, self.rt_url)
+        self.failUnlessEqual(r.assoc_handle, self.assoc_handle)
+
+    def test_checkidSetup(self):
+        args = {
+            'openid.mode': 'checkid_setup',
+            'openid.identity': self.id_url,
+            'openid.assoc_handle': self.assoc_handle,
+            'openid.return_to': self.rt_url,
+            'openid.trust_root': self.tr_url,
+            }
+        r = self.decode(args)
+        self.failUnless(isinstance(r, server.CheckIDRequest))
+        self.failUnlessEqual(r.mode, "checkid_setup")
+        self.failUnlessEqual(r.immediate, False)
+        self.failUnlessEqual(r.identity, self.id_url)
+        self.failUnlessEqual(r.trust_root, self.tr_url)
+        self.failUnlessEqual(r.return_to, self.rt_url)
+
+    def test_checkidSetupNoIdentity(self):
+        args = {
+            'openid.mode': 'checkid_setup',
+            'openid.assoc_handle': self.assoc_handle,
+            'openid.return_to': self.rt_url,
+            'openid.trust_root': self.tr_url,
+            }
+        r = self.decode(args)
+        self.failUnless(isinstance(r, server.CheckIDRequest))
+        self.failUnlessEqual(r.mode, "checkid_setup")
+        self.failUnlessEqual(r.immediate, False)
+        self.failUnlessEqual(r.identity, None)
+        self.failUnlessEqual(r.trust_root, self.tr_url)
+        self.failUnlessEqual(r.return_to, self.rt_url)
+
+    def test_checkidSetupNoReturn(self):
+        args = {
+            'openid.mode': 'checkid_setup',
+            'openid.identity': self.id_url,
+            'openid.assoc_handle': self.assoc_handle,
+            'openid.trust_root': self.tr_url,
+            }
+        self.failUnlessRaises(server.ProtocolError, self.decode, args)
+
+    def test_checkidSetupBadReturn(self):
+        args = {
+            'openid.mode': 'checkid_setup',
+            'openid.identity': self.id_url,
+            'openid.assoc_handle': self.assoc_handle,
+            'openid.return_to': 'not a url',
+            }
+        try:
+            result = self.decode(args)
+        except server.ProtocolError, err:
+            self.failUnless(err.openid_message)
+        else:
+            self.fail("Expected ProtocolError, instead returned with %s" %
+                      (result,))
+
+    def test_checkidSetupUntrustedReturn(self):
+        args = {
+            'openid.mode': 'checkid_setup',
+            'openid.identity': self.id_url,
+            'openid.assoc_handle': self.assoc_handle,
+            'openid.return_to': self.rt_url,
+            'openid.trust_root': 'http://not-the-return-place.unittest/',
+            }
+        try:
+            result = self.decode(args)
+        except server.UntrustedReturnURL, err:
+            self.failUnless(err.openid_message)
+        else:
+            self.fail("Expected UntrustedReturnURL, instead returned with %s" %
+                      (result,))
+
+    def test_checkAuth(self):
+        args = {
+            'openid.mode': 'check_authentication',
+            'openid.assoc_handle': '{dumb}{handle}',
+            'openid.sig': 'sigblob',
+            'openid.signed': 'identity,return_to,nonce,mode',
+            'openid.identity': 'signedval1',
+            'openid.return_to': 'signedval2',
+            'openid.nonce': 'signedval3',
+            'openid.baz': 'unsigned',
+            }
+        r = self.decode(args)
+        self.failUnless(isinstance(r, server.CheckAuthRequest))
+        self.failUnlessEqual(r.mode, 'check_authentication')
+        self.failUnlessEqual(r.sig, 'sigblob')
+
+
+    def test_checkAuthSignAll(self):
+        args = {
+            'openid.mode': 'check_authentication',
+            'openid.assoc_handle': '{dumb-signall}{handle}',
+            'openid.sig': 'sigblob',
+            'openid.identity': 'signedval1',
+            'openid.return_to': 'signedval2',
+            'baz': 'also-signed',
+            }
+        r = self.decode(args)        
+        self.failUnless(isinstance(r, server.CheckAuthRequest))
+        expected_message = Message.fromPostArgs(args)
+        expected_message.setArg(OPENID_NS, "mode", "id_res")
+        self.failUnlessEqual(r.signed, expected_message)
+
+
+    def test_checkAuthMissingSignature(self):
+        args = {
+            'openid.mode': 'check_authentication',
+            'openid.assoc_handle': '{dumb}{handle}',
+            'openid.signed': 'foo,bar,mode',
+            'openid.foo': 'signedval1',
+            'openid.bar': 'signedval2',
+            'openid.baz': 'unsigned',
+            }
+        self.failUnlessRaises(server.ProtocolError, self.decode, args)
+
+
+    def test_checkAuthAndInvalidate(self):
+        args = {
+            'openid.mode': 'check_authentication',
+            'openid.assoc_handle': '{dumb}{handle}',
+            'openid.invalidate_handle': '[[SMART_handle]]',
+            'openid.sig': 'sigblob',
+            'openid.signed': 'identity,return_to,nonce,mode',
+            'openid.identity': 'signedval1',
+            'openid.return_to': 'signedval2',
+            'openid.nonce': 'signedval3',
+            'openid.baz': 'unsigned',
+            }
+        r = self.decode(args)
+        self.failUnless(isinstance(r, server.CheckAuthRequest))
+        self.failUnlessEqual(r.invalidate_handle, '[[SMART_handle]]')
+
+
+    def test_associateDH(self):
+        args = {
+            'openid.mode': 'associate',
+            'openid.session_type': 'DH-SHA1',
+            'openid.dh_consumer_public': "Rzup9265tw==",
+            }
+        r = self.decode(args)
+        self.failUnless(isinstance(r, server.AssociateRequest))
+        self.failUnlessEqual(r.mode, "associate")
+        self.failUnlessEqual(r.session.session_type, "DH-SHA1")
+        self.failUnlessEqual(r.assoc_type, "HMAC-SHA1")
+        self.failUnless(r.session.consumer_pubkey)
+
+    def test_associateDHMissingKey(self):
+        """Trying DH assoc w/o public key"""
+        args = {
+            'openid.mode': 'associate',
+            'openid.session_type': 'DH-SHA1',
+            }
+        # Using DH-SHA1 without supplying dh_consumer_public is an error.
+        self.failUnlessRaises(server.ProtocolError, self.decode, args)
+
+
+    def test_associateDHpubKeyNotB64(self):
+        args = {
+            'openid.mode': 'associate',
+            'openid.session_type': 'DH-SHA1',
+            'openid.dh_consumer_public': "donkeydonkeydonkey",
+            }
+        self.failUnlessRaises(server.ProtocolError, self.decode, args)
+
+
+    def test_associateDHModGen(self):
+        # test dh with non-default but valid values for dh_modulus and dh_gen
+        args = {
+            'openid.mode': 'associate',
+            'openid.session_type': 'DH-SHA1',
+            'openid.dh_consumer_public': "Rzup9265tw==",
+            'openid.dh_modulus': cryptutil.longToBase64(ALT_MODULUS),
+            'openid.dh_gen': cryptutil.longToBase64(ALT_GEN) ,
+            }
+        r = self.decode(args)
+        self.failUnless(isinstance(r, server.AssociateRequest))
+        self.failUnlessEqual(r.mode, "associate")
+        self.failUnlessEqual(r.session.session_type, "DH-SHA1")
+        self.failUnlessEqual(r.assoc_type, "HMAC-SHA1")
+        self.failUnlessEqual(r.session.dh.modulus, ALT_MODULUS)
+        self.failUnlessEqual(r.session.dh.generator, ALT_GEN)
+        self.failUnless(r.session.consumer_pubkey)
+
+
+    def test_associateDHCorruptModGen(self):
+        # test dh with non-default but valid values for dh_modulus and dh_gen
+        args = {
+            'openid.mode': 'associate',
+            'openid.session_type': 'DH-SHA1',
+            'openid.dh_consumer_public': "Rzup9265tw==",
+            'openid.dh_modulus': 'pizza',
+            'openid.dh_gen': 'gnocchi',
+            }
+        self.failUnlessRaises(server.ProtocolError, self.decode, args)
+
+
+    def test_associateDHMissingModGen(self):
+        # test dh with non-default but valid values for dh_modulus and dh_gen
+        args = {
+            'openid.mode': 'associate',
+            'openid.session_type': 'DH-SHA1',
+            'openid.dh_consumer_public': "Rzup9265tw==",
+            'openid.dh_modulus': 'pizza',
+            }
+        self.failUnlessRaises(server.ProtocolError, self.decode, args)
+
+
+#     def test_associateDHInvalidModGen(self):
+#         # test dh with properly encoded values that are not a valid
+#         #   modulus/generator combination.
+#         args = {
+#             'openid.mode': 'associate',
+#             'openid.session_type': 'DH-SHA1',
+#             'openid.dh_consumer_public': "Rzup9265tw==",
+#             'openid.dh_modulus': cryptutil.longToBase64(9),
+#             'openid.dh_gen': cryptutil.longToBase64(27) ,
+#             }
+#         self.failUnlessRaises(server.ProtocolError, self.decode, args)
+#     test_associateDHInvalidModGen.todo = "low-priority feature"
+
+
+    def test_associateWeirdSession(self):
+        args = {
+            'openid.mode': 'associate',
+            'openid.session_type': 'FLCL6',
+            'openid.dh_consumer_public': "YQ==\n",
+            }
+        self.failUnlessRaises(server.ProtocolError, self.decode, args)
+
+
+    def test_associatePlain(self):
+        args = {
+            'openid.mode': 'associate',
+            }
+        r = self.decode(args)
+        self.failUnless(isinstance(r, server.AssociateRequest))
+        self.failUnlessEqual(r.mode, "associate")
+        self.failUnlessEqual(r.session.session_type, "no-encryption")
+        self.failUnlessEqual(r.assoc_type, "HMAC-SHA1")
+
+    def test_nomode(self):
+        args = {
+            'openid.session_type': 'DH-SHA1',
+            'openid.dh_consumer_public': "my public keeey",
+            }
+        self.failUnlessRaises(server.ProtocolError, self.decode, args)
+
+
+
+class TestEncode(unittest.TestCase):
+    def setUp(self):
+        self.encoder = server.Encoder()
+        self.encode = self.encoder.encode
+
+    def test_id_res(self):
+        request = server.CheckIDRequest(
+            identity = 'http://bombom.unittest/',
+            trust_root = 'http://burr.unittest/',
+            return_to = 'http://burr.unittest/999',
+            immediate = False,
+            )
+        response = server.OpenIDResponse(request)
+        response.fields = Message.fromOpenIDArgs({
+            'mode': 'id_res',
+            'identity': request.identity,
+            'return_to': request.return_to,
+            })
+        webresponse = self.encode(response)
+        self.failUnlessEqual(webresponse.code, server.HTTP_REDIRECT)
+        self.failUnless(webresponse.headers.has_key('location'))
+
+        location = webresponse.headers['location']
+        self.failUnless(location.startswith(request.return_to),
+                        "%s does not start with %s" % (location,
+                                                       request.return_to))
+        # argh.
+        q2 = dict(cgi.parse_qsl(urlparse(location)[4]))
+        expected = response.fields.toPostArgs()
+        self.failUnlessEqual(q2, expected)
+
+    def test_cancel(self):
+        request = server.CheckIDRequest(
+            identity = 'http://bombom.unittest/',
+            trust_root = 'http://burr.unittest/',
+            return_to = 'http://burr.unittest/999',
+            immediate = False,
+            )
+        response = server.OpenIDResponse(request)
+        response.fields = Message.fromOpenIDArgs({
+            'mode': 'cancel',
+            })
+        webresponse = self.encode(response)
+        self.failUnlessEqual(webresponse.code, server.HTTP_REDIRECT)
+        self.failUnless(webresponse.headers.has_key('location'))
+
+    def test_assocReply(self):
+        request = server.AssociateRequest.fromMessage(Message(OPENID2_NS))
+        response = server.OpenIDResponse(request)
+        response.fields = Message.fromPostArgs(
+            {'openid.assoc_handle': "every-zig"})
+        webresponse = self.encode(response)
+        body = """assoc_handle:every-zig
+"""
+        self.failUnlessEqual(webresponse.code, server.HTTP_OK)
+        self.failUnlessEqual(webresponse.headers, {})
+        self.failUnlessEqual(webresponse.body, body)
+
+    def test_checkauthReply(self):
+        request = server.CheckAuthRequest('a_sock_monkey',
+                                          'siggggg',
+                                          [])
+        response = server.OpenIDResponse(request)
+        response.fields = Message.fromOpenIDArgs({
+            'is_valid': 'true',
+            'invalidate_handle': 'xXxX:xXXx'
+            })
+        body = """invalidate_handle:xXxX:xXXx
+is_valid:true
+"""
+        webresponse = self.encode(response)
+        self.failUnlessEqual(webresponse.code, server.HTTP_OK)
+        self.failUnlessEqual(webresponse.headers, {})
+        self.failUnlessEqual(webresponse.body, body)
+
+    def test_unencodableError(self):
+        args = Message.fromPostArgs({
+            'openid.identity': 'http://limu.unittest/',
+            })
+        e = server.ProtocolError(args, "wet paint")
+        self.failUnlessRaises(server.EncodingError, self.encode, e)
+
+    def test_encodableError(self):
+        args = Message.fromPostArgs({
+            'openid.mode': 'associate',
+            'openid.identity': 'http://limu.unittest/',
+            })
+        body="error:snoot\nmode:error\n"
+        webresponse = self.encode(server.ProtocolError(args, "snoot"))
+        self.failUnlessEqual(webresponse.code, server.HTTP_ERROR)
+        self.failUnlessEqual(webresponse.headers, {})
+        self.failUnlessEqual(webresponse.body, body)
+
+
+
+class TestSigningEncode(unittest.TestCase):
+    def setUp(self):
+        self._dumb_key = server.Signatory._dumb_key
+        self._normal_key = server.Signatory._normal_key
+        self.store = _memstore.MemoryStore()
+        self.request = server.CheckIDRequest(
+            identity = 'http://bombom.unittest/',
+            trust_root = 'http://burr.unittest/',
+            return_to = 'http://burr.unittest/999',
+            immediate = False,
+            )
+        self.response = server.OpenIDResponse(self.request)
+        self.response.fields = Message.fromOpenIDArgs({
+            'mode': 'id_res',
+            'identity': self.request.identity,
+            'return_to': self.request.return_to,
+            })
+        self.signatory = server.Signatory(self.store)
+        self.encoder = server.SigningEncoder(self.signatory)
+        self.encode = self.encoder.encode
+
+    def test_idres(self):
+        assoc_handle = '{bicycle}{shed}'
+        self.store.storeAssociation(
+            self._normal_key,
+            association.Association.fromExpiresIn(60, assoc_handle,
+                                                  'sekrit', 'HMAC-SHA1'))
+        self.request.assoc_handle = assoc_handle
+        webresponse = self.encode(self.response)
+        self.failUnlessEqual(webresponse.code, server.HTTP_REDIRECT)
+        self.failUnless(webresponse.headers.has_key('location'))
+
+        location = webresponse.headers['location']
+        query = cgi.parse_qs(urlparse(location)[4])
+        self.failUnless('openid.sig' in query)
+        self.failUnless('openid.assoc_handle' in query)
+        self.failUnless('openid.signed' in query)
+
+    def test_idresDumb(self):
+        webresponse = self.encode(self.response)
+        self.failUnlessEqual(webresponse.code, server.HTTP_REDIRECT)
+        self.failUnless(webresponse.headers.has_key('location'))
+
+        location = webresponse.headers['location']
+        query = cgi.parse_qs(urlparse(location)[4])
+        self.failUnless('openid.sig' in query)
+        self.failUnless('openid.assoc_handle' in query)
+        self.failUnless('openid.signed' in query)
+
+    def test_forgotStore(self):
+        self.encoder.signatory = None
+        self.failUnlessRaises(ValueError, self.encode, self.response)
+
+    def test_cancel(self):
+        request = server.CheckIDRequest(
+            identity = 'http://bombom.unittest/',
+            trust_root = 'http://burr.unittest/',
+            return_to = 'http://burr.unittest/999',
+            immediate = False,
+            )
+        response = server.OpenIDResponse(request)
+        response.fields.setArg(OPENID_NS, 'mode', 'cancel')
+        webresponse = self.encode(response)
+        self.failUnlessEqual(webresponse.code, server.HTTP_REDIRECT)
+        self.failUnless(webresponse.headers.has_key('location'))
+        location = webresponse.headers['location']
+        query = cgi.parse_qs(urlparse(location)[4])
+        self.failIf('openid.sig' in query, response.fields.toPostArgs())
+
+    def test_assocReply(self):
+        request = server.AssociateRequest.fromMessage(Message(OPENID2_NS))
+        response = server.OpenIDResponse(request)
+        response.fields = Message.fromOpenIDArgs({'assoc_handle': "every-zig"})
+        webresponse = self.encode(response)
+        body = """assoc_handle:every-zig
+"""
+        self.failUnlessEqual(webresponse.code, server.HTTP_OK)
+        self.failUnlessEqual(webresponse.headers, {})
+        self.failUnlessEqual(webresponse.body, body)
+
+    def test_alreadySigned(self):
+        self.response.fields.setArg(OPENID_NS, 'sig', 'priorSig==')
+        self.failUnlessRaises(server.AlreadySigned, self.encode, self.response)
+
+
+
+class TestCheckID(unittest.TestCase):
+    def setUp(self):
+        self.request = server.CheckIDRequest(
+            identity = 'http://bambam.unittest/',
+            trust_root = 'http://bar.unittest/',
+            return_to = 'http://bar.unittest/999',
+            immediate = False,
+            )
+
+    def test_trustRootInvalid(self):
+        self.request.trust_root = "http://foo.unittest/17"
+        self.request.return_to = "http://foo.unittest/39"
+        self.failIf(self.request.trustRootValid())
+
+    def test_trustRootValid(self):
+        self.request.trust_root = "http://foo.unittest/"
+        self.request.return_to = "http://foo.unittest/39"
+        self.failUnless(self.request.trustRootValid())
+
+    def _expectAnswer(self, answer, identity=None):
+        expected_list = [('mode', 'id_res'), ('return_to', self.request.return_to)]
+        if identity:
+            expected_list.append(('identity', identity))
+
+        for k, expected in expected_list:
+            actual = answer.fields.getArg(OPENID_NS, k)
+            self.failUnlessEqual(actual, expected, "%s: expected %s, got %s" % (k, expected, actual))
+
+        self.failUnless(answer.fields.hasKey(OPENID_NS, 'nonce'))
+        self.failUnless(answer.fields.getOpenIDNamespace() == OPENID2_NS)
+
+        # One for nonce, one for ns
+        self.failUnlessEqual(len(answer.fields.toPostArgs()),
+                             len(expected_list) + 2, answer.fields.toPostArgs())
+        
+
+    def test_answerAllow(self):
+        answer = self.request.answer(True)
+        self.failUnlessEqual(answer.request, self.request)
+        self._expectAnswer(answer, self.request.identity)
+
+    def test_answerAllowWithoutIdentityReally(self):
+        self.request.identity = None
+        answer = self.request.answer(True)
+        self.failUnlessEqual(answer.request, self.request)
+        self._expectAnswer(answer)
+
+    def test_answerAllowAnonymousFail(self):
+        self.request.identity = None
+        self.failUnlessRaises(
+            ValueError, self.request.answer, True, identity="=V")
+
+    def test_answerAllowWithIdentity(self):
+        self.request.identity = IDENTIFIER_SELECT
+        answer = self.request.answer(True, identity='=V')
+        self._expectAnswer(answer, '=V')
+
+    def test_answerAllowWithAnotherIdentity(self):
+        self.failUnlessRaises(ValueError, self.request.answer, True,
+                              identity="http://pebbles.unittest/")
+
+    def test_answerAllowNoTrustRoot(self):
+        self.request.trust_root = None
+        answer = self.request.answer(True)
+        self.failUnlessEqual(answer.request, self.request)
+        self._expectAnswer(answer, self.request.identity)
+
+    def test_answerImmediateDeny(self):
+        self.request.mode = 'checkid_immediate'
+        self.request.immediate = True
+        server_url = "http://setup-url.unittest/"
+        # crappiting setup_url, you dirty my interface with your presence!
+        answer = self.request.answer(False, server_url=server_url)
+        self.failUnlessEqual(answer.request, self.request)
+        self.failUnlessEqual(len(answer.fields.toPostArgs()), 3, answer.fields)
+        self.failUnlessEqual(answer.fields.getOpenIDNamespace(), OPENID2_NS)
+        self.failUnlessEqual(answer.fields.getArg(OPENID_NS, 'mode'), 'id_res')
+        self.failUnless(answer.fields.getArg(
+            OPENID_NS, 'user_setup_url', '').startswith(server_url))
+
+    def test_answerSetupDeny(self):
+        answer = self.request.answer(False)
+        self.failUnlessEqual(answer.fields.getArgs(OPENID_NS), {
+            'mode': 'cancel',
+            })
+
+    def test_encodeToURL(self):
+        server_url = 'http://openid-server.unittest/'
+        result = self.request.encodeToURL(server_url)
+
+        # How to check?  How about a round-trip test.
+        base, result_args = result.split('?', 1)
+        result_args = dict(cgi.parse_qsl(result_args))
+        message = Message.fromPostArgs(result_args)
+        rebuilt_request = server.CheckIDRequest.fromMessage(message)
+        # argh, lousy hack
+        self.request.message = message
+        self.failUnlessEqual(rebuilt_request.__dict__, self.request.__dict__)
+
+    def test_getCancelURL(self):
+        url = self.request.getCancelURL()
+        rt, query_string = url.split('?')
+        self.failUnlessEqual(self.request.return_to, rt)
+        query = dict(cgi.parse_qsl(query_string))
+        self.failUnlessEqual(query, {'openid.mode':'cancel',
+                                     'openid.ns':OPENID2_NS})
+
+    def test_getCancelURLimmed(self):
+        self.request.mode = 'checkid_immediate'
+        self.request.immediate = True
+        self.failUnlessRaises(ValueError, self.request.getCancelURL)
+
+
+
+class TestCheckIDExtension(unittest.TestCase):
+
+    def setUp(self):
+        self.request = server.CheckIDRequest(
+            identity = 'http://bambam.unittest/',
+            trust_root = 'http://bar.unittest/',
+            return_to = 'http://bar.unittest/999',
+            immediate = False,
+            )
+        self.response = server.OpenIDResponse(self.request)
+        self.response.fields.setArg(OPENID_NS, 'mode', 'id_res')
+        self.response.fields.setArg(OPENID_NS, 'blue', 'star')
+
+
+    def test_addField(self):
+        namespace = 'something:'
+        self.response.fields.setArg(namespace, 'bright', 'potato')
+        self.failUnlessEqual(self.response.fields.getArgs(OPENID_NS),
+                             {'blue': 'star',
+                              'mode': 'id_res',
+                              })
+        
+        self.failUnlessEqual(self.response.fields.getArgs(namespace),
+                             {'bright':'potato'})
+
+
+    def test_addFields(self):
+        namespace = 'mi5:'
+        args =  {'tangy': 'suspenders',
+                 'bravo': 'inclusion'}
+        self.response.fields.updateArgs(namespace, args)
+        self.failUnlessEqual(self.response.fields.getArgs(OPENID_NS),
+                             {'blue': 'star',
+                              'mode': 'id_res',
+                              })
+        self.failUnlessEqual(self.response.fields.getArgs(namespace), args)
+
+
+
+class MockSignatory(object):
+    isValid = True
+
+    def __init__(self, assoc):
+        self.assocs = [assoc]
+
+    def verify(self, assoc_handle, message):
+        assert message.hasKey(OPENID_NS, "sig")
+        if (True, assoc_handle) in self.assocs:
+            return self.isValid
+        else:
+            return False
+
+    def getAssociation(self, assoc_handle, dumb):
+        if (dumb, assoc_handle) in self.assocs:
+            # This isn't a valid implementation for many uses of this
+            # function, mind you.
+            return True
+        else:
+            return None
+
+    def invalidate(self, assoc_handle, dumb):
+        if (dumb, assoc_handle) in self.assocs:
+            self.assocs.remove((dumb, assoc_handle))
+
+
+class TestCheckAuth(unittest.TestCase):
+    def setUp(self):
+        self.assoc_handle = 'mooooooooo'
+        self.message = Message.fromPostArgs({
+            'openid.sig': 'signarture',
+            'one': 'alpha',
+            'two': 'beta',
+            })
+        self.request = server.CheckAuthRequest(
+            self.assoc_handle, self.message)
+
+        self.signatory = MockSignatory((True, self.assoc_handle))
+
+    def test_valid(self):
+        r = self.request.answer(self.signatory)
+        self.failUnlessEqual(r.fields.getArgs(OPENID_NS), {'is_valid': 'true'})
+        self.failUnlessEqual(r.request, self.request)
+
+    def test_invalid(self):
+        self.signatory.isValid = False
+        r = self.request.answer(self.signatory)
+        self.failUnlessEqual(r.fields.getArgs(OPENID_NS),
+                             {'is_valid': 'false'})
+
+    def test_replay(self):
+        r = self.request.answer(self.signatory)
+        r = self.request.answer(self.signatory)
+        self.failUnlessEqual(r.fields.getArgs(OPENID_NS),
+                             {'is_valid': 'false'})
+
+    def test_invalidatehandle(self):
+        self.request.invalidate_handle = "bogusHandle"
+        r = self.request.answer(self.signatory)
+        self.failUnlessEqual(r.fields.getArgs(OPENID_NS),
+                             {'is_valid': 'true',
+                              'invalidate_handle': "bogusHandle"})
+        self.failUnlessEqual(r.request, self.request)
+
+    def test_invalidatehandleNo(self):
+        assoc_handle = 'goodhandle'
+        self.signatory.assocs.append((False, 'goodhandle'))
+        self.request.invalidate_handle = assoc_handle
+        r = self.request.answer(self.signatory)
+        self.failUnlessEqual(r.fields.getArgs(OPENID_NS), {'is_valid': 'true'})
+
+
+class TestAssociate(unittest.TestCase):
+    # TODO: test DH with non-default values for modulus and gen.
+    # (important to do because we actually had it broken for a while.)
+
+    def setUp(self):
+        self.request = server.AssociateRequest.fromMessage(
+            Message.fromPostArgs({}))
+        self.store = _memstore.MemoryStore()
+        self.signatory = server.Signatory(self.store)
+
+    def test_dhSHA1(self):
+        self.assoc = self.signatory.createAssociation(dumb=False, assoc_type='HMAC-SHA1')
+        from openid.dh import DiffieHellman
+        from openid.server.server import DiffieHellmanSHA1ServerSession
+        consumer_dh = DiffieHellman.fromDefaults()
+        cpub = consumer_dh.public
+        server_dh = DiffieHellman.fromDefaults()
+        session = DiffieHellmanSHA1ServerSession(server_dh, cpub)
+        self.request = server.AssociateRequest(session, 'HMAC-SHA1')
+        response = self.request.answer(self.assoc)
+        rfg = lambda f: response.fields.getArg(OPENID_NS, f)
+        self.failUnlessEqual(rfg("assoc_type"), "HMAC-SHA1")
+        self.failUnlessEqual(rfg("assoc_handle"), self.assoc.handle)
+        self.failIf(rfg("mac_key"))
+        self.failUnlessEqual(rfg("session_type"), "DH-SHA1")
+        self.failUnless(rfg("enc_mac_key"))
+        self.failUnless(rfg("dh_server_public"))
+
+        enc_key = rfg("enc_mac_key").decode('base64')
+        spub = cryptutil.base64ToLong(rfg("dh_server_public"))
+        secret = consumer_dh.xorSecret(spub, enc_key, cryptutil.sha1)
+        self.failUnlessEqual(secret, self.assoc.secret)
+
+
+    try:
+        cryptutil.sha256('')
+    except NotImplementedError:
+        warnings.warn("Not running SHA256 tests.")
+    else:
+        def test_dhSHA256(self):
+            self.assoc = self.signatory.createAssociation(dumb=False, assoc_type='HMAC-SHA256-SIGNALL')
+            from openid.dh import DiffieHellman
+            from openid.server.server import DiffieHellmanSHA256ServerSession
+            consumer_dh = DiffieHellman.fromDefaults()
+            cpub = consumer_dh.public
+            server_dh = DiffieHellman.fromDefaults()
+            session = DiffieHellmanSHA256ServerSession(server_dh, cpub)
+            self.request = server.AssociateRequest(session, 'HMAC-SHA256-SIGNALL')
+            response = self.request.answer(self.assoc)
+            rfg = lambda f: response.fields.getArg(OPENID_NS, f)
+            self.failUnlessEqual(rfg("assoc_type"), "HMAC-SHA256-SIGNALL")
+            self.failUnlessEqual(rfg("assoc_handle"), self.assoc.handle)
+            self.failIf(rfg("mac_key"))
+            self.failUnlessEqual(rfg("session_type"), "DH-SHA256")
+            self.failUnless(rfg("enc_mac_key"))
+            self.failUnless(rfg("dh_server_public"))
+
+            enc_key = rfg("enc_mac_key").decode('base64')
+            spub = cryptutil.base64ToLong(rfg("dh_server_public"))
+            secret = consumer_dh.xorSecret(spub, enc_key, cryptutil.sha256)
+            self.failUnlessEqual(secret, self.assoc.secret)
+
+        def test_protoError256(self):
+            from openid.consumer.consumer import \
+                 DiffieHellmanSHA256ConsumerSession
+
+            s256_session = DiffieHellmanSHA256ConsumerSession()
+
+            invalid_s256 = {'openid.assoc_type':'HMAC-SHA1',
+                            'openid.session_type':'DH-SHA256',}
+            invalid_s256.update(s256_session.getRequest())
+
+            invalid_s256_2 = {'openid.assoc_type':'MONKEY-PIRATE',
+                              'openid.session_type':'DH-SHA256',}
+            invalid_s256_2.update(s256_session.getRequest())
+
+            bad_request_argss = [
+                invalid_s256,
+                invalid_s256_2,
+                ]
+
+            for request_args in bad_request_argss:
+                message = Message.fromPostArgs(request_args)
+                self.failUnlessRaises(server.ProtocolError,
+                                      server.AssociateRequest.fromMessage,
+                                      message)
+
+    def test_protoError(self):
+        from openid.consumer.consumer import DiffieHellmanSHA1ConsumerSession
+            
+        s1_session = DiffieHellmanSHA1ConsumerSession()
+
+        invalid_s1 = {'openid.assoc_type':'HMAC-SHA256-SIGNALL',
+                      'openid.session_type':'DH-SHA1',}
+        invalid_s1.update(s1_session.getRequest())
+
+        invalid_s1_2 = {'openid.assoc_type':'ROBOT-NINJA',
+                      'openid.session_type':'DH-SHA1',}
+        invalid_s1_2.update(s1_session.getRequest())
+        
+        bad_request_argss = [
+            {'openid.assoc_type':'Wha?'},
+            invalid_s1,
+            invalid_s1_2,
+            ]
+            
+        for request_args in bad_request_argss:
+            message = Message.fromPostArgs(request_args)
+            self.failUnlessRaises(server.ProtocolError,
+                                  server.AssociateRequest.fromMessage,
+                                  message)
+
+    def test_plaintext(self):
+        self.assoc = self.signatory.createAssociation(dumb=False, assoc_type='HMAC-SHA1')
+        response = self.request.answer(self.assoc)
+        rfg = lambda f: response.fields.getArg(OPENID_NS, f)
+
+        self.failUnlessEqual(rfg("assoc_type"), "HMAC-SHA1")
+        self.failUnlessEqual(rfg("assoc_handle"), self.assoc.handle)
+
+        self.failUnlessEqual(
+            rfg("expires_in"), "%d" % (self.signatory.SECRET_LIFETIME,))
+        self.failUnlessEqual(
+            rfg("mac_key"), oidutil.toBase64(self.assoc.secret))
+        self.failIf(rfg("session_type"))
+        self.failIf(rfg("enc_mac_key"))
+        self.failIf(rfg("dh_server_public"))
+
+    def test_plaintext256(self):
+        self.assoc = self.signatory.createAssociation(dumb=False, assoc_type='HMAC-SHA256-SIGNALL')
+        response = self.request.answer(self.assoc)
+        rfg = lambda f: response.fields.getArg(OPENID_NS, f)
+
+        self.failUnlessEqual(rfg("assoc_type"), "HMAC-SHA1")
+        self.failUnlessEqual(rfg("assoc_handle"), self.assoc.handle)
+
+        self.failUnlessEqual(
+            rfg("expires_in"), "%d" % (self.signatory.SECRET_LIFETIME,))
+        self.failUnlessEqual(
+            rfg("mac_key"), oidutil.toBase64(self.assoc.secret))
+        self.failIf(rfg("session_type"))
+        self.failIf(rfg("enc_mac_key"))
+        self.failIf(rfg("dh_server_public"))
+
+    def test_unsupportedPrefer(self):
+        allowed_assoc = 'COLD-PET-RAT'
+        allowed_sess = 'FROG-BONES'
+        message = 'This is a unit test'
+        response = self.request.answerUnsupported(
+            message=message,
+            preferred_session_type=allowed_sess,
+            preferred_association_type=allowed_assoc,
+            )
+        rfg = lambda f: response.fields.getArg(OPENID_NS, f)
+        self.failUnlessEqual(rfg('error_code'), 'unsupported-type')
+        self.failUnlessEqual(rfg('assoc_type'), allowed_assoc)
+        self.failUnlessEqual(rfg('error'), message)
+        self.failUnlessEqual(rfg('session_type'), allowed_sess)
+
+    def test_unsupported(self):
+        response = self.request.answerUnsupported()
+        rfg = lambda f: response.fields.getArg(OPENID_NS, f)
+        self.failUnlessEqual(rfg('error_code'), 'unsupported-type')
+        self.failUnlessEqual(rfg('assoc_type'), None)
+        self.failUnlessEqual(rfg('error'), None)
+        self.failUnlessEqual(rfg('session_type'), None)
+
+class Counter(object):
+    def __init__(self):
+        self.count = 0
+
+    def inc(self):
+        self.count += 1
+
+class TestServer(unittest.TestCase, CatchLogs):
+    def setUp(self):
+        self.store = _memstore.MemoryStore()
+        self.server = server.Server(self.store)
+        CatchLogs.setUp(self)
+
+    def test_dispatch(self):
+        monkeycalled = Counter()
+        def monkeyDo(request):
+            monkeycalled.inc()
+            r = server.OpenIDResponse(request)
+            return r
+        self.server.openid_monkeymode = monkeyDo
+        request = server.OpenIDRequest()
+        request.mode = "monkeymode"
+        request.namespace = OPENID1_NS
+        webresult = self.server.handleRequest(request)
+        self.failUnlessEqual(monkeycalled.count, 1)
+
+    def test_associate(self):
+        request = server.AssociateRequest.fromMessage(Message.fromPostArgs({}))
+        response = self.server.openid_associate(request)
+        self.failUnless(response.fields.hasKey(OPENID_NS, "assoc_handle"),
+                        "No assoc_handle here: %s" % (response.fields,))
+
+    def test_associate2(self):
+        """Associate when the server has no allowed association types
+
+        Gives back an error with error_code and no fallback session or
+        assoc types."""
+        self.server.negotiator.setAllowedTypes([])
+        request = server.AssociateRequest.fromMessage(Message.fromPostArgs({}))
+        response = self.server.openid_associate(request)
+        self.failUnless(response.fields.hasKey(OPENID_NS, "error"))
+        self.failUnless(response.fields.hasKey(OPENID_NS, "error_code"))
+        self.failIf(response.fields.hasKey(OPENID_NS, "assoc_handle"))
+        self.failIf(response.fields.hasKey(OPENID_NS, "assoc_type"))
+        self.failIf(response.fields.hasKey(OPENID_NS, "session_type"))
+
+    def test_associate3(self):
+        """Request an assoc type that is not supported when there are
+        supported types.
+
+        Should give back an error message with a fallback type.
+        """
+        self.server.negotiator.setAllowedTypes([('HMAC-SHA256-SIGNALL', 'DH-SHA256')])
+        request = server.AssociateRequest.fromMessage(Message.fromPostArgs({}))
+        response = self.server.openid_associate(request)
+        self.failUnless(response.fields.hasKey(OPENID_NS, "error"))
+        self.failUnless(response.fields.hasKey(OPENID_NS, "error_code"))
+        self.failIf(response.fields.hasKey(OPENID_NS, "assoc_handle"))
+        self.failUnlessEqual(response.fields.getArg(OPENID_NS, "assoc_type"),
+                             'HMAC-SHA256-SIGNALL')
+        self.failUnlessEqual(response.fields.getArg(OPENID_NS, "session_type"),
+                             'DH-SHA256')
+
+    try:
+        cryptutil.sha256('')
+    except NotImplementedError:
+        warnings.warn("Not running SHA256 tests.")
+    else:
+        def test_associate4(self):
+            """DH-SHA256 association session"""
+            self.server.negotiator.setAllowedTypes(
+                [('HMAC-SHA256-SIGNALL', 'DH-SHA256')])
+            query = {
+                'openid.dh_consumer_public':
+                'ALZgnx8N5Lgd7pCj8K86T/DDMFjJXSss1SKoLmxE72kJTzOtG6I2PaYrHX'
+                'xku4jMQWSsGfLJxwCZ6280uYjUST/9NWmuAfcrBfmDHIBc3H8xh6RBnlXJ'
+                '1WxJY3jHd5k1/ZReyRZOxZTKdF/dnIqwF8ZXUwI6peV0TyS/K1fOfF/s',
+                'openid.assoc_type': 'HMAC-SHA256-SIGNALL',
+                'openid.session_type': 'DH-SHA256',
+                }
+            message = Message.fromPostArgs(query)
+            request = server.AssociateRequest.fromMessage(message)
+            response = self.server.openid_associate(request)
+            self.failUnless(response.fields.hasKey(OPENID_NS, "assoc_handle"))
+
+    def test_checkAuth(self):
+        request = server.CheckAuthRequest('arrrrrf', '0x3999', [])
+        response = self.server.openid_check_authentication(request)
+        self.failUnless(response.fields.hasKey(OPENID_NS, "is_valid"))
+
+class TestSignatory(unittest.TestCase, CatchLogs):
+    def setUp(self):
+        self.store = _memstore.MemoryStore()
+        self.signatory = server.Signatory(self.store)
+        self._dumb_key = self.signatory._dumb_key
+        self._normal_key = self.signatory._normal_key
+        CatchLogs.setUp(self)
+
+    def test_sign(self):
+        request = server.OpenIDRequest()
+        assoc_handle = '{assoc}{lookatme}'
+        self.store.storeAssociation(
+            self._normal_key,
+            association.Association.fromExpiresIn(60, assoc_handle,
+                                                  'sekrit', 'HMAC-SHA1'))
+        request.assoc_handle = assoc_handle
+        request.namespace = OPENID1_NS
+        response = server.OpenIDResponse(request)
+        response.fields = Message.fromOpenIDArgs({
+            'foo': 'amsigned',
+            'bar': 'notsigned',
+            'azu': 'alsosigned',
+            })
+        sresponse = self.signatory.sign(response)
+        self.failUnlessEqual(
+            sresponse.fields.getArg(OPENID_NS, 'assoc_handle'),
+            assoc_handle)
+        self.failUnlessEqual(sresponse.fields.getArg(OPENID_NS, 'signed'),
+                             'assoc_handle,azu,bar,foo,signed')
+        self.failUnless(sresponse.fields.getArg(OPENID_NS, 'sig'))
+        self.failIf(self.messages, self.messages)
+
+    def test_signDumb(self):
+        request = server.OpenIDRequest()
+        request.assoc_handle = None
+        request.namespace = OPENID2_NS
+        response = server.OpenIDResponse(request)
+        response.fields = Message.fromOpenIDArgs({
+            'foo': 'amsigned',
+            'bar': 'notsigned',
+            'azu': 'alsosigned',
+            'ns':OPENID2_NS,
+            })
+        sresponse = self.signatory.sign(response)
+        assoc_handle = sresponse.fields.getArg(OPENID_NS, 'assoc_handle')
+        self.failUnless(assoc_handle)
+        assoc = self.signatory.getAssociation(assoc_handle, dumb=True)
+        self.failUnless(assoc)
+        self.failUnlessEqual(sresponse.fields.getArg(OPENID_NS, 'signed'),
+                             'assoc_handle,azu,bar,foo,ns,signed')
+        self.failUnless(sresponse.fields.getArg(OPENID_NS, 'sig'))
+        self.failIf(self.messages, self.messages)
+
+    def test_signExpired(self):
+        request = server.OpenIDRequest()
+        request.namespace = OPENID2_NS
+        assoc_handle = '{assoc}{lookatme}'
+        self.store.storeAssociation(
+            self._normal_key,
+            association.Association.fromExpiresIn(-10, assoc_handle,
+                                                  'sekrit', 'HMAC-SHA1'))
+        self.failUnless(self.store.getAssociation(self._normal_key, assoc_handle))
+
+        request.assoc_handle = assoc_handle
+        response = server.OpenIDResponse(request)
+        response.fields = Message.fromOpenIDArgs({
+            'foo': 'amsigned',
+            'bar': 'notsigned',
+            'azu': 'alsosigned',
+            })
+        sresponse = self.signatory.sign(response)
+
+        new_assoc_handle = sresponse.fields.getArg(OPENID_NS, 'assoc_handle')
+        self.failUnless(new_assoc_handle)
+        self.failIfEqual(new_assoc_handle, assoc_handle)
+
+        self.failUnlessEqual(
+            sresponse.fields.getArg(OPENID_NS, 'invalidate_handle'),
+            assoc_handle)
+
+        self.failUnlessEqual(sresponse.fields.getArg(OPENID_NS, 'signed'),
+                             'assoc_handle,azu,bar,foo,invalidate_handle,signed')
+        self.failUnless(sresponse.fields.getArg(OPENID_NS, 'sig'))
+
+        # make sure the expired association is gone
+        self.failIf(self.store.getAssociation(self._normal_key, assoc_handle),
+                    "expired association is still retrievable.")
+
+        # make sure the new key is a dumb mode association
+        self.failUnless(self.store.getAssociation(self._dumb_key, new_assoc_handle))
+        self.failIf(self.store.getAssociation(self._normal_key, new_assoc_handle))
+        self.failUnless(self.messages)
+
+    def test_signExpiredSignAll(self):
+        request = server.OpenIDRequest()
+        request.namespace = OPENID2_NS
+        assoc_handle = '{assoc}{lookatme}'
+
+        expired_assoc = association.Association.fromExpiresIn(
+            -10, assoc_handle, 'sekrit', 'HMAC-SHA1-SIGNALL')
+
+        self.failUnless(expired_assoc.sign_all)
+        self.store.storeAssociation(self._normal_key, expired_assoc)
+        self.failUnless(self.store.getAssociation(self._normal_key, assoc_handle))
+
+        request.assoc_handle = assoc_handle
+        response = server.OpenIDResponse(request)
+        response.fields = Message.fromOpenIDArgs({
+            'foo': 'amsigned',
+            'bar': 'notsigned',
+            'azu': 'alsosigned',
+            })
+        sresponse = self.signatory.sign(response)
+
+        new_assoc_handle = sresponse.fields.getArg(OPENID_NS, 'assoc_handle')
+        self.failUnless(new_assoc_handle)
+        self.failIfEqual(new_assoc_handle, assoc_handle)
+
+        self.failUnlessEqual(
+            sresponse.fields.getArg(OPENID_NS, 'invalidate_handle'),
+            assoc_handle)
+
+        # make sure the new key is a dumb mode association
+        new_assoc = self.store.getAssociation(self._dumb_key, new_assoc_handle)
+        self.failUnless(new_assoc)
+        self.failIf(self.store.getAssociation(self._normal_key,
+                                              new_assoc_handle))
+        # and the new key is a sign-all key like the old one was.
+        self.failUnless(new_assoc.sign_all)
+
+        # make sure the expired association is gone
+        self.failIf(self.store.getAssociation(self._normal_key, assoc_handle))
+
+
+        self.failUnlessEqual(sresponse.fields.getArg(OPENID_NS, 'signed', None),
+                             None)
+        self.failUnless(sresponse.fields.getArg(OPENID_NS, 'sig'))
+
+
+    def test_signInvalidHandle(self):
+        request = server.OpenIDRequest()
+        request.namespace = OPENID2_NS
+        assoc_handle = '{bogus-assoc}{notvalid}'
+
+        request.assoc_handle = assoc_handle
+        response = server.OpenIDResponse(request)
+        response.fields = Message.fromOpenIDArgs({
+            'foo': 'amsigned',
+            'bar': 'notsigned',
+            'azu': 'alsosigned',
+            })
+        sresponse = self.signatory.sign(response)
+
+        new_assoc_handle = sresponse.fields.getArg(OPENID_NS, 'assoc_handle')
+        self.failUnless(new_assoc_handle)
+        self.failIfEqual(new_assoc_handle, assoc_handle)
+
+        self.failUnlessEqual(
+            sresponse.fields.getArg(OPENID_NS, 'invalidate_handle'),
+            assoc_handle)
+
+        self.failUnlessEqual(
+            sresponse.fields.getArg(OPENID_NS, 'signed'), 'assoc_handle,azu,bar,foo,invalidate_handle,signed')
+        self.failUnless(sresponse.fields.getArg(OPENID_NS, 'sig'))
+
+        # make sure the new key is a dumb mode association
+        self.failUnless(self.store.getAssociation(self._dumb_key, new_assoc_handle))
+        self.failIf(self.store.getAssociation(self._normal_key, new_assoc_handle))
+        self.failIf(self.messages, self.messages)
+
+
+    def test_verify(self):
+        assoc_handle = '{vroom}{zoom}'
+        assoc = association.Association.fromExpiresIn(
+            60, assoc_handle, 'sekrit', 'HMAC-SHA1-SIGNALL')
+
+        self.store.storeAssociation(self._dumb_key, assoc)
+
+        signed = Message.fromPostArgs({
+            'foo': 'bar',
+            'apple': 'orange',
+            'openid.sig': "d71xlHtqnq98DonoSgoK/nD+QRM=",
+            })
+
+        verified = self.signatory.verify(assoc_handle, signed)
+        self.failIf(self.messages, self.messages)
+        self.failUnless(verified)
+
+
+    def test_verifyBadSig(self):
+        assoc_handle = '{vroom}{zoom}'
+        assoc = association.Association.fromExpiresIn(
+            60, assoc_handle, 'sekrit', 'HMAC-SHA1-SIGNALL')
+
+        self.store.storeAssociation(self._dumb_key, assoc)
+
+        signed = Message.fromPostArgs({
+            'foo': 'bar',
+            'apple': 'orange',
+            'openid.sig': "d71xlHtqnq98DonoSgoK/nD+QRM=".encode('rot13'),
+            })
+
+        verified = self.signatory.verify(assoc_handle, signed)
+        self.failIf(verified)
+        self.failIf(self.messages, self.messages)
+
+    def test_verifyBadHandle(self):
+        assoc_handle = '{vroom}{zoom}'
+        signed = Message.fromPostArgs({
+            'foo': 'bar',
+            'apple': 'orange',
+            'openid.sig': "Ylu0KcIR7PvNegB/K41KpnRgJl0=",
+            })
+
+        verified = self.signatory.verify(assoc_handle, signed)
+        self.failIf(verified)
+        self.failUnless(self.messages)
+
+
+    def test_verifyAssocMismatch(self):
+        """Attempt to validate sign-all message with a signed-list assoc."""
+        assoc_handle = '{vroom}{zoom}'
+        assoc = association.Association.fromExpiresIn(
+            60, assoc_handle, 'sekrit', 'HMAC-SHA1')
+
+        self.store.storeAssociation(self._dumb_key, assoc)
+
+        signed = Message.fromPostArgs({
+            'foo': 'bar',
+            'apple': 'orange',
+            'openid.sig': "d71xlHtqnq98DonoSgoK/nD+QRM=",
+            })
+
+        verified = self.signatory.verify(assoc_handle, signed)
+        self.failIf(verified)
+        self.failUnless(self.messages)
+
+    def test_getAssoc(self):
+        assoc_handle = self.makeAssoc(dumb=True)
+        assoc = self.signatory.getAssociation(assoc_handle, True)
+        self.failUnless(assoc)
+        self.failUnlessEqual(assoc.handle, assoc_handle)
+        self.failIf(self.messages, self.messages)
+
+    def test_getAssocExpired(self):
+        assoc_handle = self.makeAssoc(dumb=True, lifetime=-10)
+        assoc = self.signatory.getAssociation(assoc_handle, True)
+        self.failIf(assoc, assoc)
+        self.failUnless(self.messages)
+
+    def test_getAssocInvalid(self):
+        ah = 'no-such-handle'
+        self.failUnlessEqual(
+            self.signatory.getAssociation(ah, dumb=False), None)
+        self.failIf(self.messages, self.messages)
+
+    def test_getAssocDumbVsNormal(self):
+        assoc_handle = self.makeAssoc(dumb=True)
+        self.failUnlessEqual(
+            self.signatory.getAssociation(assoc_handle, dumb=False), None)
+        self.failIf(self.messages, self.messages)
+
+    def test_createAssociation(self):
+        assoc = self.signatory.createAssociation(dumb=False)
+        self.failUnless(self.signatory.getAssociation(assoc.handle, dumb=False))
+        self.failIf(self.messages, self.messages)
+
+    def makeAssoc(self, dumb, lifetime=60):
+        assoc_handle = '{bling}'
+        assoc = association.Association.fromExpiresIn(lifetime, assoc_handle,
+                                                      'sekrit', 'HMAC-SHA1')
+
+        self.store.storeAssociation((dumb and self._dumb_key) or self._normal_key, assoc)
+        return assoc_handle
+
+    def test_invalidate(self):
+        assoc_handle = '-squash-'
+        assoc = association.Association.fromExpiresIn(60, assoc_handle,
+                                                      'sekrit', 'HMAC-SHA1')
+
+        self.store.storeAssociation(self._dumb_key, assoc)
+        assoc = self.signatory.getAssociation(assoc_handle, dumb=True)
+        self.failUnless(assoc)
+        assoc = self.signatory.getAssociation(assoc_handle, dumb=True)
+        self.failUnless(assoc)
+        self.signatory.invalidate(assoc_handle, dumb=True)
+        assoc = self.signatory.getAssociation(assoc_handle, dumb=True)
+        self.failIf(assoc)
+        self.failIf(self.messages, self.messages)
+
+
+
+if __name__ == '__main__':
+    unittest.main()

Added: incubator/heraldry/libraries/python/openid/trunk/openid/test/test_symbol.py
URL: http://svn.apache.org/viewvc/incubator/heraldry/libraries/python/openid/trunk/openid/test/test_symbol.py?view=auto&rev=463060
==============================================================================
--- incubator/heraldry/libraries/python/openid/trunk/openid/test/test_symbol.py (added)
+++ incubator/heraldry/libraries/python/openid/trunk/openid/test/test_symbol.py Wed Oct 11 16:22:33 2006
@@ -0,0 +1,35 @@
+import unittest
+
+from openid import oidutil
+
+class SymbolTest(unittest.TestCase):
+    def test_selfEquality(self):
+        s = oidutil.Symbol('xxx')
+        self.failUnlessEqual(s, s)
+
+    def test_otherEquality(self):
+        x = oidutil.Symbol('xxx')
+        y = oidutil.Symbol('xxx')
+        self.failUnlessEqual(x, y)
+
+    def test_inequality(self):
+        x = oidutil.Symbol('xxx')
+        y = oidutil.Symbol('yyy')
+        self.failIfEqual(x, y)
+
+    def test_selfInequality(self):
+        x = oidutil.Symbol('xxx')
+        self.failIf(x != x)
+
+    def test_otherInequality(self):
+        x = oidutil.Symbol('xxx')
+        y = oidutil.Symbol('xxx')
+        self.failIf(x != y)
+
+    def test_ne_inequality(self):
+        x = oidutil.Symbol('xxx')
+        y = oidutil.Symbol('yyy')
+        self.failUnless(x != y)
+
+if __name__ == '__main__':
+    unittest.main()

Added: incubator/heraldry/libraries/python/openid/trunk/openid/test/test_urinorm.py
URL: http://svn.apache.org/viewvc/incubator/heraldry/libraries/python/openid/trunk/openid/test/test_urinorm.py?view=auto&rev=463060
==============================================================================
--- incubator/heraldry/libraries/python/openid/trunk/openid/test/test_urinorm.py (added)
+++ incubator/heraldry/libraries/python/openid/trunk/openid/test/test_urinorm.py Wed Oct 11 16:22:33 2006
@@ -0,0 +1,56 @@
+import os
+import unittest
+import openid.urinorm
+import helper
+
+class UrinormTest(unittest.TestCase):
+    def __init__(self, desc, case, expected):
+        unittest.TestCase.__init__(self)
+        self.desc = desc
+        self.case = case
+        self.expected = expected
+
+    def shortDescription(self):
+        return self.desc
+
+    def runTest(self):
+        try:
+            actual = openid.urinorm.urinorm(self.case)
+        except ValueError:
+            self.assertEqual(self.expected, 'fail')
+        else:
+            self.assertEqual(actual, self.expected)
+
+    def parse(cls, full_case):
+        desc, case, expected = full_case.split('\n')
+        case = unicode(case, 'utf-8')
+        
+        return cls(desc, case, expected)
+
+    parse = classmethod(parse)
+
+
+def parseTests(test_data):
+    result = []
+
+    cases = test_data.split('\n\n')
+    for case in cases:
+        case = case.strip()
+
+        if case:
+            result.append(UrinormTest.parse(case))
+
+    return result
+
+def pyUnitTests():
+    here = os.path.dirname(os.path.abspath(__file__))
+    test_data_file_name = os.path.join(here, 'urinorm.txt')
+    test_data_file = file(test_data_file_name)
+    test_data = test_data_file.read()
+    test_data_file.close()
+
+    tests = parseTests(test_data)
+    return unittest.TestSuite(tests)
+
+if __name__ == '__main__':
+    helper.runAsMain()

Added: incubator/heraldry/libraries/python/openid/trunk/openid/test/test_xrd.py
URL: http://svn.apache.org/viewvc/incubator/heraldry/libraries/python/openid/trunk/openid/test/test_xrd.py?view=auto&rev=463060
==============================================================================
--- incubator/heraldry/libraries/python/openid/trunk/openid/test/test_xrd.py (added)
+++ incubator/heraldry/libraries/python/openid/trunk/openid/test/test_xrd.py Wed Oct 11 16:22:33 2006
@@ -0,0 +1,98 @@
+import unittest
+from openid.yadis import xrd
+from openid.yadis.servicetypes import openid
+import os.path
+
+def sibpath(one, other, make_absolute=True):
+    if os.path.isabs(other):
+        return other
+    p = os.path.join(os.path.dirname(one), other)
+    if make_absolute:
+        p = os.path.abspath(p)
+    return p
+
+
+XRD_FILE = sibpath(__file__, os.path.join("data", "test1-xrd.xml"))
+NOXRDS_FILE = sibpath(__file__, os.path.join("data", "not-xrds.xml"))
+NOXRD_FILE = sibpath(__file__, os.path.join("data", "no-xrd.xml"))
+
+# None of the namespaces or service URIs below are official (or even
+# sanctioned by the owners of that piece of URL-space)
+
+LID_2_0 = "http://lid.netmesh.org/sso/2.0b5"
+TYPEKEY_1_0 = "http://typekey.com/services/1.0"
+
+class TestServiceParser(unittest.TestCase):
+    def setUp(self):
+        self.xmldoc = file(XRD_FILE).read()
+        self.sp = xrd.ServiceParser()
+
+    def testParse(self):
+        services = self.sp.parse(self.xmldoc)
+
+    def testParseOpenID(self):
+        self.sp = xrd.ServiceParser([openid.OpenIDParser()])
+        services = self.sp.parse(self.xmldoc)
+        oidservices = services.getServices(openid.OPENID_1_0)
+
+        expectedServices = [
+            ("http://www.myopenid.com/server", "http://josh.myopenid.com/"),
+            ("http://www.schtuff.com/openid", "http://users.schtuff.com/josh"),
+            ("http://www.livejournal.com/openid/server.bml",
+             "http://www.livejournal.com/users/nedthealpaca/"),
+            ]
+
+        count = 0
+        for expected, service in zip(expectedServices, oidservices):
+            self.failUnlessEqual(service.uri, expected[0])
+            self.failUnlessEqual(service.delegate, expected[1])
+            # Fine test, but zip truncates to the length of the shortest
+            # sequence.
+            count = count + 1
+        self.failUnlessEqual(count, len(expectedServices))
+
+    def testGetSeveral(self):
+        services = self.sp.parse(self.xmldoc)
+        oidservices = services.getServices(LID_2_0, TYPEKEY_1_0)
+        expectedServices = [
+            # type, URL
+            (TYPEKEY_1_0, None),
+            (LID_2_0, "http://mylid.net/josh"),
+            ]
+
+        count = 0
+        for expected, service in zip(expectedServices, oidservices):
+            self.failUnlessEqual(service.type, expected[0])
+            self.failUnlessEqual(service.uri, expected[1])
+            count = count + 1
+        self.failUnlessEqual(count, len(expectedServices))
+
+    def testGetSeveralForOne(self):
+        """Getting services for one Service with several Type elements."""
+        services = self.sp.parse(self.xmldoc)
+        oidservices = services.getServices(
+            'http://lid.netmesh.org/sso/2.0b5',
+            'http://lid.netmesh.org/2.0b5')
+
+        expectedServices = [
+            ('http://lid.netmesh.org/sso/2.0b5', "http://mylid.net/josh"),
+            ('http://lid.netmesh.org/2.0b5', "http://mylid.net/josh"),
+            ]
+
+        count = 0
+        for expected, service in zip(expectedServices, oidservices):
+            self.failUnlessEqual(service.type, expected[0])
+            self.failUnlessEqual(service.uri, expected[1])
+            count = count + 1
+        self.failUnlessEqual(count, len(expectedServices))
+
+    def testNoXRDS(self):
+        self.xmldoc = file(NOXRDS_FILE).read()
+        self.failUnlessRaises(xrd.XrdsError, self.sp.parse, self.xmldoc)
+
+    def testNoXRD(self):
+        self.xmldoc = file(NOXRD_FILE).read()
+        self.failUnlessRaises(xrd.XrdsError, self.sp.parse, self.xmldoc)
+
+if __name__ == '__main__':
+    unittest.main()

Added: incubator/heraldry/libraries/python/openid/trunk/openid/test/test_xri.py
URL: http://svn.apache.org/viewvc/incubator/heraldry/libraries/python/openid/trunk/openid/test/test_xri.py?view=auto&rev=463060
==============================================================================
--- incubator/heraldry/libraries/python/openid/trunk/openid/test/test_xri.py (added)
+++ incubator/heraldry/libraries/python/openid/trunk/openid/test/test_xri.py Wed Oct 11 16:22:33 2006
@@ -0,0 +1,117 @@
+
+from unittest import TestCase
+from openid.yadis import xri
+
+class XriDiscoveryTestCase(TestCase):
+    def test_isXRI(self):
+        i = xri.identifierScheme
+        self.failUnlessEqual(i('=john.smith'), 'XRI')
+        self.failUnlessEqual(i('@smiths/john'), 'XRI')
+        self.failUnlessEqual(i('smoker.myopenid.com'), 'URI')
+        self.failUnlessEqual(i('xri://=john'), 'XRI')
+
+
+class XriEscapingTestCase(TestCase):
+    def test_escaping_percents(self):
+        self.failUnlessEqual(xri.escapeForIRI('@example/abc%2Fd/ef'),
+                             '@example/abc%252Fd/ef')
+
+
+    def test_escaping_xref(self):
+        # no escapes
+        esc = xri.escapeForIRI
+        self.failUnlessEqual('@example/foo/(@bar)', esc('@example/foo/(@bar)'))
+        # escape slashes
+        self.failUnlessEqual('@example/foo/(@bar%2Fbaz)',
+                             esc('@example/foo/(@bar/baz)'))
+        self.failUnlessEqual('@example/foo/(@bar%2Fbaz)/(+a%2Fb)',
+                             esc('@example/foo/(@bar/baz)/(+a/b)'))
+        # escape query ? and fragment #
+        self.failUnlessEqual('@example/foo/(@baz%3Fp=q%23r)?i=j#k',
+                             esc('@example/foo/(@baz?p=q#r)?i=j#k'))
+
+
+
+class XriTransformationTestCase(TestCase):
+    def test_to_iri_normal(self):
+        self.failUnlessEqual(xri.toIRINormal('@example'), 'xri://@example')
+
+    try:
+        unichr(0x10000)
+    except ValueError:
+        # bleh narrow python build
+        def test_iri_to_url(self):
+            s = u'l\xa1m'
+            expected = 'l%C2%A1m'
+            self.failUnlessEqual(xri.iriToURI(s), expected)
+    else:
+        def test_iri_to_url(self):
+            s = u'l\xa1m\U00101010n'
+            expected = 'l%C2%A1m%F4%81%80%90n'
+            self.failUnlessEqual(xri.iriToURI(s), expected)
+
+
+
+class CanonicalIDTest(TestCase):
+    providerIsAuthoritativeCases = [
+        # provider, canonicalID, isAuthoritative
+        ('=', '=!698.74D1.A1F2.86C7', True),
+        ('@!1234', '@!1234!ABCD', True),
+        ('@!1234!5678', '@!1234!5678!ABCD', True),
+        ('@!1234', '=!1234!ABCD', False),
+        ('@!1234', '@!1234!ABCD!9765', False),
+        ('@!1234!ABCD', '=!1234', False),
+        ('=!BABE', '=!D00D', False),
+        ]
+
+
+    def test_providerIsAuthoritative(self):
+        """Checking whether this provider is authoratitve for
+        the given XRI."""
+        # XXX: Should perhaps be more like the other data-driven tests?
+        for providerID, canonicalID, isAuthoritative in \
+                self.providerIsAuthoritativeCases:
+            self._providerIsAuthoritative(providerID, canonicalID,
+                                          isAuthoritative)
+
+
+    def _providerIsAuthoritative(self, providerID, canonicalID, expected):
+        result = xri.providerIsAuthoritative(providerID, canonicalID)
+        self.failUnlessEqual(expected, result,
+                             "%s providing %s, expected %s" % (providerID,
+                                                               canonicalID,
+                                                               expected))
+
+
+
+class TestGetRootAuthority(TestCase):
+    xris = [
+        ("@foo", "@"),
+        ("@foo*bar", "@"),
+        ("@*foo*bar", "@"),
+        ("@foo/bar", "@"),
+        ("!!990!991", "!"),
+        ("!1001!02", "!"),
+        ("=foo*bar", "="),
+        ("(example.com)/foo", "(example.com)"),
+        ("(example.com)*bar/foo", "(example.com)"),
+        ("baz.example.com/foo", "baz.example.com"),
+        ("baz.example.com:8080/foo", "baz.example.com:8080"),
+        # Looking at the ABNF in XRI Syntax 2.0, I don't think you can
+        # have example.com*bar.  You can do (example.com)*bar, but that
+        # would mean something else.
+        ##("example.com*bar/(=baz)", "example.com*bar"),
+        ##("baz.example.com!01/foo", "baz.example.com!01"),
+        ]
+
+
+    def test_getRootAuthority(self):
+        for thexri, expected_root in self.xris:
+            self.failUnlessEqual(xri.rootAuthority(thexri),
+                                 xri.XRI(expected_root))
+
+
+
+if __name__ == '__main__':
+    import unittest
+    unittest.main()

Added: incubator/heraldry/libraries/python/openid/trunk/openid/test/test_xrires.py
URL: http://svn.apache.org/viewvc/incubator/heraldry/libraries/python/openid/trunk/openid/test/test_xrires.py?view=auto&rev=463060
==============================================================================
--- incubator/heraldry/libraries/python/openid/trunk/openid/test/test_xrires.py (added)
+++ incubator/heraldry/libraries/python/openid/trunk/openid/test/test_xrires.py Wed Oct 11 16:22:33 2006
@@ -0,0 +1,40 @@
+
+from unittest import TestCase
+from openid.yadis import xrires
+
+class ProxyQueryTestCase(TestCase):
+    def setUp(self):
+        self.proxy_url = 'http://xri.example.com/'
+        self.proxy = xrires.ProxyResolver(self.proxy_url)
+        self.servicetype = 'xri://+i-service*(+forwarding)*($v*1.0)'
+        self.servicetype_enc = 'xri%3A%2F%2F%2Bi-service%2A%28%2Bforwarding%29%2A%28%24v%2A1.0%29'
+
+
+    def test_proxy_url(self):
+        st = self.servicetype
+        ste = self.servicetype_enc
+        args_esc = "_xrd_r=application%2Fxrds%2Bxml&_xrd_t=" + ste
+        pqu = self.proxy.queryURL
+        h = self.proxy_url
+        self.failUnlessEqual(h + '=foo?' + args_esc, pqu('=foo', st))
+        self.failUnlessEqual(h + '=foo/bar?baz&' + args_esc,
+                             pqu('=foo/bar?baz', st))
+        self.failUnlessEqual(h + '=foo/bar?baz=quux&' + args_esc,
+                             pqu('=foo/bar?baz=quux', st))
+        self.failUnlessEqual(h + '=foo/bar?mi=fa&so=la&' + args_esc,
+                             pqu('=foo/bar?mi=fa&so=la', st))
+
+        # With no service endpoint selection.
+        args_esc = "_xrd_r=application%2Fxrds%2Bxml%3Bsep%3Dfalse"
+        self.failUnlessEqual(h + '=foo?' + args_esc, pqu('=foo', None))
+
+
+    def test_proxy_url_qmarks(self):
+        st = self.servicetype
+        ste = self.servicetype_enc
+        args_esc = "_xrd_r=application%2Fxrds%2Bxml&_xrd_t=" + ste
+        pqu = self.proxy.queryURL
+        h = self.proxy_url
+        self.failUnlessEqual(h + '=foo/bar??' + args_esc, pqu('=foo/bar?', st))
+        self.failUnlessEqual(h + '=foo/bar????' + args_esc,
+                             pqu('=foo/bar???', st))

Added: incubator/heraldry/libraries/python/openid/trunk/openid/test/test_yadis_discover.py
URL: http://svn.apache.org/viewvc/incubator/heraldry/libraries/python/openid/trunk/openid/test/test_yadis_discover.py?view=auto&rev=463060
==============================================================================
--- incubator/heraldry/libraries/python/openid/trunk/openid/test/test_yadis_discover.py (added)
+++ incubator/heraldry/libraries/python/openid/trunk/openid/test/test_yadis_discover.py Wed Oct 11 16:22:33 2006
@@ -0,0 +1,173 @@
+#!/usr/bin/env python
+
+"""Tests for yadis.discover.
+
+@todo: Now that yadis.discover uses urljr.fetchers, we should be able to do
+   tests with a mock fetcher instead of spawning threads with BaseHTTPServer.
+"""
+
+import unittest
+import urlparse
+import re
+
+from openid.yadis.discover import discover, DiscoveryFailure
+
+from openid import fetchers
+
+import discoverdata
+
+status_header_re = re.compile(r'Status: (\d+) .*?$', re.MULTILINE)
+
+four04_pat = """\
+Content-Type: text/plain
+
+No such file %s
+"""
+
+class QuitServer(Exception): pass
+
+def mkResponse(data):
+    status_mo = status_header_re.match(data)
+    headers_str, body = data.split('\n\n', 1)
+    headers = {}
+    for line in headers_str.split('\n'):
+        k, v = line.split(':', 1)
+        k = k.strip().lower()
+        v = v.strip()
+        headers[k] = v
+    status = int(status_mo.group(1))
+    return fetchers.HTTPResponse(status=status,
+                                 headers=headers,
+                                 body=body)
+
+class TestFetcher(object):
+    def __init__(self, base_url):
+        self.base_url = base_url
+
+    def fetch(self, url, headers, body):
+        current_url = url
+        while True:
+            parsed = urlparse.urlparse(current_url)
+            path = parsed[2][1:]
+            try:
+                data = discoverdata.generateSample(path, self.base_url)
+            except KeyError:
+                return fetchers.HTTPResponse(status=404,
+                                             final_url=current_url,
+                                             headers={},
+                                             body='')
+
+            response = mkResponse(data)
+            if response.status in [301, 302, 303, 307]:
+                current_url = response.headers['location']
+            else:
+                response.final_url = current_url
+                return response
+
+class TestSecondGet(unittest.TestCase):
+    class MockFetcher(object):
+        def __init__(self):
+            self.count = 0
+        def fetch(self, uri, headers=None, body=None):
+            self.count += 1
+            if self.count == 1:
+                headers = {
+                    'X-XRDS-Location'.lower(): 'http://unittest/404',
+                    }
+                return fetchers.HTTPResponse(uri, 200, headers, '')
+            else:
+                return fetchers.HTTPResponse(uri, 404)
+
+    def setUp(self):
+        self.oldfetcher = fetchers.getDefaultFetcher()
+        fetchers.setDefaultFetcher(self.MockFetcher())
+
+    def tearDown(self):
+        fetchers.setDefaultFetcher(self.oldfetcher)
+
+    def test_404(self):
+        uri = "http://something.unittest/"
+        self.failUnlessRaises(DiscoveryFailure, discover, uri)
+
+
+class _TestCase(unittest.TestCase):
+    base_url = 'http://invalid.unittest/'
+
+    def __init__(self, input_name, id_name, result_name, success):
+        self.input_name = input_name
+        self.id_name = id_name
+        self.result_name = result_name
+        self.success = success
+        # Still not quite sure how to best construct these custom tests.
+        # Between python2.3 and python2.4, a patch attached to pyunit.sf.net
+        # bug #469444 got applied which breaks loadTestsFromModule on this
+        # class if it has test_ or runTest methods.  So, kludge to change
+        # the method name.
+        unittest.TestCase.__init__(self, methodName='runCustomTest')
+
+    def setUp(self):
+        fetchers.setDefaultFetcher(TestFetcher(self.base_url),
+                                   wrap_exceptions=False)
+
+        self.input_url, self.expected = discoverdata.generateResult(
+            self.base_url,
+            self.input_name,
+            self.id_name,
+            self.result_name,
+            self.success)
+
+    def tearDown(self):
+        fetchers.setDefaultFetcher(None)
+
+    def runCustomTest(self):
+        if self.expected is DiscoveryFailure:
+            self.failUnlessRaises(DiscoveryFailure,
+                                  discover, self.input_url)
+        else:
+            result = discover(self.input_url)
+            self.failUnlessEqual(self.input_url, result.request_uri)
+
+            msg = 'Identity URL mismatch: actual = %r, expected = %r' % (
+                result.normalized_uri, self.expected.normalized_uri)
+            self.failUnlessEqual(
+                self.expected.normalized_uri, result.normalized_uri, msg)
+
+            msg = 'Content mismatch: actual = %r, expected = %r' % (
+                result.response_text, self.expected.response_text)
+            self.failUnlessEqual(
+                self.expected.response_text, result.response_text, msg)
+
+            expected_keys = self.expected.__dict__.keys()
+            expected_keys.sort()
+            actual_keys = result.__dict__.keys()
+            actual_keys.sort()
+            self.failUnlessEqual(actual_keys, expected_keys)
+
+            for k, exp_v in self.expected.__dict__.items():
+                act_v = result.__dict__.get(k)
+                assert act_v == exp_v, (k, exp_v, act_v)
+
+    def shortDescription(self):
+        try:
+            n = self.input_url
+        except AttributeError:
+            # run before setUp, or if setUp did not complete successfully.
+            n = self.input_name
+        return "%s (%s)" % (
+            n,
+            self.__class__.__module__)
+
+def pyUnitTests():
+    s = unittest.TestSuite()
+    for success, input_name, id_name, result_name in discoverdata.testlist:
+        test = _TestCase(input_name, id_name, result_name, success)
+        s.addTest(test)
+
+    return s
+
+def test():
+    runner = unittest.TextTestRunner()
+    return runner.run(loadTests())
+
+if __name__ == '__main__':
+    test()

Propchange: incubator/heraldry/libraries/python/openid/trunk/openid/test/test_yadis_discover.py
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/heraldry/libraries/python/openid/trunk/openid/test/urinorm.txt
URL: http://svn.apache.org/viewvc/incubator/heraldry/libraries/python/openid/trunk/openid/test/urinorm.txt?view=auto&rev=463060
==============================================================================
--- incubator/heraldry/libraries/python/openid/trunk/openid/test/urinorm.txt (added)
+++ incubator/heraldry/libraries/python/openid/trunk/openid/test/urinorm.txt Wed Oct 11 16:22:33 2006
@@ -0,0 +1,79 @@
+Already normal form
+http://example.com/
+http://example.com/
+
+Add a trailing slash
+http://example.com
+http://example.com/
+
+Remove an empty port segment
+http://example.com:/
+http://example.com/
+
+Remove a default port segment
+http://example.com:80/
+http://example.com/
+
+Capitalization in host names
+http://wWw.exaMPLE.COm/
+http://www.example.com/
+
+Capitalization in scheme names
+htTP://example.com/
+http://example.com/
+
+Capitalization in percent-escaped reserved characters
+http://example.com/foo%2cbar
+http://example.com/foo%2Cbar
+
+Unescape percent-encoded unreserved characters
+http://example.com/foo%2Dbar%2dbaz
+http://example.com/foo-bar-baz
+
+remove_dot_segments example 1
+http://example.com/a/b/c/./../../g
+http://example.com/a/g
+
+remove_dot_segments example 2
+http://example.com/mid/content=5/../6
+http://example.com/mid/6
+
+remove_dot_segments: single-dot
+http://example.com/a/./b
+http://example.com/a/b
+
+remove_dot_segments: double-dot
+http://example.com/a/../b
+http://example.com/b
+
+remove_dot_segments: leading double-dot
+http://example.com/../b
+http://example.com/b
+
+remove_dot_segments: trailing single-dot
+http://example.com/a/.
+http://example.com/a/
+
+remove_dot_segments: trailing double-dot
+http://example.com/a/..
+http://example.com/
+
+remove_dot_segments: trailing single-dot-slash
+http://example.com/a/./
+http://example.com/a/
+
+remove_dot_segments: trailing double-dot-slash
+http://example.com/a/../
+http://example.com/
+
+Test of all kinds of syntax-based normalization
+hTTPS://a/./b/../b/%63/%7bfoo%7d
+https://a/b/c/%7Bfoo%7D
+
+Unsupported scheme
+ftp://example.com/
+fail
+
+Non-absolute URI
+http:/foo
+fail
\ No newline at end of file

Added: incubator/heraldry/libraries/python/openid/trunk/openid/urinorm.py
URL: http://svn.apache.org/viewvc/incubator/heraldry/libraries/python/openid/trunk/openid/urinorm.py?view=auto&rev=463060
==============================================================================
--- incubator/heraldry/libraries/python/openid/trunk/openid/urinorm.py (added)
+++ incubator/heraldry/libraries/python/openid/trunk/openid/urinorm.py Wed Oct 11 16:22:33 2006
@@ -0,0 +1,188 @@
+import re
+
+# from appendix B of rfc 3986 (http://www.ietf.org/rfc/rfc3986.txt)
+uri_pattern = r'^(([^:/?#]+):)?(//([^/?#]*))?([^?#]*)(\?([^#]*))?(#(.*))?'
+uri_re = re.compile(uri_pattern)
+
+
+authority_pattern = r'^([^@]*@)?([^:]*)(:.*)?'
+authority_re = re.compile(authority_pattern)
+
+
+pct_encoded_pattern = r'%([0-9A-Fa-f]{2})'
+pct_encoded_re = re.compile(pct_encoded_pattern)
+
+try:
+    unichr(0x10000)
+except ValueError:
+    # narrow python build
+    UCSCHAR = [
+        (0xA0, 0xD7FF),
+        (0xF900, 0xFDCF),
+        (0xFDF0, 0xFFEF),
+        ]
+
+    IPRIVATE = [
+        (0xE000, 0xF8FF),
+        ]
+else:
+    UCSCHAR = [
+        (0xA0, 0xD7FF),
+        (0xF900, 0xFDCF),
+        (0xFDF0, 0xFFEF),
+        (0x10000, 0x1FFFD),
+        (0x20000, 0x2FFFD),
+        (0x30000, 0x3FFFD),
+        (0x40000, 0x4FFFD),
+        (0x50000, 0x5FFFD),
+        (0x60000, 0x6FFFD),
+        (0x70000, 0x7FFFD),
+        (0x80000, 0x8FFFD),
+        (0x90000, 0x9FFFD),
+        (0xA0000, 0xAFFFD),
+        (0xB0000, 0xBFFFD),
+        (0xC0000, 0xCFFFD),
+        (0xD0000, 0xDFFFD),
+        (0xE1000, 0xEFFFD),
+        ]
+
+    IPRIVATE = [
+        (0xE000, 0xF8FF),
+        (0xF0000, 0xFFFFD),
+        (0x100000, 0x10FFFD),
+        ]
+
+
+_unreserved = [False] * 256
+for _ in range(ord('A'), ord('Z') + 1): _unreserved[_] = True
+for _ in range(ord('0'), ord('9') + 1): _unreserved[_] = True
+for _ in range(ord('a'), ord('z') + 1): _unreserved[_] = True
+_unreserved[ord('-')] = True
+_unreserved[ord('.')] = True
+_unreserved[ord('_')] = True
+_unreserved[ord('~')] = True
+
+
+_escapeme_re = re.compile('[%s]' % (''.join(
+    map(lambda (m, n): u'%s-%s' % (unichr(m), unichr(n)),
+        UCSCHAR + IPRIVATE)),))
+
+
+def _pct_escape_unicode(char_match):
+    c = char_match.group()
+    return ''.join(['%%%X' % (ord(octet),) for octet in c.encode('utf-8')])
+
+
+def _pct_encoded_replace_unreserved(mo):
+    try:
+        i = int(mo.group(1), 16)
+        if _unreserved[i]:
+            return chr(i)
+        else:
+            return mo.group().upper()
+
+    except ValueError:
+        return mo.group()
+
+
+def _pct_encoded_replace(mo):
+    try:
+        return chr(int(mo.group(1), 16))
+    except ValueError:
+        return mo.group()
+
+
+def remove_dot_segments(path):
+    result_segments = []
+    
+    while path:
+        if path.startswith('../'):
+            path = path[3:]
+        elif path.startswith('./'):
+            path = path[2:]
+        elif path.startswith('/./'):
+            path = path[2:]
+        elif path == '/.':
+            path = '/'
+        elif path.startswith('/../'):
+            path = path[3:]
+            if result_segments:
+                result_segments.pop()
+        elif path == '/..':
+            path = '/'
+            if result_segments:
+                result_segments.pop()
+        elif path == '..' or path == '.':
+            path = ''
+        else:
+            i = 0
+            if path[0] == '/':
+                i = 1
+            i = path.find('/', i)
+            if i == -1:
+                i = len(path)
+            result_segments.append(path[:i])
+            path = path[i:]
+            
+    return ''.join(result_segments)
+
+
+def urinorm(uri):
+    if isinstance(uri, unicode):
+        uri = _escapeme_re.sub(_pct_escape_unicode, uri).encode('ascii')
+
+    uri_mo = uri_re.match(uri)
+
+    scheme = uri_mo.group(2)
+    if scheme is None:
+        raise ValueError('No scheme specified')
+
+    scheme = scheme.lower()
+    if scheme not in ('http', 'https'):
+        raise ValueError('Not an absolute HTTP or HTTPS URI: %r' % (uri,))
+
+    authority = uri_mo.group(4)
+    if authority is None:
+        raise ValueError('Not an absolute URI: %r' % (uri,))
+
+    authority_mo = authority_re.match(authority)
+    if authority_mo is None:
+        raise ValueError('URI does not have a valid authority: %r' % (uri,))
+
+    userinfo, host, port = authority_mo.groups()
+
+    if userinfo is None:
+        userinfo = ''
+
+    if '%' in host:
+        host = host.lower()
+        host = pct_encoded_re.sub(_pct_encoded_replace, host)
+        host = unicode(host, 'utf-8').encode('idna')
+    else:
+        host = host.lower()
+
+    if port:
+        if (port == ':' or
+            (scheme == 'http' and port == ':80') or
+            (scheme == 'https' and port == ':443')):
+            port = ''
+    else:
+        port = ''
+
+    authority = userinfo + host + port
+
+    path = uri_mo.group(5)
+    path = pct_encoded_re.sub(_pct_encoded_replace_unreserved, path)
+    path = remove_dot_segments(path)
+    if not path:
+        path = '/'
+
+    query = uri_mo.group(6)
+    if query is None:
+        query = ''
+
+    fragment = uri_mo.group(8)
+    if fragment is None:
+        fragment = ''
+
+    return scheme + '://' + authority + path + query + fragment

Added: incubator/heraldry/libraries/python/openid/trunk/openid/yadis/__init__.py
URL: http://svn.apache.org/viewvc/incubator/heraldry/libraries/python/openid/trunk/openid/yadis/__init__.py?view=auto&rev=463060
==============================================================================
--- incubator/heraldry/libraries/python/openid/trunk/openid/yadis/__init__.py (added)
+++ incubator/heraldry/libraries/python/openid/trunk/openid/yadis/__init__.py Wed Oct 11 16:22:33 2006
@@ -0,0 +1,17 @@
+"""Yadis.
+
+@see: U{http://www.openidenabled.com/yadis}
+"""
+
+__version__ = '[library version:1.1.0-rc1]'[17:-1]
+
+# Parse the version info
+try:
+    version_info = map(int, __version__.split('.'))
+except ValueError:
+    version_info = (None, None, None)
+else:
+    if len(version_info) != 3:
+        version_info = (None, None, None)
+    else:
+        version_info = tuple(version_info)

Added: incubator/heraldry/libraries/python/openid/trunk/openid/yadis/accept.py
URL: http://svn.apache.org/viewvc/incubator/heraldry/libraries/python/openid/trunk/openid/yadis/accept.py?view=auto&rev=463060
==============================================================================
--- incubator/heraldry/libraries/python/openid/trunk/openid/yadis/accept.py (added)
+++ incubator/heraldry/libraries/python/openid/trunk/openid/yadis/accept.py Wed Oct 11 16:22:33 2006
@@ -0,0 +1,133 @@
+"""Functions for generating and parsing HTTP Accept: headers for
+supporting server-directed content negotiation.
+"""
+
+def generateAcceptHeader(*elements):
+    """Generate an accept header value
+
+    [str or (str, float)] -> str
+    """
+    parts = []
+    for element in elements:
+        if type(element) is str:
+            qs = "1.0"
+            mtype = element
+        else:
+            mtype, q = element
+            q = float(q)
+            if q > 1 or q <= 0:
+                raise ValueError('Invalid preference factor: %r' % q)
+
+            qs = '%0.1f' % (q,)
+
+        parts.append((qs, mtype))
+
+    parts.sort()
+    chunks = []
+    for q, mtype in parts:
+        if q == '1.0':
+            chunks.append(mtype)
+        else:
+            chunks.append('%s; q=%s' % (mtype, q))
+
+    return ', '.join(chunks)
+
+def parseAcceptHeader(value):
+    """Parse an accept header, ignoring any accept-extensions
+
+    returns a list of tuples containing main MIME type, MIME subtype,
+    and quality markdown.
+
+    str -> [(str, str, float)]
+    """
+    chunks = [chunk.strip() for chunk in value.split(',')]
+    accept = []
+    for chunk in chunks:
+        parts = [s.strip() for s in chunk.split(';')]
+
+        mtype = parts.pop(0)
+        if '/' not in mtype:
+            # This is not a MIME type, so ignore the bad data
+            continue
+
+        main, sub = mtype.split('/', 1)
+
+        for ext in parts:
+            if '=' in ext:
+                k, v = ext.split('=', 1)
+                if k == 'q':
+                    try:
+                        q = float(v)
+                        break
+                    except ValueError:
+                        # Ignore poorly formed q-values
+                        pass
+        else:
+            q = 1.0
+
+        accept.append((q, main, sub))
+
+    accept.sort()
+    accept.reverse()
+    return [(main, sub, q) for (q, main, sub) in accept]
+
+def matchTypes(accept_types, have_types):
+    """Given the result of parsing an Accept: header, and the
+    available MIME types, return the acceptable types with their
+    quality markdowns.
+
+    For example:
+
+    >>> acceptable = parseAcceptHeader('text/html, text/plain; q=0.5')
+    >>> matchTypes(acceptable, ['text/plain', 'text/html', 'image/jpeg'])
+    [('text/html', 1.0), ('text/plain', 0.5)]
+
+
+    Type signature: ([(str, str, float)], [str]) -> [(str, float)]
+    """
+    if not accept_types:
+        # Accept all of them
+        default = 1
+    else:
+        default = 0
+
+    match_main = {}
+    match_sub = {}
+    for (main, sub, q) in accept_types:
+        if main == '*':
+            default = max(default, q)
+            continue
+        elif sub == '*':
+            match_main[main] = max(match_main.get(main, 0), q)
+        else:
+            match_sub[(main, sub)] = max(match_sub.get((main, sub), 0), q)
+
+    accepted_list = []
+    order_maintainer = 0
+    for mtype in have_types:
+        main, sub = mtype.split('/')
+        if (main, sub) in match_sub:
+            q = match_sub[(main, sub)]
+        else:
+            q = match_main.get(main, default)
+
+        if q:
+            accepted_list.append((1 - q, order_maintainer, q, mtype))
+            order_maintainer += 1
+
+    accepted_list.sort()
+    return [(mtype, q) for (_, _, q, mtype) in accepted_list]
+
+def getAcceptable(accept_header, have_types):
+    """Parse the accept header and return a list of available types in
+    preferred order. If a type is unacceptable, it will not be in the
+    resulting list.
+
+    This is a convenience wrapper around matchTypes and
+    parseAcceptHeader.
+
+    (str, [str]) -> [str]
+    """
+    accepted = parseAcceptHeader(accept_header)
+    preferred = matchTypes(accepted, have_types)
+    return [mtype for (mtype, _) in preferred]

Added: incubator/heraldry/libraries/python/openid/trunk/openid/yadis/constants.py
URL: http://svn.apache.org/viewvc/incubator/heraldry/libraries/python/openid/trunk/openid/yadis/constants.py?view=auto&rev=463060
==============================================================================
--- incubator/heraldry/libraries/python/openid/trunk/openid/yadis/constants.py (added)
+++ incubator/heraldry/libraries/python/openid/trunk/openid/yadis/constants.py Wed Oct 11 16:22:33 2006
@@ -0,0 +1,13 @@
+__all__ = ['YADIS_HEADER_NAME', 'YADIS_CONTENT_TYPE', 'YADIS_ACCEPT_HEADER']
+from openid.yadis.accept import generateAcceptHeader
+
+YADIS_HEADER_NAME = 'X-XRDS-Location'
+YADIS_CONTENT_TYPE = 'application/xrds+xml'
+
+# A value suitable for using as an accept header when performing YADIS
+# discovery, unless the application has special requirements
+YADIS_ACCEPT_HEADER = generateAcceptHeader(
+    ('text/html', 0.3),
+    ('application/xhtml+xml', 0.5),
+    (YADIS_CONTENT_TYPE, 1.0),
+    )



Mime
View raw message