incubator-heraldry-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ket...@apache.org
Subject svn commit: r463041 [6/7] - in /incubator/heraldry/libraries/python/openid/trunk: ./ admin/ doc/ examples/ openid/ openid/consumer/ openid/server/ openid/store/ openid/test/
Date Wed, 11 Oct 2006 23:11:50 GMT
Added: incubator/heraldry/libraries/python/openid/trunk/openid/test/oidutil.py
URL: http://svn.apache.org/viewvc/incubator/heraldry/libraries/python/openid/trunk/openid/test/oidutil.py?view=auto&rev=463041
==============================================================================
--- incubator/heraldry/libraries/python/openid/trunk/openid/test/oidutil.py (added)
+++ incubator/heraldry/libraries/python/openid/trunk/openid/test/oidutil.py Wed Oct 11 16:11:47 2006
@@ -0,0 +1,207 @@
+import unittest
+import codecs
+import string
+import random
+from openid import oidutil
+
+def test_base64():
+    allowed_s = string.ascii_letters + string.digits + '+/='
+    allowed_d = {}
+    for c in allowed_s:
+        allowed_d[c] = None
+    isAllowed = allowed_d.has_key
+
+    def checkEncoded(s):
+        for c in s:
+            assert isAllowed(c), s
+
+    cases = [
+        '',
+        'x',
+        '\x00',
+        '\x01',
+        '\x00' * 100,
+        ''.join(map(chr, range(256))),
+        ]
+
+    for s in cases:
+        b64 = oidutil.toBase64(s)
+        checkEncoded(b64)
+        s_prime = oidutil.fromBase64(b64)
+        assert s_prime == s, (s, b64, s_prime)
+
+    # Randomized test
+    for _ in xrange(50):
+        n = random.randrange(2048)
+        s = ''.join(map(chr, map(lambda _: random.randrange(256), range(n))))
+        b64 = oidutil.toBase64(s)
+        checkEncoded(b64)
+        s_prime = oidutil.fromBase64(b64)
+        assert s_prime == s, (s, b64, s_prime)
+
+def test_normalizeUrl():
+    n = oidutil.normalizeUrl
+
+    assert 'http://foo.com/' == n('foo.com')
+
+    assert 'http://foo.com/' == n('http://foo.com')
+    assert 'https://foo.com/' == n('https://foo.com')
+    assert 'http://foo.com/bar' == n('foo.com/bar')
+    assert 'http://foo.com/bar' == n('http://foo.com/bar')
+
+    assert 'http://foo.com/' == n('http://foo.com/')
+    assert 'https://foo.com/' == n('https://foo.com/')
+    assert 'https://foo.com/bar'  == n('https://foo.com/bar')
+
+    assert 'http://foo.com/%E8%8D%89' == n(u'foo.com/\u8349')
+    assert 'http://foo.com/%E8%8D%89' == n(u'http://foo.com/\u8349')
+
+    non_ascii_domain_cases = [
+        ('http://xn--vl1a.com/', u'\u8349.com'),
+        ('http://xn--vl1a.com/', u'http://\u8349.com'),
+        ('http://xn--vl1a.com/', u'\u8349.com/'),
+        ('http://xn--vl1a.com/', u'http://\u8349.com/'),
+        ('http://xn--vl1a.com/%E8%8D%89', u'\u8349.com/\u8349'),
+        ('http://xn--vl1a.com/%E8%8D%89', u'http://\u8349.com/\u8349'),
+        ]
+
+    try:
+        codecs.getencoder('idna')
+    except LookupError:
+        # If there is no idna codec, these cases with
+        # non-ascii-representable domain names should fail.
+        should_raise = True
+    else:
+        should_raise = False
+
+    for expected, case in non_ascii_domain_cases:
+        try:
+            actual = n(case)
+        except UnicodeError:
+            assert should_raise
+        else:
+            assert not should_raise and actual == expected, case
+
+    assert n(None) is None
+    assert n('') is None
+    assert n('http://') is None
+
+class AppendArgsTest(unittest.TestCase):
+    def __init__(self, desc, args, expected):
+        unittest.TestCase.__init__(self)
+        self.desc = desc
+        self.args = args
+        self.expected = expected
+
+    def runTest(self):
+        result = oidutil.appendArgs(*self.args)
+        self.assertEqual(self.expected, result, self.args)
+
+    def shortDescription(self):
+        return self.desc
+
+def buildAppendTests():
+    simple = 'http://www.example.com/'
+    cases = [
+        ('empty list',
+         (simple, []),
+         simple),
+
+        ('empty dict',
+         (simple, {}),
+         simple),
+
+        ('one list',
+         (simple, [('a', 'b')]),
+         simple + '?a=b'),
+
+        ('one dict',
+         (simple, {'a':'b'}),
+         simple + '?a=b'),
+
+        ('two list (same)',
+         (simple, [('a', 'b'), ('a', 'c')]),
+         simple + '?a=b&a=c'),
+
+        ('two list',
+         (simple, [('a', 'b'), ('b', 'c')]),
+         simple + '?a=b&b=c'),
+
+        ('two list (order)',
+         (simple, [('b', 'c'), ('a', 'b')]),
+         simple + '?b=c&a=b'),
+
+        ('two dict (order)',
+         (simple, {'b':'c', 'a':'b'}),
+         simple + '?a=b&b=c'),
+
+        ('escape',
+         (simple, [('=', '=')]),
+         simple + '?%3D=%3D'),
+
+        ('escape (URL)',
+         (simple, [('this_url', simple)]),
+         simple + '?this_url=http%3A%2F%2Fwww.example.com%2F'),
+
+        ('use dots',
+         (simple, [('openid.stuff', 'bother')]),
+         simple + '?openid.stuff=bother'),
+
+        ('args exist (empty)',
+         (simple + '?stuff=bother', []),
+         simple + '?stuff=bother'),
+
+        ('args exist',
+         (simple + '?stuff=bother', [('ack', 'ack')]),
+         simple + '?stuff=bother&ack=ack'),
+
+        ('args exist',
+         (simple + '?stuff=bother', [('ack', 'ack')]),
+         simple + '?stuff=bother&ack=ack'),
+
+        ('args exist (dict)',
+         (simple + '?stuff=bother', {'ack': 'ack'}),
+         simple + '?stuff=bother&ack=ack'),
+
+        ('args exist (dict 2)',
+         (simple + '?stuff=bother', {'ack': 'ack', 'zebra':'lion'}),
+         simple + '?stuff=bother&ack=ack&zebra=lion'),
+
+        ('three args (dict)',
+         (simple, {'stuff': 'bother', 'ack': 'ack', 'zebra':'lion'}),
+         simple + '?ack=ack&stuff=bother&zebra=lion'),
+
+        ('three args (list)',
+         (simple, [('stuff', 'bother'), ('ack', 'ack'), ('zebra', 'lion')]),
+         simple + '?stuff=bother&ack=ack&zebra=lion'),
+        ]
+
+    tests = []
+
+    for name, args, expected in cases:
+        test = AppendArgsTest(name, args, expected)
+        tests.append(test)
+
+    return unittest.TestSuite(tests)
+
+def pyUnitTests():
+    return buildAppendTests()
+
+def test_appendArgs():
+    suite = buildAppendTests()
+    runner = unittest.TextTestRunner()
+    result = runner.run(suite)
+    assert result.wasSuccessful()
+
+# XXX: there are more functions that could benefit from being better
+# specified and tested in oidutil.py These include, but are not
+# limited to appendArgs
+
+def test(skipPyUnit=True):
+    test_base64()
+    test_normalizeUrl()
+    if not skipPyUnit:
+        test_appendArgs()
+
+if __name__ == '__main__':
+    test(skipPyUnit=False)

