incubator-heraldry-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ket...@apache.org
Subject svn commit: r463060 [4/6] - in /incubator/heraldry/libraries/python/openid/trunk: ./ admin/ examples/ openid/ openid/consumer/ openid/server/ openid/store/ openid/test/ openid/test/data/ openid/yadis/
Date Wed, 11 Oct 2006 23:22:36 GMT
Added: incubator/heraldry/libraries/python/openid/trunk/openid/test/test_fetchers.py
URL: http://svn.apache.org/viewvc/incubator/heraldry/libraries/python/openid/trunk/openid/test/test_fetchers.py?view=auto&rev=463060
==============================================================================
--- incubator/heraldry/libraries/python/openid/trunk/openid/test/test_fetchers.py (added)
+++ incubator/heraldry/libraries/python/openid/trunk/openid/test/test_fetchers.py Wed Oct
11 16:22:33 2006
@@ -0,0 +1,276 @@
+import unittest
+import sys
+import urllib2
+
+import helper
+from openid import fetchers
+
+# XXX: make these separate test cases
+
+def failUnlessResponseExpected(expected, actual):
+    assert expected.final_url == actual.final_url
+    assert expected.status == actual.status
+    assert expected.body == actual.body
+    got_headers = dict(actual.headers)
+    del got_headers['date']
+    del got_headers['server']
+    assert expected.headers == got_headers
+
+def test_fetcher(fetcher, exc, server):
+    def geturl(path):
+        return 'http://%s:%s%s' % (server.server_name,
+                                   server.socket.getsockname()[1],
+                                   path)
+
+    expected_headers = {'content-type':'text/plain'}
+
+    def plain(path, code):
+        path = '/' + path
+        expected = fetchers.HTTPResponse(
+            geturl(path), code, expected_headers, path)
+        return (path, expected)
+
+    expect_success = fetchers.HTTPResponse(
+        geturl('/success'), 200, expected_headers, '/success')
+    cases = [
+        ('/success', expect_success),
+        ('/301redirect', expect_success),
+        ('/302redirect', expect_success),
+        ('/303redirect', expect_success),
+        ('/307redirect', expect_success),
+        plain('notfound', 404),
+        plain('badreq', 400),
+        plain('forbidden', 403),
+        plain('error', 500),
+        plain('server_error', 503),
+        ]
+
+    for path, expected in cases:
+        fetch_url = geturl(path)
+        try:
+            actual = fetcher.fetch(fetch_url)
+        except (SystemExit, KeyboardInterrupt):
+            pass
+        except:
+            print fetcher, fetch_url
+            raise
+        else:
+            failUnlessResponseExpected(expected, actual)
+
+    for err_url in [geturl('/closed'),
+                    'http://invalid.janrain.com/',
+                    'not:a/url',
+                    'ftp://janrain.com/pub/']:
+        try:
+            result = fetcher.fetch(err_url)
+        except (KeyboardInterrupt, SystemExit):
+            raise
+        except fetchers.HTTPError, why:
+            # This is raised by the Curl fetcher for bad cases
+            # detected by the fetchers module, but it's a subclass of
+            # HTTPFetchingError, so we have to catch it explicitly.
+            assert exc
+        except fetchers.HTTPFetchingError, why:
+            assert not exc, (fetcher, exc, server)
+        except:
+            assert exc
+        else:
+            assert result is None, (fetcher, result)
+
+def run_fetcher_tests(server):
+    exc_fetchers = [fetchers.Urllib2Fetcher(),]
+    try:
+        exc_fetchers.append(fetchers.CurlHTTPFetcher())
+    except RuntimeError, why:
+        if why[0] == 'Cannot find pycurl library':
+            try:
+                import pycurl
+            except ImportError:
+                pass
+            else:
+                assert False, 'curl present but not detected'
+        else:
+            raise
+
+    non_exc_fetchers = []
+    for f in exc_fetchers:
+        non_exc_fetchers.append(fetchers.ExceptionWrappingFetcher(f))
+
+    for f in exc_fetchers:
+        test_fetcher(f, True, server)
+
+    for f in non_exc_fetchers:
+        test_fetcher(f, False, server)
+
+from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
+
+class FetcherTestHandler(BaseHTTPRequestHandler):
+    cases = {
+        '/success':(200, None),
+        '/301redirect':(301, '/success'),
+        '/302redirect':(302, '/success'),
+        '/303redirect':(303, '/success'),
+        '/307redirect':(307, '/success'),
+        '/notfound':(404, None),
+        '/badreq':(400, None),
+        '/forbidden':(403, None),
+        '/error':(500, None),
+        '/server_error':(503, None),
+        }
+
+    def log_request(self, *args):
+        pass
+
+    def do_GET(self):
+        if self.path == '/closed':
+            self.wfile.close()
+        else:
+            try:
+                http_code, location = self.cases[self.path]
+            except KeyError:
+                self.errorResponse('Bad path')
+            else:
+                extra_headers = [('Content-type', 'text/plain')]
+                if location is not None:
+                    base = ('http://%s:%s' % self.server.server_address)
+                    location = base + location
+                    extra_headers.append(('Location', location))
+                self._respond(http_code, extra_headers, self.path)
+
+    def do_POST(self):
+        try:
+            http_code, extra_headers = self.cases[self.path]
+        except KeyError:
+            self.errorResponse('Bad path')
+        else:
+            if http_code in [301, 302, 303, 307]:
+                self.errorResponse()
+            else:
+                content_type = self.headers.get('content-type', 'text/plain')
+                extra_headers.append(('Content-type', content_type))
+                content_length = int(self.headers.get('Content-length', '-1'))
+                body = self.rfile.read(content_length)
+                self._respond(http_code, extra_headers, body)
+
+    def errorResponse(self, message=None):
+        req = [
+            ('HTTP method', self.command),
+            ('path', self.path),
+            ]
+        if message:
+            req.append(('message', message))
+
+        body_parts = ['Bad request:\r\n']
+        for k, v in req:
+            body_parts.append(' %s: %s\r\n' % (k, v))
+        body = ''.join(body_parts)
+        self._respond(400, [('Content-type', 'text/plain')], body)
+
+    def _respond(self, http_code, extra_headers, body):
+        self.send_response(http_code)
+        for k, v in extra_headers:
+            self.send_header(k, v)
+        self.end_headers()
+        self.wfile.write(body)
+        self.wfile.close()
+
+    def finish(self):
+        if not self.wfile.closed:
+            self.wfile.flush()
+        self.wfile.close()
+        self.rfile.close()
+
+def test():
+    import socket
+    host = socket.getfqdn('127.0.0.1')
+    # When I use port 0 here, it works for the first fetch and the
+    # next one gets connection refused.  Bummer.  So instead, pick a
+    # port that's *probably* not in use.
+    import os
+    port = (os.getpid() % 31000) + 1024
+
+    server = HTTPServer((host, port), FetcherTestHandler)
+
+    import threading
+    server_thread = threading.Thread(target=server.serve_forever)
+    server_thread.setDaemon(True)
+    server_thread.start()
+
+    run_fetcher_tests(server)
+
+class FakeFetcher(object):
+    sentinel = object()
+
+    def fetch(self, *args, **kwargs):
+        return self.sentinel
+
+class DefaultFetcherTest(unittest.TestCase):
+    def setUp(self):
+        """reset the default fetcher to None"""
+        fetchers.setDefaultFetcher(None)
+
+    def tearDown(self):
+        """reset the default fetcher to None"""
+        fetchers.setDefaultFetcher(None)
+
+    def test_getDefaultNotNone(self):
+        """Make sure that None is never returned as a default fetcher"""
+        self.failUnless(fetchers.getDefaultFetcher() is not None)
+        fetchers.setDefaultFetcher(None)
+        self.failUnless(fetchers.getDefaultFetcher() is not None)
+
+    def test_setDefault(self):
+        """Make sure the getDefaultFetcher returns the object set for
+        setDefaultFetcher"""
+        sentinel = object()
+        fetchers.setDefaultFetcher(sentinel, wrap_exceptions=False)
+        self.failUnless(fetchers.getDefaultFetcher() is sentinel)
+
+    def test_callFetch(self):
+        """Make sure that fetchers.fetch() uses the default fetcher
+        instance that was set."""
+        fetchers.setDefaultFetcher(FakeFetcher())
+        actual = fetchers.fetch('bad://url')
+        self.failUnless(actual is FakeFetcher.sentinel)
+
+    def test_wrappedByDefault(self):
+        """Make sure that the default fetcher instance wraps
+        exceptions by default"""
+        default_fetcher = fetchers.getDefaultFetcher()
+        self.failUnless(isinstance(default_fetcher,
+                                   fetchers.ExceptionWrappingFetcher),
+                        default_fetcher)
+
+        self.failUnlessRaises(fetchers.HTTPFetchingError,
+                              fetchers.fetch, 'http://invalid.janrain.com/')
+
+    def test_notWrapped(self):
+        """Make sure that if we set a non-wrapped fetcher as default,
+        it will not wrap exceptions."""
+        # A fetcher that will raise an exception when it encounters a
+        # host that will not resolve
+        fetcher = fetchers.Urllib2Fetcher()
+        fetchers.setDefaultFetcher(fetcher, wrap_exceptions=False)
+
+        self.failIf(isinstance(fetchers.getDefaultFetcher(),
+                               fetchers.ExceptionWrappingFetcher))
+
+        try:
+            fetchers.fetch('http://invalid.janrain.com/')
+        except fetchers.HTTPFetchingError:
+            self.fail('Should not be wrapping exception')
+        except:
+            exc = sys.exc_info()[1]
+            self.failUnless(isinstance(exc, urllib2.URLError), exc)
+            pass
+        else:
+            self.fail('Should have raised an exception')
+
+def pyUnitTests():
+    case1 = unittest.FunctionTestCase(test)
+    loadTests = unittest.defaultTestLoader.loadTestsFromTestCase
+    case2 = loadTests(DefaultFetcherTest)
+    return unittest.TestSuite([case1, case2])
+
+if __name__ == '__main__':
+    helper.runAsMain()

