qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject [09/11] qpid-proton git commit: PROTON-1885: [python] move tests/python to python/tests
Date Fri, 06 Jul 2018 16:59:21 GMT
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9778eda8/python/tests/proton_tests/interop.py
----------------------------------------------------------------------
diff --git a/python/tests/proton_tests/interop.py b/python/tests/proton_tests/interop.py
new file mode 100644
index 0000000..fe62c02
--- /dev/null
+++ b/python/tests/proton_tests/interop.py
@@ -0,0 +1,147 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from proton import *
+from . import common
+import os
+
+
+def find_test_interop_dir():
+    """Find the common tests directory relative to this script"""
+    from os.path import dirname, join, abspath, isdir
+    f = dirname(dirname(dirname(dirname(abspath(__file__)))))
+    f = join(f, "tests", "interop")
+    if not isdir(f):
+        raise Exception("Cannot find tests/interop directory from "+__file__)
+    return f
+
+test_interop_dir=find_test_interop_dir()
+
+class InteropTest(common.Test):
+
+    def setUp(self):
+        self.data = Data()
+        self.message = Message()
+
+    def tearDown(self):
+        self.data = None
+
+    def get_data(self, name):
+        filename = os.path.join(test_interop_dir, name+".amqp")
+        f = open(filename,"rb")
+        try: return f.read()
+        finally: f.close()
+
+    def decode_data(self, encoded):
+        buffer = encoded
+        while buffer:
+            n = self.data.decode(buffer)
+            buffer = buffer[n:]
+        self.data.rewind()
+
+    def decode_data_file(self, name):
+        encoded = self.get_data(name)
+        self.decode_data(encoded)
+        encoded_size = self.data.encoded_size()
+        # Re-encode and verify pre-computed and actual encoded size match.
+        reencoded = self.data.encode()
+        assert encoded_size == len(reencoded), "%d != %d" % (encoded_size, len(reencoded))
+        # verify round trip bytes
+        assert reencoded == encoded, "Value mismatch: %s != %s" % (reencoded, encoded)
+
+    def decode_message_file(self, name):
+        self.message.decode(self.get_data(name))
+        body = self.message.body
+        if str(type(body)) == "<type 'org.apache.qpid.proton.amqp.Binary'>":
+            body = body.array.tostring()
+        self.decode_data(body)
+
+    def assert_next(self, type, value):
+        next_type = self.data.next()
+        assert next_type == type, "Type mismatch: %s != %s"%(
+            Data.type_names[next_type], Data.type_names[type])
+        next_value = self.data.get_object()
+        assert next_value == value, "Value mismatch: %s != %s"%(next_value, value)
+
+    def test_message(self):
+        self.decode_message_file("message")
+        self.assert_next(Data.STRING, "hello")
+        assert self.data.next() is None
+
+    def test_primitives(self):
+        self.decode_data_file("primitives")
+        self.assert_next(Data.BOOL, True)
+        self.assert_next(Data.BOOL, False)
+        self.assert_next(Data.UBYTE, 42)
+        self.assert_next(Data.USHORT, 42)
+        self.assert_next(Data.SHORT, -42)
+        self.assert_next(Data.UINT, 12345)
+        self.assert_next(Data.INT, -12345)
+        self.assert_next(Data.ULONG, 12345)
+        self.assert_next(Data.LONG, -12345)
+        self.assert_next(Data.FLOAT, 0.125)
+        self.assert_next(Data.DOUBLE, 0.125)
+        assert self.data.next() is None
+
+    def test_strings(self):
+        self.decode_data_file("strings")
+        self.assert_next(Data.BINARY, b"abc\0defg")
+        self.assert_next(Data.STRING, "abcdefg")
+        self.assert_next(Data.SYMBOL, "abcdefg")
+        self.assert_next(Data.BINARY, b"")
+        self.assert_next(Data.STRING, "")
+        self.assert_next(Data.SYMBOL, "")
+        assert self.data.next() is None
+
+    def test_described(self):
+        self.decode_data_file("described")
+        self.assert_next(Data.DESCRIBED, Described("foo-descriptor", "foo-value"))
+        self.data.exit()
+
+        assert self.data.next() == Data.DESCRIBED
+        self.data.enter()
+        self.assert_next(Data.INT, 12)
+        self.assert_next(Data.INT, 13)
+        self.data.exit()
+
+        assert self.data.next() is None
+
+    def test_described_array(self):
+        self.decode_data_file("described_array")
+        self.assert_next(Data.ARRAY, Array("int-array", Data.INT, *range(0,10)))
+
+    def test_arrays(self):
+        self.decode_data_file("arrays")
+        self.assert_next(Data.ARRAY, Array(UNDESCRIBED, Data.INT, *range(0,100)))
+        self.assert_next(Data.ARRAY, Array(UNDESCRIBED, Data.STRING, *["a", "b", "c"]))
+        self.assert_next(Data.ARRAY, Array(UNDESCRIBED, Data.INT))
+        assert self.data.next() is None
+
+    def test_lists(self):
+        self.decode_data_file("lists")
+        self.assert_next(Data.LIST, [32, "foo", True])
+        self.assert_next(Data.LIST, [])
+        assert self.data.next() is None
+
+    def test_maps(self):
+        self.decode_data_file("maps")
+        self.assert_next(Data.MAP, {"one":1, "two":2, "three":3 })
+        self.assert_next(Data.MAP, {1:"one", 2:"two", 3:"three"})
+        self.assert_next(Data.MAP, {})
+        assert self.data.next() is None

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9778eda8/python/tests/proton_tests/message.py
----------------------------------------------------------------------
diff --git a/python/tests/proton_tests/message.py b/python/tests/proton_tests/message.py
new file mode 100644
index 0000000..26a3dd2
--- /dev/null
+++ b/python/tests/proton_tests/message.py
@@ -0,0 +1,275 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os
+from . import common
+from proton import *
+from uuid import uuid4
+
+class Test(common.Test):
+
+  def setUp(self):
+    self.msg = Message()
+
+  def tearDown(self):
+    self.msg = None
+
+
+class AccessorsTest(Test):
+
+  def _test(self, name, default, values):
+    d = getattr(self.msg, name)
+    assert d == default, (d, default)
+    for v in values:
+      setattr(self.msg, name, v)
+      gotten = getattr(self.msg, name)
+      assert gotten == v, gotten
+
+  def _test_symbol(self, name):
+    self._test(name, symbol(None), (symbol(u"abc.123.#$%"), symbol(u"hello.world")))
+
+  def _test_str(self, name):
+    self._test(name, None, (u"asdf", u"fdsa", u""))
+
+  def _test_time(self, name):
+    self._test(name, 0, (0, 123456789, 987654321))
+
+  def testId(self):
+    self._test("id", None, ("bytes", None, 123, u"string", uuid4()))
+
+  def testCorrelationId(self):
+    self._test("correlation_id", None, ("bytes", None, 123, u"string", uuid4()))
+
+  def testDurable(self):
+    self._test("durable", False, (True, False))
+
+  def testPriority(self):
+    self._test("priority", Message.DEFAULT_PRIORITY, range(0, 255))
+
+  def testTtl(self):
+    self._test("ttl", 0, range(12345, 54321))
+
+  def testFirstAcquirer(self):
+    self._test("first_acquirer", False, (True, False))
+
+  def testDeliveryCount(self):
+    self._test("delivery_count", 0, range(0, 1024))
+
+  def testUserId(self):
+    self._test("user_id", b"", (b"asdf", b"fdsa", b"asd\x00fdsa", b""))
+
+  def testAddress(self):
+    self._test_str("address")
+
+  def testSubject(self):
+    self._test_str("subject")
+
+  def testReplyTo(self):
+    self._test_str("reply_to")
+
+  def testContentType(self):
+    self._test_symbol("content_type")
+
+  def testContentEncoding(self):
+    self._test_symbol("content_encoding")
+
+  def testExpiryTime(self):
+    self._test_time("expiry_time")
+
+  def testCreationTime(self):
+    self._test_time("creation_time")
+
+  def testGroupId(self):
+    self._test_str("group_id")
+
+  def testGroupSequence(self):
+    self._test("group_sequence", 0, (0, -10, 10, 20, -20))
+
+  def testReplyToGroupId(self):
+    self._test_str("reply_to_group_id")
+
+class CodecTest(Test):
+
+  def testProperties(self):
+    self.msg.properties = {}
+    self.msg.properties['key'] = 'value'
+    data = self.msg.encode()
+
+    msg2 = Message()
+    msg2.decode(data)
+
+    assert msg2.properties['key'] == 'value', msg2.properties['key']
+
+  def testRoundTrip(self):
+    self.msg.id = "asdf"
+    self.msg.correlation_id = uuid4()
+    self.msg.ttl = 3
+    self.msg.priority = 100
+    self.msg.address = "address"
+    self.msg.subject = "subject"
+    self.msg.body = 'Hello World!'
+
+    data = self.msg.encode()
+
+    msg2 = Message()
+    msg2.decode(data)
+
+    assert self.msg.id == msg2.id, (self.msg.id, msg2.id)
+    assert self.msg.correlation_id == msg2.correlation_id, (self.msg.correlation_id, msg2.correlation_id)
+    assert self.msg.ttl == msg2.ttl, (self.msg.ttl, msg2.ttl)
+    assert self.msg.priority == msg2.priority, (self.msg.priority, msg2.priority)
+    assert self.msg.address == msg2.address, (self.msg.address, msg2.address)
+    assert self.msg.subject == msg2.subject, (self.msg.subject, msg2.subject)
+    assert self.msg.body == msg2.body, (self.msg.body, msg2.body)
+
+  def testExpiryEncodeAsNull(self):
+    self.msg.group_id = "A" # Force creation and expiry fields to be present
+    data = self.msg.encode()
+
+    decoder = Data()
+
+    # Skip past the headers
+    consumed = decoder.decode(data)
+    decoder.clear()
+    data = data[consumed:]
+
+    decoder.decode(data)
+    dproperties = decoder.get_py_described()
+    # Check we've got the correct described list
+    assert dproperties.descriptor == 0x73, (dproperties.descriptor)
+
+    properties = dproperties.value
+    assert properties[8] == None, properties[8]
+
+  def testCreationEncodeAsNull(self):
+    self.msg.group_id = "A" # Force creation and expiry fields to be present
+    data = self.msg.encode()
+
+    decoder = Data()
+
+    # Skip past the headers
+    consumed = decoder.decode(data)
+    decoder.clear()
+    data = data[consumed:]
+
+    decoder.decode(data)
+    dproperties = decoder.get_py_described()
+    # Check we've got the correct described list
+    assert dproperties.descriptor == 0x73, (dproperties.descriptor)
+
+    properties = dproperties.value
+    assert properties[9] == None, properties[9]
+
+  def testGroupSequenceEncodeAsNull(self):
+    self.msg.reply_to_group_id = "R" # Force group_id and group_sequence fields to be present
+    data = self.msg.encode()
+
+    decoder = Data()
+
+    # Skip past the headers
+    consumed = decoder.decode(data)
+    decoder.clear()
+    data = data[consumed:]
+
+    decoder.decode(data)
+    dproperties = decoder.get_py_described()
+    # Check we've got the correct described list
+    assert dproperties.descriptor == 0x73, (dproperties.descriptor)
+
+    properties = dproperties.value
+    assert properties[10] == None, properties[10]
+    assert properties[11] == None, properties[11]
+
+  def testGroupSequenceEncodeAsNonNull(self):
+    self.msg.group_id = "G"
+    self.msg.reply_to_group_id = "R" # Force group_id and group_sequence fields to be present
+    data = self.msg.encode()
+
+    decoder = Data()
+
+    # Skip past the headers
+    consumed = decoder.decode(data)
+    decoder.clear()
+    data = data[consumed:]
+
+    decoder.decode(data)
+    dproperties = decoder.get_py_described()
+    # Check we've got the correct described list
+    assert dproperties.descriptor == 0x73, (dproperties.descriptor)
+
+    properties = dproperties.value
+    assert properties[10] == 'G', properties[10]
+    assert properties[11] == 0, properties[11]
+
+  def testDefaultCreationExpiryDecode(self):
+    # This is a message with everything filled explicitly as null or zero in LIST32 HEADER and PROPERTIES lists
+    data = b'\x00\x53\x70\xd0\x00\x00\x00\x0a\x00\x00\x00\x05\x42\x40\x40\x42\x52\x00\x00\x53\x73\xd0\x00\x00\x00\x12\x00\x00\x00\x0d\x40\x40\x40\x40\x40\x40\x40\x40\x40\x40\x40\x52\x00\x40'
+    msg2 = Message()
+    msg2.decode(data)
+    assert msg2.expiry_time == 0, (msg2.expiry_time)
+    assert msg2.creation_time == 0, (msg2.creation_time)
+
+    # The same message with LIST8s instead
+    data = b'\x00\x53\x70\xc0\x07\x05\x42\x40\x40\x42\x52\x00\x00\x53\x73\xc0\x0f\x0d\x40\x40\x40\x40\x40\x40\x40\x40\x40\x40\x40\x52\x00\x40'
+    msg3 = Message()
+    msg3.decode(data)
+    assert msg2.expiry_time == 0, (msg2.expiry_time)
+    assert msg2.creation_time == 0, (msg2.creation_time)
+
+    # Minified message with zero length HEADER and PROPERTIES lists
+    data = b'\x00\x53\x70\x45' b'\x00\x53\x73\x45'
+    msg4 = Message()
+    msg4.decode(data)
+    assert msg2.expiry_time == 0, (msg2.expiry_time)
+    assert msg2.creation_time == 0, (msg2.creation_time)
+
+  def testDefaultPriorityEncode(self):
+    assert self.msg.priority == 4, (self.msg.priority)
+    self.msg.ttl = 0.003 # field after priority, so forces priority to be present
+    data = self.msg.encode()
+
+    decoder = Data()
+    decoder.decode(data)
+
+    dheaders = decoder.get_py_described()
+    # Check we've got the correct described list
+    assert dheaders.descriptor == 0x70, (dheaders.descriptor)
+
+    # Check that the priority field (second field) is encoded as null
+    headers = dheaders.value
+    assert headers[1] == None, (headers[1])
+
+  def testDefaultPriorityDecode(self):
+    # This is a message with everything filled explicitly as null or zero in LIST32 HEADER and PROPERTIES lists
+    data = b'\x00\x53\x70\xd0\x00\x00\x00\x0a\x00\x00\x00\x05\x42\x40\x40\x42\x52\x00\x00\x53\x73\xd0\x00\x00\x00\x22\x00\x00\x00\x0d\x40\x40\x40\x40\x40\x40\x40\x40\x83\x00\x00\x00\x00\x00\x00\x00\x00\x83\x00\x00\x00\x00\x00\x00\x00\x00\x40\x52\x00\x40'
+    msg2 = Message()
+    msg2.decode(data)
+    assert msg2.priority == 4, (msg2.priority)
+
+    # The same message with LIST8s instead
+    data = b'\x00\x53\x70\xc0\x07\x05\x42\x40\x40\x42\x52\x00\x00\x53\x73\xc0\x1f\x0d\x40\x40\x40\x40\x40\x40\x40\x40\x83\x00\x00\x00\x00\x00\x00\x00\x00\x83\x00\x00\x00\x00\x00\x00\x00\x00\x40\x52\x00\x40'
+    msg3 = Message()
+    msg3.decode(data)
+    assert msg3.priority == 4, (msg3.priority)
+
+    # Minified message with zero length HEADER and PROPERTIES lists
+    data = b'\x00\x53\x70\x45' b'\x00\x53\x73\x45'
+    msg4 = Message()
+    msg4.decode(data)
+    assert msg4.priority == 4, (msg4.priority)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9778eda8/python/tests/proton_tests/reactor.py
----------------------------------------------------------------------
diff --git a/python/tests/proton_tests/reactor.py b/python/tests/proton_tests/reactor.py
new file mode 100644
index 0000000..208baa2
--- /dev/null
+++ b/python/tests/proton_tests/reactor.py
@@ -0,0 +1,485 @@
+from __future__ import absolute_import
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import time
+import sys
+from .common import Test, SkipTest, TestServer, free_tcp_port, ensureCanTestExtendedSASL
+from proton.reactor import Container, Reactor, ApplicationEvent, EventInjector
+from proton.handlers import Handshaker, MessagingHandler
+from proton import Handler, Url
+
+class Barf(Exception):
+    pass
+
+class BarfOnInit:
+
+    def on_reactor_init(self, event):
+        raise Barf()
+
+    def on_connection_init(self, event):
+        raise Barf()
+
+    def on_session_init(self, event):
+        raise Barf()
+
+    def on_link_init(self, event):
+        raise Barf()
+
+class BarfOnTask:
+
+    def on_timer_task(self, event):
+        raise Barf()
+
+class BarfOnFinal:
+    init = False
+
+    def on_reactor_init(self, event):
+        self.init = True
+
+    def on_reactor_final(self, event):
+        raise Barf()
+
+class BarfOnFinalDerived(Handshaker):
+    init = False
+
+    def on_reactor_init(self, event):
+        self.init = True
+
+    def on_reactor_final(self, event):
+        raise Barf()
+
+class ExceptionTest(Test):
+
+    def setUp(self):
+        self.reactor = Reactor()
+
+    def test_reactor_final(self):
+        self.reactor.global_handler = BarfOnFinal()
+        try:
+            self.reactor.run()
+            assert False, "expected to barf"
+        except Barf:
+            pass
+
+    def test_global_set(self):
+        self.reactor.global_handler = BarfOnInit()
+        try:
+            self.reactor.run()
+            assert False, "expected to barf"
+        except Barf:
+            pass
+
+    def test_global_add(self):
+        self.reactor.global_handler.add(BarfOnInit())
+        try:
+            self.reactor.run()
+            assert False, "expected to barf"
+        except Barf:
+            pass
+
+    def test_reactor_set(self):
+        self.reactor.handler = BarfOnInit()
+        try:
+            self.reactor.run()
+            assert False, "expected to barf"
+        except Barf:
+            pass
+
+    def test_reactor_add(self):
+        self.reactor.handler.add(BarfOnInit())
+        try:
+            self.reactor.run()
+            assert False, "expected to barf"
+        except Barf:
+            pass
+
+    def test_connection(self):
+        self.reactor.connection(BarfOnInit())
+        try:
+            self.reactor.run()
+            assert False, "expected to barf"
+        except Barf:
+            pass
+
+    def test_connection_set(self):
+        c = self.reactor.connection()
+        c.handler = BarfOnInit()
+        try:
+            self.reactor.run()
+            assert False, "expected to barf"
+        except Barf:
+            pass
+
+    def test_connection_add(self):
+        c = self.reactor.connection()
+        c.handler = object()
+        c.handler.add(BarfOnInit())
+        try:
+            self.reactor.run()
+            assert False, "expected to barf"
+        except Barf:
+            pass
+
+    def test_session_set(self):
+        c = self.reactor.connection()
+        s = c.session()
+        s.handler = BarfOnInit()
+        try:
+            self.reactor.run()
+            assert False, "expected to barf"
+        except Barf:
+            pass
+
+    def test_session_add(self):
+        c = self.reactor.connection()
+        s = c.session()
+        s.handler = object()
+        s.handler.add(BarfOnInit())
+        try:
+            self.reactor.run()
+            assert False, "expected to barf"
+        except Barf:
+            pass
+
+    def test_link_set(self):
+        c = self.reactor.connection()
+        s = c.session()
+        l = s.sender("xxx")
+        l.handler = BarfOnInit()
+        try:
+            self.reactor.run()
+            assert False, "expected to barf"
+        except Barf:
+            pass
+
+    def test_link_add(self):
+        c = self.reactor.connection()
+        s = c.session()
+        l = s.sender("xxx")
+        l.handler = object()
+        l.handler.add(BarfOnInit())
+        try:
+            self.reactor.run()
+            assert False, "expected to barf"
+        except Barf:
+            pass
+
+    def test_schedule(self):
+        self.reactor.schedule(0, BarfOnTask())
+        try:
+            self.reactor.run()
+            assert False, "expected to barf"
+        except Barf:
+            pass
+
+    def test_schedule_many_nothings(self):
+        class Nothing:
+            results = []
+            def on_timer_task(self, event):
+                self.results.append(None)
+        num = 12345
+        for a in range(num):
+            self.reactor.schedule(0, Nothing())
+        self.reactor.run()
+        assert len(Nothing.results) == num
+
+    def test_schedule_many_nothing_refs(self):
+        class Nothing:
+            results = []
+            def on_timer_task(self, event):
+                self.results.append(None)
+        num = 12345
+        tasks = []
+        for a in range(num):
+            tasks.append(self.reactor.schedule(0, Nothing()))
+        self.reactor.run()
+        assert len(Nothing.results) == num
+
+    def test_schedule_many_nothing_refs_cancel_before_run(self):
+        class Nothing:
+            results = []
+            def on_timer_task(self, event):
+                self.results.append(None)
+        num = 12345
+        tasks = []
+        for a in range(num):
+            tasks.append(self.reactor.schedule(0, Nothing()))
+        for task in tasks:
+            task.cancel()
+        self.reactor.run()
+        assert len(Nothing.results) == 0
+
+    def test_schedule_cancel(self):
+        barf = self.reactor.schedule(10, BarfOnTask())
+        class CancelBarf:
+            def __init__(self, barf):
+                self.barf = barf
+            def on_timer_task(self, event):
+                self.barf.cancel()
+                pass
+        self.reactor.schedule(0, CancelBarf(barf))
+        now = self.reactor.mark()
+        try:
+            self.reactor.run()
+            elapsed = self.reactor.mark() - now
+            assert elapsed < 10, "expected cancelled task to not delay the reactor by %s" % elapsed
+        except Barf:
+            assert False, "expected barf to be cancelled"
+
+    def test_schedule_cancel_many(self):
+        num = 12345
+        barfs = set()
+        for a in range(num):
+            barf = self.reactor.schedule(10*(a+1), BarfOnTask())
+            class CancelBarf:
+                def __init__(self, barf):
+                    self.barf = barf
+                def on_timer_task(self, event):
+                    self.barf.cancel()
+                    barfs.discard(self.barf)
+                    pass
+            self.reactor.schedule(0, CancelBarf(barf))
+            barfs.add(barf)
+        now = self.reactor.mark()
+        try:
+            self.reactor.run()
+            elapsed = self.reactor.mark() - now
+            assert elapsed < num, "expected cancelled task to not delay the reactor by %s" % elapsed
+            assert not barfs, "expected all barfs to be discarded"
+        except Barf:
+            assert False, "expected barf to be cancelled"
+
+
+class ApplicationEventTest(Test):
+    """Test application defined events and handlers."""
+
+    class MyTestServer(TestServer):
+        def __init__(self):
+            super(ApplicationEventTest.MyTestServer, self).__init__()
+
+    class MyHandler(Handler):
+        def __init__(self, test):
+            super(ApplicationEventTest.MyHandler, self).__init__()
+            self._test = test
+
+        def on_hello(self, event):
+            # verify PROTON-1056
+            self._test.hello_rcvd = str(event)
+
+        def on_goodbye(self, event):
+            self._test.goodbye_rcvd = str(event)
+
+    def setUp(self):
+        import os
+        if not hasattr(os, 'pipe'):
+          # KAG: seems like Jython doesn't have an os.pipe() method
+          raise SkipTest()
+        if os.name=="nt":
+          # Correct implementation on Windows is complicated
+          raise SkipTest("PROTON-1071")
+        self.server = ApplicationEventTest.MyTestServer()
+        self.server.reactor.handler.add(ApplicationEventTest.MyHandler(self))
+        self.event_injector = EventInjector()
+        self.hello_event = ApplicationEvent("hello")
+        self.goodbye_event = ApplicationEvent("goodbye")
+        self.server.reactor.selectable(self.event_injector)
+        self.hello_rcvd = None
+        self.goodbye_rcvd = None
+        self.server.start()
+
+    def tearDown(self):
+        self.server.stop()
+
+    def _wait_for(self, predicate, timeout=10.0):
+        deadline = time.time() + timeout
+        while time.time() < deadline:
+            if predicate():
+                break
+            time.sleep(0.1)
+        assert predicate()
+
+    def test_application_events(self):
+        self.event_injector.trigger(self.hello_event)
+        self._wait_for(lambda: self.hello_rcvd is not None)
+        self.event_injector.trigger(self.goodbye_event)
+        self._wait_for(lambda: self.goodbye_rcvd is not None)
+
+
+class AuthenticationTestHandler(MessagingHandler):
+    def __init__(self):
+        super(AuthenticationTestHandler, self).__init__()
+        port = free_tcp_port()
+        self.url = "localhost:%i" % port
+        self.verified = False
+
+    def on_start(self, event):
+        self.listener = event.container.listen(self.url)
+
+    def on_connection_opened(self, event):
+        event.connection.close()
+
+    def on_connection_opening(self, event):
+        assert event.connection.transport.user == "user@proton"
+        self.verified = True
+
+    def on_connection_closed(self, event):
+        event.connection.close()
+        self.listener.close()
+
+    def on_connection_error(self, event):
+        event.connection.close()
+        self.listener.close()
+
+class ContainerTest(Test):
+    """Test container subclass of reactor."""
+
+    def test_event_has_container_attribute(self):
+        ensureCanTestExtendedSASL()
+        class TestHandler(MessagingHandler):
+            def __init__(self):
+                super(TestHandler, self).__init__()
+                port = free_tcp_port()
+                self.url = "localhost:%i" % port
+
+            def on_start(self, event):
+                self.listener = event.container.listen(self.url)
+
+            def on_connection_closing(self, event):
+                event.connection.close()
+                self.listener.close()
+        test_handler = TestHandler()
+        container = Container(test_handler)
+        class ConnectionHandler(MessagingHandler):
+            def __init__(self):
+                super(ConnectionHandler, self).__init__()
+
+            def on_connection_opened(self, event):
+                event.connection.close()
+                assert event.container == event.reactor
+                assert event.container == container
+        container.connect(test_handler.url, handler=ConnectionHandler())
+        container.run()
+
+    def test_authentication_via_url(self):
+        ensureCanTestExtendedSASL()
+        test_handler = AuthenticationTestHandler()
+        container = Container(test_handler)
+        container.connect("%s:password@%s" % ("user%40proton", test_handler.url), reconnect=False)
+        container.run()
+        assert test_handler.verified
+
+    def test_authentication_via_container_attributes(self):
+        ensureCanTestExtendedSASL()
+        test_handler = AuthenticationTestHandler()
+        container = Container(test_handler)
+        container.user = "user@proton"
+        container.password = "password"
+        container.connect(test_handler.url, reconnect=False)
+        container.run()
+        assert test_handler.verified
+
+    def test_authentication_via_kwargs(self):
+        ensureCanTestExtendedSASL()
+        test_handler = AuthenticationTestHandler()
+        container = Container(test_handler)
+        container.connect(test_handler.url, user="user@proton", password="password", reconnect=False)
+        container.run()
+        assert test_handler.verified
+
+    class _ServerHandler(MessagingHandler):
+        def __init__(self, host):
+            super(ContainerTest._ServerHandler, self).__init__()
+            self.host = host
+            port = free_tcp_port()
+            self.port = free_tcp_port()
+            self.client_addr = None
+            self.peer_hostname = None
+
+        def on_start(self, event):
+            self.listener = event.container.listen("%s:%s" % (self.host, self.port))
+
+        def on_connection_opened(self, event):
+            self.client_addr = event.reactor.get_connection_address(event.connection)
+            self.peer_hostname = event.connection.remote_hostname
+
+        def on_connection_closing(self, event):
+            event.connection.close()
+            self.listener.close()
+
+    class _ClientHandler(MessagingHandler):
+        def __init__(self):
+            super(ContainerTest._ClientHandler, self).__init__()
+            self.server_addr = None
+
+        def on_connection_opened(self, event):
+            self.server_addr = event.reactor.get_connection_address(event.connection)
+            event.connection.close()
+
+    def test_numeric_hostname(self):
+        ensureCanTestExtendedSASL()
+        server_handler = ContainerTest._ServerHandler("127.0.0.1")
+        client_handler = ContainerTest._ClientHandler()
+        container = Container(server_handler)
+        container.connect(url=Url(host="127.0.0.1",
+                                  port=server_handler.port),
+                          handler=client_handler)
+        container.run()
+        assert server_handler.client_addr
+        assert client_handler.server_addr
+        assert server_handler.peer_hostname == "127.0.0.1", server_handler.peer_hostname
+        assert client_handler.server_addr.rsplit(':', 1)[1] == str(server_handler.port)
+
+    def test_non_numeric_hostname(self):
+        ensureCanTestExtendedSASL()
+        server_handler = ContainerTest._ServerHandler("localhost")
+        client_handler = ContainerTest._ClientHandler()
+        container = Container(server_handler)
+        container.connect(url=Url(host="localhost",
+                                  port=server_handler.port),
+                          handler=client_handler)
+        container.run()
+        assert server_handler.client_addr
+        assert client_handler.server_addr
+        assert server_handler.peer_hostname == "localhost", server_handler.peer_hostname
+        assert client_handler.server_addr.rsplit(':', 1)[1] == str(server_handler.port)
+
+    def test_virtual_host(self):
+        ensureCanTestExtendedSASL()
+        server_handler = ContainerTest._ServerHandler("localhost")
+        container = Container(server_handler)
+        conn = container.connect(url=Url(host="localhost",
+                                         port=server_handler.port),
+                                 handler=ContainerTest._ClientHandler(),
+                                 virtual_host="a.b.c.org")
+        container.run()
+        assert server_handler.peer_hostname == "a.b.c.org", server_handler.peer_hostname
+
+    def test_no_virtual_host(self):
+        # explicitly setting an empty virtual host should prevent the hostname
+        # field from being sent in the Open performative when using the
+        # Python Container.
+        server_handler = ContainerTest._ServerHandler("localhost")
+        container = Container(server_handler)
+        conn = container.connect(url=Url(host="localhost",
+                                         port=server_handler.port),
+                                 handler=ContainerTest._ClientHandler(),
+                                 virtual_host="")
+        container.run()
+        assert server_handler.peer_hostname is None, server_handler.peer_hostname

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9778eda8/python/tests/proton_tests/sasl.py
----------------------------------------------------------------------
diff --git a/python/tests/proton_tests/sasl.py b/python/tests/proton_tests/sasl.py
new file mode 100644
index 0000000..09b5f81
--- /dev/null
+++ b/python/tests/proton_tests/sasl.py
@@ -0,0 +1,587 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from __future__ import absolute_import
+
+import sys, os
+from . import common
+from . import engine
+
+from proton import *
+from .common import pump, Skipped
+
+def _sslCertpath(file):
+    """ Return the full path to the certificate, keyfile, etc.
+    """
+    if os.name=="nt":
+        if file.find("private-key")!=-1:
+            # The private key is not in a separate store
+            return None
+        # Substitute pkcs#12 equivalent for the CA/key store
+        if file.endswith(".pem"):
+            file = file[:-4] + ".p12"
+    return os.path.join(os.path.dirname(__file__),
+                        "ssl_db/%s" % file)
+
+def _testSaslMech(self, mech, clientUser='user@proton', authUser='user@proton', encrypted=False, authenticated=True):
+  self.s1.allowed_mechs(mech)
+  self.c1.open()
+  self.c2.open()
+
+  pump(self.t1, self.t2, 1024)
+
+  if encrypted is not None:
+    assert self.t2.encrypted == encrypted, encrypted
+    assert self.t1.encrypted == encrypted, encrypted
+
+  assert self.t2.authenticated == authenticated, authenticated
+  assert self.t1.authenticated == authenticated, authenticated
+  if authenticated:
+    # Server
+    assert self.t2.user == authUser
+    assert self.s2.user == authUser
+    assert self.s2.mech == mech.strip()
+    assert self.s2.outcome == SASL.OK, self.s2.outcome
+    assert self.c2.state & Endpoint.LOCAL_ACTIVE and self.c2.state & Endpoint.REMOTE_ACTIVE,\
+      "local_active=%s, remote_active=%s" % (self.c1.state & Endpoint.LOCAL_ACTIVE, self.c1.state & Endpoint.REMOTE_ACTIVE)
+    # Client
+    assert self.t1.user == clientUser
+    assert self.s1.user == clientUser
+    assert self.s1.mech == mech.strip()
+    assert self.s1.outcome == SASL.OK, self.s1.outcome
+    assert self.c1.state & Endpoint.LOCAL_ACTIVE and self.c1.state & Endpoint.REMOTE_ACTIVE,\
+      "local_active=%s, remote_active=%s" % (self.c1.state & Endpoint.LOCAL_ACTIVE, self.c1.state & Endpoint.REMOTE_ACTIVE)
+  else:
+    # Server
+    assert self.t2.user == None
+    assert self.s2.user == None
+    assert self.s2.outcome != SASL.OK, self.s2.outcome
+    # Client
+    assert self.t1.user == clientUser
+    assert self.s1.user == clientUser
+    assert self.s1.outcome != SASL.OK, self.s1.outcome
+
+class Test(common.Test):
+  pass
+
+def consumeAllOuput(t):
+  stops = 0
+  while stops<1:
+    out = t.peek(1024)
+    l = len(out) if out else 0
+    t.pop(l)
+    if l <= 0:
+      stops += 1
+
+class SaslTest(Test):
+
+  def setUp(self):
+    self.t1 = Transport()
+    self.s1 = SASL(self.t1)
+    self.t2 = Transport(Transport.SERVER)
+    self.t2.max_frame_size = 65536
+    self.s2 = SASL(self.t2)
+
+  def pump(self):
+    pump(self.t1, self.t2, 1024)
+
+  # We have to generate the client frames manually because proton does not
+  # generate pipelined SASL and AMQP frames together
+  def testIllegalProtocolLayering(self):
+    # Server
+    self.s2.allowed_mechs('ANONYMOUS')
+
+    c2 = Connection()
+    self.t2.bind(c2)
+
+    assert self.s2.outcome is None
+
+    # Push client bytes into server
+    self.t2.push(
+        # SASL
+        b'AMQP\x03\x01\x00\x00'
+        # @sasl-init(65) [mechanism=:ANONYMOUS, initial-response=b"anonymous@fuschia"]
+        b'\x00\x00\x002\x02\x01\x00\x00\x00SA\xd0\x00\x00\x00"\x00\x00\x00\x02\xa3\x09ANONYMOUS\xa0\x11anonymous@fuschia'
+        # SASL (again illegally)
+        b'AMQP\x03\x01\x00\x00'
+        # @sasl-init(65) [mechanism=:ANONYMOUS, initial-response=b"anonymous@fuschia"]
+        b'\x00\x00\x002\x02\x01\x00\x00\x00SA\xd0\x00\x00\x00"\x00\x00\x00\x02\xa3\x09ANONYMOUS\xa0\x11anonymous@fuschia'
+        # AMQP
+        b'AMQP\x00\x01\x00\x00'
+        # @open(16) [container-id="", channel-max=1234]
+        b'\x00\x00\x00!\x02\x00\x00\x00\x00S\x10\xd0\x00\x00\x00\x11\x00\x00\x00\x0a\xa1\x00@@`\x04\xd2@@@@@@'
+        )
+
+    consumeAllOuput(self.t2)
+
+    assert self.t2.condition
+    assert self.t2.closed
+    assert not c2.state & Endpoint.REMOTE_ACTIVE
+
+  def testPipelinedClient(self):
+    # Server
+    self.s2.allowed_mechs('ANONYMOUS')
+
+    c2 = Connection()
+    self.t2.bind(c2)
+
+    assert self.s2.outcome is None
+
+    # Push client bytes into server
+    self.t2.push(
+        # SASL
+        b'AMQP\x03\x01\x00\x00'
+        # @sasl-init(65) [mechanism=:ANONYMOUS, initial-response=b"anonymous@fuschia"]
+        b'\x00\x00\x002\x02\x01\x00\x00\x00SA\xd0\x00\x00\x00"\x00\x00\x00\x02\xa3\x09ANONYMOUS\xa0\x11anonymous@fuschia'
+        # AMQP
+        b'AMQP\x00\x01\x00\x00'
+        # @open(16) [container-id="", channel-max=1234]
+        b'\x00\x00\x00!\x02\x00\x00\x00\x00S\x10\xd0\x00\x00\x00\x11\x00\x00\x00\x0a\xa1\x00@@`\x04\xd2@@@@@@'
+        )
+
+    consumeAllOuput(self.t2)
+
+    assert not self.t2.condition
+    assert self.s2.outcome == SASL.OK
+    assert c2.state & Endpoint.REMOTE_ACTIVE
+
+  def testPipelinedServer(self):
+    # Client
+    self.s1.allowed_mechs('ANONYMOUS')
+
+    c1 = Connection()
+    self.t1.bind(c1)
+
+    assert self.s1.outcome is None
+
+    # Push server bytes into client
+    # Commented out lines in this test are where the client input processing doesn't
+    # run after output processing even though there is input waiting
+    self.t1.push(
+        # SASL
+        b'AMQP\x03\x01\x00\x00'
+        # @sasl-mechanisms(64) [sasl-server-mechanisms=@PN_SYMBOL[:ANONYMOUS]]
+        b'\x00\x00\x00\x1c\x02\x01\x00\x00\x00S@\xc0\x0f\x01\xe0\x0c\x01\xa3\tANONYMOUS'
+        # @sasl-outcome(68) [code=0]
+        b'\x00\x00\x00\x10\x02\x01\x00\x00\x00SD\xc0\x03\x01P\x00'
+        # AMQP
+        b'AMQP\x00\x01\x00\x00'
+        # @open(16) [container-id="", channel-max=1234]
+        b'\x00\x00\x00!\x02\x00\x00\x00\x00S\x10\xd0\x00\x00\x00\x11\x00\x00\x00\x0a\xa1\x00@@`\x04\xd2@@@@@@'
+        )
+
+    consumeAllOuput(self.t1)
+
+    assert self.s1.outcome == SASL.OK
+    assert c1.state & Endpoint.REMOTE_ACTIVE
+
+  def testPipelined2(self):
+    out1 = self.t1.peek(1024)
+    self.t1.pop(len(out1))
+    self.t2.push(out1)
+
+    self.s2.allowed_mechs('ANONYMOUS')
+    c2 = Connection()
+    c2.open()
+    self.t2.bind(c2)
+
+    out2 = self.t2.peek(1024)
+    self.t2.pop(len(out2))
+    self.t1.push(out2)
+
+    out1 = self.t1.peek(1024)
+    assert len(out1) > 0
+
+  def testFracturedSASL(self):
+    """ PROTON-235
+    """
+    assert self.s1.outcome is None
+
+    # self.t1.trace(Transport.TRACE_FRM)
+
+    out = self.t1.peek(1024)
+    self.t1.pop(len(out))
+    self.t1.push(b"AMQP\x03\x01\x00\x00")
+    out = self.t1.peek(1024)
+    self.t1.pop(len(out))
+    self.t1.push(b"\x00\x00\x00")
+    out = self.t1.peek(1024)
+    self.t1.pop(len(out))
+
+    self.t1.push(b"6\x02\x01\x00\x00\x00S@\xc0\x29\x01\xe0\x26\x04\xa3\x05PLAIN\x0aDIGEST-MD5\x09ANONYMOUS\x08CRAM-MD5")
+    out = self.t1.peek(1024)
+    self.t1.pop(len(out))
+    self.t1.push(b"\x00\x00\x00\x10\x02\x01\x00\x00\x00SD\xc0\x03\x01P\x00")
+    out = self.t1.peek(1024)
+    self.t1.pop(len(out))
+    while out:
+      out = self.t1.peek(1024)
+      self.t1.pop(len(out))
+
+    assert self.s1.outcome == SASL.OK, self.s1.outcome
+
+  def test_singleton(self):
+      """Verify that only a single instance of SASL can exist per Transport"""
+      transport = Transport()
+      attr = object()
+      sasl1 = SASL(transport)
+      sasl1.my_attribute = attr
+      sasl2 = transport.sasl()
+      sasl3 = SASL(transport)
+      assert sasl1 == sasl2
+      assert sasl1 == sasl3
+      assert sasl1.my_attribute == attr
+      assert sasl2.my_attribute == attr
+      assert sasl3.my_attribute == attr
+      transport = Transport()
+      sasl1 = transport.sasl()
+      sasl1.my_attribute = attr
+      sasl2 = SASL(transport)
+      assert sasl1 == sasl2
+      assert sasl1.my_attribute == attr
+      assert sasl2.my_attribute == attr
+
+  def testSaslSkipped(self):
+    """Verify that the server (with SASL) correctly handles a client without SASL"""
+    self.t1 = Transport()
+    self.t2.require_auth(False)
+    self.pump()
+    assert self.s2.outcome == None
+    assert self.t2.condition == None
+    assert self.t2.authenticated == False
+    assert self.s1.outcome == None
+    assert self.t1.condition == None
+    assert self.t1.authenticated == False
+
+  def testSaslSkippedFail(self):
+    """Verify that the server (with SASL) correctly handles a client without SASL"""
+    self.t1 = Transport()
+    self.t2.require_auth(True)
+    self.pump()
+    assert self.s2.outcome == None
+    assert self.t2.condition != None
+    assert self.s1.outcome == None
+    assert self.t1.condition != None
+
+  def testMechNotFound(self):
+    self.c1 = Connection()
+    self.c1.open()
+    self.t1.bind(self.c1)
+    self.s1.allowed_mechs('IMPOSSIBLE')
+
+    self.pump()
+
+    assert self.t2.authenticated == False
+    assert self.t1.authenticated == False
+    assert self.s1.outcome != SASL.OK
+    assert self.s2.outcome != SASL.OK
+
+class SASLMechTest(Test):
+  def setUp(self):
+    self.t1 = Transport()
+    self.s1 = SASL(self.t1)
+    self.t2 = Transport(Transport.SERVER)
+    self.s2 = SASL(self.t2)
+
+    self.c1 = Connection()
+    self.c1.user = 'user@proton'
+    self.c1.password = 'password'
+    self.c1.hostname = 'localhost'
+
+    self.c2 = Connection()
+
+  def testANON(self):
+    self.t1.bind(self.c1)
+    self.t2.bind(self.c2)
+    _testSaslMech(self, 'ANONYMOUS', authUser='anonymous')
+
+  def testCRAMMD5(self):
+    common.ensureCanTestExtendedSASL()
+
+    self.t1.bind(self.c1)
+    self.t2.bind(self.c2)
+    _testSaslMech(self, 'CRAM-MD5')
+
+  def testDIGESTMD5(self):
+    common.ensureCanTestExtendedSASL()
+
+    self.t1.bind(self.c1)
+    self.t2.bind(self.c2)
+    _testSaslMech(self, 'DIGEST-MD5')
+
+  # PLAIN shouldn't work without encryption without special setting
+  def testPLAINfail(self):
+    common.ensureCanTestExtendedSASL()
+
+    self.t1.bind(self.c1)
+    self.t2.bind(self.c2)
+    _testSaslMech(self, 'PLAIN', authenticated=False)
+
+  # Client won't accept PLAIN even if offered by server without special setting
+  def testPLAINClientFail(self):
+    common.ensureCanTestExtendedSASL()
+
+    self.s2.allow_insecure_mechs = True
+    self.t1.bind(self.c1)
+    self.t2.bind(self.c2)
+    _testSaslMech(self, 'PLAIN', authenticated=False)
+
+  # PLAIN will only work if both ends are specially set up
+  def testPLAIN(self):
+    common.ensureCanTestExtendedSASL()
+
+    self.s1.allow_insecure_mechs = True
+    self.s2.allow_insecure_mechs = True
+    self.t1.bind(self.c1)
+    self.t2.bind(self.c2)
+    _testSaslMech(self, 'PLAIN')
+
+# SCRAM not supported before Cyrus SASL 2.1.26
+# so not universal and hence need a test for support
+# to keep it in tests.
+#  def testSCRAMSHA1(self):
+#    common.ensureCanTestExtendedSASL()
+#
+#    self.t1.bind(self.c1)
+#    self.t2.bind(self.c2)
+#    _testSaslMech(self, 'SCRAM-SHA-1')
+
+def _sslConnection(domain, transport, connection):
+  transport.bind(connection)
+  ssl = SSL(transport, domain, None )
+  return connection
+
+class SSLSASLTest(Test):
+  def setUp(self):
+    if not common.isSSLPresent():
+      raise Skipped("No SSL libraries found.")
+
+    self.server_domain = SSLDomain(SSLDomain.MODE_SERVER)
+    self.client_domain = SSLDomain(SSLDomain.MODE_CLIENT)
+
+    self.t1 = Transport()
+    self.s1 = SASL(self.t1)
+    self.t2 = Transport(Transport.SERVER)
+    self.s2 = SASL(self.t2)
+
+    self.c1 = Connection()
+    self.c2 = Connection()
+
+  def testSSLPlainSimple(self):
+    if not SASL.extended():
+      raise Skipped("Simple SASL server does not support PLAIN")
+    common.ensureCanTestExtendedSASL()
+
+    clientUser = 'user@proton'
+    mech = 'PLAIN'
+
+    self.c1.user = clientUser
+    self.c1.password = 'password'
+    self.c1.hostname = 'localhost'
+
+    ssl1 = _sslConnection(self.client_domain, self.t1, self.c1)
+    ssl2 = _sslConnection(self.server_domain, self.t2, self.c2)
+
+    _testSaslMech(self, mech, encrypted=True)
+
+  def testSSLPlainSimpleFail(self):
+    if not SASL.extended():
+      raise Skipped("Simple SASL server does not support PLAIN")
+    common.ensureCanTestExtendedSASL()
+
+    clientUser = 'usr@proton'
+    mech = 'PLAIN'
+
+    self.c1.user = clientUser
+    self.c1.password = 'password'
+    self.c1.hostname = 'localhost'
+
+    ssl1 = _sslConnection(self.client_domain, self.t1, self.c1)
+    ssl2 = _sslConnection(self.server_domain, self.t2, self.c2)
+
+    _testSaslMech(self, mech, clientUser='usr@proton', encrypted=True, authenticated=False)
+
+  def testSSLExternalSimple(self):
+    if os.name=="nt":
+      extUser = 'O=Client, CN=127.0.0.1'
+    else:
+      extUser = 'O=Client,CN=127.0.0.1'
+    mech = 'EXTERNAL'
+
+    self.server_domain.set_credentials(_sslCertpath("server-certificate.pem"),
+                                       _sslCertpath("server-private-key.pem"),
+                                       "server-password")
+    self.server_domain.set_trusted_ca_db(_sslCertpath("ca-certificate.pem"))
+    self.server_domain.set_peer_authentication(SSLDomain.VERIFY_PEER,
+                                               _sslCertpath("ca-certificate.pem") )
+    self.client_domain.set_credentials(_sslCertpath("client-certificate.pem"),
+                                       _sslCertpath("client-private-key.pem"),
+                                       "client-password")
+    self.client_domain.set_trusted_ca_db(_sslCertpath("ca-certificate.pem"))
+    self.client_domain.set_peer_authentication(SSLDomain.VERIFY_PEER)
+
+    ssl1 = _sslConnection(self.client_domain, self.t1, self.c1)
+    ssl2 = _sslConnection(self.server_domain, self.t2, self.c2)
+
+    _testSaslMech(self, mech, clientUser=None, authUser=extUser, encrypted=True)
+
+  def testSSLExternalSimpleFail(self):
+    mech = 'EXTERNAL'
+
+    self.server_domain.set_credentials(_sslCertpath("server-certificate.pem"),
+                                       _sslCertpath("server-private-key.pem"),
+                                       "server-password")
+    self.server_domain.set_trusted_ca_db(_sslCertpath("ca-certificate.pem"))
+    self.server_domain.set_peer_authentication(SSLDomain.VERIFY_PEER,
+                                               _sslCertpath("ca-certificate.pem") )
+    self.client_domain.set_trusted_ca_db(_sslCertpath("ca-certificate.pem"))
+    self.client_domain.set_peer_authentication(SSLDomain.VERIFY_PEER)
+
+    ssl1 = _sslConnection(self.client_domain, self.t1, self.c1)
+    ssl2 = _sslConnection(self.server_domain, self.t2, self.c2)
+
+    _testSaslMech(self, mech, clientUser=None, authUser=None, encrypted=None, authenticated=False)
+
+class SASLEventTest(engine.CollectorTest):
+  def setUp(self):
+    engine.CollectorTest.setUp(self)
+    self.t1 = Transport()
+    self.s1 = SASL(self.t1)
+    self.t2 = Transport(Transport.SERVER)
+    self.s2 = SASL(self.t2)
+
+    self.c1 = Connection()
+    self.c1.user = 'user@proton'
+    self.c1.password = 'password'
+    self.c1.hostname = 'localhost'
+
+    self.c2 = Connection()
+
+    self.collector = Collector()
+
+  def testNormalAuthenticationClient(self):
+    common.ensureCanTestExtendedSASL()
+    self.c1.collect(self.collector)
+    self.t1.bind(self.c1)
+    self.t2.bind(self.c2)
+    _testSaslMech(self, 'DIGEST-MD5')
+    self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND,
+                Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT,
+                Event.CONNECTION_REMOTE_OPEN)
+
+  def testNormalAuthenticationServer(self):
+    common.ensureCanTestExtendedSASL()
+    self.c2.collect(self.collector)
+    self.t1.bind(self.c1)
+    self.t2.bind(self.c2)
+    _testSaslMech(self, 'DIGEST-MD5')
+    self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND,
+                Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT,
+                Event.CONNECTION_REMOTE_OPEN)
+
+  def testFailedAuthenticationClient(self):
+    common.ensureCanTestExtendedSASL()
+    clientUser = "usr@proton"
+    self.c1.user = clientUser
+    self.c1.collect(self.collector)
+    self.t1.bind(self.c1)
+    self.t2.bind(self.c2)
+    _testSaslMech(self, 'DIGEST-MD5', clientUser=clientUser, authenticated=False)
+    self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND,
+                Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT,
+                Event.TRANSPORT_ERROR,
+                Event.TRANSPORT_TAIL_CLOSED,
+                Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED)
+
+  def testFailedAuthenticationServer(self):
+    common.ensureCanTestExtendedSASL()
+    clientUser = "usr@proton"
+    self.c1.user = clientUser
+    self.c2.collect(self.collector)
+    self.t1.bind(self.c1)
+    self.t2.bind(self.c2)
+    _testSaslMech(self, 'DIGEST-MD5', clientUser=clientUser, authenticated=False)
+    self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND,
+                Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT,
+                Event.TRANSPORT_ERROR,
+                Event.TRANSPORT_TAIL_CLOSED,
+                Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED)
+
+  def testNoMechClient(self):
+    common.ensureCanTestExtendedSASL()
+    self.c1.collect(self.collector)
+    self.s2.allowed_mechs('IMPOSSIBLE')
+    self.t1.bind(self.c1)
+    self.t2.bind(self.c2)
+    _testSaslMech(self, 'DIGEST-MD5', authenticated=False)
+    self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND,
+                Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT,
+                Event.TRANSPORT_ERROR,
+                Event.TRANSPORT_TAIL_CLOSED, Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED)
+
+  def testNoMechServer(self):
+    common.ensureCanTestExtendedSASL()
+    self.c2.collect(self.collector)
+    self.s2.allowed_mechs('IMPOSSIBLE')
+    self.t1.bind(self.c1)
+    self.t2.bind(self.c2)
+    _testSaslMech(self, 'DIGEST-MD5', authenticated=False)
+    self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND,
+                Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT,
+                Event.TRANSPORT_TAIL_CLOSED,
+                Event.TRANSPORT_ERROR, Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED)
+
+  def testDisallowedMechClient(self):
+    self.c1.collect(self.collector)
+    self.t1.bind(self.c1)
+    self.t2.bind(self.c2)
+    _testSaslMech(self, 'IMPOSSIBLE', authenticated=False)
+    self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND,
+                Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT,
+                Event.TRANSPORT_ERROR,
+                Event.TRANSPORT_TAIL_CLOSED, Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED)
+
+  def testDisallowedMechServer(self):
+    self.c2.collect(self.collector)
+    self.t1.bind(self.c1)
+    self.t2.bind(self.c2)
+    _testSaslMech(self, 'IMPOSSIBLE', authenticated=False)
+    self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND,
+                Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT,
+                Event.TRANSPORT_TAIL_CLOSED,
+                Event.TRANSPORT_ERROR, Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED)
+
+  def testDisallowedPlainClient(self):
+    self.c1.collect(self.collector)
+    self.t1.bind(self.c1)
+    self.t2.bind(self.c2)
+    _testSaslMech(self, 'PLAIN', authenticated=False)
+    self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND,
+                Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT,
+                Event.TRANSPORT_ERROR,
+                Event.TRANSPORT_TAIL_CLOSED, Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED)
+
+  def testDisallowedPlainServer(self):
+    self.c2.collect(self.collector)
+    self.t1.bind(self.c1)
+    self.t2.bind(self.c2)
+    _testSaslMech(self, 'PLAIN', authenticated=False)
+    self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND,
+                Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT,
+                Event.TRANSPORT_TAIL_CLOSED,
+                Event.TRANSPORT_ERROR, Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9778eda8/python/tests/proton_tests/soak.py
----------------------------------------------------------------------
diff --git a/python/tests/proton_tests/soak.py b/python/tests/proton_tests/soak.py
new file mode 100644
index 0000000..b7b521c
--- /dev/null
+++ b/python/tests/proton_tests/soak.py
@@ -0,0 +1,334 @@
+from __future__ import absolute_import
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import os
+import sys
+from .common import Test, Skipped, free_tcp_ports, \
+    MessengerReceiverC, MessengerSenderC, \
+    MessengerReceiverValgrind, MessengerSenderValgrind, \
+    ReactorReceiverC, ReactorSenderC, \
+    ReactorReceiverValgrind, ReactorSenderValgrind, \
+    isSSLPresent
+from proton import *
+
+#
+# Tests that run the apps
+#
+
+class AppTests(Test):
+
+    def __init__(self, *args):
+        Test.__init__(self, *args)
+        self.is_valgrind = False
+
+    def default(self, name, value, **kwargs):
+        if self.is_valgrind:
+            default = kwargs.get("valgrind", value)
+        else:
+            default = value
+        return Test.default(self, name, default, **kwargs)
+
+    @property
+    def iterations(self):
+        return int(self.default("iterations", 2, fast=1, valgrind=2))
+
+    @property
+    def send_count(self):
+        return int(self.default("send_count", 17, fast=1, valgrind=2))
+
+    @property
+    def target_count(self):
+        return int(self.default("target_count", 5, fast=1, valgrind=2))
+
+    @property
+    def send_batch(self):
+        return int(self.default("send_batch", 7, fast=1, valgrind=2))
+
+    @property
+    def forward_count(self):
+        return int(self.default("forward_count", 5, fast=1, valgrind=2))
+
+    @property
+    def port_count(self):
+        return int(self.default("port_count", 3, fast=1, valgrind=2))
+
+    @property
+    def sender_count(self):
+        return int(self.default("sender_count", 3, fast=1, valgrind=2))
+
+    def valgrind_test(self):
+        self.is_valgrind = True
+
+    def setUp(self):
+        self.senders = []
+        self.receivers = []
+
+    def tearDown(self):
+        pass
+
+    def _do_test(self, iterations=1):
+        verbose = self.verbose
+
+        for R in self.receivers:
+            R.start( verbose )
+
+        for j in range(iterations):
+            for S in self.senders:
+                S.start( verbose )
+
+            for S in self.senders:
+                S.wait()
+                #print("SENDER OUTPUT:")
+                #print( S.stdout() )
+                assert S.status() == 0, ("Command '%s' failed status=%d: '%s' '%s'"
+                                         % (str(S.cmdline()),
+                                            S.status(),
+                                            S.stdout(),
+                                            S.stderr()))
+
+        for R in self.receivers:
+            R.wait()
+            #print("RECEIVER OUTPUT")
+            #print( R.stdout() )
+            assert R.status() == 0, ("Command '%s' failed status=%d: '%s' '%s'"
+                                     % (str(R.cmdline()),
+                                        R.status(),
+                                        R.stdout(),
+                                        R.stderr()))
+
+#
+# Traffic passing tests based on the Messenger apps
+#
+
+class MessengerTests(AppTests):
+
+    _timeout = 60
+
+    def _ssl_check(self):
+        if not isSSLPresent():
+            raise Skipped("No SSL libraries found.")
+        if os.name=="nt":
+            raise Skipped("Windows SChannel lacks anonymous cipher support.")
+
+    def __init__(self, *args):
+        AppTests.__init__(self, *args)
+
+    def _do_oneway_test(self, receiver, sender, domain="amqp"):
+        """ Send N messages to a receiver.
+        Parameters:
+        iterations - repeat the senders this many times
+        target_count = # of targets to send to.
+        send_count = # messages sent to each target
+        """
+        iterations = self.iterations
+        send_count = self.send_count
+        target_count = self.target_count
+
+        send_total = send_count * target_count
+        receive_total = send_total * iterations
+
+        port = free_tcp_ports()[0]
+
+        receiver.subscriptions = ["%s://~0.0.0.0:%s" % (domain, port)]
+        receiver.receive_count = receive_total
+        receiver.timeout = MessengerTests._timeout
+        self.receivers.append( receiver )
+
+        sender.targets = ["%s://0.0.0.0:%s/X%d" % (domain, port, j) for j in range(target_count)]
+        sender.send_count = send_total
+        sender.timeout = MessengerTests._timeout
+        self.senders.append( sender )
+
+        self._do_test(iterations)
+
+    def _do_echo_test(self, receiver, sender, domain="amqp"):
+        """ Send N messages to a receiver, which responds to each.
+        Parameters:
+        iterations - repeat the senders this many times
+        target_count - # targets to send to
+        send_count = # messages sent to each target
+        send_batch - wait for replies after this many messages sent
+        """
+        iterations = self.iterations
+        send_count = self.send_count
+        target_count = self.target_count
+        send_batch = self.send_batch
+
+        send_total = send_count * target_count
+        receive_total = send_total * iterations
+
+        port = free_tcp_ports()[0]
+
+        receiver.subscriptions = ["%s://~0.0.0.0:%s" % (domain, port)]
+        receiver.receive_count = receive_total
+        receiver.send_reply = True
+        receiver.timeout = MessengerTests._timeout
+        self.receivers.append( receiver )
+
+        sender.targets = ["%s://0.0.0.0:%s/%dY" % (domain, port, j) for j in range(target_count)]
+        sender.send_count = send_total
+        sender.get_reply = True
+        sender.send_batch = send_batch
+        sender.timeout = MessengerTests._timeout
+        self.senders.append( sender )
+
+        self._do_test(iterations)
+
+    def _do_relay_test(self, receiver, relay, sender, domain="amqp"):
+        """ Send N messages to a receiver, which replies to each and forwards
+        each of them to different receiver.
+        Parameters:
+        iterations - repeat the senders this many times
+        target_count - # targets to send to
+        send_count = # messages sent to each target
+        send_batch - wait for replies after this many messages sent
+        forward_count - forward to this many targets
+        """
+        iterations = self.iterations
+        send_count = self.send_count
+        target_count = self.target_count
+        send_batch = self.send_batch
+        forward_count = self.forward_count
+
+        send_total = send_count * target_count
+        receive_total = send_total * iterations
+
+        port = free_tcp_ports()[0]
+
+        receiver.subscriptions = ["%s://~0.0.0.0:%s" % (domain, port)]
+        receiver.receive_count = receive_total
+        receiver.send_reply = True
+        # forward to 'relay' - uses two links
+        # ## THIS FAILS:
+        # receiver.forwards = ["amqp://Relay/%d" % j for j in range(forward_count)]
+        receiver.forwards = ["%s://Relay" % domain]
+        receiver.timeout = MessengerTests._timeout
+        self.receivers.append( receiver )
+
+        relay.subscriptions = ["%s://0.0.0.0:%s" % (domain, port)]
+        relay.name = "Relay"
+        relay.receive_count = receive_total
+        relay.timeout = MessengerTests._timeout
+        self.receivers.append( relay )
+
+        # send to 'receiver'
+        sender.targets = ["%s://0.0.0.0:%s/X%dY" % (domain, port, j) for j in range(target_count)]
+        sender.send_count = send_total
+        sender.get_reply = True
+        sender.timeout = MessengerTests._timeout
+        self.senders.append( sender )
+
+        self._do_test(iterations)
+
+
+    def _do_star_topology_test(self, r_factory, s_factory, domain="amqp"):
+        """
+        A star-like topology, with a central receiver at the hub, and senders at
+        the spokes.  Each sender will connect to each of the ports the receiver is
+        listening on.  Each sender will then create N links per each connection.
+        Each sender will send X messages per link, waiting for a response.
+        Parameters:
+        iterations - repeat the senders this many times
+        port_count - # of ports the receiver will listen on.  Each sender connects
+                     to all ports.
+        sender_count - # of senders
+        target_count - # of targets per connection
+        send_count - # of messages sent to each target
+        send_batch - # of messages to send before waiting for response
+        """
+        iterations = self.iterations
+        port_count = self.port_count
+        sender_count = self.sender_count
+        target_count = self.target_count
+        send_count = self.send_count
+        send_batch = self.send_batch
+
+        send_total = port_count * target_count * send_count
+        receive_total = send_total * sender_count * iterations
+
+        ports = free_tcp_ports(port_count)
+
+        receiver = r_factory()
+        receiver.subscriptions = ["%s://~0.0.0.0:%s" % (domain, port) for port in ports]
+        receiver.receive_count = receive_total
+        receiver.send_reply = True
+        receiver.timeout = MessengerTests._timeout
+        self.receivers.append( receiver )
+
+        for i in range(sender_count):
+            sender = s_factory()
+            sender.targets = ["%s://0.0.0.0:%s/%d" % (domain, port, j) for port in ports for j in range(target_count)]
+            sender.send_count = send_total
+            sender.send_batch = send_batch
+            sender.get_reply = True
+            sender.timeout = MessengerTests._timeout
+            self.senders.append( sender )
+
+        self._do_test(iterations)
+
+    def test_oneway_C(self):
+        self._do_oneway_test(MessengerReceiverC(), MessengerSenderC())
+
+    def test_oneway_C_SSL(self):
+        self._ssl_check()
+        self._do_oneway_test(MessengerReceiverC(), MessengerSenderC(), "amqps")
+
+    def test_oneway_valgrind(self):
+        self.valgrind_test()
+        self._do_oneway_test(MessengerReceiverValgrind(), MessengerSenderValgrind())
+
+    def test_echo_C(self):
+        self._do_echo_test(MessengerReceiverC(), MessengerSenderC())
+
+    def test_echo_C_SSL(self):
+        self._ssl_check()
+        self._do_echo_test(MessengerReceiverC(), MessengerSenderC(), "amqps")
+
+    def test_echo_valgrind(self):
+        self.valgrind_test()
+        self._do_echo_test(MessengerReceiverValgrind(), MessengerSenderValgrind())
+
+    def test_relay_C(self):
+        self._do_relay_test(MessengerReceiverC(), MessengerReceiverC(), MessengerSenderC())
+
+    def test_relay_C_SSL(self):
+        self._ssl_check()
+        self._do_relay_test(MessengerReceiverC(), MessengerReceiverC(), MessengerSenderC(), "amqps")
+
+    def test_relay_valgrind(self):
+        self.valgrind_test()
+        self._do_relay_test(MessengerReceiverValgrind(), MessengerReceiverValgrind(), MessengerSenderValgrind())
+
+    def test_star_topology_C(self):
+        self._do_star_topology_test( MessengerReceiverC, MessengerSenderC )
+
+    def test_star_topology_C_SSL(self):
+        self._ssl_check()
+        self._do_star_topology_test( MessengerReceiverC, MessengerSenderC, "amqps" )
+
+    def test_star_topology_valgrind(self):
+        self.valgrind_test()
+        self._do_star_topology_test( MessengerReceiverValgrind, MessengerSenderValgrind )
+
+    def test_oneway_reactor(self):
+        self._do_oneway_test(ReactorReceiverC(), ReactorSenderC())
+
+    def test_oneway_reactor_valgrind(self):
+        self.valgrind_test()
+        self._do_oneway_test(ReactorReceiverValgrind(), ReactorSenderValgrind())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message