Added: incubator/heraldry/libraries/python/openid/trunk/openid/test/server.py
URL: http://svn.apache.org/viewvc/incubator/heraldry/libraries/python/openid/trunk/openid/test/server.py?view=auto&rev=463041
==============================================================================
--- incubator/heraldry/libraries/python/openid/trunk/openid/test/server.py (added)
+++ incubator/heraldry/libraries/python/openid/trunk/openid/test/server.py Wed Oct 11 16:11:47 2006
@@ -0,0 +1,1118 @@
+# -*- test-case-name: openid.test.server -*-
+"""Tests for openid.server.
+"""
+from openid.server import server
+from openid import association, cryptutil, oidutil
+import _memstore
+import cgi
+
+import unittest
+
+from urlparse import urlparse
+
+# In general, if you edit or add tests here, try to move in the direction
+# of testing smaller units.  For testing the external interfaces, we'll be
+# developing an implementation-agnostic testing suite.
+
+# for more, see /etc/ssh/moduli
+
+ALT_MODULUS = 0xCAADDDEC1667FC68B5FA15D53C4E1532DD24561A1A2D47A12C01ABEA1E00731F6921AAC40742311FDF9E634BB7131BEE1AF240261554389A910425E044E88C8359B010F5AD2B80E29CB1A5B027B19D9E01A6F63A6F45E5D7ED2FF6A2A0085050A7D0CF307C3DB51D2490355907B4427C23A98DF1EB8ABEF2BA209BB7AFFE86A7
+ALT_GEN = 5
+
+class CatchLogs(object):
+    def setUp(self):
+        self.old_logger = oidutil.log
+        oidutil.log = self.gotLogMessage
+        self.messages = []
+
+    def gotLogMessage(self, message):
+        self.messages.append(message)
+
+    def tearDown(self):
+        oidutil.log = self.old_logger
+
+class TestProtocolError(unittest.TestCase):
+    def test_browserWithReturnTo(self):
+        return_to = "http://rp.unittest/consumer"
+        # will be a ProtocolError raised by Decode or CheckIDRequest.answer
+        args = {
+            'openid.mode': 'monkeydance',
+            'openid.identity': 'http://wagu.unittest/',
+            'openid.return_to': return_to,
+            }
+        e = server.ProtocolError(args, "plucky")
+        self.failUnless(e.hasReturnTo())
+        expected_args = {
+            'openid.mode': ['error'],
+            'openid.error': ['plucky'],
+            }
+
+        rt_base, result_args = e.encodeToURL().split('?', 1)
+        result_args = cgi.parse_qs(result_args)
+        self.failUnlessEqual(result_args, expected_args)
+
+    def test_noReturnTo(self):
+        # will be a ProtocolError raised by Decode or CheckIDRequest.answer
+        args = {
+            'openid.mode': 'zebradance',
+            'openid.identity': 'http://wagu.unittest/',
+            }
+        e = server.ProtocolError(args, "waffles")
+        self.failIf(e.hasReturnTo())
+        expected = """error:waffles
+mode:error
+"""
+        self.failUnlessEqual(e.encodeToKVForm(), expected)
+
+
+
+class TestDecode(unittest.TestCase):
+    def setUp(self):
+        self.id_url = "http://decoder.am.unittest/"
+        self.rt_url = "http://rp.unittest/foobot/?qux=zam"
+        self.tr_url = "http://rp.unittest/"
+        self.assoc_handle = "{assoc}{handle}"
+        self.decode = server.Decoder().decode
+
+    def test_none(self):
+        args = {}
+        r = self.decode(args)
+        self.failUnlessEqual(r, None)
+
+    def test_irrelevant(self):
+        args = {
+            'pony': 'spotted',
+            'sreg.mutant_power': 'decaffinator',
+            }
+        r = self.decode(args)
+        self.failUnlessEqual(r, None)
+
+    def test_bad(self):
+        args = {
+            'openid.mode': 'twos-compliment',
+            'openid.pants': 'zippered',
+            }
+        self.failUnlessRaises(server.ProtocolError, self.decode, args)
+
+    def test_dictOfLists(self):
+        args = {
+            'openid.mode': ['checkid_setup'],
+            'openid.identity': self.id_url,
+            'openid.assoc_handle': self.assoc_handle,
+            'openid.return_to': self.rt_url,
+            'openid.trust_root': self.tr_url,
+            }
+        try:
+            result = self.decode(args)
+        except TypeError, err:
+            self.failUnless(str(err).find('values') != -1, err)
+        else:
+            self.fail("Expected TypeError, but got result %s" % (result,))
+
+    def test_checkidImmediate(self):
+        args = {
+            'openid.mode': 'checkid_immediate',
+            'openid.identity': self.id_url,
+            'openid.assoc_handle': self.assoc_handle,
+            'openid.return_to': self.rt_url,
+            'openid.trust_root': self.tr_url,
+            # should be ignored
+            'openid.some.extension': 'junk',
+            }
+        r = self.decode(args)
+        self.failUnless(isinstance(r, server.CheckIDRequest))
+        self.failUnlessEqual(r.mode, "checkid_immediate")
+        self.failUnlessEqual(r.immediate, True)
+        self.failUnlessEqual(r.identity, self.id_url)
+        self.failUnlessEqual(r.trust_root, self.tr_url)
+        self.failUnlessEqual(r.return_to, self.rt_url)
+        self.failUnlessEqual(r.assoc_handle, self.assoc_handle)
+
+    def test_checkidSetup(self):
+        args = {
+            'openid.mode': 'checkid_setup',
+            'openid.identity': self.id_url,
+            'openid.assoc_handle': self.assoc_handle,
+            'openid.return_to': self.rt_url,
+            'openid.trust_root': self.tr_url,
+            }
+        r = self.decode(args)
+        self.failUnless(isinstance(r, server.CheckIDRequest))
+        self.failUnlessEqual(r.mode, "checkid_setup")
+        self.failUnlessEqual(r.immediate, False)
+        self.failUnlessEqual(r.identity, self.id_url)
+        self.failUnlessEqual(r.trust_root, self.tr_url)
+        self.failUnlessEqual(r.return_to, self.rt_url)
+
+    def test_checkidSetupNoIdentity(self):
+        args = {
+            'openid.mode': 'checkid_setup',
+            'openid.assoc_handle': self.assoc_handle,
+            'openid.return_to': self.rt_url,
+            'openid.trust_root': self.tr_url,
+            }
+        try:
+            result = self.decode(args)
+        except server.ProtocolError, err:
+            self.failUnless(err.query)
+        else:
+            self.fail("Expected ProtocolError, instead returned with %s" %
+                      (result,))
+
+    def test_checkidSetupNoReturn(self):
+        args = {
+            'openid.mode': 'checkid_setup',
+            'openid.identity': self.id_url,
+            'openid.assoc_handle': self.assoc_handle,
+            'openid.trust_root': self.tr_url,
+            }
+        self.failUnlessRaises(server.ProtocolError, self.decode, args)
+
+    def test_checkidSetupBadReturn(self):
+        args = {
+            'openid.mode': 'checkid_setup',
+            'openid.identity': self.id_url,
+            'openid.assoc_handle': self.assoc_handle,
+            'openid.return_to': 'not a url',
+            }
+        try:
+            result = self.decode(args)
+        except server.ProtocolError, err:
+            self.failUnless(err.query)
+        else:
+            self.fail("Expected ProtocolError, instead returned with %s" %
+                      (result,))
+
+    def test_checkidSetupUntrustedReturn(self):
+        args = {
+            'openid.mode': 'checkid_setup',
+            'openid.identity': self.id_url,
+            'openid.assoc_handle': self.assoc_handle,
+            'openid.return_to': self.rt_url,
+            'openid.trust_root': 'http://not-the-return-place.unittest/',
+            }
+        try:
+            result = self.decode(args)
+        except server.UntrustedReturnURL, err:
+            self.failUnless(err.query)
+        else:
+            self.fail("Expected UntrustedReturnURL, instead returned with %s" %
+                      (result,))
+
+    def test_checkAuth(self):
+        args = {
+            'openid.mode': 'check_authentication',
+            'openid.assoc_handle': '{dumb}{handle}',
+            'openid.sig': 'sigblob',
+            'openid.signed': 'foo,bar,mode',
+            'openid.foo': 'signedval1',
+            'openid.bar': 'signedval2',
+            'openid.baz': 'unsigned',
+            }
+        r = self.decode(args)
+        self.failUnless(isinstance(r, server.CheckAuthRequest))
+        self.failUnlessEqual(r.mode, 'check_authentication')
+        self.failUnlessEqual(r.sig, 'sigblob')
+        self.failUnlessEqual(r.signed, [
+            ('foo', 'signedval1'),
+            ('bar', 'signedval2'),
+            ('mode', 'id_res'),
+            ])
+        # XXX: test error cases (i.e. missing required fields)
+
+
+    def test_checkAuthMissingSignedField(self):
+        args = {
+            'openid.mode': 'check_authentication',
+            'openid.assoc_handle': '{dumb}{handle}',
+            'openid.sig': 'sigblob',
+            'openid.signed': 'foo,bar,mode',
+            'openid.foo': 'signedval1',
+            'openid.baz': 'unsigned',
+            }
+        self.failUnlessRaises(server.ProtocolError, self.decode, args)
+
+
+    def test_checkAuthMissingSignature(self):
+        args = {
+            'openid.mode': 'check_authentication',
+            'openid.assoc_handle': '{dumb}{handle}',
+            'openid.signed': 'foo,bar,mode',
+            'openid.foo': 'signedval1',
+            'openid.bar': 'signedval2',
+            'openid.baz': 'unsigned',
+            }
+        self.failUnlessRaises(server.ProtocolError, self.decode, args)
+
+
+    def test_checkAuthAndInvalidate(self):
+        args = {
+            'openid.mode': 'check_authentication',
+            'openid.assoc_handle': '{dumb}{handle}',
+            'openid.invalidate_handle': '[[SMART_handle]]',
+            'openid.sig': 'sigblob',
+            'openid.signed': 'foo,bar,mode',
+            'openid.foo': 'signedval1',
+            'openid.bar': 'signedval2',
+            'openid.baz': 'unsigned',
+            }
+        r = self.decode(args)
+        self.failUnless(isinstance(r, server.CheckAuthRequest))
+        self.failUnlessEqual(r.invalidate_handle, '[[SMART_handle]]')
+
+
+    def test_associateDH(self):
+        args = {
+            'openid.mode': 'associate',
+            'openid.session_type': 'DH-SHA1',
+            'openid.dh_consumer_public': "Rzup9265tw==",
+            }
+        r = self.decode(args)
+        self.failUnless(isinstance(r, server.AssociateRequest))
+        self.failUnlessEqual(r.mode, "associate")
+        self.failUnlessEqual(r.session.session_type, "DH-SHA1")
+        self.failUnlessEqual(r.assoc_type, "HMAC-SHA1")
+        self.failUnless(r.session.consumer_pubkey)
+
+    def test_associateDHMissingKey(self):
+        """Trying DH assoc w/o public key"""
+        args = {
+            'openid.mode': 'associate',
+            'openid.session_type': 'DH-SHA1',
+            }
+        # Using DH-SHA1 without supplying dh_consumer_public is an error.
+        self.failUnlessRaises(server.ProtocolError, self.decode, args)
+
+
+    def test_associateDHpubKeyNotB64(self):
+        args = {
+            'openid.mode': 'associate',
+            'openid.session_type': 'DH-SHA1',
+            'openid.dh_consumer_public': "donkeydonkeydonkey",
+            }
+        self.failUnlessRaises(server.ProtocolError, self.decode, args)
+
+
+    def test_associateDHModGen(self):
+        # test dh with non-default but valid values for dh_modulus and dh_gen
+        args = {
+            'openid.mode': 'associate',
+            'openid.session_type': 'DH-SHA1',
+            'openid.dh_consumer_public': "Rzup9265tw==",
+            'openid.dh_modulus': cryptutil.longToBase64(ALT_MODULUS),
+            'openid.dh_gen': cryptutil.longToBase64(ALT_GEN) ,
+            }
+        r = self.decode(args)
+        self.failUnless(isinstance(r, server.AssociateRequest))
+        self.failUnlessEqual(r.mode, "associate")
+        self.failUnlessEqual(r.session.session_type, "DH-SHA1")
+        self.failUnlessEqual(r.assoc_type, "HMAC-SHA1")
+        self.failUnlessEqual(r.session.dh.modulus, ALT_MODULUS)
+        self.failUnlessEqual(r.session.dh.generator, ALT_GEN)
+        self.failUnless(r.session.consumer_pubkey)
+
+
+    def test_associateDHCorruptModGen(self):
+        # test dh with non-default but valid values for dh_modulus and dh_gen
+        args = {
+            'openid.mode': 'associate',
+            'openid.session_type': 'DH-SHA1',
+            'openid.dh_consumer_public': "Rzup9265tw==",
+            'openid.dh_modulus': 'pizza',
+            'openid.dh_gen': 'gnocchi',
+            }
+        self.failUnlessRaises(server.ProtocolError, self.decode, args)
+
+
+    def test_associateDHMissingModGen(self):
+        # test dh with non-default but valid values for dh_modulus and dh_gen
+        args = {
+            'openid.mode': 'associate',
+            'openid.session_type': 'DH-SHA1',
+            'openid.dh_consumer_public': "Rzup9265tw==",
+            'openid.dh_modulus': 'pizza',
+            }
+        self.failUnlessRaises(server.ProtocolError, self.decode, args)
+
+
+#     def test_associateDHInvalidModGen(self):
+#         # test dh with properly encoded values that are not a valid
+#         #   modulus/generator combination.
+#         args = {
+#             'openid.mode': 'associate',
+#             'openid.session_type': 'DH-SHA1',
+#             'openid.dh_consumer_public': "Rzup9265tw==",
+#             'openid.dh_modulus': cryptutil.longToBase64(9),
+#             'openid.dh_gen': cryptutil.longToBase64(27) ,
+#             }
+#         self.failUnlessRaises(server.ProtocolError, self.decode, args)
+#     test_associateDHInvalidModGen.todo = "low-priority feature"
+
+
+    def test_associateWeirdSession(self):
+        args = {
+            'openid.mode': 'associate',
+            'openid.session_type': 'FLCL6',
+            'openid.dh_consumer_public': "YQ==\n",
+            }
+        self.failUnlessRaises(server.ProtocolError, self.decode, args)
+
+
+    def test_associatePlain(self):
+        args = {
+            'openid.mode': 'associate',
+            }
+        r = self.decode(args)
+        self.failUnless(isinstance(r, server.AssociateRequest))
+        self.failUnlessEqual(r.mode, "associate")
+        self.failUnlessEqual(r.session.session_type, "plaintext")
+        self.failUnlessEqual(r.assoc_type, "HMAC-SHA1")
+
+    def test_nomode(self):
+        args = {
+            'openid.session_type': 'DH-SHA1',
+            'openid.dh_consumer_public': "my public keeey",
+            }
+        self.failUnlessRaises(server.ProtocolError, self.decode, args)
+
+
+
+class TestEncode(unittest.TestCase):
+    def setUp(self):
+        self.encoder = server.Encoder()
+        self.encode = self.encoder.encode
+
+    def test_id_res(self):
+        request = server.CheckIDRequest(
+            identity = 'http://bombom.unittest/',
+            trust_root = 'http://burr.unittest/',
+            return_to = 'http://burr.unittest/999',
+            immediate = False,
+            )
+        response = server.OpenIDResponse(request)
+        response.fields = {
+            'mode': 'id_res',
+            'identity': request.identity,
+            'return_to': request.return_to,
+            }
+        webresponse = self.encode(response)
+        self.failUnlessEqual(webresponse.code, server.HTTP_REDIRECT)
+        self.failUnless(webresponse.headers.has_key('location'))
+
+        location = webresponse.headers['location']
+        self.failUnless(location.startswith(request.return_to),
+                        "%s does not start with %s" % (location,
+                                                       request.return_to))
+        query = cgi.parse_qs(urlparse(location)[4])
+        # argh.
+        q2 = dict([(k, v[0]) for k, v in query.iteritems()])
+        expected = dict(
+            [('openid.' + k, v) for k, v in response.fields.iteritems()])
+        self.failUnlessEqual(q2, expected)
+
+    def test_cancel(self):
+        request = server.CheckIDRequest(
+            identity = 'http://bombom.unittest/',
+            trust_root = 'http://burr.unittest/',
+            return_to = 'http://burr.unittest/999',
+            immediate = False,
+            )
+        response = server.OpenIDResponse(request)
+        response.fields = {
+            'mode': 'cancel',
+            }
+        webresponse = self.encode(response)
+        self.failUnlessEqual(webresponse.code, server.HTTP_REDIRECT)
+        self.failUnless(webresponse.headers.has_key('location'))
+
+    def test_assocReply(self):
+        request = server.AssociateRequest.fromQuery({})
+        response = server.OpenIDResponse(request)
+        response.fields = {'assoc_handle': "every-zig"}
+        webresponse = self.encode(response)
+        body = """assoc_handle:every-zig
+"""
+        self.failUnlessEqual(webresponse.code, server.HTTP_OK)
+        self.failUnlessEqual(webresponse.headers, {})
+        self.failUnlessEqual(webresponse.body, body)
+
+    def test_checkauthReply(self):
+        request = server.CheckAuthRequest('a_sock_monkey',
+                                          'siggggg',
+                                          [])
+        response = server.OpenIDResponse(request)
+        response.fields = {
+            'is_valid': 'true',
+            'invalidate_handle': 'xXxX:xXXx'
+            }
+        body = """invalidate_handle:xXxX:xXXx
+is_valid:true
+"""
+        webresponse = self.encode(response)
+        self.failUnlessEqual(webresponse.code, server.HTTP_OK)
+        self.failUnlessEqual(webresponse.headers, {})
+        self.failUnlessEqual(webresponse.body, body)
+
+    def test_unencodableError(self):
+        args = {
+            'openid.identity': 'http://limu.unittest/',
+            }
+        e = server.ProtocolError(args, "wet paint")
+        self.failUnlessRaises(server.EncodingError, self.encode, e)
+
+    def test_encodableError(self):
+        args = {
+            'openid.mode': 'associate',
+            'openid.identity': 'http://limu.unittest/',
+            }
+        body="""error:snoot
+mode:error
+"""
+        webresponse = self.encode(server.ProtocolError(args, "snoot"))
+        self.failUnlessEqual(webresponse.code, server.HTTP_ERROR)
+        self.failUnlessEqual(webresponse.headers, {})
+        self.failUnlessEqual(webresponse.body, body)
+
+
+
+class TestSigningEncode(unittest.TestCase):
+    def setUp(self):
+        self._dumb_key = server.Signatory._dumb_key
+        self._normal_key = server.Signatory._normal_key
+        self.store = _memstore.MemoryStore()
+        self.request = server.CheckIDRequest(
+            identity = 'http://bombom.unittest/',
+            trust_root = 'http://burr.unittest/',
+            return_to = 'http://burr.unittest/999',
+            immediate = False,
+            )
+        self.response = server.OpenIDResponse(self.request)
+        self.response.fields = {
+            'mode': 'id_res',
+            'identity': self.request.identity,
+            'return_to': self.request.return_to,
+            }
+        self.response.signed.extend(['mode','identity','return_to'])
+        self.signatory = server.Signatory(self.store)
+        self.encoder = server.SigningEncoder(self.signatory)
+        self.encode = self.encoder.encode
+
+    def test_idres(self):
+        assoc_handle = '{bicycle}{shed}'
+        self.store.storeAssociation(
+            self._normal_key,
+            association.Association.fromExpiresIn(60, assoc_handle,
+                                                  'sekrit', 'HMAC-SHA1'))
+        self.request.assoc_handle = assoc_handle
+        webresponse = self.encode(self.response)
+        self.failUnlessEqual(webresponse.code, server.HTTP_REDIRECT)
+        self.failUnless(webresponse.headers.has_key('location'))
+
+        location = webresponse.headers['location']
+        query = cgi.parse_qs(urlparse(location)[4])
+        self.failUnless('openid.sig' in query)
+        self.failUnless('openid.assoc_handle' in query)
+        self.failUnless('openid.signed' in query)
+
+    def test_idresDumb(self):
+        webresponse = self.encode(self.response)
+        self.failUnlessEqual(webresponse.code, server.HTTP_REDIRECT)
+        self.failUnless(webresponse.headers.has_key('location'))
+
+        location = webresponse.headers['location']
+        query = cgi.parse_qs(urlparse(location)[4])
+        self.failUnless('openid.sig' in query)
+        self.failUnless('openid.assoc_handle' in query)
+        self.failUnless('openid.signed' in query)
+
+    def test_forgotStore(self):
+        self.encoder.signatory = None
+        self.failUnlessRaises(ValueError, self.encode, self.response)
+
+    def test_cancel(self):
+        request = server.CheckIDRequest(
+            identity = 'http://bombom.unittest/',
+            trust_root = 'http://burr.unittest/',
+            return_to = 'http://burr.unittest/999',
+            immediate = False,
+            )
+        response = server.OpenIDResponse(request)
+        response.fields['mode'] = 'cancel'
+        response.signed[:] = []
+        webresponse = self.encode(response)
+        self.failUnlessEqual(webresponse.code, server.HTTP_REDIRECT)
+        self.failUnless(webresponse.headers.has_key('location'))
+        location = webresponse.headers['location']
+        query = cgi.parse_qs(urlparse(location)[4])
+        self.failIf('openid.sig' in query, query.get('openid.sig'))
+
+    def test_assocReply(self):
+        request = server.AssociateRequest.fromQuery({})
+        response = server.OpenIDResponse(request)
+        response.fields = {'assoc_handle': "every-zig"}
+        webresponse = self.encode(response)
+        body = """assoc_handle:every-zig
+"""
+        self.failUnlessEqual(webresponse.code, server.HTTP_OK)
+        self.failUnlessEqual(webresponse.headers, {})
+        self.failUnlessEqual(webresponse.body, body)
+
+    def test_alreadySigned(self):
+        self.response.fields['sig'] = 'priorSig=='
+        self.failUnlessRaises(server.AlreadySigned, self.encode, self.response)
+
+
+
+class TestCheckID(unittest.TestCase):
+    def setUp(self):
+        self.request = server.CheckIDRequest(
+            identity = 'http://bambam.unittest/',
+            trust_root = 'http://bar.unittest/',
+            return_to = 'http://bar.unittest/999',
+            immediate = False,
+            )
+
+    def test_trustRootInvalid(self):
+        self.request.trust_root = "http://foo.unittest/17"
+        self.request.return_to = "http://foo.unittest/39"
+        self.failIf(self.request.trustRootValid())
+
+    def test_trustRootValid(self):
+        self.request.trust_root = "http://foo.unittest/"
+        self.request.return_to = "http://foo.unittest/39"
+        self.failUnless(self.request.trustRootValid())
+
+    def test_answerAllow(self):
+        answer = self.request.answer(True)
+        self.failUnlessEqual(answer.request, self.request)
+        self.failUnlessEqual(answer.fields, {
+            'mode': 'id_res',
+            'identity': self.request.identity,
+            'return_to': self.request.return_to,
+            })
+        signed = answer.signed[:]
+        signed.sort()
+        self.failUnlessEqual(signed, ["identity", "mode", "return_to"])
+
+    def test_answerAllowNoTrustRoot(self):
+        self.request.trust_root = None
+        answer = self.request.answer(True)
+        self.failUnlessEqual(answer.request, self.request)
+        self.failUnlessEqual(answer.fields, {
+            'mode': 'id_res',
+            'identity': self.request.identity,
+            'return_to': self.request.return_to,
+            })
+        signed = answer.signed[:]
+        signed.sort()
+        self.failUnlessEqual(signed, ["identity", "mode", "return_to"])
+
+    def test_answerImmediateDeny(self):
+        self.request.mode = 'checkid_immediate'
+        self.request.immediate = True
+        server_url = "http://setup-url.unittest/"
+        # crappiting setup_url, you dirty my interface with your presence!
+        answer = self.request.answer(False, server_url=server_url)
+        self.failUnlessEqual(answer.request, self.request)
+        self.failUnlessEqual(len(answer.fields), 2, answer.fields)
+        self.failUnlessEqual(answer.fields.get('mode'), 'id_res')
+        self.failUnless(answer.fields.get('user_setup_url', '').startswith(
+            server_url))
+        self.failUnlessEqual(answer.signed, [])
+
+    def test_answerSetupDeny(self):
+        answer = self.request.answer(False)
+        self.failUnlessEqual(answer.fields, {
+            'mode': 'cancel',
+            })
+        self.failUnlessEqual(answer.signed, [])
+
+    def test_encodeToURL(self):
+        server_url = 'http://openid-server.unittest/'
+        result = self.request.encodeToURL(server_url)
+
+        # How to check?  How about a round-trip test.
+        base, result_args = result.split('?', 1)
+        result_args = cgi.parse_qs(result_args)
+        result_args = dict([(k, v[0]) for k, v in result_args.iteritems()])
+        rebuilt_request = server.CheckIDRequest.fromQuery(result_args)
+        self.failUnlessEqual(rebuilt_request.__dict__, self.request.__dict__)
+
+    def test_getCancelURL(self):
+        url = self.request.getCancelURL()
+        expected = self.request.return_to + '?openid.mode=cancel'
+        self.failUnlessEqual(url, expected)
+
+    def test_getCancelURLimmed(self):
+        self.request.mode = 'checkid_immediate'
+        self.request.immediate = True
+        self.failUnlessRaises(ValueError, self.request.getCancelURL)
+
+
+
+class TestCheckIDExtension(unittest.TestCase):
+
+    def setUp(self):
+        self.request = server.CheckIDRequest(
+            identity = 'http://bambam.unittest/',
+            trust_root = 'http://bar.unittest/',
+            return_to = 'http://bar.unittest/999',
+            immediate = False,
+            )
+        self.response = server.OpenIDResponse(self.request)
+        self.response.fields['mode'] = 'id_res'
+        self.response.fields['blue'] = 'star'
+        self.response.signed.extend(['mode','identity','return_to'])
+
+
+    def test_addField(self):
+        namespace = 'mj12'
+        self.response.addField(namespace, 'bright', 'potato')
+        self.failUnlessEqual(self.response.fields,
+                             {'blue': 'star',
+                              'mode': 'id_res',
+                              'mj12.bright': 'potato'})
+        self.failUnlessEqual(self.response.signed,
+                             ['mode', 'identity', 'return_to', 'mj12.bright'])
+
+
+    def test_addFieldNoNamespace(self):
+        self.response.addField('', 'dark', 'pages')
+        self.failUnlessEqual(self.response.fields,
+                             {'blue': 'star',
+                              'mode': 'id_res',
+                              'dark': 'pages'})
+
+    def test_addFieldUnsigned(self):
+        namespace = 'mj12'
+        self.response.addField(namespace, 'dull', 'lemon', signed=False)
+        self.failUnlessEqual(self.response.fields,
+                             {'blue': 'star',
+                              'mode': 'id_res',
+                              'mj12.dull': 'lemon'})
+        self.failUnlessEqual(self.response.signed,
+                             ['mode', 'identity', 'return_to'])
+
+
+    def test_addFields(self):
+        namespace = 'mi5'
+        self.response.addFields(namespace, {'tangy': 'suspenders',
+                                           'bravo': 'inclusion'})
+        self.failUnlessEqual(self.response.fields,
+                             {'blue': 'star',
+                              'mode': 'id_res',
+                              'mi5.tangy': 'suspenders',
+                              'mi5.bravo': 'inclusion'})
+        self.failUnlessEqual(self.response.signed,
+                             ['mode', 'identity', 'return_to',
+                              'mi5.tangy', 'mi5.bravo'])
+
+
+    def test_addFieldsUnsigned(self):
+        namespace = 'mi5'
+        self.response.addFields(namespace, {'strange': 'conditioner',
+                                           'elemental': 'blender'},
+                                signed=False)
+        self.failUnlessEqual(self.response.fields,
+                             {'blue': 'star',
+                              'mode': 'id_res',
+                              'mi5.strange': 'conditioner',
+                              'mi5.elemental': 'blender'})
+        self.failUnlessEqual(self.response.signed,
+                             ['mode', 'identity', 'return_to'])
+
+
+    def test_update(self):
+        eresponse = server.OpenIDResponse(None)
+        eresponse.fields.update({'shape': 'heart',
+                                 'content': 'strings,wire'})
+        eresponse.signed = ['content']
+        self.response.update('box', eresponse)
+        self.failUnlessEqual(self.response.fields,
+                             {'blue': 'star',
+                              'mode': 'id_res',
+                              'box.shape': 'heart',
+                              'box.content': 'strings,wire'})
+        self.failUnlessEqual(self.response.signed,
+                             ['mode', 'identity', 'return_to', 'box.content'])
+
+    def test_updateNoNamespace(self):
+        eresponse = server.OpenIDResponse(None)
+        eresponse.fields.update({'species': 'pterodactyl',
+                                 'saturation': 'day-glo'})
+        eresponse.signed = ['species']
+        self.response.update(None, eresponse)
+        self.failUnlessEqual(self.response.fields,
+                             {'blue': 'star',
+                              'mode': 'id_res',
+                              'species': 'pterodactyl',
+                              'saturation': 'day-glo'})
+        self.failUnlessEqual(self.response.signed,
+                             ['mode', 'identity', 'return_to', 'species'])
+
+
+
+class MockSignatory(object):
+    isValid = True
+
+    def __init__(self, assoc):
+        self.assocs = [assoc]
+
+    def verify(self, assoc_handle, sig, signed_pairs):
+        assert sig
+        signed_pairs[:]
+        if (True, assoc_handle) in self.assocs:
+            return self.isValid
+        else:
+            return False
+
+    def getAssociation(self, assoc_handle, dumb):
+        if (dumb, assoc_handle) in self.assocs:
+            # This isn't a valid implementation for many uses of this
+            # function, mind you.
+            return True
+        else:
+            return None
+
+    def invalidate(self, assoc_handle, dumb):
+        if (dumb, assoc_handle) in self.assocs:
+            self.assocs.remove((dumb, assoc_handle))
+
+
+class TestCheckAuth(unittest.TestCase):
+    def setUp(self):
+        self.assoc_handle = 'mooooooooo'
+        self.request = server.CheckAuthRequest(
+            self.assoc_handle, 'signarture',
+            [('one', 'alpha'), ('two', 'beta')])
+
+        self.signatory = MockSignatory((True, self.assoc_handle))
+
+    def test_valid(self):
+        r = self.request.answer(self.signatory)
+        self.failUnlessEqual(r.fields, {'is_valid': 'true'})
+        self.failUnlessEqual(r.request, self.request)
+
+    def test_invalid(self):
+        self.signatory.isValid = False
+        r = self.request.answer(self.signatory)
+        self.failUnlessEqual(r.fields, {'is_valid': 'false'})
+
+    def test_replay(self):
+        r = self.request.answer(self.signatory)
+        r = self.request.answer(self.signatory)
+        self.failUnlessEqual(r.fields, {'is_valid': 'false'})
+
+    def test_invalidatehandle(self):
+        self.request.invalidate_handle = "bogusHandle"
+        r = self.request.answer(self.signatory)
+        self.failUnlessEqual(r.fields, {'is_valid': 'true',
+                                        'invalidate_handle': "bogusHandle"})
+        self.failUnlessEqual(r.request, self.request)
+
+    def test_invalidatehandleNo(self):
+        assoc_handle = 'goodhandle'
+        self.signatory.assocs.append((False, 'goodhandle'))
+        self.request.invalidate_handle = assoc_handle
+        r = self.request.answer(self.signatory)
+        self.failUnlessEqual(r.fields, {'is_valid': 'true'})
+
+
+class TestAssociate(unittest.TestCase):
+    # TODO: test DH with non-default values for modulus and gen.
+    # (important to do because we actually had it broken for a while.)
+
+    def setUp(self):
+        self.request = server.AssociateRequest.fromQuery({})
+        self.store = _memstore.MemoryStore()
+        self.signatory = server.Signatory(self.store)
+        self.assoc = self.signatory.createAssociation(dumb=False)
+
+    def test_dh(self):
+        from openid.dh import DiffieHellman
+        from openid.server.server import DiffieHellmanServerSession
+        consumer_dh = DiffieHellman.fromDefaults()
+        cpub = consumer_dh.public
+        session = DiffieHellmanServerSession(DiffieHellman.fromDefaults(), cpub)
+        self.request = server.AssociateRequest(session)
+        response = self.request.answer(self.assoc)
+        rfg = response.fields.get
+        self.failUnlessEqual(rfg("assoc_type"), "HMAC-SHA1")
+        self.failUnlessEqual(rfg("assoc_handle"), self.assoc.handle)
+        self.failIf(rfg("mac_key"))
+        self.failUnlessEqual(rfg("session_type"), "DH-SHA1")
+        self.failUnless(rfg("enc_mac_key"))
+        self.failUnless(rfg("dh_server_public"))
+
+        enc_key = rfg("enc_mac_key").decode('base64')
+        spub = cryptutil.base64ToLong(rfg("dh_server_public"))
+        secret = consumer_dh.xorSecret(spub, enc_key)
+        self.failUnlessEqual(secret, self.assoc.secret)
+
+
+    def test_plaintext(self):
+        response = self.request.answer(self.assoc)
+        rfg = response.fields.get
+
+        self.failUnlessEqual(rfg("assoc_type"), "HMAC-SHA1")
+        self.failUnlessEqual(rfg("assoc_handle"), self.assoc.handle)
+
+        self.failUnlessEqual(
+            rfg("expires_in"), "%d" % (self.signatory.SECRET_LIFETIME,))
+        self.failUnlessEqual(
+            rfg("mac_key"), oidutil.toBase64(self.assoc.secret))
+        self.failIf(rfg("session_type"))
+        self.failIf(rfg("enc_mac_key"))
+        self.failIf(rfg("dh_server_public"))
+
+class Counter(object):
+    def __init__(self):
+        self.count = 0
+
+    def inc(self):
+        self.count += 1
+
+class TestServer(unittest.TestCase, CatchLogs):
+    def setUp(self):
+        self.store = _memstore.MemoryStore()
+        self.server = server.Server(self.store)
+        CatchLogs.setUp(self)
+
+    def test_dispatch(self):
+        monkeycalled = Counter()
+        def monkeyDo(request):
+            monkeycalled.inc()
+            r = server.OpenIDResponse(request)
+            return r
+        self.server.openid_monkeymode = monkeyDo
+        request = server.OpenIDRequest()
+        request.mode = "monkeymode"
+        webresult = self.server.handleRequest(request)
+        self.failUnlessEqual(monkeycalled.count, 1)
+
+    def test_associate(self):
+        request = server.AssociateRequest.fromQuery({})
+        response = self.server.openid_associate(request)
+        self.failUnless(response.fields.has_key("assoc_handle"))
+
+    def test_checkAuth(self):
+        request = server.CheckAuthRequest('arrrrrf', '0x3999', [])
+        response = self.server.openid_check_authentication(request)
+        self.failUnless(response.fields.has_key("is_valid"))
+
+class TestSignatory(unittest.TestCase, CatchLogs):
+    def setUp(self):
+        self.store = _memstore.MemoryStore()
+        self.signatory = server.Signatory(self.store)
+        self._dumb_key = self.signatory._dumb_key
+        self._normal_key = self.signatory._normal_key
+        CatchLogs.setUp(self)
+
+    def test_sign(self):
+        request = server.OpenIDRequest()
+        assoc_handle = '{assoc}{lookatme}'
+        self.store.storeAssociation(
+            self._normal_key,
+            association.Association.fromExpiresIn(60, assoc_handle,
+                                                  'sekrit', 'HMAC-SHA1'))
+        request.assoc_handle = assoc_handle
+        response = server.OpenIDResponse(request)
+        response.fields = {
+            'foo': 'amsigned',
+            'bar': 'notsigned',
+            'azu': 'alsosigned',
+            }
+        response.signed = ['foo', 'azu']
+        sresponse = self.signatory.sign(response)
+        self.failUnlessEqual(sresponse.fields.get('assoc_handle'),
+                             assoc_handle)
+        self.failUnlessEqual(sresponse.fields.get('signed'),
+                             'foo,azu')
+        self.failUnless(sresponse.fields.get('sig'))
+        self.failIf(self.messages, self.messages)
+
+    def test_signDumb(self):
+        request = server.OpenIDRequest()
+        request.assoc_handle = None
+        response = server.OpenIDResponse(request)
+        response.fields = {
+            'foo': 'amsigned',
+            'bar': 'notsigned',
+            'azu': 'alsosigned',
+            }
+        response.signed = ['foo', 'azu']
+        sresponse = self.signatory.sign(response)
+        assoc_handle = sresponse.fields.get('assoc_handle')
+        self.failUnless(assoc_handle)
+        assoc = self.signatory.getAssociation(assoc_handle, dumb=True)
+        self.failUnless(assoc)
+        self.failUnlessEqual(sresponse.fields.get('signed'),
+                             'foo,azu')
+        self.failUnless(sresponse.fields.get('sig'))
+        self.failIf(self.messages, self.messages)
+
+    def test_signExpired(self):
+        request = server.OpenIDRequest()
+        assoc_handle = '{assoc}{lookatme}'
+        self.store.storeAssociation(
+            self._normal_key,
+            association.Association.fromExpiresIn(-10, assoc_handle,
+                                                  'sekrit', 'HMAC-SHA1'))
+        self.failUnless(self.store.getAssociation(self._normal_key, assoc_handle))
+
+        request.assoc_handle = assoc_handle
+        response = server.OpenIDResponse(request)
+        response.fields = {
+            'foo': 'amsigned',
+            'bar': 'notsigned',
+            'azu': 'alsosigned',
+            }
+        response.signed = ['foo', 'azu']
+        sresponse = self.signatory.sign(response)
+
+        new_assoc_handle = sresponse.fields.get('assoc_handle')
+        self.failUnless(new_assoc_handle)
+        self.failIfEqual(new_assoc_handle, assoc_handle)
+
+        self.failUnlessEqual(sresponse.fields.get('invalidate_handle'),
+                             assoc_handle)
+
+        self.failUnlessEqual(sresponse.fields.get('signed'),
+                             'foo,azu')
+        self.failUnless(sresponse.fields.get('sig'))
+
+        # make sure the expired association is gone
+        self.failIf(self.store.getAssociation(self._normal_key, assoc_handle))
+
+        # make sure the new key is a dumb mode association
+        self.failUnless(self.store.getAssociation(self._dumb_key, new_assoc_handle))
+        self.failIf(self.store.getAssociation(self._normal_key, new_assoc_handle))
+        self.failUnless(self.messages)
+
+    def test_signInvalidHandle(self):
+        request = server.OpenIDRequest()
+        assoc_handle = '{bogus-assoc}{notvalid}'
+
+        request.assoc_handle = assoc_handle
+        response = server.OpenIDResponse(request)
+        response.fields = {
+            'foo': 'amsigned',
+            'bar': 'notsigned',
+            'azu': 'alsosigned',
+            }
+        response.signed = ['foo', 'azu']
+        sresponse = self.signatory.sign(response)
+
+        new_assoc_handle = sresponse.fields.get('assoc_handle')
+        self.failUnless(new_assoc_handle)
+        self.failIfEqual(new_assoc_handle, assoc_handle)
+
+        self.failUnlessEqual(sresponse.fields.get('invalidate_handle'),
+                             assoc_handle)
+
+        self.failUnlessEqual(sresponse.fields.get('signed'), 'foo,azu')
+        self.failUnless(sresponse.fields.get('sig'))
+
+        # make sure the new key is a dumb mode association
+        self.failUnless(self.store.getAssociation(self._dumb_key, new_assoc_handle))
+        self.failIf(self.store.getAssociation(self._normal_key, new_assoc_handle))
+        self.failIf(self.messages, self.messages)
+
+
+    def test_verify(self):
+        assoc_handle = '{vroom}{zoom}'
+        assoc = association.Association.fromExpiresIn(60, assoc_handle,
+                                                      'sekrit', 'HMAC-SHA1')
+
+        self.store.storeAssociation(self._dumb_key, assoc)
+
+        signed_pairs = [('foo', 'bar'),
+                        ('apple', 'orange')]
+
+        sig = "Ylu0KcIR7PvNegB/K41KpnRgJl0="
+        verified = self.signatory.verify(assoc_handle, sig, signed_pairs)
+        self.failUnless(verified)
+        self.failIf(self.messages, self.messages)
+
+    def test_verifyBadSig(self):
+        assoc_handle = '{vroom}{zoom}'
+        assoc = association.Association.fromExpiresIn(60, assoc_handle,
+                                                      'sekrit', 'HMAC-SHA1')
+
+        self.store.storeAssociation(self._dumb_key, assoc)
+
+        signed_pairs = [('foo', 'bar'),
+                        ('apple', 'orange')]
+
+        sig = "Ylu0KcIR7PvNegB/K41KpnRgJl0=".encode('rot13')
+        verified = self.signatory.verify(assoc_handle, sig, signed_pairs)
+        self.failIf(verified)
+        self.failIf(self.messages, self.messages)
+
+    def test_verifyBadHandle(self):
+        assoc_handle = '{vroom}{zoom}'
+        signed_pairs = [('foo', 'bar'),
+                        ('apple', 'orange')]
+
+        sig = "Ylu0KcIR7PvNegB/K41KpnRgJl0="
+        verified = self.signatory.verify(assoc_handle, sig, signed_pairs)
+        self.failIf(verified)
+        self.failUnless(self.messages)
+
+    def test_getAssoc(self):
+        assoc_handle = self.makeAssoc(dumb=True)
+        assoc = self.signatory.getAssociation(assoc_handle, True)
+        self.failUnless(assoc)
+        self.failUnlessEqual(assoc.handle, assoc_handle)
+        self.failIf(self.messages, self.messages)
+
+    def test_getAssocExpired(self):
+        assoc_handle = self.makeAssoc(dumb=True, lifetime=-10)
+        assoc = self.signatory.getAssociation(assoc_handle, True)
+        self.failIf(assoc, assoc)
+        self.failUnless(self.messages)
+
+    def test_getAssocInvalid(self):
+        ah = 'no-such-handle'
+        self.failUnlessEqual(
+            self.signatory.getAssociation(ah, dumb=False), None)
+        self.failIf(self.messages, self.messages)
+
+    def test_getAssocDumbVsNormal(self):
+        assoc_handle = self.makeAssoc(dumb=True)
+        self.failUnlessEqual(
+            self.signatory.getAssociation(assoc_handle, dumb=False), None)
+        self.failIf(self.messages, self.messages)
+
+    def test_createAssociation(self):
+        assoc = self.signatory.createAssociation(dumb=False)
+        self.failUnless(self.signatory.getAssociation(assoc.handle, dumb=False))
+        self.failIf(self.messages, self.messages)
+
+    def makeAssoc(self, dumb, lifetime=60):
+        assoc_handle = '{bling}'
+        assoc = association.Association.fromExpiresIn(lifetime, assoc_handle,
+                                                      'sekrit', 'HMAC-SHA1')
+
+        self.store.storeAssociation((dumb and self._dumb_key) or self._normal_key, assoc)
+        return assoc_handle
+
+    def test_invalidate(self):
+        assoc_handle = '-squash-'
+        assoc = association.Association.fromExpiresIn(60, assoc_handle,
+                                                      'sekrit', 'HMAC-SHA1')
+
+        self.store.storeAssociation(self._dumb_key, assoc)
+        assoc = self.signatory.getAssociation(assoc_handle, dumb=True)
+        self.failUnless(assoc)
+        assoc = self.signatory.getAssociation(assoc_handle, dumb=True)
+        self.failUnless(assoc)
+        self.signatory.invalidate(assoc_handle, dumb=True)
+        assoc = self.signatory.getAssociation(assoc_handle, dumb=True)
+        self.failIf(assoc)
+        self.failIf(self.messages, self.messages)
+
+
+
+if __name__ == '__main__':
+    unittest.main()