Added: incubator/heraldry/libraries/python/openid/trunk/openid/test/test_message.py
URL: http://svn.apache.org/viewvc/incubator/heraldry/libraries/python/openid/trunk/openid/test/test_message.py?view=auto&rev=463060
==============================================================================
--- incubator/heraldry/libraries/python/openid/trunk/openid/test/test_message.py (added)
+++ incubator/heraldry/libraries/python/openid/trunk/openid/test/test_message.py Wed Oct 11
16:22:33 2006
@@ -0,0 +1,640 @@
+from openid import message
+
+import urllib
+import cgi
+import unittest
+import datadriven
+
+class EmptyMessageTest(unittest.TestCase):
+    def setUp(self):
+        self.msg = message.Message()
+
+    def test_toPostArgs(self):
+        self.failUnlessEqual(self.msg.toPostArgs(), {})
+
+    def test_toArgs(self):
+        self.failUnlessEqual(self.msg.toArgs(), {})
+
+    def test_toKVForm(self):
+        self.failUnlessEqual(self.msg.toKVForm(), '')
+
+    def test_toURLEncoded(self):
+        self.failUnlessEqual(self.msg.toURLEncoded(), '')
+
+    def test_toURL(self):
+        base_url = 'http://base.url/'
+        self.failUnlessEqual(self.msg.toURL(base_url), base_url)
+
+    def test_getOpenID(self):
+        self.failUnlessEqual(self.msg.getOpenIDNamespace(), None)
+
+    def test_getKeyOpenID(self):
+        # Could reasonably return None instead of raising an
+        # exception. I'm not sure which one is more right, since this
+        # case should only happen when you're building a message from
+        # scratch and so have no default namespace.
+        self.failUnlessRaises(message.UndefinedOpenIDNamespace,
+                              self.msg.getKey, message.OPENID_NS, 'foo')
+
+    def test_getKeyBARE(self):
+        self.failUnlessEqual(self.msg.getKey(message.BARE_NS, 'foo'), 'foo')
+
+    def test_getKeyNS1(self):
+        self.failUnlessEqual(self.msg.getKey(message.OPENID1_NS, 'foo'), None)
+
+    def test_getKeyNS2(self):
+        self.failUnlessEqual(self.msg.getKey(message.OPENID2_NS, 'foo'), None)
+
+    def test_getKeyNS3(self):
+        self.failUnlessEqual(self.msg.getKey('urn:nothing-significant', 'foo'),
+                             None)
+
+    def test_hasKey(self):
+        # Could reasonably return False instead of raising an
+        # exception. I'm not sure which one is more right, since this
+        # case should only happen when you're building a message from
+        # scratch and so have no default namespace.
+        self.failUnlessRaises(message.UndefinedOpenIDNamespace,
+                              self.msg.hasKey, message.OPENID_NS, 'foo')
+
+    def test_hasKeyBARE(self):
+        self.failUnlessEqual(self.msg.hasKey(message.BARE_NS, 'foo'), False)
+
+    def test_hasKeyNS1(self):
+        self.failUnlessEqual(self.msg.hasKey(message.OPENID1_NS, 'foo'), False)
+
+    def test_hasKeyNS2(self):
+        self.failUnlessEqual(self.msg.hasKey(message.OPENID2_NS, 'foo'), False)
+
+    def test_hasKeyNS3(self):
+        self.failUnlessEqual(self.msg.hasKey('urn:nothing-significant', 'foo'),
+                             False)
+
+    def test_getArg(self):
+        # Could reasonably return None instead of raising an
+        # exception. I'm not sure which one is more right, since this
+        # case should only happen when you're building a message from
+        # scratch and so have no default namespace.
+        self.failUnlessRaises(message.UndefinedOpenIDNamespace,
+                              self.msg.getArg, message.OPENID_NS, 'foo')
+
+    def test_getArgBARE(self):
+        self.failUnlessEqual(self.msg.getArg(message.BARE_NS, 'foo'), None)
+
+    def test_getArgNS1(self):
+        self.failUnlessEqual(self.msg.getArg(message.OPENID1_NS, 'foo'), None)
+
+    def test_getArgNS2(self):
+        self.failUnlessEqual(self.msg.getArg(message.OPENID2_NS, 'foo'), None)
+
+    def test_getArgNS3(self):
+        self.failUnlessEqual(self.msg.getArg('urn:nothing-significant', 'foo'),
+                             None)
+
+    def test_getArgs(self):
+        # Could reasonably return {} instead of raising an
+        # exception. I'm not sure which one is more right, since this
+        # case should only happen when you're building a message from
+        # scratch and so have no default namespace.
+        self.failUnlessRaises(message.UndefinedOpenIDNamespace,
+                              self.msg.getArgs, message.OPENID_NS)
+
+    def test_getArgsBARE(self):
+        self.failUnlessEqual(self.msg.getArgs(message.BARE_NS), {})
+
+    def test_getArgsNS1(self):
+        self.failUnlessEqual(self.msg.getArgs(message.OPENID1_NS), {})
+
+    def test_getArgsNS2(self):
+        self.failUnlessEqual(self.msg.getArgs(message.OPENID2_NS), {})
+
+    def test_getArgsNS3(self):
+        self.failUnlessEqual(self.msg.getArgs('urn:nothing-significant'), {})
+
+    def test_updateArgs(self):
+        self.failUnlessRaises(message.UndefinedOpenIDNamespace,
+                              self.msg.updateArgs, message.OPENID_NS,
+                              {'does not':'matter'})
+
+    def _test_updateArgsNS(self, ns):
+        update_args = {
+            'Camper van Beethoven':'David Lowery',
+            'Magnolia Electric Co.':'Jason Molina',
+            }
+
+        self.failUnlessEqual(self.msg.getArgs(ns), {})
+        self.msg.updateArgs(ns, update_args)
+        self.failUnlessEqual(self.msg.getArgs(ns), update_args)
+
+    def test_updateArgsBARE(self):
+        self._test_updateArgsNS(message.BARE_NS)
+
+    def test_updateArgsNS1(self):
+        self._test_updateArgsNS(message.OPENID1_NS)
+
+    def test_updateArgsNS2(self):
+        self._test_updateArgsNS(message.OPENID2_NS)
+
+    def test_updateArgsNS3(self):
+        self._test_updateArgsNS('urn:nothing-significant')
+
+    def test_setArg(self):
+        self.failUnlessRaises(message.UndefinedOpenIDNamespace,
+                              self.msg.setArg, message.OPENID_NS,
+                              'does not', 'matter')
+
+    def _test_setArgNS(self, ns):
+        key = 'Camper van Beethoven'
+        value = 'David Lowery'
+        self.failUnlessEqual(self.msg.getArg(ns, key), None)
+        self.msg.setArg(ns, key, value)
+        self.failUnlessEqual(self.msg.getArg(ns, key), value)
+
+    def test_setArgBARE(self):
+        self._test_setArgNS(message.BARE_NS)
+
+    def test_setArgNS1(self):
+        self._test_setArgNS(message.OPENID1_NS)
+
+    def test_setArgNS2(self):
+        self._test_setArgNS(message.OPENID2_NS)
+
+    def test_setArgNS3(self):
+        self._test_setArgNS('urn:nothing-significant')
+
+    def test_delArg(self):
+        # Could reasonably raise KeyError instead of raising
+        # UndefinedOpenIDNamespace. I'm not sure which one is more
+        # right, since this case should only happen when you're
+        # building a message from scratch and so have no default
+        # namespace.
+        self.failUnlessRaises(message.UndefinedOpenIDNamespace,
+                              self.msg.setArg, message.OPENID_NS,
+                              'does not', 'matter')
+
+    def _test_delArgNS(self, ns):
+        key = 'Camper van Beethoven'
+        self.failUnlessRaises(KeyError, self.msg.delArg, ns, key)
+
+    def test_delArgBARE(self):
+        self._test_delArgNS(message.BARE_NS)
+
+    def test_delArgNS1(self):
+        self._test_delArgNS(message.OPENID1_NS)
+
+    def test_delArgNS2(self):
+        self._test_delArgNS(message.OPENID2_NS)
+
+    def test_delArgNS3(self):
+        self._test_delArgNS('urn:nothing-significant')
+
+
+class OpenID1MessageTest(unittest.TestCase):
+    def setUp(self):
+        self.msg = message.Message.fromPostArgs({'openid.mode':'error',
+                                                 'openid.error':'unit test'})
+
+    def test_toPostArgs(self):
+        self.failUnlessEqual(self.msg.toPostArgs(),
+                             {'openid.mode':'error',
+                              'openid.error':'unit test'})
+
+    def test_toArgs(self):
+        self.failUnlessEqual(self.msg.toArgs(), {'mode':'error',
+                                                 'error':'unit test'})
+
+    def test_toKVForm(self):
+        self.failUnlessEqual(self.msg.toKVForm(),
+                             'error:unit test\nmode:error\n')
+
+    def test_toURLEncoded(self):
+        self.failUnlessEqual(self.msg.toURLEncoded(),
+                             'openid.error=unit+test&openid.mode=error')
+
+    def test_toURL(self):
+        base_url = 'http://base.url/'
+        actual = self.msg.toURL(base_url)
+        actual_base = actual[:len(base_url)]
+        self.failUnlessEqual(actual_base, base_url)
+        self.failUnlessEqual(actual[len(base_url)], '?')
+        query = actual[len(base_url) + 1:]
+        parsed = cgi.parse_qs(query)
+        self.failUnlessEqual(parsed, {'openid.mode':['error'],
+                                      'openid.error':['unit test']})
+
+    def test_getOpenID(self):
+        self.failUnlessEqual(self.msg.getOpenIDNamespace(), message.OPENID1_NS)
+
+    def test_getKeyOpenID(self):
+        self.failUnlessEqual(self.msg.getKey(message.OPENID_NS, 'mode'),
+                             'openid.mode')
+
+    def test_getKeyBARE(self):
+        self.failUnlessEqual(self.msg.getKey(message.BARE_NS, 'mode'), 'mode')
+
+    def test_getKeyNS1(self):
+        self.failUnlessEqual(
+            self.msg.getKey(message.OPENID1_NS, 'mode'), 'openid.mode')
+
+    def test_getKeyNS2(self):
+        self.failUnlessEqual(self.msg.getKey(message.OPENID2_NS, 'mode'), None)
+
+    def test_getKeyNS3(self):
+        self.failUnlessEqual(
+            self.msg.getKey('urn:nothing-significant', 'mode'), None)
+
+    def test_hasKey(self):
+        self.failUnlessEqual(self.msg.hasKey(message.OPENID_NS, 'mode'), True)
+
+    def test_hasKeyBARE(self):
+        self.failUnlessEqual(self.msg.hasKey(message.BARE_NS, 'mode'), False)
+
+    def test_hasKeyNS1(self):
+        self.failUnlessEqual(self.msg.hasKey(message.OPENID1_NS, 'mode'), True)
+
+    def test_hasKeyNS2(self):
+        self.failUnlessEqual(
+            self.msg.hasKey(message.OPENID2_NS, 'mode'), False)
+
+    def test_hasKeyNS3(self):
+        self.failUnlessEqual(
+            self.msg.hasKey('urn:nothing-significant', 'mode'), False)
+
+    def test_getArg(self):
+        self.failUnlessEqual(self.msg.getArg(message.OPENID_NS, 'mode'),
+                             'error')
+
+    def test_getArgBARE(self):
+        self.failUnlessEqual(self.msg.getArg(message.BARE_NS, 'mode'), None)
+
+    def test_getArgNS1(self):
+        self.failUnlessEqual(self.msg.getArg(message.OPENID1_NS, 'mode'),
+                             'error')
+
+    def test_getArgNS2(self):
+        self.failUnlessEqual(self.msg.getArg(message.OPENID2_NS, 'mode'), None)
+
+    def test_getArgNS3(self):
+        self.failUnlessEqual(
+            self.msg.getArg('urn:nothing-significant', 'mode'), None)
+
+    def test_getArgs(self):
+        self.failUnlessEqual(self.msg.getArgs(message.OPENID_NS),
+                             {'mode':'error',
+                              'error':'unit test',
+                              })
+
+    def test_getArgsBARE(self):
+        self.failUnlessEqual(self.msg.getArgs(message.BARE_NS), {})
+
+    def test_getArgsNS1(self):
+        self.failUnlessEqual(self.msg.getArgs(message.OPENID1_NS),
+                             {'mode':'error',
+                              'error':'unit test',
+                              })
+
+    def test_getArgsNS2(self):
+        self.failUnlessEqual(self.msg.getArgs(message.OPENID2_NS), {})
+
+    def test_getArgsNS3(self):
+        self.failUnlessEqual(self.msg.getArgs('urn:nothing-significant'), {})
+
+    def _test_updateArgsNS(self, ns, before=None):
+        if before is None:
+            before = {}
+        update_args = {
+            'Camper van Beethoven':'David Lowery',
+            'Magnolia Electric Co.':'Jason Molina',
+            }
+
+        self.failUnlessEqual(self.msg.getArgs(ns), before)
+        self.msg.updateArgs(ns, update_args)
+        after = dict(before)
+        after.update(update_args)
+        self.failUnlessEqual(self.msg.getArgs(ns), after)
+
+    def test_updateArgs(self):
+        self._test_updateArgsNS(message.OPENID_NS,
+                                before={'mode':'error', 'error':'unit test'})
+
+    def test_updateArgsBARE(self):
+        self._test_updateArgsNS(message.BARE_NS)
+
+    def test_updateArgsNS1(self):
+        self._test_updateArgsNS(message.OPENID1_NS,
+                                before={'mode':'error', 'error':'unit test'})
+
+    def test_updateArgsNS2(self):
+        self._test_updateArgsNS(message.OPENID2_NS)
+
+    def test_updateArgsNS3(self):
+        self._test_updateArgsNS('urn:nothing-significant')
+
+    def _test_setArgNS(self, ns):
+        key = 'Camper van Beethoven'
+        value = 'David Lowery'
+        self.failUnlessEqual(self.msg.getArg(ns, key), None)
+        self.msg.setArg(ns, key, value)
+        self.failUnlessEqual(self.msg.getArg(ns, key), value)
+
+    def test_setArg(self):
+        self._test_setArgNS(message.OPENID_NS)
+
+    def test_setArgBARE(self):
+        self._test_setArgNS(message.BARE_NS)
+
+    def test_setArgNS1(self):
+        self._test_setArgNS(message.OPENID1_NS)
+
+    def test_setArgNS2(self):
+        self._test_setArgNS(message.OPENID2_NS)
+
+    def test_setArgNS3(self):
+        self._test_setArgNS('urn:nothing-significant')
+
+    def _test_delArgNS(self, ns):
+        key = 'Camper van Beethoven'
+        value = 'David Lowery'
+
+        self.failUnlessRaises(KeyError, self.msg.delArg, ns, key)
+        self.msg.setArg(ns, key, value)
+        self.failUnlessEqual(self.msg.getArg(ns, key), value)
+        self.msg.delArg(ns, key)
+        self.failUnlessEqual(self.msg.getArg(ns, key), None)
+
+    def test_delArg(self):
+        self._test_delArgNS(message.OPENID_NS)
+
+    def test_delArgBARE(self):
+        self._test_delArgNS(message.BARE_NS)
+
+    def test_delArgNS1(self):
+        self._test_delArgNS(message.OPENID1_NS)
+
+    def test_delArgNS2(self):
+        self._test_delArgNS(message.OPENID2_NS)
+
+    def test_delArgNS3(self):
+        self._test_delArgNS('urn:nothing-significant')
+
+
+class OpenID1ExplicitMessageTest(OpenID1MessageTest):
+    def setUp(self):
+        self.msg = message.Message.fromPostArgs({'openid.mode':'error',
+                                                 'openid.error':'unit test',
+                                                 'openid.ns':message.OPENID1_NS
+                                                 })
+
+
+class OpenID2MessageTest(unittest.TestCase):
+    def setUp(self):
+        self.msg = message.Message.fromPostArgs({'openid.mode':'error',
+                                                 'openid.error':'unit test',
+                                                 'openid.ns':message.OPENID2_NS
+                                                 })
+        self.msg.setArg(message.BARE_NS, "xey", "value")
+
+    def test_toPostArgs(self):
+        self.failUnlessEqual(self.msg.toPostArgs(),
+                             {'openid.mode':'error',
+                              'openid.error':'unit test',
+                              'openid.ns':message.OPENID2_NS,
+                              'xey': 'value',
+                              })
+
+    def test_toArgs(self):
+        # This method can't tolerate BARE_NS.
+        self.msg.delArg(message.BARE_NS, "xey")
+        self.failUnlessEqual(self.msg.toArgs(), {'mode':'error',
+                                                 'error':'unit test',
+                                                 'ns':message.OPENID2_NS,
+                                                 })
+
+    def test_toKVForm(self):
+        # Can't tolerate BARE_NS in kvform
+        self.msg.delArg(message.BARE_NS, "xey")
+        self.failUnlessEqual(self.msg.toKVForm(),
+                             'error:unit test\nmode:error\nns:%s\n' %
+                             (message.OPENID2_NS,))
+
+    def _test_urlencoded(self, s):
+        expected = ('openid.error=unit+test&openid.mode=error&'
+                    'openid.ns=%s&xey=value' % (
+            urllib.quote(message.OPENID2_NS, ''),))
+        self.failUnlessEqual(s, expected)
+        
+
+    def test_toURLEncoded(self):
+        self._test_urlencoded(self.msg.toURLEncoded())
+
+    def test_toURL(self):
+        base_url = 'http://base.url/'
+        actual = self.msg.toURL(base_url)
+        actual_base = actual[:len(base_url)]
+        self.failUnlessEqual(actual_base, base_url)
+        self.failUnlessEqual(actual[len(base_url)], '?')
+        query = actual[len(base_url) + 1:]
+        self._test_urlencoded(query)
+
+    def test_getOpenID(self):
+        self.failUnlessEqual(self.msg.getOpenIDNamespace(), message.OPENID2_NS)
+
+    def test_getKeyOpenID(self):
+        self.failUnlessEqual(self.msg.getKey(message.OPENID_NS, 'mode'),
+                             'openid.mode')
+
+    def test_getKeyBARE(self):
+        self.failUnlessEqual(self.msg.getKey(message.BARE_NS, 'mode'), 'mode')
+
+    def test_getKeyNS1(self):
+        self.failUnlessEqual(
+            self.msg.getKey(message.OPENID1_NS, 'mode'), None)
+
+    def test_getKeyNS2(self):
+        self.failUnlessEqual(
+            self.msg.getKey(message.OPENID2_NS, 'mode'), 'openid.mode')
+
+    def test_getKeyNS3(self):
+        self.failUnlessEqual(
+            self.msg.getKey('urn:nothing-significant', 'mode'), None)
+
+    def test_hasKeyOpenID(self):
+        self.failUnlessEqual(self.msg.hasKey(message.OPENID_NS, 'mode'), True)
+
+    def test_hasKeyBARE(self):
+        self.failUnlessEqual(self.msg.hasKey(message.BARE_NS, 'mode'), False)
+
+    def test_hasKeyNS1(self):
+        self.failUnlessEqual(
+            self.msg.hasKey(message.OPENID1_NS, 'mode'), False)
+
+    def test_hasKeyNS2(self):
+        self.failUnlessEqual(
+            self.msg.hasKey(message.OPENID2_NS, 'mode'), True)
+
+    def test_hasKeyNS3(self):
+        self.failUnlessEqual(
+            self.msg.hasKey('urn:nothing-significant', 'mode'), False)
+
+    def test_getArgOpenID(self):
+        self.failUnlessEqual(self.msg.getArg(message.OPENID_NS, 'mode'),
+                             'error')
+
+    def test_getArgBARE(self):
+        self.failUnlessEqual(self.msg.getArg(message.BARE_NS, 'mode'), None)
+
+    def test_getArgNS1(self):
+        self.failUnlessEqual(self.msg.getArg(message.OPENID1_NS, 'mode'), None)
+
+    def test_getArgNS2(self):
+        self.failUnlessEqual(self.msg.getArg(message.OPENID2_NS, 'mode'),
+                             'error')
+
+    def test_getArgNS3(self):
+        self.failUnlessEqual(
+            self.msg.getArg('urn:nothing-significant', 'mode'), None)
+
+    def test_getArgsOpenID(self):
+        self.failUnlessEqual(self.msg.getArgs(message.OPENID_NS),
+                             {'mode':'error',
+                              'error':'unit test',
+                              })
+
+    def test_getArgsBARE(self):
+        self.failUnlessEqual(self.msg.getArgs(message.BARE_NS),
+                             {'xey': 'value'})
+
+    def test_getArgsNS1(self):
+        self.failUnlessEqual(self.msg.getArgs(message.OPENID1_NS), {})
+
+    def test_getArgsNS2(self):
+        self.failUnlessEqual(self.msg.getArgs(message.OPENID2_NS),
+                             {'mode':'error',
+                              'error':'unit test',
+                              })
+
+    def test_getArgsNS3(self):
+        self.failUnlessEqual(self.msg.getArgs('urn:nothing-significant'), {})
+
+    def _test_updateArgsNS(self, ns, before=None):
+        if before is None:
+            before = {}
+        update_args = {
+            'Camper van Beethoven':'David Lowery',
+            'Magnolia Electric Co.':'Jason Molina',
+            }
+
+        self.failUnlessEqual(self.msg.getArgs(ns), before)
+        self.msg.updateArgs(ns, update_args)
+        after = dict(before)
+        after.update(update_args)
+        self.failUnlessEqual(self.msg.getArgs(ns), after)
+
+    def test_updateArgsOpenID(self):
+        self._test_updateArgsNS(message.OPENID_NS,
+                                before={'mode':'error', 'error':'unit test'})
+
+    def test_updateArgsBARE(self):
+        self._test_updateArgsNS(message.BARE_NS,
+                                before={'xey':'value'})
+
+    def test_updateArgsNS1(self):
+        self._test_updateArgsNS(message.OPENID1_NS)
+
+    def test_updateArgsNS2(self):
+        self._test_updateArgsNS(message.OPENID2_NS,
+                                before={'mode':'error', 'error':'unit test'})
+
+    def test_updateArgsNS3(self):
+        self._test_updateArgsNS('urn:nothing-significant')
+
+    def _test_setArgNS(self, ns):
+        key = 'Camper van Beethoven'
+        value = 'David Lowery'
+        self.failUnlessEqual(self.msg.getArg(ns, key), None)
+        self.msg.setArg(ns, key, value)
+        self.failUnlessEqual(self.msg.getArg(ns, key), value)
+
+    def test_setArgOpenID(self):
+        self._test_setArgNS(message.OPENID_NS)
+
+    def test_setArgBARE(self):
+        self._test_setArgNS(message.BARE_NS)
+
+    def test_setArgNS1(self):
+        self._test_setArgNS(message.OPENID1_NS)
+
+    def test_setArgNS2(self):
+        self._test_setArgNS(message.OPENID2_NS)
+
+    def test_setArgNS3(self):
+        self._test_setArgNS('urn:nothing-significant')
+
+    def _test_delArgNS(self, ns):
+        key = 'Camper van Beethoven'
+        value = 'David Lowery'
+
+        self.failUnlessRaises(KeyError, self.msg.delArg, ns, key)
+        self.msg.setArg(ns, key, value)
+        self.failUnlessEqual(self.msg.getArg(ns, key), value)
+        self.msg.delArg(ns, key)
+        self.failUnlessEqual(self.msg.getArg(ns, key), None)
+
+    def test_delArgOpenID(self):
+        self._test_delArgNS(message.OPENID_NS)
+
+    def test_delArgBARE(self):
+        self._test_delArgNS(message.BARE_NS)
+
+    def test_delArgNS1(self):
+        self._test_delArgNS(message.OPENID1_NS)
+
+    def test_delArgNS2(self):
+        self._test_delArgNS(message.OPENID2_NS)
+
+    def test_delArgNS3(self):
+        self._test_delArgNS('urn:nothing-significant')
+
+class NamespaceMapTest(unittest.TestCase):
+    def test_onealias(self):
+        nsm = message.NamespaceMap()
+        uri = 'http://example.com/foo'
+        alias = "foo"
+        nsm.addAlias(uri, alias)
+        self.failUnless(nsm.getNamespaceURI(alias) == uri)
+        self.failUnless(nsm.getAlias(uri) == alias)
+
+    def test_iteration(self):
+        nsm = message.NamespaceMap()
+        uripat = 'http://example.com/foo%r'
+        
+        nsm.add(uripat%0)
+        for n in range(1,23):
+            self.failUnless(uripat%(n-1) in nsm)
+            self.failUnless(nsm.isDefined(uripat%(n-1)))
+            nsm.add(uripat%n)
+
+        for (uri, alias) in nsm.iteritems():
+            self.failUnless(uri[22:]==alias)
+
+        i=0
+        it = nsm.iterAliases() 
+        try:
+            while True:
+                it.next()
+                i += 1
+        except StopIteration:
+            self.failUnless(i == 23)
+
+        i=0
+        it = nsm.iterNamespaceURIs() 
+        try:
+            while True:
+                it.next()
+                i += 1
+        except StopIteration:
+            self.failUnless(i == 23)
+
+            
+if __name__ == '__main__':
+    unittest.main()

