Return-Path: Delivered-To: apmail-incubator-qpid-commits-archive@locus.apache.org Received: (qmail 81849 invoked from network); 24 Jul 2007 14:09:13 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 24 Jul 2007 14:09:13 -0000 Received: (qmail 40018 invoked by uid 500); 24 Jul 2007 14:09:02 -0000 Delivered-To: apmail-incubator-qpid-commits-archive@incubator.apache.org Received: (qmail 39987 invoked by uid 500); 24 Jul 2007 14:09:02 -0000 Mailing-List: contact qpid-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: qpid-dev@incubator.apache.org Delivered-To: mailing list qpid-commits@incubator.apache.org Received: (qmail 39961 invoked by uid 99); 24 Jul 2007 14:09:02 -0000 Received: from herse.apache.org (HELO herse.apache.org) (140.211.11.133) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 24 Jul 2007 07:09:02 -0700 X-ASF-Spam-Status: No, hits=-99.5 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 24 Jul 2007 07:08:57 -0700 Received: by eris.apache.org (Postfix, from userid 65534) id C924F1A981A; Tue, 24 Jul 2007 07:08:36 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r559055 [1/2] - in /incubator/qpid/trunk/qpid/python: ./ qpid/ tests/ tests_0-10/ tests_0-9/ Date: Tue, 24 Jul 2007 14:08:33 -0000 To: qpid-commits@incubator.apache.org From: gsim@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20070724140836.C924F1A981A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: gsim Date: Tue Jul 24 07:08:32 2007 New Revision: 559055 URL: http://svn.apache.org/viewvc?view=rev&rev=559055 Log: Some initial 0-10 support including placeholders for new domains, use of execution layer for synchronising methods with no explicit responses and a new set of tests (mainly just copies of the 0-9 ones, but these will be altered as 0-10 support progresses). Added: incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt (with props) incubator/qpid/trunk/qpid/python/tests_0-10/ incubator/qpid/trunk/qpid/python/tests_0-10/__init__.py (with props) incubator/qpid/trunk/qpid/python/tests_0-10/alternate-exchange.py (with props) incubator/qpid/trunk/qpid/python/tests_0-10/basic.py (with props) incubator/qpid/trunk/qpid/python/tests_0-10/broker.py (with props) incubator/qpid/trunk/qpid/python/tests_0-10/dtx.py (with props) incubator/qpid/trunk/qpid/python/tests_0-10/example.py (with props) incubator/qpid/trunk/qpid/python/tests_0-10/exchange.py (with props) incubator/qpid/trunk/qpid/python/tests_0-10/execution.py (with props) incubator/qpid/trunk/qpid/python/tests_0-10/message.py (with props) incubator/qpid/trunk/qpid/python/tests_0-10/query.py (with props) incubator/qpid/trunk/qpid/python/tests_0-10/queue.py (with props) incubator/qpid/trunk/qpid/python/tests_0-10/testlib.py (with props) incubator/qpid/trunk/qpid/python/tests_0-10/tx.py (with props) Modified: incubator/qpid/trunk/qpid/python/qpid/codec.py incubator/qpid/trunk/qpid/python/qpid/peer.py incubator/qpid/trunk/qpid/python/qpid/spec.py incubator/qpid/trunk/qpid/python/qpid/testlib.py incubator/qpid/trunk/qpid/python/tests/codec.py incubator/qpid/trunk/qpid/python/tests_0-9/broker.py Added: incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt?view=auto&rev=559055 ============================================================================== --- incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt (added) +++ incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt Tue Jul 24 07:08:32 2007 @@ -0,0 +1,4 @@ +tests_0-10.message.MessageTests.test_checkpoint +tests_0-10.message.MessageTests.test_reject +tests_0-10.basic.BasicTests.test_get + Propchange: incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt ------------------------------------------------------------------------------ svn:mime-type = text/plain Modified: incubator/qpid/trunk/qpid/python/qpid/codec.py URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/codec.py?view=diff&rev=559055&r1=559054&r2=559055 ============================================================================== --- incubator/qpid/trunk/qpid/python/qpid/codec.py (original) +++ incubator/qpid/trunk/qpid/python/qpid/codec.py Tue Jul 24 07:08:32 2007 @@ -258,17 +258,9 @@ codec = Codec(enc) if tbl: for key, value in tbl.items(): - # Field names MUST start with a letter, '$' or '#' and may - # continue with letters, '$' or '#', digits, or underlines, to - # a maximum length of 128 characters. - if len(key) > 128: raise ValueError("field table key too long: '%s'" % key) - m = Codec.KEY_CHECK.match(key) - if m == None or m.end() != len(key): - raise ValueError("invalid field table key: '%s'" % key) - codec.encode_shortstr(key) if isinstance(value, basestring): codec.write("S") @@ -338,3 +330,25 @@ return self.decode_longstr() else: return ReferenceId(self.decode_longstr()) + + # new domains for 0-10: + + def encode_uuid(self, s): + self.encode_longstr(s) + + def decode_uuid(self): + return self.decode_longstr() + + def encode_rfc1982_long(self, s): + self.encode_long(s) + + def decode_rfc1982_long(self): + return self.decode_long() + + #Not done yet + def encode_rfc1982_long_set(self, s): + self.encode_short(0) + + def decode_rfc1982_long_set(self): + self.decode_short() + return 0; Modified: incubator/qpid/trunk/qpid/python/qpid/peer.py URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/peer.py?view=diff&rev=559055&r1=559054&r2=559055 ============================================================================== --- incubator/qpid/trunk/qpid/python/qpid/peer.py (original) +++ incubator/qpid/trunk/qpid/python/qpid/peer.py Tue Jul 24 07:08:32 2007 @@ -189,7 +189,9 @@ self.completion = ExecutionCompletion() # Use reliable framing if version == 0-9. - self.reliable = (spec.major == 0 and spec.minor == 9) + # (also for 0-10 while transitioning...) + self.reliable = (spec.major == 0 and (spec.minor == 9 or spec.minor == 10)) + self.use_execution_layer = (spec.major == 0 and spec.minor == 10) self.synchronous = True def close(self, reason): @@ -199,6 +201,7 @@ self.reason = reason self.incoming.close() self.responses.close() + self.completion.close() def write(self, frame, content = None): if self.closed: @@ -261,6 +264,11 @@ self.request(frame, self.queue_response, content) if not frame.method.responses: + if self.use_execution_layer and type.klass.name != "execution": + self.execution_flush() + self.completion.wait() + if self.closed: + raise Closed(self.reason) return None try: resp = self.responses.get() @@ -352,6 +360,9 @@ #the following test is a hack until the track/sub-channel is available if method.klass.name != "execution": self.command_id = self.sequence.next() + + def close(self): + self.completed.set() def complete(self, mark): self.mark = mark Modified: incubator/qpid/trunk/qpid/python/qpid/spec.py URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/spec.py?view=diff&rev=559055&r1=559054&r2=559055 ============================================================================== --- incubator/qpid/trunk/qpid/python/qpid/spec.py (original) +++ incubator/qpid/trunk/qpid/python/qpid/spec.py Tue Jul 24 07:08:32 2007 @@ -237,7 +237,11 @@ "long": 0, "longlong": 0, "timestamp": 0, - "content": None} + "content": None, + "uuid": "", + "rfc1982_long": 0, + "rfc1982_long_set": 0 + } def define_method(self, name): g = {Method.METHOD: self} @@ -306,7 +310,9 @@ for root in [spec_root] + map(lambda x: mllib.xml_parse(x)["amqp"], errata): # constants for nd in root.query["constant"]: - const = Constant(spec, pythonize(nd["@name"]), int(nd["@value"]), + val = nd["@value"] + if val.startswith("0x"): continue + const = Constant(spec, pythonize(nd["@name"]), int(val), nd["@class"], get_docs(nd)) try: spec.constants.add(const) Modified: incubator/qpid/trunk/qpid/python/qpid/testlib.py URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/testlib.py?view=diff&rev=559055&r1=559054&r2=559055 ============================================================================== --- incubator/qpid/trunk/qpid/python/qpid/testlib.py (original) +++ incubator/qpid/trunk/qpid/python/qpid/testlib.py Tue Jul 24 07:08:32 2007 @@ -125,6 +125,8 @@ self.tests=findmodules("tests") if self.use08spec(): self.tests+=findmodules("tests_0-8") + elif self.spec.major == 0 and self.spec.minor == 10: + self.tests+=findmodules("tests_0-10") else: self.tests+=findmodules("tests_0-9") @@ -214,10 +216,10 @@ def exchange_declare(self, channel=None, ticket=0, exchange='', type='', passive=False, durable=False, - auto_delete=False, internal=False, nowait=False, + auto_delete=False, arguments={}): channel = channel or self.channel - reply = channel.exchange_declare(ticket, exchange, type, passive, durable, auto_delete, internal, nowait, arguments) + reply = channel.exchange_declare(ticket=ticket, exchange=exchange, type=type, passive=passive,durable=durable, auto_delete=auto_delete, arguments=arguments) self.exchanges.append((channel,exchange)) return reply Modified: incubator/qpid/trunk/qpid/python/tests/codec.py URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests/codec.py?view=diff&rev=559055&r1=559054&r2=559055 ============================================================================== --- incubator/qpid/trunk/qpid/python/tests/codec.py (original) +++ incubator/qpid/trunk/qpid/python/tests/codec.py Tue Jul 24 07:08:32 2007 @@ -432,13 +432,6 @@ """ self.failUnlessEqual(self.callFunc('encode_table', {'$key1':'value1'}), '\x00\x00\x00\x11\x05$key1S\x00\x00\x00\x06value1', 'valid name value pair encoding FAILED...') - # ------------------------------------------- - def test_field_table_invalid_field_name(self): - """ - invalid field name - """ - self.failUnlessRaises(Exception, self.codec.encode_table, {'1key1':'value1'}) - # ---------------------------------------------------- def test_field_table_invalid_field_name_length(self): """ Added: incubator/qpid/trunk/qpid/python/tests_0-10/__init__.py URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/__init__.py?view=auto&rev=559055 ============================================================================== --- incubator/qpid/trunk/qpid/python/tests_0-10/__init__.py (added) +++ incubator/qpid/trunk/qpid/python/tests_0-10/__init__.py Tue Jul 24 07:08:32 2007 @@ -0,0 +1,20 @@ +# Do not delete - marks this directory as a python package. + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# Propchange: incubator/qpid/trunk/qpid/python/tests_0-10/__init__.py ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/qpid/trunk/qpid/python/tests_0-10/alternate-exchange.py URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/alternate-exchange.py?view=auto&rev=559055 ============================================================================== --- incubator/qpid/trunk/qpid/python/tests_0-10/alternate-exchange.py (added) +++ incubator/qpid/trunk/qpid/python/tests_0-10/alternate-exchange.py Tue Jul 24 07:08:32 2007 @@ -0,0 +1,178 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +from qpid.client import Client, Closed +from qpid.queue import Empty +from qpid.content import Content +from qpid.testlib import testrunner, TestBase + +class AlternateExchangeTests(TestBase): + """ + Tests for the new mechanism for message returns introduced in 0-10 + and available in 0-9 for preview + """ + + def test_unroutable(self): + """ + Test that unroutable messages are delivered to the alternate-exchange if specified + """ + channel = self.channel + #create an exchange with an alternate defined + channel.exchange_declare(exchange="secondary", type="fanout") + channel.exchange_declare(exchange="primary", type="direct", alternate_exchange="secondary") + + #declare, bind (to the alternate exchange) and consume from a queue for 'returned' messages + channel.queue_declare(queue="returns", exclusive=True) + channel.queue_bind(queue="returns", exchange="secondary") + channel.message_consume(destination="a", queue="returns") + returned = self.client.queue("a") + + #declare, bind (to the primary exchange) and consume from a queue for 'processed' messages + channel.queue_declare(queue="processed", exclusive=True) + channel.queue_bind(queue="processed", exchange="primary", routing_key="my-key") + channel.message_consume(destination="b", queue="processed") + processed = self.client.queue("b") + + #publish to the primary exchange + #...one message that makes it to the 'processed' queue: + channel.message_transfer(destination="primary", routing_key="my-key", body="Good") + #...and one that does not: + channel.message_transfer(destination="primary", routing_key="unused-key", body="Bad") + + #delete the exchanges + channel.exchange_delete(exchange="primary") + channel.exchange_delete(exchange="secondary") + + #verify behaviour + self.assertEqual("Good", processed.get(timeout=1).body) + self.assertEqual("Bad", returned.get(timeout=1).body) + self.assertEmpty(processed) + self.assertEmpty(returned) + + def test_queue_delete(self): + """ + Test that messages in a queue being deleted are delivered to the alternate-exchange if specified + """ + channel = self.channel + #set up a 'dead letter queue': + channel.exchange_declare(exchange="dlq", type="fanout") + channel.queue_declare(queue="deleted", exclusive=True) + channel.queue_bind(exchange="dlq", queue="deleted") + channel.message_consume(destination="dlq", queue="deleted") + dlq = self.client.queue("dlq") + + #create a queue using the dlq as its alternate exchange: + channel.queue_declare(queue="delete-me", alternate_exchange="dlq") + #send it some messages: + channel.message_transfer(routing_key="delete-me", body="One") + channel.message_transfer(routing_key="delete-me", body="Two") + channel.message_transfer(routing_key="delete-me", body="Three") + #delete it: + channel.queue_delete(queue="delete-me") + #delete the dlq exchange: + channel.exchange_delete(exchange="dlq") + + #check the messages were delivered to the dlq: + self.assertEqual("One", dlq.get(timeout=1).body) + self.assertEqual("Two", dlq.get(timeout=1).body) + self.assertEqual("Three", dlq.get(timeout=1).body) + self.assertEmpty(dlq) + + + def test_immediate(self): + """ + Test that messages in a queue being deleted are delivered to the alternate-exchange if specified + """ + channel = self.channel + #set up a 'dead letter queue': + channel.exchange_declare(exchange="dlq", type="fanout") + channel.queue_declare(queue="immediate", exclusive=True) + channel.queue_bind(exchange="dlq", queue="immediate") + channel.message_consume(destination="dlq", queue="immediate") + dlq = self.client.queue("dlq") + + #create a queue using the dlq as its alternate exchange: + channel.queue_declare(queue="no-consumers", alternate_exchange="dlq", exclusive=True) + #send it some messages: + channel.message_transfer(routing_key="no-consumers", body="no one wants me", immediate=True) + + #check the messages were delivered to the dlq: + self.assertEqual("no one wants me", dlq.get(timeout=1).body) + self.assertEmpty(dlq) + + #cleanup: + channel.queue_delete(queue="no-consumers") + channel.exchange_delete(exchange="dlq") + + + def test_delete_while_used_by_queue(self): + """ + Ensure an exchange still in use as an alternate-exchange for a + queue can't be deleted + """ + channel = self.channel + channel.exchange_declare(exchange="alternate", type="fanout") + channel.queue_declare(queue="q", exclusive=True, alternate_exchange="alternate") + try: + channel.exchange_delete(exchange="alternate") + self.fail("Expected deletion of in-use alternate-exchange to fail") + except Closed, e: + #cleanup: + other = self.connect() + channel = other.channel(1) + channel.channel_open() + channel.exchange_delete(exchange="alternate") + channel.channel_close(200, "ok") + other.close() + + self.assertConnectionException(530, e.args[0]) + + + + def test_delete_while_used_by_exchange(self): + """ + Ensure an exchange still in use as an alternate-exchange for + another exchange can't be deleted + """ + channel = self.channel + channel.exchange_declare(exchange="alternate", type="fanout") + channel.exchange_declare(exchange="e", type="fanout", alternate_exchange="alternate") + try: + channel.exchange_delete(exchange="alternate") + #cleanup: + channel.exchange_delete(exchange="e") + self.fail("Expected deletion of in-use alternate-exchange to fail") + except Closed, e: + #cleanup: + other = self.connect() + channel = other.channel(1) + channel.channel_open() + channel.exchange_delete(exchange="e") + channel.exchange_delete(exchange="alternate") + channel.channel_close(200, "ok") + other.close() + + self.assertConnectionException(530, e.args[0]) + + + def assertEmpty(self, queue): + try: + msg = queue.get(timeout=1) + self.fail("Queue not empty: " + msg) + except Empty: None + Propchange: incubator/qpid/trunk/qpid/python/tests_0-10/alternate-exchange.py ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/qpid/trunk/qpid/python/tests_0-10/basic.py URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/basic.py?view=auto&rev=559055 ============================================================================== --- incubator/qpid/trunk/qpid/python/tests_0-10/basic.py (added) +++ incubator/qpid/trunk/qpid/python/tests_0-10/basic.py Tue Jul 24 07:08:32 2007 @@ -0,0 +1,396 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +from qpid.client import Client, Closed +from qpid.queue import Empty +from qpid.content import Content +from qpid.testlib import testrunner, TestBase + +class BasicTests(TestBase): + """Tests for 'methods' on the amqp basic 'class'""" + + def test_consume_no_local(self): + """ + Test that the no_local flag is honoured in the consume method + """ + channel = self.channel + #setup, declare two queues: + channel.queue_declare(queue="test-queue-1a", exclusive=True) + channel.queue_declare(queue="test-queue-1b", exclusive=True) + #establish two consumers one of which excludes delivery of locally sent messages + channel.basic_consume(consumer_tag="local_included", queue="test-queue-1a") + channel.basic_consume(consumer_tag="local_excluded", queue="test-queue-1b", no_local=True) + + #send a message + channel.basic_publish(routing_key="test-queue-1a", content=Content("consume_no_local")) + channel.basic_publish(routing_key="test-queue-1b", content=Content("consume_no_local")) + + #check the queues of the two consumers + excluded = self.client.queue("local_excluded") + included = self.client.queue("local_included") + msg = included.get(timeout=1) + self.assertEqual("consume_no_local", msg.content.body) + try: + excluded.get(timeout=1) + self.fail("Received locally published message though no_local=true") + except Empty: None + + + def test_consume_exclusive(self): + """ + Test that the exclusive flag is honoured in the consume method + """ + channel = self.channel + #setup, declare a queue: + channel.queue_declare(queue="test-queue-2", exclusive=True) + + #check that an exclusive consumer prevents other consumer being created: + channel.basic_consume(consumer_tag="first", queue="test-queue-2", exclusive=True) + try: + channel.basic_consume(consumer_tag="second", queue="test-queue-2") + self.fail("Expected consume request to fail due to previous exclusive consumer") + except Closed, e: + self.assertChannelException(403, e.args[0]) + + #open new channel and cleanup last consumer: + channel = self.client.channel(2) + channel.channel_open() + + #check that an exclusive consumer cannot be created if a consumer already exists: + channel.basic_consume(consumer_tag="first", queue="test-queue-2") + try: + channel.basic_consume(consumer_tag="second", queue="test-queue-2", exclusive=True) + self.fail("Expected exclusive consume request to fail due to previous consumer") + except Closed, e: + self.assertChannelException(403, e.args[0]) + + def test_consume_queue_errors(self): + """ + Test error conditions associated with the queue field of the consume method: + """ + channel = self.channel + try: + #queue specified but doesn't exist: + channel.basic_consume(queue="invalid-queue") + self.fail("Expected failure when consuming from non-existent queue") + except Closed, e: + self.assertChannelException(404, e.args[0]) + + channel = self.client.channel(2) + channel.channel_open() + try: + #queue not specified and none previously declared for channel: + channel.basic_consume(queue="") + self.fail("Expected failure when consuming from unspecified queue") + except Closed, e: + self.assertConnectionException(530, e.args[0]) + + def test_consume_unique_consumers(self): + """ + Ensure unique consumer tags are enforced + """ + channel = self.channel + #setup, declare a queue: + channel.queue_declare(queue="test-queue-3", exclusive=True) + + #check that attempts to use duplicate tags are detected and prevented: + channel.basic_consume(consumer_tag="first", queue="test-queue-3") + try: + channel.basic_consume(consumer_tag="first", queue="test-queue-3") + self.fail("Expected consume request to fail due to non-unique tag") + except Closed, e: + self.assertConnectionException(530, e.args[0]) + + def test_cancel(self): + """ + Test compliance of the basic.cancel method + """ + channel = self.channel + #setup, declare a queue: + channel.queue_declare(queue="test-queue-4", exclusive=True) + channel.basic_consume(consumer_tag="my-consumer", queue="test-queue-4") + channel.basic_publish(routing_key="test-queue-4", content=Content("One")) + + myqueue = self.client.queue("my-consumer") + msg = myqueue.get(timeout=1) + self.assertEqual("One", msg.content.body) + + #cancel should stop messages being delivered + channel.basic_cancel(consumer_tag="my-consumer") + channel.basic_publish(routing_key="test-queue-4", content=Content("Two")) + try: + msg = myqueue.get(timeout=1) + self.fail("Got message after cancellation: " + msg) + except Empty: None + + #cancellation of non-existant consumers should be handled without error + channel.basic_cancel(consumer_tag="my-consumer") + channel.basic_cancel(consumer_tag="this-never-existed") + + + def test_ack(self): + """ + Test basic ack/recover behaviour + """ + channel = self.channel + channel.queue_declare(queue="test-ack-queue", exclusive=True) + + reply = channel.basic_consume(queue="test-ack-queue", no_ack=False) + queue = self.client.queue(reply.consumer_tag) + + channel.basic_publish(routing_key="test-ack-queue", content=Content("One")) + channel.basic_publish(routing_key="test-ack-queue", content=Content("Two")) + channel.basic_publish(routing_key="test-ack-queue", content=Content("Three")) + channel.basic_publish(routing_key="test-ack-queue", content=Content("Four")) + channel.basic_publish(routing_key="test-ack-queue", content=Content("Five")) + + msg1 = queue.get(timeout=1) + msg2 = queue.get(timeout=1) + msg3 = queue.get(timeout=1) + msg4 = queue.get(timeout=1) + msg5 = queue.get(timeout=1) + + self.assertEqual("One", msg1.content.body) + self.assertEqual("Two", msg2.content.body) + self.assertEqual("Three", msg3.content.body) + self.assertEqual("Four", msg4.content.body) + self.assertEqual("Five", msg5.content.body) + + channel.basic_ack(delivery_tag=msg2.delivery_tag, multiple=True) #One & Two + channel.basic_ack(delivery_tag=msg4.delivery_tag, multiple=False) #Four + + channel.basic_recover(requeue=False) + + msg3b = queue.get(timeout=1) + msg5b = queue.get(timeout=1) + + self.assertEqual("Three", msg3b.content.body) + self.assertEqual("Five", msg5b.content.body) + + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected message: " + extra.content.body) + except Empty: None + + def test_recover_requeue(self): + """ + Test requeing on recovery + """ + channel = self.channel + channel.queue_declare(queue="test-requeue", exclusive=True) + + subscription = channel.basic_consume(queue="test-requeue", no_ack=False) + queue = self.client.queue(subscription.consumer_tag) + + channel.basic_publish(routing_key="test-requeue", content=Content("One")) + channel.basic_publish(routing_key="test-requeue", content=Content("Two")) + channel.basic_publish(routing_key="test-requeue", content=Content("Three")) + channel.basic_publish(routing_key="test-requeue", content=Content("Four")) + channel.basic_publish(routing_key="test-requeue", content=Content("Five")) + + msg1 = queue.get(timeout=1) + msg2 = queue.get(timeout=1) + msg3 = queue.get(timeout=1) + msg4 = queue.get(timeout=1) + msg5 = queue.get(timeout=1) + + self.assertEqual("One", msg1.content.body) + self.assertEqual("Two", msg2.content.body) + self.assertEqual("Three", msg3.content.body) + self.assertEqual("Four", msg4.content.body) + self.assertEqual("Five", msg5.content.body) + + channel.basic_ack(delivery_tag=msg2.delivery_tag, multiple=True) #One & Two + channel.basic_ack(delivery_tag=msg4.delivery_tag, multiple=False) #Four + + channel.basic_cancel(consumer_tag=subscription.consumer_tag) + + channel.basic_recover(requeue=True) + + subscription2 = channel.basic_consume(queue="test-requeue") + queue2 = self.client.queue(subscription2.consumer_tag) + + msg3b = queue2.get(timeout=1) + msg5b = queue2.get(timeout=1) + + self.assertEqual("Three", msg3b.content.body) + self.assertEqual("Five", msg5b.content.body) + + self.assertEqual(True, msg3b.redelivered) + self.assertEqual(True, msg5b.redelivered) + + try: + extra = queue2.get(timeout=1) + self.fail("Got unexpected message in second queue: " + extra.content.body) + except Empty: None + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected message in original queue: " + extra.content.body) + except Empty: None + + + def test_qos_prefetch_count(self): + """ + Test that the prefetch count specified is honoured + """ + #setup: declare queue and subscribe + channel = self.channel + channel.queue_declare(queue="test-prefetch-count", exclusive=True) + subscription = channel.basic_consume(queue="test-prefetch-count", no_ack=False) + queue = self.client.queue(subscription.consumer_tag) + + #set prefetch to 5: + channel.basic_qos(prefetch_count=5) + + #publish 10 messages: + for i in range(1, 11): + channel.basic_publish(routing_key="test-prefetch-count", content=Content("Message %d" % i)) + + #only 5 messages should have been delivered: + for i in range(1, 6): + msg = queue.get(timeout=1) + self.assertEqual("Message %d" % i, msg.content.body) + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected 6th message in original queue: " + extra.content.body) + except Empty: None + + #ack messages and check that the next set arrive ok: + channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True) + + for i in range(6, 11): + msg = queue.get(timeout=1) + self.assertEqual("Message %d" % i, msg.content.body) + + channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True) + + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected 11th message in original queue: " + extra.content.body) + except Empty: None + + + + def test_qos_prefetch_size(self): + """ + Test that the prefetch size specified is honoured + """ + #setup: declare queue and subscribe + channel = self.channel + channel.queue_declare(queue="test-prefetch-size", exclusive=True) + subscription = channel.basic_consume(queue="test-prefetch-size", no_ack=False) + queue = self.client.queue(subscription.consumer_tag) + + #set prefetch to 50 bytes (each message is 9 or 10 bytes): + channel.basic_qos(prefetch_size=50) + + #publish 10 messages: + for i in range(1, 11): + channel.basic_publish(routing_key="test-prefetch-size", content=Content("Message %d" % i)) + + #only 5 messages should have been delivered (i.e. 45 bytes worth): + for i in range(1, 6): + msg = queue.get(timeout=1) + self.assertEqual("Message %d" % i, msg.content.body) + + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected 6th message in original queue: " + extra.content.body) + except Empty: None + + #ack messages and check that the next set arrive ok: + channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True) + + for i in range(6, 11): + msg = queue.get(timeout=1) + self.assertEqual("Message %d" % i, msg.content.body) + + channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True) + + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected 11th message in original queue: " + extra.content.body) + except Empty: None + + #make sure that a single oversized message still gets delivered + large = "abcdefghijklmnopqrstuvwxyz" + large = large + "-" + large; + channel.basic_publish(routing_key="test-prefetch-size", content=Content(large)) + msg = queue.get(timeout=1) + self.assertEqual(large, msg.content.body) + + def test_get(self): + """ + Test basic_get method + """ + channel = self.channel + channel.queue_declare(queue="test-get", exclusive=True) + + #publish some messages (no_ack=True) + for i in range(1, 11): + channel.basic_publish(routing_key="test-get", content=Content("Message %d" % i)) + + #use basic_get to read back the messages, and check that we get an empty at the end + for i in range(1, 11): + reply = channel.basic_get(no_ack=True) + self.assertEqual(reply.method.klass.name, "basic") + self.assertEqual(reply.method.name, "get_ok") + self.assertEqual("Message %d" % i, reply.content.body) + + reply = channel.basic_get(no_ack=True) + self.assertEqual(reply.method.klass.name, "basic") + self.assertEqual(reply.method.name, "get_empty") + + #repeat for no_ack=False + for i in range(11, 21): + channel.basic_publish(routing_key="test-get", content=Content("Message %d" % i)) + + for i in range(11, 21): + reply = channel.basic_get(no_ack=False) + self.assertEqual(reply.method.klass.name, "basic") + self.assertEqual(reply.method.name, "get_ok") + self.assertEqual("Message %d" % i, reply.content.body) + if(i == 13): + channel.basic_ack(delivery_tag=reply.delivery_tag, multiple=True) + if(i in [15, 17, 19]): + channel.basic_ack(delivery_tag=reply.delivery_tag) + + reply = channel.basic_get(no_ack=True) + self.assertEqual(reply.method.klass.name, "basic") + self.assertEqual(reply.method.name, "get_empty") + + #recover(requeue=True) + channel.basic_recover(requeue=True) + + #get the unacked messages again (14, 16, 18, 20) + for i in [14, 16, 18, 20]: + reply = channel.basic_get(no_ack=False) + self.assertEqual(reply.method.klass.name, "basic") + self.assertEqual(reply.method.name, "get_ok") + self.assertEqual("Message %d" % i, reply.content.body) + channel.basic_ack(delivery_tag=reply.delivery_tag) + + reply = channel.basic_get(no_ack=True) + self.assertEqual(reply.method.klass.name, "basic") + self.assertEqual(reply.method.name, "get_empty") + + channel.basic_recover(requeue=True) + + reply = channel.basic_get(no_ack=True) + self.assertEqual(reply.method.klass.name, "basic") + self.assertEqual(reply.method.name, "get_empty") Propchange: incubator/qpid/trunk/qpid/python/tests_0-10/basic.py ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/qpid/trunk/qpid/python/tests_0-10/broker.py URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/broker.py?view=auto&rev=559055 ============================================================================== --- incubator/qpid/trunk/qpid/python/tests_0-10/broker.py (added) +++ incubator/qpid/trunk/qpid/python/tests_0-10/broker.py Tue Jul 24 07:08:32 2007 @@ -0,0 +1,126 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +from qpid.client import Closed +from qpid.queue import Empty +from qpid.content import Content +from qpid.testlib import testrunner, TestBase + +class BrokerTests(TestBase): + """Tests for basic Broker functionality""" + + def test_ack_and_no_ack(self): + """ + First, this test tries to receive a message with a no-ack + consumer. Second, this test tries to explicitly receive and + acknowledge a message with an acknowledging consumer. + """ + ch = self.channel + self.queue_declare(ch, queue = "myqueue") + + # No ack consumer + ctag = "tag1" + ch.message_consume(queue = "myqueue", destination = ctag, no_ack = True) + body = "test no-ack" + ch.message_transfer(routing_key = "myqueue", body = body) + msg = self.client.queue(ctag).get(timeout = 5) + self.assert_(msg.body == body) + + # Acknowledging consumer + self.queue_declare(ch, queue = "otherqueue") + ctag = "tag2" + ch.message_consume(queue = "otherqueue", destination = ctag, no_ack = False) + body = "test ack" + ch.message_transfer(routing_key = "otherqueue", body = body) + msg = self.client.queue(ctag).get(timeout = 5) + msg.ok() + self.assert_(msg.body == body) + + def test_simple_delivery_immediate(self): + """ + Test simple message delivery where consume is issued before publish + """ + channel = self.channel + self.exchange_declare(channel, exchange="test-exchange", type="direct") + self.queue_declare(channel, queue="test-queue") + channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key") + consumer_tag = "tag1" + channel.message_consume(queue="test-queue", destination=consumer_tag, no_ack=True) + queue = self.client.queue(consumer_tag) + + body = "Immediate Delivery" + channel.message_transfer(destination="test-exchange", routing_key="key", body=body, immediate=True) + msg = queue.get(timeout=5) + self.assert_(msg.body == body) + + # TODO: Ensure we fail if immediate=True and there's no consumer. + + + def test_simple_delivery_queued(self): + """ + Test basic message delivery where publish is issued before consume + (i.e. requires queueing of the message) + """ + channel = self.channel + self.exchange_declare(channel, exchange="test-exchange", type="direct") + self.queue_declare(channel, queue="test-queue") + channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key") + body = "Queued Delivery" + channel.message_transfer(destination="test-exchange", routing_key="key", body=body) + + consumer_tag = "tag1" + channel.message_consume(queue="test-queue", destination=consumer_tag, no_ack=True) + queue = self.client.queue(consumer_tag) + msg = queue.get(timeout=5) + self.assert_(msg.body == body) + + def test_invalid_channel(self): + channel = self.client.channel(200) + try: + channel.queue_declare(exclusive=True) + self.fail("Expected error on queue_declare for invalid channel") + except Closed, e: + self.assertConnectionException(504, e.args[0]) + + def test_closed_channel(self): + channel = self.client.channel(200) + channel.channel_open() + channel.channel_close() + try: + channel.queue_declare(exclusive=True) + self.fail("Expected error on queue_declare for closed channel") + except Closed, e: + if isinstance(e.args[0], str): self.fail(e) + self.assertConnectionException(504, e.args[0]) + + def test_channel_flow(self): + channel = self.channel + channel.queue_declare(queue="flow_test_queue", exclusive=True) + channel.message_consume(destination="my-tag", queue="flow_test_queue") + incoming = self.client.queue("my-tag") + + channel.channel_flow(active=False) + channel.message_transfer(routing_key="flow_test_queue", body="abcdefghijklmnopqrstuvwxyz") + try: + incoming.get(timeout=1) + self.fail("Received message when flow turned off.") + except Empty: None + + channel.channel_flow(active=True) + msg = incoming.get(timeout=1) + self.assertEqual("abcdefghijklmnopqrstuvwxyz", msg.body) Propchange: incubator/qpid/trunk/qpid/python/tests_0-10/broker.py ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/qpid/trunk/qpid/python/tests_0-10/dtx.py URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/dtx.py?view=auto&rev=559055 ============================================================================== --- incubator/qpid/trunk/qpid/python/tests_0-10/dtx.py (added) +++ incubator/qpid/trunk/qpid/python/tests_0-10/dtx.py Tue Jul 24 07:08:32 2007 @@ -0,0 +1,587 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +from qpid.client import Client, Closed +from qpid.queue import Empty +from qpid.content import Content +from qpid.testlib import testrunner, TestBase +from struct import pack, unpack +from time import sleep + +class DtxTests(TestBase): + """ + Tests for the amqp dtx related classes. + + Tests of the form test_simple_xxx test the basic transactional + behaviour. The approach here is to 'swap' a message from one queue + to another by consuming and re-publishing in the same + transaction. That transaction is then completed in different ways + and the appropriate result verified. + + The other tests enforce more specific rules and behaviour on a + per-method or per-field basis. + """ + + XA_RBROLLBACK = 1 + XA_RBTIMEOUT = 2 + XA_OK = 8 + + def test_simple_commit(self): + """ + Test basic one-phase commit behaviour. + """ + channel = self.channel + tx = self.xid("my-xid") + self.txswap(tx, "commit") + + #neither queue should have any messages accessible + self.assertMessageCount(0, "queue-a") + self.assertMessageCount(0, "queue-b") + + #commit + self.assertEqual(self.XA_OK, channel.dtx_coordination_commit(xid=tx, one_phase=True).status) + + #check result + self.assertMessageCount(0, "queue-a") + self.assertMessageCount(1, "queue-b") + self.assertMessageId("commit", "queue-b") + + def test_simple_prepare_commit(self): + """ + Test basic two-phase commit behaviour. + """ + channel = self.channel + tx = self.xid("my-xid") + self.txswap(tx, "prepare-commit") + + #prepare + self.assertEqual(self.XA_OK, channel.dtx_coordination_prepare(xid=tx).status) + + #neither queue should have any messages accessible + self.assertMessageCount(0, "queue-a") + self.assertMessageCount(0, "queue-b") + + #commit + self.assertEqual(self.XA_OK, channel.dtx_coordination_commit(xid=tx, one_phase=False).status) + + #check result + self.assertMessageCount(0, "queue-a") + self.assertMessageCount(1, "queue-b") + self.assertMessageId("prepare-commit", "queue-b") + + + def test_simple_rollback(self): + """ + Test basic rollback behaviour. + """ + channel = self.channel + tx = self.xid("my-xid") + self.txswap(tx, "rollback") + + #neither queue should have any messages accessible + self.assertMessageCount(0, "queue-a") + self.assertMessageCount(0, "queue-b") + + #rollback + self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).status) + + #check result + self.assertMessageCount(1, "queue-a") + self.assertMessageCount(0, "queue-b") + self.assertMessageId("rollback", "queue-a") + + def test_simple_prepare_rollback(self): + """ + Test basic rollback behaviour after the transaction has been prepared. + """ + channel = self.channel + tx = self.xid("my-xid") + self.txswap(tx, "prepare-rollback") + + #prepare + self.assertEqual(self.XA_OK, channel.dtx_coordination_prepare(xid=tx).status) + + #neither queue should have any messages accessible + self.assertMessageCount(0, "queue-a") + self.assertMessageCount(0, "queue-b") + + #rollback + self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).status) + + #check result + self.assertMessageCount(1, "queue-a") + self.assertMessageCount(0, "queue-b") + self.assertMessageId("prepare-rollback", "queue-a") + + def test_select_required(self): + """ + check that an error is flagged if select is not issued before + start or end + """ + channel = self.channel + tx = self.xid("dummy") + try: + channel.dtx_demarcation_start(xid=tx) + + #if we get here we have failed, but need to do some cleanup: + channel.dtx_demarcation_end(xid=tx) + channel.dtx_coordination_rollback(xid=tx) + self.fail("Channel not selected for use with dtx, expected exception!") + except Closed, e: + self.assertConnectionException(503, e.args[0]) + + def test_start_already_known(self): + """ + Verify that an attempt to start an association with a + transaction that is already known is not allowed (unless the + join flag is set). + """ + #create two channels on different connection & select them for use with dtx: + channel1 = self.channel + channel1.dtx_demarcation_select() + + other = self.connect() + channel2 = other.channel(1) + channel2.channel_open() + channel2.dtx_demarcation_select() + + #create a xid + tx = self.xid("dummy") + #start work on one channel under that xid: + channel1.dtx_demarcation_start(xid=tx) + #then start on the other without the join set + failed = False + try: + channel2.dtx_demarcation_start(xid=tx) + except Closed, e: + failed = True + error = e + + #cleanup: + if not failed: + channel2.dtx_demarcation_end(xid=tx) + other.close() + channel1.dtx_demarcation_end(xid=tx) + channel1.dtx_coordination_rollback(xid=tx) + + #verification: + if failed: self.assertConnectionException(503, e.args[0]) + else: self.fail("Xid already known, expected exception!") + + def test_forget_xid_on_completion(self): + """ + Verify that a xid is 'forgotten' - and can therefore be used + again - once it is completed. + """ + channel = self.channel + #do some transactional work & complete the transaction + self.test_simple_commit() + + #start association for the same xid as the previously completed txn + tx = self.xid("my-xid") + channel.dtx_demarcation_start(xid=tx) + channel.dtx_demarcation_end(xid=tx) + channel.dtx_coordination_rollback(xid=tx) + + def test_start_join_and_resume(self): + """ + Ensure the correct error is signalled when both the join and + resume flags are set on starting an association between a + channel and a transcation. + """ + channel = self.channel + channel.dtx_demarcation_select() + tx = self.xid("dummy") + try: + channel.dtx_demarcation_start(xid=tx, join=True, resume=True) + #failed, but need some cleanup: + channel.dtx_demarcation_end(xid=tx) + channel.dtx_coordination_rollback(xid=tx) + self.fail("Join and resume both set, expected exception!") + except Closed, e: + self.assertConnectionException(503, e.args[0]) + + def test_start_join(self): + """ + Verify 'join' behaviour, where a channel is associated with a + transaction that is already associated with another channel. + """ + #create two channels & select them for use with dtx: + channel1 = self.channel + channel1.dtx_demarcation_select() + + channel2 = self.client.channel(2) + channel2.channel_open() + channel2.dtx_demarcation_select() + + #setup + channel1.queue_declare(queue="one", exclusive=True) + channel1.queue_declare(queue="two", exclusive=True) + channel1.message_transfer(routing_key="one", message_id="a", body="DtxMessage") + channel1.message_transfer(routing_key="two", message_id="b", body="DtxMessage") + + #create a xid + tx = self.xid("dummy") + #start work on one channel under that xid: + channel1.dtx_demarcation_start(xid=tx) + #then start on the other with the join flag set + channel2.dtx_demarcation_start(xid=tx, join=True) + + #do work through each channel + self.swap(channel1, "one", "two")#swap 'a' from 'one' to 'two' + self.swap(channel2, "two", "one")#swap 'b' from 'two' to 'one' + + #mark end on both channels + channel1.dtx_demarcation_end(xid=tx) + channel2.dtx_demarcation_end(xid=tx) + + #commit and check + channel1.dtx_coordination_commit(xid=tx, one_phase=True) + self.assertMessageCount(1, "one") + self.assertMessageCount(1, "two") + self.assertMessageId("a", "two") + self.assertMessageId("b", "one") + + + def test_suspend_resume(self): + """ + Test suspension and resumption of an association + """ + channel = self.channel + channel.dtx_demarcation_select() + + #setup + channel.queue_declare(queue="one", exclusive=True) + channel.queue_declare(queue="two", exclusive=True) + channel.message_transfer(routing_key="one", message_id="a", body="DtxMessage") + channel.message_transfer(routing_key="two", message_id="b", body="DtxMessage") + + tx = self.xid("dummy") + + channel.dtx_demarcation_start(xid=tx) + self.swap(channel, "one", "two")#swap 'a' from 'one' to 'two' + channel.dtx_demarcation_end(xid=tx, suspend=True) + + channel.dtx_demarcation_start(xid=tx, resume=True) + self.swap(channel, "two", "one")#swap 'b' from 'two' to 'one' + channel.dtx_demarcation_end(xid=tx) + + #commit and check + channel.dtx_coordination_commit(xid=tx, one_phase=True) + self.assertMessageCount(1, "one") + self.assertMessageCount(1, "two") + self.assertMessageId("a", "two") + self.assertMessageId("b", "one") + + def test_end_suspend_and_fail(self): + """ + Verify that the correct error is signalled if the suspend and + fail flag are both set when disassociating a transaction from + the channel + """ + channel = self.channel + channel.dtx_demarcation_select() + tx = self.xid("suspend_and_fail") + channel.dtx_demarcation_start(xid=tx) + try: + channel.dtx_demarcation_end(xid=tx, suspend=True, fail=True) + self.fail("Suspend and fail both set, expected exception!") + except Closed, e: + self.assertConnectionException(503, e.args[0]) + + #cleanup + other = self.connect() + channel = other.channel(1) + channel.channel_open() + channel.dtx_coordination_rollback(xid=tx) + channel.channel_close() + other.close() + + + def test_end_unknown_xid(self): + """ + Verifies that the correct exception is thrown when an attempt + is made to end the association for a xid not previously + associated with the channel + """ + channel = self.channel + channel.dtx_demarcation_select() + tx = self.xid("unknown-xid") + try: + channel.dtx_demarcation_end(xid=tx) + self.fail("Attempted to end association with unknown xid, expected exception!") + except Closed, e: + #FYI: this is currently *not* the exception specified, but I think the spec is wrong! Confirming... + self.assertConnectionException(503, e.args[0]) + + def test_end(self): + """ + Verify that the association is terminated by end and subsequent + operations are non-transactional + """ + channel = self.client.channel(2) + channel.channel_open() + channel.queue_declare(queue="tx-queue", exclusive=True) + + #publish a message under a transaction + channel.dtx_demarcation_select() + tx = self.xid("dummy") + channel.dtx_demarcation_start(xid=tx) + channel.message_transfer(routing_key="tx-queue", message_id="one", body="DtxMessage") + channel.dtx_demarcation_end(xid=tx) + + #now that association with txn is ended, publish another message + channel.message_transfer(routing_key="tx-queue", message_id="two", body="DtxMessage") + + #check the second message is available, but not the first + self.assertMessageCount(1, "tx-queue") + channel.message_consume(queue="tx-queue", destination="results", no_ack=False) + msg = self.client.queue("results").get(timeout=1) + self.assertEqual("two", msg.message_id) + channel.message_cancel(destination="results") + #ack the message then close the channel + msg.ok() + channel.channel_close() + + channel = self.channel + #commit the transaction and check that the first message (and + #only the first message) is then delivered + channel.dtx_coordination_commit(xid=tx, one_phase=True) + self.assertMessageCount(1, "tx-queue") + self.assertMessageId("one", "tx-queue") + + def test_invalid_commit_one_phase_true(self): + """ + Test that a commit with one_phase = True is rejected if the + transaction in question has already been prepared. + """ + other = self.connect() + tester = other.channel(1) + tester.channel_open() + tester.queue_declare(queue="dummy", exclusive=True) + tester.dtx_demarcation_select() + tx = self.xid("dummy") + tester.dtx_demarcation_start(xid=tx) + tester.message_transfer(routing_key="dummy", body="whatever") + tester.dtx_demarcation_end(xid=tx) + tester.dtx_coordination_prepare(xid=tx) + failed = False + try: + tester.dtx_coordination_commit(xid=tx, one_phase=True) + except Closed, e: + failed = True + error = e + + if failed: + self.channel.dtx_coordination_rollback(xid=tx) + self.assertConnectionException(503, e.args[0]) + else: + tester.channel_close() + other.close() + self.fail("Invalid use of one_phase=True, expected exception!") + + def test_invalid_commit_one_phase_false(self): + """ + Test that a commit with one_phase = False is rejected if the + transaction in question has not yet been prepared. + """ + """ + Test that a commit with one_phase = True is rejected if the + transaction in question has already been prepared. + """ + other = self.connect() + tester = other.channel(1) + tester.channel_open() + tester.queue_declare(queue="dummy", exclusive=True) + tester.dtx_demarcation_select() + tx = self.xid("dummy") + tester.dtx_demarcation_start(xid=tx) + tester.message_transfer(routing_key="dummy", body="whatever") + tester.dtx_demarcation_end(xid=tx) + failed = False + try: + tester.dtx_coordination_commit(xid=tx, one_phase=False) + except Closed, e: + failed = True + error = e + + if failed: + self.channel.dtx_coordination_rollback(xid=tx) + self.assertConnectionException(503, e.args[0]) + else: + tester.channel_close() + other.close() + self.fail("Invalid use of one_phase=False, expected exception!") + + def test_implicit_end(self): + """ + Test that an association is implicitly ended when the channel + is closed (whether by exception or explicit client request) + and the transaction in question is marked as rollback only. + """ + channel1 = self.channel + channel2 = self.client.channel(2) + channel2.channel_open() + + #setup: + channel2.queue_declare(queue="dummy", exclusive=True) + channel2.message_transfer(routing_key="dummy", body="whatever") + tx = self.xid("dummy") + + channel2.dtx_demarcation_select() + channel2.dtx_demarcation_start(xid=tx) + channel2.message_get(queue="dummy", destination="dummy") + self.client.queue("dummy").get(timeout=1).ok() + channel2.message_transfer(routing_key="dummy", body="whatever") + channel2.channel_close() + + self.assertEqual(self.XA_RBROLLBACK, channel1.dtx_coordination_prepare(xid=tx).status) + channel1.dtx_coordination_rollback(xid=tx) + + def test_get_timeout(self): + """ + Check that get-timeout returns the correct value, (and that a + transaction with a timeout can complete normally) + """ + channel = self.channel + tx = self.xid("dummy") + + channel.dtx_demarcation_select() + channel.dtx_demarcation_start(xid=tx) + self.assertEqual(0, channel.dtx_coordination_get_timeout(xid=tx).timeout) + channel.dtx_coordination_set_timeout(xid=tx, timeout=60) + self.assertEqual(60, channel.dtx_coordination_get_timeout(xid=tx).timeout) + self.assertEqual(self.XA_OK, channel.dtx_demarcation_end(xid=tx).status) + self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).status) + + def test_set_timeout(self): + """ + Test the timeout of a transaction results in the expected + behaviour + """ + #open new channel to allow self.channel to be used in checking te queue + channel = self.client.channel(2) + channel.channel_open() + #setup: + tx = self.xid("dummy") + channel.queue_declare(queue="queue-a", exclusive=True) + channel.queue_declare(queue="queue-b", exclusive=True) + channel.message_transfer(routing_key="queue-a", message_id="timeout", body="DtxMessage") + + channel.dtx_demarcation_select() + channel.dtx_demarcation_start(xid=tx) + self.swap(channel, "queue-a", "queue-b") + channel.dtx_coordination_set_timeout(xid=tx, timeout=2) + sleep(3) + #check that the work has been rolled back already + self.assertMessageCount(1, "queue-a") + self.assertMessageCount(0, "queue-b") + self.assertMessageId("timeout", "queue-a") + #check the correct codes are returned when we try to complete the txn + self.assertEqual(self.XA_RBTIMEOUT, channel.dtx_demarcation_end(xid=tx).status) + self.assertEqual(self.XA_RBTIMEOUT, channel.dtx_coordination_rollback(xid=tx).status) + + + + def test_recover(self): + """ + Test basic recover behaviour + """ + channel = self.channel + + channel.dtx_demarcation_select() + channel.queue_declare(queue="dummy", exclusive=True) + + prepared = [] + for i in range(1, 10): + tx = self.xid("tx%s" % (i)) + channel.dtx_demarcation_start(xid=tx) + channel.message_transfer(routing_key="dummy", body="message%s" % (i)) + channel.dtx_demarcation_end(xid=tx) + if i in [2, 5, 6, 8]: + channel.dtx_coordination_prepare(xid=tx) + prepared.append(tx) + else: + channel.dtx_coordination_rollback(xid=tx) + + indoubt = channel.dtx_coordination_recover().in_doubt + #convert indoubt table to a list of xids (note: this will change for 0-10) + data = indoubt["xids"] + xids = [] + pos = 0 + while pos < len(data): + size = unpack("!B", data[pos])[0] + start = pos + 1 + end = start + size + xid = data[start:end] + xids.append(xid) + pos = end + + #rollback the prepared transactions returned by recover + for x in xids: + channel.dtx_coordination_rollback(xid=x) + + #validate against the expected list of prepared transactions + actual = set(xids) + expected = set(prepared) + intersection = actual.intersection(expected) + + if intersection != expected: + missing = expected.difference(actual) + extra = actual.difference(expected) + for x in missing: + channel.dtx_coordination_rollback(xid=x) + self.fail("Recovered xids not as expected. missing: %s; extra: %s" % (missing, extra)) + + def xid(self, txid, branchqual = ''): + return pack('LBB', 0, len(txid), len(branchqual)) + txid + branchqual + + def txswap(self, tx, id): + channel = self.channel + #declare two queues: + channel.queue_declare(queue="queue-a", exclusive=True) + channel.queue_declare(queue="queue-b", exclusive=True) + #put message with specified id on one queue: + channel.message_transfer(routing_key="queue-a", message_id=id, body="DtxMessage") + + #start the transaction: + channel.dtx_demarcation_select() + self.assertEqual(self.XA_OK, self.channel.dtx_demarcation_start(xid=tx).status) + + #'swap' the message from one queue to the other, under that transaction: + self.swap(self.channel, "queue-a", "queue-b") + + #mark the end of the transactional work: + self.assertEqual(self.XA_OK, self.channel.dtx_demarcation_end(xid=tx).status) + + def swap(self, channel, src, dest): + #consume from src: + channel.message_get(destination="temp-swap", queue=src) + msg = self.client.queue("temp-swap").get(timeout=1) + msg.ok(); + + #re-publish to dest + channel.message_transfer(routing_key=dest, message_id=msg.message_id, body=msg.body) + + def assertMessageCount(self, expected, queue): + self.assertEqual(expected, self.channel.queue_declare(queue=queue, passive=True).message_count) + + def assertMessageId(self, expected, queue): + self.channel.message_consume(queue=queue, destination="results", no_ack=True) + self.assertEqual(expected, self.client.queue("results").get(timeout=1).message_id) + self.channel.message_cancel(destination="results") Propchange: incubator/qpid/trunk/qpid/python/tests_0-10/dtx.py ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/qpid/trunk/qpid/python/tests_0-10/example.py URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/example.py?view=auto&rev=559055 ============================================================================== --- incubator/qpid/trunk/qpid/python/tests_0-10/example.py (added) +++ incubator/qpid/trunk/qpid/python/tests_0-10/example.py Tue Jul 24 07:08:32 2007 @@ -0,0 +1,94 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from qpid.content import Content +from qpid.testlib import testrunner, TestBase + +class ExampleTest (TestBase): + """ + An example Qpid test, illustrating the unittest frameowkr and the + python Qpid client. The test class must inherit TestCase. The + test code uses the Qpid client to interact with a qpid broker and + verify it behaves as expected. + """ + + def test_example(self): + """ + An example test. Note that test functions must start with 'test_' + to be recognized by the test framework. + """ + + # By inheriting TestBase, self.client is automatically connected + # and self.channel is automatically opened as channel(1) + # Other channel methods mimic the protocol. + channel = self.channel + + # Now we can send regular commands. If you want to see what the method + # arguments mean or what other commands are available, you can use the + # python builtin help() method. For example: + #help(chan) + #help(chan.exchange_declare) + + # If you want browse the available protocol methods without being + # connected to a live server you can use the amqp-doc utility: + # + # Usage amqp-doc [] [ ... ] + # + # Options: + # -e, --regexp use regex instead of glob when matching + + # Now that we know what commands are available we can use them to + # interact with the server. + + # Here we use ordinal arguments. + self.exchange_declare(channel, 0, "test", "direct") + + # Here we use keyword arguments. + self.queue_declare(channel, queue="test-queue") + channel.queue_bind(queue="test-queue", exchange="test", routing_key="key") + + # Call Channel.basic_consume to register as a consumer. + # All the protocol methods return a message object. The message object + # has fields corresponding to the reply method fields, plus a content + # field that is filled if the reply includes content. In this case the + # interesting field is the consumer_tag. + channel.message_consume(queue="test-queue", destination="consumer_tag") + + # We can use the Client.queue(...) method to access the queue + # corresponding to our consumer_tag. + queue = self.client.queue("consumer_tag") + + # Now lets publish a message and see if our consumer gets it. To do + # this we need to import the Content class. + body = "Hello World!" + channel.message_transfer(destination="test", + routing_key="key", + body = body) + + # Now we'll wait for the message to arrive. We can use the timeout + # argument in case the server hangs. By default queue.get() will wait + # until a message arrives or the connection to the server dies. + msg = queue.get(timeout=10) + + # And check that we got the right response with assertEqual + self.assertEqual(body, msg.body) + + # Now acknowledge the message. + msg.ok() + Propchange: incubator/qpid/trunk/qpid/python/tests_0-10/example.py ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/qpid/trunk/qpid/python/tests_0-10/exchange.py URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/exchange.py?view=auto&rev=559055 ============================================================================== --- incubator/qpid/trunk/qpid/python/tests_0-10/exchange.py (added) +++ incubator/qpid/trunk/qpid/python/tests_0-10/exchange.py Tue Jul 24 07:08:32 2007 @@ -0,0 +1,327 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +Tests for exchange behaviour. + +Test classes ending in 'RuleTests' are derived from rules in amqp.xml. +""" + +import Queue, logging +from qpid.testlib import TestBase +from qpid.content import Content +from qpid.client import Closed + + +class StandardExchangeVerifier: + """Verifies standard exchange behavior. + + Used as base class for classes that test standard exchanges.""" + + def verifyDirectExchange(self, ex): + """Verify that ex behaves like a direct exchange.""" + self.queue_declare(queue="q") + self.channel.queue_bind(queue="q", exchange=ex, routing_key="k") + self.assertPublishConsume(exchange=ex, queue="q", routing_key="k") + try: + self.assertPublishConsume(exchange=ex, queue="q", routing_key="kk") + self.fail("Expected Empty exception") + except Queue.Empty: None # Expected + + def verifyFanOutExchange(self, ex): + """Verify that ex behaves like a fanout exchange.""" + self.queue_declare(queue="q") + self.channel.queue_bind(queue="q", exchange=ex) + self.queue_declare(queue="p") + self.channel.queue_bind(queue="p", exchange=ex) + for qname in ["q", "p"]: self.assertPublishGet(self.consume(qname), ex) + + def verifyTopicExchange(self, ex): + """Verify that ex behaves like a topic exchange""" + self.queue_declare(queue="a") + self.channel.queue_bind(queue="a", exchange=ex, routing_key="a.#.b.*") + q = self.consume("a") + self.assertPublishGet(q, ex, "a.b.x") + self.assertPublishGet(q, ex, "a.x.b.x") + self.assertPublishGet(q, ex, "a.x.x.b.x") + # Shouldn't match + self.channel.message_transfer(destination=ex, routing_key="a.b", body="") + self.channel.message_transfer(destination=ex, routing_key="a.b.x.y", body="") + self.channel.message_transfer(destination=ex, routing_key="x.a.b.x", body="") + self.channel.message_transfer(destination=ex, routing_key="a.b", body="") + self.assert_(q.empty()) + + def verifyHeadersExchange(self, ex): + """Verify that ex is a headers exchange""" + self.queue_declare(queue="q") + self.channel.queue_bind(queue="q", exchange=ex, arguments={ "x-match":"all", "name":"fred" , "age":3} ) + q = self.consume("q") + headers = {"name":"fred", "age":3} + self.assertPublishGet(q, exchange=ex, properties=headers) + self.channel.message_transfer(destination=ex, body="") # No headers, won't deliver + self.assertEmpty(q); + + +class RecommendedTypesRuleTests(TestBase, StandardExchangeVerifier): + """ + The server SHOULD implement these standard exchange types: topic, headers. + + Client attempts to declare an exchange with each of these standard types. + """ + + def testDirect(self): + """Declare and test a direct exchange""" + self.exchange_declare(0, exchange="d", type="direct") + self.verifyDirectExchange("d") + + def testFanout(self): + """Declare and test a fanout exchange""" + self.exchange_declare(0, exchange="f", type="fanout") + self.verifyFanOutExchange("f") + + def testTopic(self): + """Declare and test a topic exchange""" + self.exchange_declare(0, exchange="t", type="topic") + self.verifyTopicExchange("t") + + def testHeaders(self): + """Declare and test a headers exchange""" + self.exchange_declare(0, exchange="h", type="headers") + self.verifyHeadersExchange("h") + + +class RequiredInstancesRuleTests(TestBase, StandardExchangeVerifier): + """ + The server MUST, in each virtual host, pre-declare an exchange instance + for each standard exchange type that it implements, where the name of the + exchange instance is amq. followed by the exchange type name. + + Client creates a temporary queue and attempts to bind to each required + exchange instance (amq.fanout, amq.direct, and amq.topic, amq.match if + those types are defined). + """ + def testAmqDirect(self): self.verifyDirectExchange("amq.direct") + + def testAmqFanOut(self): self.verifyFanOutExchange("amq.fanout") + + def testAmqTopic(self): self.verifyTopicExchange("amq.topic") + + def testAmqMatch(self): self.verifyHeadersExchange("amq.match") + +class DefaultExchangeRuleTests(TestBase, StandardExchangeVerifier): + """ + The server MUST predeclare a direct exchange to act as the default exchange + for content Publish methods and for default queue bindings. + + Client checks that the default exchange is active by specifying a queue + binding with no exchange name, and publishing a message with a suitable + routing key but without specifying the exchange name, then ensuring that + the message arrives in the queue correctly. + """ + def testDefaultExchange(self): + # Test automatic binding by queue name. + self.queue_declare(queue="d") + self.assertPublishConsume(queue="d", routing_key="d") + # Test explicit bind to default queue + self.verifyDirectExchange("") + + +# TODO aconway 2006-09-27: Fill in empty tests: + +class DefaultAccessRuleTests(TestBase): + """ + The server MUST NOT allow clients to access the default exchange except + by specifying an empty exchange name in the Queue.Bind and content Publish + methods. + """ + +class ExtensionsRuleTests(TestBase): + """ + The server MAY implement other exchange types as wanted. + """ + + +class DeclareMethodMinimumRuleTests(TestBase): + """ + The server SHOULD support a minimum of 16 exchanges per virtual host and + ideally, impose no limit except as defined by available resources. + + The client creates as many exchanges as it can until the server reports + an error; the number of exchanges successfuly created must be at least + sixteen. + """ + + +class DeclareMethodTicketFieldValidityRuleTests(TestBase): + """ + The client MUST provide a valid access ticket giving "active" access to + the realm in which the exchange exists or will be created, or "passive" + access if the if-exists flag is set. + + Client creates access ticket with wrong access rights and attempts to use + in this method. + """ + + +class DeclareMethodExchangeFieldReservedRuleTests(TestBase): + """ + Exchange names starting with "amq." are reserved for predeclared and + standardised exchanges. The client MUST NOT attempt to create an exchange + starting with "amq.". + + + """ + + +class DeclareMethodTypeFieldTypedRuleTests(TestBase): + """ + Exchanges cannot be redeclared with different types. The client MUST not + attempt to redeclare an existing exchange with a different type than used + in the original Exchange.Declare method. + + + """ + + +class DeclareMethodTypeFieldSupportRuleTests(TestBase): + """ + The client MUST NOT attempt to create an exchange with a type that the + server does not support. + + + """ + + +class DeclareMethodPassiveFieldNotFoundRuleTests(TestBase): + """ + If set, and the exchange does not already exist, the server MUST raise a + channel exception with reply code 404 (not found). + """ + def test(self): + try: + self.channel.exchange_declare(exchange="humpty_dumpty", passive=True) + self.fail("Expected 404 for passive declaration of unknown exchange.") + except Closed, e: + self.assertChannelException(404, e.args[0]) + + +class DeclareMethodDurableFieldSupportRuleTests(TestBase): + """ + The server MUST support both durable and transient exchanges. + + + """ + + +class DeclareMethodDurableFieldStickyRuleTests(TestBase): + """ + The server MUST ignore the durable field if the exchange already exists. + + + """ + + +class DeclareMethodAutoDeleteFieldStickyRuleTests(TestBase): + """ + The server MUST ignore the auto-delete field if the exchange already + exists. + + + """ + + +class DeleteMethodTicketFieldValidityRuleTests(TestBase): + """ + The client MUST provide a valid access ticket giving "active" access + rights to the exchange's access realm. + + Client creates access ticket with wrong access rights and attempts to use + in this method. + """ + + +class DeleteMethodExchangeFieldExistsRuleTests(TestBase): + """ + The client MUST NOT attempt to delete an exchange that does not exist. + """ + + +class HeadersExchangeTests(TestBase): + """ + Tests for headers exchange functionality. + """ + def setUp(self): + TestBase.setUp(self) + self.queue_declare(queue="q") + self.q = self.consume("q") + + def myAssertPublishGet(self, headers): + self.assertPublishGet(self.q, exchange="amq.match", properties=headers) + + def myBasicPublish(self, headers): + self.channel.message_transfer(destination="amq.match", body="foobar", application_headers=headers) + + def testMatchAll(self): + self.channel.queue_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'all', "name":"fred", "age":3}) + self.myAssertPublishGet({"name":"fred", "age":3}) + self.myAssertPublishGet({"name":"fred", "age":3, "extra":"ignoreme"}) + + # None of these should match + self.myBasicPublish({}) + self.myBasicPublish({"name":"barney"}) + self.myBasicPublish({"name":10}) + self.myBasicPublish({"name":"fred", "age":2}) + self.assertEmpty(self.q) + + def testMatchAny(self): + self.channel.queue_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'any', "name":"fred", "age":3}) + self.myAssertPublishGet({"name":"fred"}) + self.myAssertPublishGet({"name":"fred", "ignoreme":10}) + self.myAssertPublishGet({"ignoreme":10, "age":3}) + + # Wont match + self.myBasicPublish({}) + self.myBasicPublish({"irrelevant":0}) + self.assertEmpty(self.q) + + +class MiscellaneousErrorsTests(TestBase): + """ + Test some miscellaneous error conditions + """ + def testTypeNotKnown(self): + try: + self.channel.exchange_declare(exchange="test_type_not_known_exchange", type="invalid_type") + self.fail("Expected 503 for declaration of unknown exchange type.") + except Closed, e: + self.assertConnectionException(503, e.args[0]) + + def testDifferentDeclaredType(self): + self.channel.exchange_declare(exchange="test_different_declared_type_exchange", type="direct") + try: + self.channel.exchange_declare(exchange="test_different_declared_type_exchange", type="topic") + self.fail("Expected 530 for redeclaration of exchange with different type.") + except Closed, e: + self.assertConnectionException(530, e.args[0]) + #cleanup + other = self.connect() + c2 = other.channel(1) + c2.channel_open() + c2.exchange_delete(exchange="test_different_declared_type_exchange") + Propchange: incubator/qpid/trunk/qpid/python/tests_0-10/exchange.py ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/qpid/trunk/qpid/python/tests_0-10/execution.py URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/execution.py?view=auto&rev=559055 ============================================================================== --- incubator/qpid/trunk/qpid/python/tests_0-10/execution.py (added) +++ incubator/qpid/trunk/qpid/python/tests_0-10/execution.py Tue Jul 24 07:08:32 2007 @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from qpid.content import Content +from qpid.testlib import testrunner, TestBase + +class ExecutionTests (TestBase): + def test_flush(self): + channel = self.channel + for i in [1, 2, 3]: + channel.basic_publish() + channel.execution_flush() + assert(channel.completion.wait(channel.completion.command_id, timeout=1)) Propchange: incubator/qpid/trunk/qpid/python/tests_0-10/execution.py ------------------------------------------------------------------------------ svn:eol-style = native