Added: incubator/heraldry/libraries/python/openid/trunk/openid/test/storetest.py
URL: http://svn.apache.org/viewvc/incubator/heraldry/libraries/python/openid/trunk/openid/test/storetest.py?view=auto&rev=463041
==============================================================================
--- incubator/heraldry/libraries/python/openid/trunk/openid/test/storetest.py (added)
+++ incubator/heraldry/libraries/python/openid/trunk/openid/test/storetest.py Wed Oct 11 16:11:47 2006
@@ -0,0 +1,347 @@
+from openid.association import Association
+from openid.cryptutil import randomString
+
+import string
+import time
+import socket
+import random
+import os
+
+db_host = 'dbtest'
+
+allowed_handle = []
+for c in string.printable:
+    if c not in string.whitespace:
+        allowed_handle.append(c)
+allowed_handle = ''.join(allowed_handle)
+
+def generateHandle(n):
+    return randomString(n, allowed_handle)
+
+generateSecret = randomString
+
+allowed_nonce = string.ascii_letters + string.digits
+def generateNonce():
+    return randomString(8, allowed_nonce)
+
+def getTmpDbName():
+    hostname = socket.gethostname()
+    hostname.replace('.', '_')
+    return "%s_%d_%s_openid_test" % \
+           (hostname, os.getpid(), \
+            random.randrange(1, int(time.time())))
+
+def testStore(store):
+    """Make sure a given store has a minimum of API compliance. Call
+    this function with an empty store.
+
+    Raises AssertionError if the store does not work as expected.
+
+    OpenIDStore -> NoneType
+    """
+
+    ### Association functions
+    now = int(time.time())
+
+    server_url = 'http://www.myopenid.com/openid'
+    def genAssoc(issued, lifetime=600):
+        sec = generateSecret(20)
+        hdl = generateHandle(128)
+        return Association(hdl, sec, now + issued, lifetime, 'HMAC-SHA1')
+
+    def checkRetrieve(url, handle=None, expected=None):
+        retrieved_assoc = store.getAssociation(url, handle)
+        if expected is None or store.isDumb():
+            assert retrieved_assoc is None
+        else:
+            assert retrieved_assoc == expected, (retrieved_assoc, expected)
+            if retrieved_assoc is expected:
+                print ('Unexpected: retrieved a reference to the expected '
+                       'value instead of a new object')
+            assert retrieved_assoc.handle == expected.handle
+            assert retrieved_assoc.secret == expected.secret
+
+    def checkRemove(url, handle, expected):
+        present = store.removeAssociation(url, handle)
+        expectedPresent = (not store.isDumb()) and expected
+        assert bool(expectedPresent) == bool(present)
+
+    assoc = genAssoc(issued=0)
+
+    # Make sure that a missing association returns no result
+    checkRetrieve(server_url)
+
+    # Check that after storage, getting returns the same result
+    store.storeAssociation(server_url, assoc)
+    checkRetrieve(server_url, None, assoc)
+
+    # more than once
+    checkRetrieve(server_url, None, assoc)
+
+    # Storing more than once has no ill effect
+    store.storeAssociation(server_url, assoc)
+    checkRetrieve(server_url, None, assoc)
+
+    # Removing an association that does not exist returns not present
+    checkRemove(server_url, assoc.handle + 'x', False)
+
+    # Removing an association that does not exist returns not present
+    checkRemove(server_url + 'x', assoc.handle, False)
+
+    # Removing an association that is present returns present
+    checkRemove(server_url, assoc.handle, True)
+
+    # but not present on subsequent calls
+    checkRemove(server_url, assoc.handle, False)
+
+    # Put assoc back in the store
+    store.storeAssociation(server_url, assoc)
+
+    # More recent and expires after assoc
+    assoc2 = genAssoc(issued=1)
+    store.storeAssociation(server_url, assoc2)
+
+    # After storing an association with a different handle, but the
+    # same server_url, the handle with the later issue date is returned.
+    checkRetrieve(server_url, None, assoc2)
+
+    # We can still retrieve the older association
+    checkRetrieve(server_url, assoc.handle, assoc)
+
+    # Plus we can retrieve the association with the later issue date
+    # explicitly
+    checkRetrieve(server_url, assoc2.handle, assoc2)
+
+    # More recent, and expires earlier than assoc2 or assoc. Make sure
+    # that we're picking the one with the latest issued date and not
+    # taking into account the expiration.
+    assoc3 = genAssoc(issued=2, lifetime=100)
+    store.storeAssociation(server_url, assoc3)
+
+    checkRetrieve(server_url, None, assoc3)
+    checkRetrieve(server_url, assoc.handle, assoc)
+    checkRetrieve(server_url, assoc2.handle, assoc2)
+    checkRetrieve(server_url, assoc3.handle, assoc3)
+
+    checkRemove(server_url, assoc2.handle, True)
+
+    checkRetrieve(server_url, None, assoc3)
+    checkRetrieve(server_url, assoc.handle, assoc)
+    checkRetrieve(server_url, assoc2.handle, None)
+    checkRetrieve(server_url, assoc3.handle, assoc3)
+
+    checkRemove(server_url, assoc2.handle, False)
+    checkRemove(server_url, assoc3.handle, True)
+
+    checkRetrieve(server_url, None, assoc)
+    checkRetrieve(server_url, assoc.handle, assoc)
+    checkRetrieve(server_url, assoc2.handle, None)
+    checkRetrieve(server_url, assoc3.handle, None)
+
+    checkRemove(server_url, assoc2.handle, False)
+    checkRemove(server_url, assoc.handle, True)
+    checkRemove(server_url, assoc3.handle, False)
+
+    checkRetrieve(server_url, None, None)
+    checkRetrieve(server_url, assoc.handle, None)
+    checkRetrieve(server_url, assoc2.handle, None)
+    checkRetrieve(server_url, assoc3.handle, None)
+
+    checkRemove(server_url, assoc2.handle, False)
+    checkRemove(server_url, assoc.handle, False)
+    checkRemove(server_url, assoc3.handle, False)
+
+    ### Nonce functions
+
+    def checkUseNonce(nonce, expected):
+        actual = store.useNonce(nonce)
+        expected = store.isDumb() or expected
+        assert bool(actual) == bool(expected)
+
+    # Random nonce (not in store)
+    nonce1 = generateNonce()
+
+    # A nonce is not present by default
+    checkUseNonce(nonce1, False)
+
+    # Storing once causes useNonce to return True the first, and only
+    # the first, time it is called after the store.
+    store.storeNonce(nonce1)
+    checkUseNonce(nonce1, True)
+    checkUseNonce(nonce1, False)
+
+    # Storing twice has the same effect as storing once.
+    store.storeNonce(nonce1)
+    store.storeNonce(nonce1)
+    checkUseNonce(nonce1, True)
+    checkUseNonce(nonce1, False)
+
+    ### Auth key functions
+
+    # There is no key to start with, so generate a new key and return it.
+    key = store.getAuthKey()
+
+    # The second time around should return the same as last time.
+    key2 = store.getAuthKey()
+    assert key == key2
+    assert len(key) == store.AUTH_KEY_LEN
+
+def test_filestore():
+    from openid.store import filestore
+    import tempfile
+    import shutil
+    try:
+        temp_dir = tempfile.mkdtemp()
+    except AttributeError:
+        import os
+        temp_dir = os.tmpnam()
+        os.mkdir(temp_dir)
+
+    store = filestore.FileOpenIDStore(temp_dir)
+    try:
+        testStore(store)
+    except:
+        raise
+    else:
+        shutil.rmtree(temp_dir)
+
+def test_sqlite():
+    from openid.store import sqlstore
+    try:
+        from pysqlite2 import dbapi2 as sqlite
+    except ImportError:
+        pass
+    else:
+        conn = sqlite.connect(':memory:')
+        store = sqlstore.SQLiteStore(conn)
+        store.createTables()
+        testStore(store)
+
+def test_mysql():
+    from openid.store import sqlstore
+    try:
+        import MySQLdb
+    except ImportError:
+        pass
+    else:
+        db_user = 'openid_test'
+        db_passwd = ''
+        db_name = getTmpDbName()
+
+        from MySQLdb.constants import ER
+
+        # Change this connect line to use the right user and password
+        conn = MySQLdb.connect(user=db_user, passwd=db_passwd, host = db_host)
+
+        conn.query('CREATE DATABASE %s;' % db_name)
+        try:
+            conn.query('USE %s;' % db_name)
+
+            # OK, we're in the right environment. Create store and
+            # create the tables.
+            store = sqlstore.MySQLStore(conn)
+            store.createTables()
+
+            # At last, we get to run the test.
+            testStore(store)
+        finally:
+            # Remove the database. If you want to do post-mortem on a
+            # failing test, comment out this line.
+            conn.query('DROP DATABASE %s;' % db_name)
+
+def test_postgresql():
+    """
+    Tests the PostgreSQLStore on a locally-hosted PostgreSQL database
+    cluster, version 7.4 or later.  To run this test, you must have:
+
+    - The 'psycopg' python module (version 1.1) installed
+
+    - PostgreSQL running locally
+
+    - An 'openid_test' user account in your database cluster, which
+      you can create by running 'createuser -Ad openid_test' as the
+      'postgres' user
+
+    - Trust auth for the 'openid_test' account, which you can activate
+      by adding the following line to your pg_hba.conf file:
+
+      local all openid_test trust
+
+    This test connects to the database cluster three times:
+
+    - To the 'template1' database, to create the test database
+
+    - To the test database, to run the store tests
+
+    - To the 'template1' database once more, to drop the test database
+    """
+    from openid.store import sqlstore
+    try:
+        import psycopg
+    except ImportError:
+        pass
+    else:
+        db_name = getTmpDbName()
+        db_user = 'openid_test'
+
+        # Connect once to create the database; reconnect to access the
+        # new database.
+        conn_create = psycopg.connect(database = 'template1', user = db_user,
+                                      host = db_host)
+        conn_create.autocommit()
+
+        # Create the test database.
+        cursor = conn_create.cursor()
+        cursor.execute('CREATE DATABASE %s;' % (db_name,))
+        conn_create.close()
+
+        # Connect to the test database.
+        conn_test = psycopg.connect(database = db_name, user = db_user,
+                                    host = db_host)
+
+        # OK, we're in the right environment. Create the store
+        # instance and create the tables.
+        store = sqlstore.PostgreSQLStore(conn_test)
+        store.createTables()
+
+        # At last, we get to run the test.
+        testStore(store)
+
+        # Disconnect.
+        conn_test.close()
+
+        # It takes a little time for the close() call above to take
+        # effect, so we'll wait for a second before trying to remove
+        # the database.  (Maybe this is because we're using a UNIX
+        # socket to connect to postgres rather than TCP?)
+        import time
+        time.sleep(1)
+
+        # Remove the database now that the test is over.
+        conn_remove = psycopg.connect(database = 'template1', user = db_user,
+                                      host = db_host)
+        conn_remove.autocommit()
+
+        cursor = conn_remove.cursor()
+        cursor.execute('DROP DATABASE %s;' % (db_name,))
+        conn_remove.close()
+
+def test_dumbstore():
+    from openid.store import dumbstore
+    store = dumbstore.DumbStore('bad secret; do not use')
+    testStore(store)
+
+def test_memstore():
+    import _memstore
+    testStore(_memstore.MemoryStore())
+
+def test():
+    test_filestore()
+    test_sqlite()
+    test_mysql()
+    test_postgresql()
+    test_dumbstore()
+    test_memstore()
+
+if __name__ == '__main__':
+    test()