Added: incubator/heraldry/libraries/python/openid/trunk/openid/test/test_nonce.py
URL: http://svn.apache.org/viewvc/incubator/heraldry/libraries/python/openid/trunk/openid/test/test_nonce.py?view=auto&rev=463060
==============================================================================
--- incubator/heraldry/libraries/python/openid/trunk/openid/test/test_nonce.py (added)
+++ incubator/heraldry/libraries/python/openid/trunk/openid/test/test_nonce.py Wed Oct 11
16:22:33 2006
@@ -0,0 +1,104 @@
+from openid.test import datadriven
+import time
+import unittest
+import re
+
+from openid.store.nonce import \
+     mkNonce, \
+     split as splitNonce, \
+     checkTimestamp
+
+nonce_re = re.compile(r'\A\d{4}-\d\d-\d\dT\d\d:\d\d:\d\dZ')
+
+class NonceTest(unittest.TestCase):
+    def test_mkNonce(self):
+        nonce = mkNonce()
+        self.failUnless(nonce_re.match(nonce))
+        self.failUnless(len(nonce) == 26)
+
+    def test_mkNonce_when(self):
+        nonce = mkNonce(0)
+        self.failUnless(nonce_re.match(nonce))
+        self.failUnless(nonce.startswith('1970-01-01T00:00:00Z'))
+        self.failUnless(len(nonce) == 26)
+
+    def test_splitNonce(self):
+        s = '1970-01-01T00:00:00Z'
+        expected_t = 0
+        expected_salt = ''
+        actual_t, actual_salt = splitNonce(s)
+        self.failUnlessEqual(expected_t, actual_t)
+        self.failUnlessEqual(expected_salt, actual_salt)
+
+    def test_mkSplit(self):
+        t = 42
+        nonce_str = mkNonce(t)
+        self.failUnless(nonce_re.match(nonce_str))
+        et, salt = splitNonce(nonce_str)
+        self.failUnlessEqual(len(salt), 6)
+        self.failUnlessEqual(et, t)
+
+class BadSplitTest(datadriven.DataDrivenTestCase):
+    cases = [
+        '',
+        '1970-01-01T00:00:00+1:00',
+        '1969-01-01T00:00:00Z',
+        '1970-00-01T00:00:00Z',
+        '1970.01-01T00:00:00Z',
+        'Thu Sep  7 13:29:31 PDT 2006',
+        'monkeys',
+        ]
+
+    def __init__(self, nonce_str):
+        datadriven.DataDrivenTestCase.__init__(self, nonce_str)
+        self.nonce_str = nonce_str
+
+    def runOneTest(self):
+        self.failUnlessRaises(ValueError, splitNonce, self.nonce_str)
+
+class CheckTimestampTest(datadriven.DataDrivenTestCase):
+    cases = [
+        # exact, no allowed skew
+        ('1970-01-01T00:00:00Z', 0, 0, True),
+
+        # exact, large skew
+        ('1970-01-01T00:00:00Z', 1000, 0, True),
+
+        # no allowed skew, one second old
+        ('1970-01-01T00:00:00Z', 0, 1, False),
+
+        # many seconds old, outside of skew
+        ('1970-01-01T00:00:00Z', 10, 50, False),
+
+        # one second old, one second skew allowed
+        ('1970-01-01T00:00:00Z', 1, 1, True),
+
+        # One second in the future, one second skew allowed
+        ('1970-01-01T00:00:02Z', 1, 1, True),
+
+        # two seconds in the future, one second skew allowed
+        ('1970-01-01T00:00:02Z', 1, 0, False),
+
+        # malformed nonce string
+        ('monkeys', 0, 0, False),
+        ]
+
+    def __init__(self, nonce_string, allowed_skew, now, expected):
+        datadriven.DataDrivenTestCase.__init__(
+            self, repr((nonce_string, allowed_skew, now)))
+        self.nonce_string = nonce_string
+        self.allowed_skew = allowed_skew
+        self.now = now
+        self.expected = expected
+
+    def runOneTest(self):
+        actual = checkTimestamp(self.nonce_string, self.allowed_skew, self.now)
+        self.failUnlessEqual(bool(self.expected), bool(actual))
+
+def pyUnitTests():
+    return datadriven.loadTests(__name__)
+
+if __name__ == '__main__':
+    suite = pyUnitTests()
+    runner = unittest.TextTestRunner()
+    runner.run(suite)

Modified: incubator/heraldry/libraries/python/openid/trunk/openid/test/test_openidyadis.py
URL: http://svn.apache.org/viewvc/incubator/heraldry/libraries/python/openid/trunk/openid/test/test_openidyadis.py?view=diff&rev=463060&r1=463059&r2=463060
==============================================================================
--- incubator/heraldry/libraries/python/openid/trunk/openid/test/test_openidyadis.py (original)
+++ incubator/heraldry/libraries/python/openid/trunk/openid/test/test_openidyadis.py Wed Oct
11 16:22:33 2006
@@ -2,7 +2,7 @@
 from openid.consumer.discover import \
      OpenIDServiceEndpoint, OPENID_1_2_TYPE, OPENID_1_1_TYPE, OPENID_1_0_TYPE
 
-from yadis.services import applyFilter
+from openid.yadis.services import applyFilter
 
 
 XRDS_BOILERPLATE = '''\

Added: incubator/heraldry/libraries/python/openid/trunk/openid/test/test_parsehtml.py
URL: http://svn.apache.org/viewvc/incubator/heraldry/libraries/python/openid/trunk/openid/test/test_parsehtml.py?view=auto&rev=463060
==============================================================================
--- incubator/heraldry/libraries/python/openid/trunk/openid/test/test_parsehtml.py (added)
+++ incubator/heraldry/libraries/python/openid/trunk/openid/test/test_parsehtml.py Wed Oct
11 16:22:33 2006
@@ -0,0 +1,82 @@
+from openid.yadis.parsehtml import YadisHTMLParser, ParseDone
+from HTMLParser import HTMLParseError
+
+import os.path, unittest, sys
+
+class _TestCase(unittest.TestCase):
+    reserved_values = ['None', 'EOF']
+
+    def __init__(self, filename, testname, expected, case):
+        self.filename = filename
+        self.testname = testname
+        self.expected = expected
+        self.case = case
+        unittest.TestCase.__init__(self)
+
+    def runTest(self):
+        p = YadisHTMLParser()
+        try:
+            p.feed(self.case)
+        except ParseDone, why:
+            found = why[0]
+
+            # make sure we protect outselves against accidental bogus
+            # test cases
+            assert found not in self.reserved_values
+
+            # convert to a string
+            if found is None:
+                found = 'None'
+
+            msg = "%r != %r for case %s" % (found, self.expected, self.case)
+            self.failUnlessEqual(found, self.expected, msg)
+        except HTMLParseError:
+            assert self.expected == 'None'
+        else:
+            self.failUnless(self.expected == 'EOF', (self.case, self.expected))
+
+    def shortDescription(self):
+        return "%s (%s<%s>)" % (
+            self.testname,
+            self.__class__.__module__,
+            os.path.basename(self.filename))
+
+def parseCases(data):
+    cases = []
+    for chunk in data.split('\f\n'):
+        expected, case = chunk.split('\n', 1)
+        cases.append((expected, case))
+    return cases
+
+def pyUnitTests():
+    """Make a pyunit TestSuite from a file defining test cases."""
+    s = unittest.TestSuite()
+    for (filename, test_num, expected, case) in getCases():
+        s.addTest(_TestCase(filename, str(test_num), expected, case))
+    return s
+
+def test():
+    runner = unittest.TextTestRunner()
+    return runner.run(loadTests())
+
+filenames = ['data/test1-parsehtml.txt']
+
+default_test_files = []
+base = os.path.dirname(__file__)
+for filename in filenames:
+    full_name = os.path.join(base, filename)
+    default_test_files.append(full_name)
+
+def getCases(test_files=default_test_files):
+    cases = []
+    for filename in test_files:
+        test_num = 0
+        data = file(filename).read()
+        for expected, case in parseCases(data):
+            test_num += 1
+            cases.append((filename, test_num, expected, case))
+    return cases
+
+
+if __name__ == '__main__':
+    sys.exit(not test().wasSuccessful())



Mime
View raw message