Added: incubator/heraldry/libraries/python/openid/trunk/openid/test/test_discover.py
URL: http://svn.apache.org/viewvc/incubator/heraldry/libraries/python/openid/trunk/openid/test/test_discover.py?view=auto&rev=463041
==============================================================================
--- incubator/heraldry/libraries/python/openid/trunk/openid/test/test_discover.py (added)
+++ incubator/heraldry/libraries/python/openid/trunk/openid/test/test_discover.py Wed Oct 11 16:11:47 2006
@@ -0,0 +1,442 @@
+import sys
+import unittest
+import datadriven
+from urljr import fetchers
+from urljr.fetchers import HTTPResponse
+from yadis.discover import DiscoveryFailure
+from openid.consumer import discover
+from yadis import xrires
+from yadis.xri import XRI
+from urlparse import urlsplit
+
+### Tests for conditions that trigger DiscoveryFailure
+
+class SimpleMockFetcher(object):
+    def __init__(self, responses):
+        self.responses = list(responses)
+
+    def fetch(self, url, body=None, headers=None):
+        response = self.responses.pop(0)
+        assert body is None
+        assert response.final_url == url
+        return response
+
+class TestDiscoveryFailure(datadriven.DataDrivenTestCase):
+    cases = [
+        [HTTPResponse('http://network.error/', None)],
+        [HTTPResponse('http://not.found/', 404)],
+        [HTTPResponse('http://bad.request/', 400)],
+        [HTTPResponse('http://server.error/', 500)],
+        [HTTPResponse('http://header.found/', 200,
+                      headers={'x-xrds-location':'http://xrds.missing/'}),
+         HTTPResponse('http://xrds.missing/', 404)],
+        ]
+
+    def __init__(self, responses):
+        self.url = responses[0].final_url
+        datadriven.DataDrivenTestCase.__init__(self, self.url)
+        self.responses = responses
+
+    def setUp(self):
+        fetcher = SimpleMockFetcher(self.responses)
+        fetchers.setDefaultFetcher(fetcher)
+
+    def tearDown(self):
+        fetchers.setDefaultFetcher(None)
+
+    def runOneTest(self):
+        expected_status = self.responses[-1].status
+        try:
+            discover.discover(self.url)
+        except DiscoveryFailure, why:
+            self.failUnlessEqual(why.http_response.status, expected_status)
+        else:
+            self.fail('Did not raise DiscoveryFailure')
+
+
+### Tests for raising/catching exceptions from the fetcher through the
+### discover function
+
+# Python 2.5 displays a message when running this test, which is
+# testing the behaviour in the presence of string exceptions,
+# deprecated or not, so tell it no to complain when this particular
+# string exception is raised.
+import warnings
+warnings.filterwarnings('ignore', 'raising a string.*', DeprecationWarning,
+                        r'^openid\.test\.test_discover$', 74)
+
+class ErrorRaisingFetcher(object):
+    """Just raise an exception when fetch is called"""
+
+    def __init__(self, thing_to_raise):
+        self.thing_to_raise = thing_to_raise
+
+    def fetch(self, url, body=None, headers=None):
+        raise self.thing_to_raise
+
+class DidFetch(Exception):
+    """Custom exception just to make sure it's not handled differently"""
+
+class TestFetchException(datadriven.DataDrivenTestCase):
+    """Make sure exceptions get passed through discover function from
+    fetcher."""
+
+    cases = [
+        Exception(),
+        DidFetch(),
+        ValueError(),
+        RuntimeError(),
+        'oi!',
+        ]
+
+    def __init__(self, exc):
+        datadriven.DataDrivenTestCase.__init__(self, repr(exc))
+        self.exc = exc
+
+    def setUp(self):
+        fetcher = ErrorRaisingFetcher(self.exc)
+        fetchers.setDefaultFetcher(fetcher, wrap_exceptions=False)
+
+    def tearDown(self):
+        fetchers.setDefaultFetcher(None)
+
+    def runOneTest(self):
+        try:
+            discover.discover('http://doesnt.matter/')
+        except:
+            exc = sys.exc_info()[1]
+            if exc is None:
+                # str exception
+                self.failUnless(self.exc is sys.exc_info()[0])
+            else:
+                self.failUnless(self.exc is exc, exc)
+        else:
+            self.fail('Expected %r', self.exc)
+
+
+### Tests for openid.consumer.discover.discover
+
+
+class DiscoveryMockFetcher(object):
+    redirect = None
+
+    def __init__(self, documents):
+        self.documents = documents
+        self.fetchlog = []
+
+    def fetch(self, url, body=None, headers=None):
+        self.fetchlog.append((url, body, headers))
+        if self.redirect:
+            final_url = self.redirect
+        else:
+            final_url = url
+
+        try:
+            ctype, body = self.documents[url]
+        except KeyError:
+            status = 404
+            ctype = 'text/plain'
+            body = ''
+        else:
+            status = 200
+
+        return HTTPResponse(final_url, status, {'content-type': ctype}, body)
+
+# from twisted.trial import unittest as trialtest
+
+class BaseTestDiscovery(unittest.TestCase):
+    id_url = "http://someuser.unittest/"
+
+    documents = {}
+    fetcherClass = DiscoveryMockFetcher
+
+    def setUp(self):
+        self.documents = self.documents.copy()
+        self.fetcher = self.fetcherClass(self.documents)
+        fetchers.setDefaultFetcher(self.fetcher)
+
+    def tearDown(self):
+        fetchers.setDefaultFetcher(None)
+
+yadis_2entries = '''<?xml version="1.0" encoding="UTF-8"?>
+<xrds:XRDS xmlns:xrds="xri://$xrds"
+           xmlns="xri://$xrd*($v*2.0)"
+           xmlns:openid="http://openid.net/xmlns/1.0"
+           >
+  <XRD>
+    <CanonicalID>=!1000</CanonicalID>
+
+    <Service priority="10">
+      <Type>http://openid.net/signon/1.0</Type>
+      <URI>http://www.myopenid.com/server</URI>
+      <openid:Delegate>http://smoker.myopenid.com/</openid:Delegate>
+    </Service>
+
+    <Service priority="20">
+      <Type>http://openid.net/signon/1.0</Type>
+      <URI>http://www.livejournal.com/openid/server.bml</URI>
+      <openid:Delegate>http://frank.livejournal.com/</openid:Delegate>
+    </Service>
+
+  </XRD>
+</xrds:XRDS>
+'''
+
+yadis_another = '''<?xml version="1.0" encoding="UTF-8"?>
+<xrds:XRDS xmlns:xrds="xri://$xrds"
+           xmlns="xri://$xrd*($v*2.0)"
+           xmlns:openid="http://openid.net/xmlns/1.0"
+           >
+  <XRD>
+
+    <Service priority="10">
+      <Type>http://openid.net/signon/1.0</Type>
+      <URI>http://vroom.unittest/server</URI>
+      <openid:Delegate>http://smoker.myopenid.com/</openid:Delegate>
+    </Service>
+  </XRD>
+</xrds:XRDS>
+'''
+
+
+yadis_0entries = '''<?xml version="1.0" encoding="UTF-8"?>
+<xrds:XRDS xmlns:xrds="xri://$xrds"
+           xmlns="xri://$xrd*($v*2.0)"
+           xmlns:openid="http://openid.net/xmlns/1.0"
+           >
+  <XRD>
+    <Service >
+      <Type>http://is-not-openid.unittest/</Type>
+      <URI>http://noffing.unittest./</URI>
+    </Service>
+  </XRD>
+</xrds:XRDS>
+'''
+
+yadis_no_delegate = '''<?xml version="1.0" encoding="UTF-8"?>
+<xrds:XRDS xmlns:xrds="xri://$xrds"
+           xmlns="xri://$xrd*($v*2.0)"
+           >
+  <XRD>
+    <Service priority="10">
+      <Type>http://openid.net/signon/1.0</Type>
+      <URI>http://www.myopenid.com/server</URI>
+    </Service>
+  </XRD>
+</xrds:XRDS>
+'''
+
+openid_html = """
+<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
+<html>
+  <head>
+    <title>Identity Page for Smoker</title>
+<link rel="openid.server" href="http://www.myopenid.com/server" />
+<link rel="openid.delegate" href="http://smoker.myopenid.com/" />
+  </head><body><p>foo</p></body></html>
+"""
+
+openid_html_no_delegate = """
+<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
+<html>
+  <head>
+    <title>Identity Page for Smoker</title>
+<link rel="openid.server" href="http://www.myopenid.com/server" />
+  </head><body><p>foo</p></body></html>
+"""
+
+openid_and_yadis_html = """
+<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
+<html>
+  <head>
+    <title>Identity Page for Smoker</title>
+<meta http-equiv="X-XRDS-Location" content="http://someuser.unittest/xrds" />
+<link rel="openid.server" href="http://www.myopenid.com/server" />
+<link rel="openid.delegate" href="http://smoker.myopenid.com/" />
+  </head><body><p>foo</p></body></html>
+"""
+
+class TestDiscovery(BaseTestDiscovery):
+    def _usedYadis(self, service):
+        self.failUnless(service.used_yadis, "Expected to use Yadis")
+
+    def _notUsedYadis(self, service):
+        self.failIf(service.used_yadis, "Expected to use old-style discovery")
+
+    def test_404(self):
+        self.failUnlessRaises(DiscoveryFailure,
+                              discover.discover, self.id_url + '/404')
+
+    def test_noYadis(self):
+        self.documents[self.id_url] = ('text/html', openid_html)
+        id_url, services = discover.discover(self.id_url)
+        self.failUnlessEqual(self.id_url, id_url)
+        self.failUnlessEqual(len(services), 1,
+                             "More than one service in %r" % (services,))
+        self.failUnlessEqual(services[0].server_url,
+                             "http://www.myopenid.com/server")
+        self.failUnlessEqual(services[0].delegate,
+                             "http://smoker.myopenid.com/")
+        self.failUnlessEqual(services[0].identity_url, self.id_url)
+        self._notUsedYadis(services[0])
+
+    def test_noOpenID(self):
+        self.fetcher.documents = {
+            self.id_url: ('text/plain', "junk"),
+        }
+        id_url, services = discover.discover(self.id_url)
+        self.failUnlessEqual(self.id_url, id_url)
+        self.failIf(len(services))
+
+    def test_yadis(self):
+        self.fetcher.documents = {
+            BaseTestDiscovery.id_url: ('application/xrds+xml', yadis_2entries),
+        }
+
+        id_url, services = discover.discover(self.id_url)
+        self.failUnlessEqual(self.id_url, id_url)
+        self.failUnlessEqual(len(services), 2,
+                             "Not 2 services in %r" % (services,))
+        self.failUnlessEqual(services[0].server_url,
+                             "http://www.myopenid.com/server")
+        self._usedYadis(services[0])
+        self.failUnlessEqual(services[1].server_url,
+                             "http://www.livejournal.com/openid/server.bml")
+        self._usedYadis(services[1])
+
+    def test_redirect(self):
+        expected_final_url = "http://elsewhere.unittest/"
+        self.fetcher.redirect = expected_final_url
+        self.fetcher.documents = {
+            self.id_url: ('text/html', openid_html),
+        }
+        id_url, services = discover.discover(self.id_url)
+        self.failUnlessEqual(expected_final_url, id_url)
+        self.failUnlessEqual(len(services), 1,
+                             "More than one service in %r" % (services,))
+        self.failUnlessEqual(services[0].server_url,
+                             "http://www.myopenid.com/server")
+        self.failUnlessEqual(services[0].delegate,
+                             "http://smoker.myopenid.com/")
+        self.failUnlessEqual(services[0].identity_url, expected_final_url)
+        self._notUsedYadis(services[0])
+
+    def test_emptyList(self):
+        self.fetcher.documents = {
+            self.id_url: ('application/xrds+xml', yadis_0entries),
+        }
+        id_url, services = discover.discover(self.id_url)
+        self.failUnlessEqual(self.id_url, id_url)
+        self.failIf(services)
+
+    def test_emptyListWithLegacy(self):
+        self.fetcher.documents = {
+            self.id_url: ('text/html', openid_and_yadis_html),
+            self.id_url + 'xrds': ('application/xrds+xml', yadis_0entries),
+        }
+        id_url, services = discover.discover(self.id_url)
+        self.failUnlessEqual(self.id_url, id_url)
+        self.failUnlessEqual(len(services), 1,
+                             "Not one service in %r" % (services,))
+        self.failUnlessEqual(services[0].server_url,
+                             "http://www.myopenid.com/server")
+        self.failUnlessEqual(services[0].identity_url, self.id_url)
+        self._notUsedYadis(services[0])
+
+    def test_yadisNoDelegate(self):
+        self.fetcher.documents = {
+            self.id_url: ('application/xrds+xml', yadis_no_delegate),
+        }
+        id_url, services = discover.discover(self.id_url)
+        self.failUnlessEqual(self.id_url, id_url)
+        self.failUnlessEqual(len(services), 1,
+                             "Not 1 service in %r" % (services,))
+        self.failUnlessEqual(services[0].server_url,
+                             "http://www.myopenid.com/server")
+        self.failUnless(services[0].delegate is None,
+                        'Delegate should be None. Got %r' %
+                        (services[0].delegate,))
+        self._usedYadis(services[0])
+
+    def test_openidNoDelegate(self):
+        self.fetcher.documents = {
+            self.id_url: ('text/html', openid_html_no_delegate),
+        }
+        id_url, services = discover.discover(self.id_url)
+        self.failUnlessEqual(self.id_url, id_url)
+        self.failUnlessEqual(services[0].server_url,
+                             "http://www.myopenid.com/server")
+        self.failUnlessEqual(services[0].identity_url, self.id_url)
+        self.failUnless(services[0].delegate is None,
+                        'Delegate should be None. Got %r' %
+                        (services[0].delegate,))
+
+        self._notUsedYadis(services[0])
+
+
+class MockFetcherForXRIProxy(object):
+
+    def __init__(self, documents, proxy_url=xrires.DEFAULT_PROXY):
+        self.documents = documents
+        self.fetchlog = []
+        self.proxy_url = None
+
+
+    def fetch(self, url, body=None, headers=None):
+        self.fetchlog.append((url, body, headers))
+
+        u = urlsplit(url)
+        proxy_host = u[1]
+        xri = u[2]
+        query = u[3]
+
+        if not headers and not query:
+            raise ValueError("No headers or query; you probably didn't "
+                             "mean to do that.")
+
+        if xri.startswith('/'):
+            xri = xri[1:]
+
+        try:
+            ctype, body = self.documents[xri]
+        except KeyError:
+            status = 404
+            ctype = 'text/plain'
+            body = ''
+        else:
+            status = 200
+
+        return HTTPResponse(url, status, {'content-type': ctype}, body)
+
+
+class TestXRIDiscovery(BaseTestDiscovery):
+    fetcherClass = MockFetcherForXRIProxy
+
+    documents = {'=smoker': ('application/xrds+xml', yadis_2entries) }
+
+    def test_xri(self):
+        user_xri, services = discover.discoverXRI('=smoker')
+        self.failUnless(services)
+        self.failUnlessEqual(services[0].server_url,
+                             "http://www.myopenid.com/server")
+        self.failUnlessEqual(services[1].server_url,
+                             "http://www.livejournal.com/openid/server.bml")
+        self.failUnlessEqual(services[0].canonicalID, XRI("=!1000"))
+
+    def test_useCanonicalID(self):
+        """When there is no delegate, the CanonicalID should be used with XRI.
+        """
+        endpoint = discover.OpenIDServiceEndpoint()
+        endpoint.identity_url = "=example"
+        endpoint.canonicalID = XRI("=!1000")
+        self.failUnlessEqual(endpoint.getServerID(), XRI("=!1000"))
+
+
+
+def pyUnitTests():
+    return datadriven.loadTests(__name__)
+
+if __name__ == '__main__':
+    suite = pyUnitTests()
+    runner = unittest.TextTestRunner()
+    runner.run(suite)

Added: incubator/heraldry/libraries/python/openid/trunk/openid/test/test_htmldiscover.py
URL: http://svn.apache.org/viewvc/incubator/heraldry/libraries/python/openid/trunk/openid/test/test_htmldiscover.py?view=auto&rev=463041
==============================================================================
--- incubator/heraldry/libraries/python/openid/trunk/openid/test/test_htmldiscover.py (added)
+++ incubator/heraldry/libraries/python/openid/trunk/openid/test/test_htmldiscover.py Wed Oct 11 16:11:47 2006
@@ -0,0 +1,28 @@
+from openid.consumer.discover import OpenIDServiceEndpoint
+from openid.consumer.parse import openIDDiscover as parseOpenIDLinkRel
+from openid.consumer.parse import ParseError
+import datadriven
+
+class BadLinksTestCase(datadriven.DataDrivenTestCase):
+    cases = [
+        '',
+        "http://not.in.a.link.tag/",
+        '<link rel="openid.server" href="not.in.html.or.head" />',
+        ]
+
+    def __init__(self, data):
+        datadriven.DataDrivenTestCase.__init__(self, data)
+        self.data = data
+
+    def callFunc(self):
+        return parseOpenIDLinkRel(self.data)
+
+    def runOneTest(self):
+        self.failUnlessRaises(ParseError, self.callFunc)
+
+class BadLinksThroughEndpoint(BadLinksTestCase):
+    def callFunc(self):
+        return OpenIDServiceEndpoint.fromHTML('http://unused.url/', self.data)
+
+def pyUnitTests():
+    return datadriven.loadTests(__name__)

Added: incubator/heraldry/libraries/python/openid/trunk/openid/test/test_openidyadis.py
URL: http://svn.apache.org/viewvc/incubator/heraldry/libraries/python/openid/trunk/openid/test/test_openidyadis.py?view=auto&rev=463041
==============================================================================
--- incubator/heraldry/libraries/python/openid/trunk/openid/test/test_openidyadis.py (added)
+++ incubator/heraldry/libraries/python/openid/trunk/openid/test/test_openidyadis.py Wed Oct 11 16:11:47 2006
@@ -0,0 +1,164 @@
+import unittest
+from openid.consumer.discover import \
+     OpenIDServiceEndpoint, OPENID_1_2_TYPE, OPENID_1_1_TYPE, OPENID_1_0_TYPE
+
+from yadis.services import applyFilter
+
+
+XRDS_BOILERPLATE = '''\
+<?xml version="1.0" encoding="UTF-8"?>
+<xrds:XRDS xmlns:xrds="xri://$xrds"
+           xmlns="xri://$xrd*($v*2.0)"
+           xmlns:openid="http://openid.net/xmlns/1.0">
+    <XRD>
+%s\
+    </XRD>
+</xrds:XRDS>
+'''
+
+def mkXRDS(services):
+    return XRDS_BOILERPLATE % (services,)
+
+def mkService(uris=None, type_uris=None, delegate=None, dent='        '):
+    chunks = [dent, '<Service>\n']
+    dent2 = dent + '    '
+    if type_uris:
+        for type_uri in type_uris:
+            chunks.extend([dent2 + '<Type>', type_uri, '</Type>\n'])
+
+    if uris:
+        for uri in uris:
+            if type(uri) is tuple:
+                uri, prio = uri
+            else:
+                prio = None
+
+            chunks.extend([dent2, '<URI'])
+            if prio is not None:
+                chunks.extend([' priority="', str(prio), '"'])
+            chunks.extend(['>', uri, '</URI>\n'])
+
+    if delegate:
+        chunks.extend(
+            [dent2, '<openid:Delegate>', delegate, '</openid:Delegate>\n'])
+
+    chunks.extend([dent, '</Service>\n'])
+
+    return ''.join(chunks)
+
+# Different sets of server URLs for use in the URI tag
+server_url_options = [
+    [], # This case should not generate an endpoint object
+    ['http://server.url/'],
+    ['https://server.url/'],
+    ['https://server.url/', 'http://server.url/'],
+    ['https://server.url/',
+     'http://server.url/',
+     'http://example.server.url/'],
+    ]
+
+# Used for generating test data
+def subsets(l):
+    """Generate all non-empty sublists of a list"""
+    subsets_list = [[]]
+    for x in l:
+        subsets_list += [[x] + t for t in subsets_list]
+    return subsets_list
+
+# A couple of example extension type URIs. These are not at all
+# official, but are just here for testing.
+ext_types = [
+    'http://janrain.com/extension/blah',
+    'http://openid.net/sreg/1.0',
+    ]
+
+# All valid combinations of Type tags that should produce an OpenID endpoint
+type_uri_options = [
+    exts + ts
+
+    # All non-empty sublists of the valid OpenID type URIs
+    for ts in subsets([OPENID_1_0_TYPE, OPENID_1_1_TYPE, OPENID_1_2_TYPE])
+    if ts
+
+    # All combinations of extension types (including empty extenstion list)
+    for exts in subsets(ext_types)
+    ]
+
+# Range of valid Delegate tag values for generating test data
+delegate_options = [
+    None,
+    'http://vanity.domain/',
+    'https://somewhere/yadis/',
+    ]
+
+# All combinations of valid URIs, Type URIs and Delegate tags
+data = [
+    (uris, type_uris, delegate)
+    for uris in server_url_options
+    for type_uris in type_uri_options
+    for delegate in delegate_options
+    ]
+
+class OpenIDYadisTest(unittest.TestCase):
+    def __init__(self, uris, type_uris, delegate):
+        unittest.TestCase.__init__(self)
+        self.uris = uris
+        self.type_uris = type_uris
+        self.delegate = delegate
+
+    def shortDescription(self):
+        # XXX:
+        return 'Successful OpenID Yadis parsing case'
+
+    def setUp(self):
+        self.yadis_url = 'http://unit.test/'
+
+        # Create an XRDS document to parse
+        services = mkService(uris=self.uris,
+                             type_uris=self.type_uris,
+                             delegate=self.delegate)
+        self.xrds = mkXRDS(services)
+
+    def runTest(self):
+        # Parse into endpoint objects that we will check
+        endpoints = applyFilter(
+            self.yadis_url, self.xrds, OpenIDServiceEndpoint)
+
+        # make sure there are the same number of endpoints as
+        # URIs. This assumes that the type_uris contains at least one
+        # OpenID type.
+        self.failUnlessEqual(len(self.uris), len(endpoints))
+
+        # So that we can check equality on the endpoint types
+        type_uris = list(self.type_uris)
+        type_uris.sort()
+
+        seen_uris = []
+        for endpoint in endpoints:
+            seen_uris.append(endpoint.server_url)
+
+            # All endpoints will have same yadis_url
+            self.failUnlessEqual(self.yadis_url, endpoint.identity_url)
+
+            # and delegate
+            self.failUnlessEqual(self.delegate, endpoint.delegate)
+
+            # and types
+            actual_types = list(endpoint.type_uris)
+            actual_types.sort()
+            self.failUnlessEqual(actual_types, type_uris)
+
+        # So that they will compare equal, because we don't care what
+        # order they are in
+        seen_uris.sort()
+        uris = list(self.uris)
+        uris.sort()
+
+        # Make sure we saw all URIs, and saw each one once
+        self.failUnlessEqual(uris, seen_uris)
+
+def pyUnitTests():
+    cases = []
+    for args in data:
+        cases.append(OpenIDYadisTest(*args))
+    return unittest.TestSuite(cases)

Added: incubator/heraldry/libraries/python/openid/trunk/openid/test/trustroot.py
URL: http://svn.apache.org/viewvc/incubator/heraldry/libraries/python/openid/trunk/openid/test/trustroot.py?view=auto&rev=463041
==============================================================================
--- incubator/heraldry/libraries/python/openid/trunk/openid/test/trustroot.py (added)
+++ incubator/heraldry/libraries/python/openid/trunk/openid/test/trustroot.py Wed Oct 11 16:11:47 2006
@@ -0,0 +1,83 @@
+import os
+import unittest
+from openid.server.trustroot import TrustRoot
+
+class _ParseTest(unittest.TestCase):
+    def __init__(self, sanity, desc, case):
+        unittest.TestCase.__init__(self)
+        self.desc = desc + ': ' + repr(case)
+        self.case = case
+        self.sanity = sanity
+
+    def shortDescription(self):
+        return self.desc
+
+    def runTest(self):
+        tr = TrustRoot.parse(self.case)
+        if self.sanity == 'sane':
+            assert tr.isSane(), self.case
+        elif self.sanity == 'insane':
+            assert not tr.isSane(), self.case
+        else:
+            assert tr is None
+
+class _MatchTest(unittest.TestCase):
+    def __init__(self, match, desc, line):
+        unittest.TestCase.__init__(self)
+        tr, rt = line.split()
+        self.desc = desc + ': ' + repr(tr) + ' ' + repr(rt)
+        self.tr = tr
+        self.rt = rt
+        self.match = match
+
+    def shortDescription(self):
+        return self.desc
+
+    def runTest(self):
+        tr = TrustRoot.parse(self.tr)
+        match = tr.validateURL(self.rt)
+        if self.match:
+            assert match
+        else:
+            assert not match
+
+def getTests(t, grps, head, dat):
+    tests = []
+    top = head.strip()
+    gdat = map(str.strip, dat.split('-' * 40 + '\n'))
+    assert not gdat[0]
+    assert len(gdat) == (len(grps) * 2 + 1), (gdat, grps)
+    i = 1
+    for x in grps:
+        n, desc = gdat[i].split(': ')
+        cases = gdat[i + 1].split('\n')
+        assert len(cases) == int(n)
+        for case in cases:
+            tests.append(t(x, top + ' - ' + desc, case))
+        i += 2
+    return tests
+
+def parseTests(data):
+    parts = map(str.strip, data.split('=' * 40 + '\n'))
+    assert not parts[0]
+    _, ph, pdat, mh, mdat = parts
+
+    tests = []
+    tests.extend(getTests(_ParseTest, ['bad', 'insane', 'sane'], ph, pdat))
+    tests.extend(getTests(_MatchTest, [1, 0], mh, mdat))
+    return tests
+
+def pyUnitTests():
+    here = os.path.dirname(os.path.abspath(__file__))
+    test_data_file_name = os.path.join(here, 'trustroot.txt')
+    test_data_file = file(test_data_file_name)
+    test_data = test_data_file.read()
+    test_data_file.close()
+
+    tests = parseTests(test_data)
+    return unittest.TestSuite(tests)
+
+if __name__ == '__main__':
+    suite = pyUnitTests()
+    runner = unittest.TextTestRunner()
+    runner.run(suite)



Mime
View raw message