Return-Path: Delivered-To: apmail-qpid-commits-archive@www.apache.org Received: (qmail 36697 invoked from network); 23 Mar 2010 18:01:50 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 23 Mar 2010 18:01:50 -0000 Received: (qmail 50966 invoked by uid 500); 23 Mar 2010 18:01:50 -0000 Delivered-To: apmail-qpid-commits-archive@qpid.apache.org Received: (qmail 50949 invoked by uid 500); 23 Mar 2010 18:01:50 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 50942 invoked by uid 99); 23 Mar 2010 18:01:50 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 23 Mar 2010 18:01:50 +0000 X-ASF-Spam-Status: No, hits=-1999.9 required=10.0 tests=ALL_TRUSTED,AWL X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 23 Mar 2010 18:01:46 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 131712388AAA; Tue, 23 Mar 2010 18:01:00 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r926686 [6/6] - in /qpid/branches/qmf-devel0.7a/qpid: ./ cpp/ cpp/docs/api/ cpp/docs/src/ cpp/examples/ cpp/examples/messaging/ cpp/examples/pub-sub/ cpp/include/qmf/engine/ cpp/include/qpid/agent/ cpp/include/qpid/client/amqp0_10/ cpp/incl... Date: Tue, 23 Mar 2010 18:00:56 -0000 To: commits@qpid.apache.org From: kgiusti@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100323180100.131712388AAA@eris.apache.org> Modified: qpid/branches/qmf-devel0.7a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java?rev=926686&r1=926685&r2=926686&view=diff ============================================================================== --- qpid/branches/qmf-devel0.7a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java (original) +++ qpid/branches/qmf-devel0.7a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java Tue Mar 23 18:00:49 2010 @@ -77,7 +77,7 @@ public class AddressBasedDestinationTest } catch(JMSException e) { - assertTrue(e.getMessage().contains("The name supplied in the address " + + assertTrue(e.getMessage().contains("The name 'testQueue1' supplied in the address " + "doesn't resolve to an exchange or a queue")); } @@ -140,7 +140,7 @@ public class AddressBasedDestinationTest } catch(JMSException e) { - assertTrue(e.getMessage().contains("The name supplied in the address " + + assertTrue(e.getMessage().contains("The name 'testQueue3' supplied in the address " + "doesn't resolve to an exchange or a queue")); } @@ -150,7 +150,7 @@ public class AddressBasedDestinationTest } catch(JMSException e) { - assertTrue(e.getMessage().contains("The name supplied in the address " + + assertTrue(e.getMessage().contains("The name 'testQueue3' supplied in the address " + "doesn't resolve to an exchange or a queue")); } @@ -167,7 +167,7 @@ public class AddressBasedDestinationTest } catch(JMSException e) { - assertTrue(e.getMessage().contains("The name supplied in the address " + + assertTrue(e.getMessage().contains("The name 'testQueue3' supplied in the address " + "doesn't resolve to an exchange or a queue")); } assertFalse("Queue should not be created",( Modified: qpid/branches/qmf-devel0.7a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java?rev=926686&r1=926685&r2=926686&view=diff ============================================================================== --- qpid/branches/qmf-devel0.7a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java (original) +++ qpid/branches/qmf-devel0.7a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java Tue Mar 23 18:00:49 2010 @@ -334,6 +334,7 @@ public class DurableSubscriptionTest ext { _logger.info("Receive message on consumer 3 :expecting B"); msg = consumer3.receive(POSITIVE_RECEIVE_TIMEOUT); + assertNotNull(msg); assertEquals("B", ((TextMessage) msg).getText()); } Modified: qpid/branches/qmf-devel0.7a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/FaultTest.java URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/FaultTest.java?rev=926686&r1=926685&r2=926686&view=diff ============================================================================== --- qpid/branches/qmf-devel0.7a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/FaultTest.java (original) +++ qpid/branches/qmf-devel0.7a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/FaultTest.java Tue Mar 23 18:00:49 2010 @@ -339,7 +339,7 @@ public class FaultTest extends AbstractX { assertEquals("Wrong error code: ", XAException.XAER_PROTO, e.errorCode); } - } + } /** * Strategy: @@ -355,7 +355,7 @@ public class FaultTest extends AbstractX _xaResource.end(xid, XAResource.TMSUCCESS); xid = getNewXid(); _xaResource.start(xid, XAResource.TMNOFLAGS); - assertEquals("Wrong timeout", _xaResource.getTransactionTimeout(), 0); + assertEquals("Wrong timeout", _xaResource.getTransactionTimeout(), 1000); } /** @@ -381,5 +381,29 @@ public class FaultTest extends AbstractX assertEquals("Wrong error code: ", XAException.XA_RBTIMEOUT, e.errorCode); } } + + /** + * Strategy: + * Set the transaction timeout to 1000 + */ + public void testTransactionTimeoutAfterCommit() throws Exception + { + Xid xid = getNewXid(); + + _xaResource.start(xid, XAResource.TMNOFLAGS); + _xaResource.setTransactionTimeout(1000); + assertEquals("Wrong timeout", 1000,_xaResource.getTransactionTimeout()); + + //_xaResource.prepare(xid); + _xaResource.end(xid, XAResource.TMSUCCESS); + _xaResource.commit(xid, true); + + _xaResource.setTransactionTimeout(2000); + assertEquals("Wrong timeout", 2000,_xaResource.getTransactionTimeout()); + + xid = getNewXid(); + _xaResource.start(xid, XAResource.TMNOFLAGS); + assertEquals("Wrong timeout", 2000, _xaResource.getTransactionTimeout()); + } } Propchange: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/08StandaloneExcludes ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Mar 23 18:00:49 2010 @@ -1,3 +1,4 @@ /incubator/qpid/trunk/qpid/java/test-profiles/08StandaloneExcludes:443187-726139 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/08StandaloneExcludes:795950-829653 /qpid/branches/java-network-refactor/qpid/java/test-profiles/08StandaloneExcludes:805429-821809 +/qpid/trunk/qpid/java/test-profiles/08StandaloneExcludes:919043-926606 Modified: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/CPPExcludes URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/CPPExcludes?rev=926686&r1=926685&r2=926686&view=diff ============================================================================== --- qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/CPPExcludes (original) +++ qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/CPPExcludes Tue Mar 23 18:00:49 2010 @@ -56,10 +56,13 @@ org.apache.qpid.test.client.timeouts.Syn // QPID-1262, QPID-1119 : This test fails occasionally due to potential protocol issue. org.apache.qpid.test.client.timeouts.SyncWaitTimeoutDelayTest#* -// c++ broker doesn't support priorities, TTL or message bouncing +// c++ broker doesn't support priorities, message bouncing org.apache.qpid.server.exchange.ReturnUnroutableMandatoryMessageTest#* org.apache.qpid.server.queue.PriorityTest#* + +// c++ broker expires messages on delivery or when the queue cleaner thread runs. org.apache.qpid.server.queue.TimeToLiveTest#testActiveTTL +org.apache.qpid.server.queue.TimeToLiveTest#testActiveTTLwithDurableSubscription // QPID-1727 , QPID-1726 :c++ broker does not support flow to disk on transient queues. Also it requries a persistent store impl. for Apache org.apache.qpid.test.client.QueueBrowsingFlowToDiskTest#* Propchange: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/CPPExcludes ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Mar 23 18:00:49 2010 @@ -1,3 +1,4 @@ /incubator/qpid/trunk/qpid/java/test-profiles/010Excludes:443187-726139 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/010Excludes:795950-829653 /qpid/branches/java-network-refactor/qpid/java/test-profiles/010Excludes:805429-821809 +/qpid/trunk/qpid/java/test-profiles/CPPExcludes:919043-926606 Propchange: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/Excludes ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Mar 23 18:00:49 2010 @@ -1,3 +1,4 @@ /incubator/qpid/trunk/qpid/java/test-profiles/Excludes:443187-726139 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/Excludes:795950-829653 /qpid/branches/java-network-refactor/qpid/java/test-profiles/Excludes:805429-821809 +/qpid/trunk/qpid/java/test-profiles/Excludes:919043-926606 Propchange: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/JavaExcludes ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Mar 23 18:00:49 2010 @@ -1,3 +1,4 @@ /incubator/qpid/trunk/qpid/java/test-profiles/08Excludes:443187-726139 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/08Excludes:795950-829653 /qpid/branches/java-network-refactor/qpid/java/test-profiles/08Excludes:805429-821809 +/qpid/trunk/qpid/java/test-profiles/JavaExcludes:919043-926606 Propchange: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/JavaStandaloneExcludes ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Mar 23 18:00:49 2010 @@ -1,3 +1,4 @@ /incubator/qpid/trunk/qpid/java/test-profiles/08StandaloneExcludes:443187-726139 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/08StandaloneExcludes:795950-829653 /qpid/branches/java-network-refactor/qpid/java/test-profiles/08StandaloneExcludes:805429-821809 +/qpid/trunk/qpid/java/test-profiles/JavaStandaloneExcludes:919043-926606 Propchange: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/JavaTransientExcludes ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Mar 23 18:00:49 2010 @@ -1,3 +1,4 @@ /incubator/qpid/trunk/qpid/java/test-profiles/08TransientExcludes:443187-726139 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/08TransientExcludes:795950-829653 /qpid/branches/java-network-refactor/qpid/java/test-profiles/08TransientExcludes:805429-821809 +/qpid/trunk/qpid/java/test-profiles/JavaTransientExcludes:919043-926606 Propchange: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/XAExcludes ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Mar 23 18:00:49 2010 @@ -1,3 +1,4 @@ /incubator/qpid/trunk/qpid/java/test-profiles/XAExcludes:443187-726139 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/XAExcludes:795950-829653 /qpid/branches/java-network-refactor/qpid/java/test-profiles/XAExcludes:805429-821809 +/qpid/trunk/qpid/java/test-profiles/XAExcludes:919043-926606 Propchange: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/clean-dir ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Mar 23 18:00:49 2010 @@ -1,3 +1,4 @@ /incubator/qpid/trunk/qpid/java/test-profiles/clean-dir:443187-726139 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/clean-dir:795950-829653 /qpid/branches/java-network-refactor/qpid/java/test-profiles/clean-dir:805429-821809 +/qpid/trunk/qpid/java/test-profiles/clean-dir:919043-926606 Propchange: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/cpp.async.testprofile ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Mar 23 18:00:49 2010 @@ -1,3 +1,4 @@ /incubator/qpid/trunk/qpid/java/test-profiles/cpp.async.testprofile:443187-726139 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/cpp.async.testprofile:795950-829653 /qpid/branches/java-network-refactor/qpid/java/test-profiles/cpp.async.testprofile:805429-821809 +/qpid/trunk/qpid/java/test-profiles/cpp.async.testprofile:919043-926606 Propchange: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/cpp.cluster.testprofile ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Mar 23 18:00:49 2010 @@ -1,3 +1,4 @@ /incubator/qpid/trunk/qpid/java/test-profiles/cpp.cluster.testprofile:443187-726139 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/cpp.cluster.testprofile:795950-829653 /qpid/branches/java-network-refactor/qpid/java/test-profiles/cpp.cluster.testprofile:805429-821809 +/qpid/trunk/qpid/java/test-profiles/cpp.cluster.testprofile:919043-926606 Propchange: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/cpp.noprefetch.testprofile ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Mar 23 18:00:49 2010 @@ -1,3 +1,4 @@ /incubator/qpid/trunk/qpid/java/test-profiles/cpp.noprefetch.testprofile:443187-726139 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/cpp.noprefetch.testprofile:795950-829653 /qpid/branches/java-network-refactor/qpid/java/test-profiles/cpp.noprefetch.testprofile:805429-821809 +/qpid/trunk/qpid/java/test-profiles/cpp.noprefetch.testprofile:919043-926606 Propchange: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/cpp.ssl.excludes ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Mar 23 18:00:49 2010 @@ -1,3 +1,4 @@ /incubator/qpid/trunk/qpid/java/test-profiles/cpp.ssl.excludes:443187-726139 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/cpp.ssl.excludes:795950-829653 /qpid/branches/java-network-refactor/qpid/java/test-profiles/cpp.ssl.excludes:805429-821809 +/qpid/trunk/qpid/java/test-profiles/cpp.ssl.excludes:919043-926606 Propchange: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/cpp.ssl.testprofile ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Mar 23 18:00:49 2010 @@ -1,3 +1,4 @@ /incubator/qpid/trunk/qpid/java/test-profiles/cpp.ssl.testprofile:443187-726139 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/cpp.ssl.testprofile:795950-829653 /qpid/branches/java-network-refactor/qpid/java/test-profiles/cpp.ssl.testprofile:805429-821809 +/qpid/trunk/qpid/java/test-profiles/cpp.ssl.testprofile:919043-926606 Propchange: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/cpp.testprofile ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Mar 23 18:00:49 2010 @@ -1,3 +1,4 @@ /incubator/qpid/trunk/qpid/java/test-profiles/cpp.testprofile:443187-726139 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/cpp.testprofile:795950-829653 /qpid/branches/java-network-refactor/qpid/java/test-profiles/cpp.testprofile:805429-821809 +/qpid/trunk/qpid/java/test-profiles/cpp.testprofile:919043-926606 Propchange: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/default.testprofile ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Mar 23 18:00:49 2010 @@ -1,3 +1,4 @@ /incubator/qpid/trunk/qpid/java/test-profiles/default.testprofile:443187-726139 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/default.testprofile:795950-829653 /qpid/branches/java-network-refactor/qpid/java/test-profiles/default.testprofile:805429-821809 +/qpid/trunk/qpid/java/test-profiles/default.testprofile:919043-926606 Propchange: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/java-derby.testprofile ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Mar 23 18:00:49 2010 @@ -1,3 +1,4 @@ /incubator/qpid/trunk/qpid/java/test-profiles/java-derby.testprofile:443187-726139 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/java-derby.testprofile:795950-829653 /qpid/branches/java-network-refactor/qpid/java/test-profiles/java-derby.testprofile:805429-821809 +/qpid/trunk/qpid/java/test-profiles/java-derby.testprofile:919043-926606 Propchange: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/java.testprofile ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Mar 23 18:00:49 2010 @@ -1,3 +1,4 @@ /incubator/qpid/trunk/qpid/java/test-profiles/java.testprofile:443187-726139 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/java.testprofile:795950-829653 /qpid/branches/java-network-refactor/qpid/java/test-profiles/java.testprofile:805429-821809 +/qpid/trunk/qpid/java/test-profiles/java.testprofile:919043-926606 Propchange: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/log4j-test.xml ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Mar 23 18:00:49 2010 @@ -1,3 +1,4 @@ /incubator/qpid/trunk/qpid/java/test-profiles/log4j-test.xml:443187-726139 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/log4j-test.xml:795950-829653 /qpid/branches/java-network-refactor/qpid/java/test-profiles/log4j-test.xml:805429-821809 +/qpid/trunk/qpid/java/test-profiles/log4j-test.xml:919043-926606 Propchange: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/test-provider.properties ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Mar 23 18:00:49 2010 @@ -1,3 +1,4 @@ /incubator/qpid/trunk/qpid/java/test-profiles/test-provider.properties:443187-726139 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/test-provider.properties:795950-829653 /qpid/branches/java-network-refactor/qpid/java/test-profiles/test-provider.properties:805429-821809 +/qpid/trunk/qpid/java/test-profiles/test-provider.properties:919043-926606 Propchange: qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/test_resources/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Mar 23 18:00:49 2010 @@ -1,3 +1,4 @@ /incubator/qpid/trunk/qpid/java/test-profiles/test_resources:443187-726139 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/test_resources:795950-829653 /qpid/branches/java-network-refactor/qpid/java/test-profiles/test_resources:805429-821809 +/qpid/trunk/qpid/java/test-profiles/test_resources:919043-926606 Propchange: qpid/branches/qmf-devel0.7a/qpid/python/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Mar 23 18:00:49 2010 @@ -1,3 +1,4 @@ /qpid/branches/0.5.x-dev/qpid/python:892761,894875 /qpid/branches/java-network-refactor/qpid/python:805429-825319 /qpid/branches/qmfv2/qpid/python:902858,902894 +/qpid/trunk/qpid/python:919043-926606 Propchange: qpid/branches/qmf-devel0.7a/qpid/python/examples/api/spout ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Mar 23 18:00:49 2010 @@ -1,2 +1,3 @@ /qpid/branches/qmfv2/qpid/python/examples/api/spout:902858,902894 /qpid/branches/qpid.rnr/python/examples/api/spout:894071-896158 +/qpid/trunk/qpid/python/examples/api/spout:919043-926606 Modified: qpid/branches/qmf-devel0.7a/qpid/python/examples/pubsub/verify.in URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/python/examples/pubsub/verify.in?rev=926686&r1=926685&r2=926686&view=diff ============================================================================== --- qpid/branches/qmf-devel0.7a/qpid/python/examples/pubsub/verify.in (original) +++ qpid/branches/qmf-devel0.7a/qpid/python/examples/pubsub/verify.in Tue Mar 23 18:00:49 2010 @@ -1,5 +1,18 @@ ==== topic_publisher.py.out ==== topic_subscriber.py.out | remove_uuid | sort +Messages on 'europe' queue: +Messages on 'news' queue: +Messages on 'usa' queue: +Messages on 'weather' queue: +Queues created - please start the topic producer +Subscribing local queue 'local_europe' to europe-' +Subscribing local queue 'local_news' to news-' +Subscribing local queue 'local_usa' to usa-' +Subscribing local queue 'local_weather' to weather-' +That's all, folks! +That's all, folks! +That's all, folks! +That's all, folks! europe.news 0 europe.news 0 europe.news 1 @@ -20,19 +33,6 @@ europe.weather 3 europe.weather 3 europe.weather 4 europe.weather 4 -Messages on 'europe' queue: -Messages on 'news' queue: -Messages on 'usa' queue: -Messages on 'weather' queue: -Queues created - please start the topic producer -Subscribing local queue 'local_europe' to europe-' -Subscribing local queue 'local_news' to news-' -Subscribing local queue 'local_usa' to usa-' -Subscribing local queue 'local_weather' to weather-' -That's all, folks! -That's all, folks! -That's all, folks! -That's all, folks! usa.news 0 usa.news 0 usa.news 1 Modified: qpid/branches/qmf-devel0.7a/qpid/python/qpid/brokertest.py URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/python/qpid/brokertest.py?rev=926686&r1=926685&r2=926686&view=diff ============================================================================== --- qpid/branches/qmf-devel0.7a/qpid/python/qpid/brokertest.py (original) +++ qpid/branches/qmf-devel0.7a/qpid/python/qpid/brokertest.py Tue Mar 23 18:00:49 2010 @@ -38,11 +38,10 @@ EXPECT_EXIT_FAIL=2 # Expect to e EXPECT_RUNNING=3 # Expect to still be running at end of test EXPECT_UNKNOWN=4 # No expectation, don't check exit status. -def is_exe(fpath): - return os.path.exists(fpath) and os.access(fpath, os.X_OK) - def find_exe(program): """Find an executable in the system PATH""" + def is_exe(fpath): + return os.path.isfile(fpath) and os.access(fpath, os.X_OK) dir, name = os.path.split(program) if dir: if is_exe(program): return program @@ -144,13 +143,13 @@ class Popen(popen2.Popen3): expect - if set verify expectation at end of test. drain - if true (default) drain stdout/stderr to files. """ - assert find_exe(cmd[0]) + assert find_exe(cmd[0]), "executable not found: "+cmd[0] if type(cmd) is type(""): cmd = [cmd] # Make it a list. self.cmd = [ str(x) for x in cmd ] popen2.Popen3.__init__(self, self.cmd, True) self.expect = expect self.was_shutdown = False # Set if we deliberately kill/terminate the process - self.pname = "%s-%d" % (os.path.split(self.cmd[0])[-1], self.pid) + self.pname = "%s-%d" % (os.path.split(self.cmd[0])[1], self.pid) msg = "Process %s" % self.pname self.stdin = ExceptionWrapper(self.tochild, msg) self.stdout = Popen.OutStream(self.fromchild, self.outfile("out"), msg) @@ -179,6 +178,7 @@ class Popen(popen2.Popen3): def stop(self): # Clean up at end of test. self.drain() + self.stdin.close() if self.expect == EXPECT_UNKNOWN: try: self.kill() # Just make sure its dead except: pass @@ -267,8 +267,9 @@ class Broker(Popen): cmd += ["--data-dir", self.datadir] Popen.__init__(self, cmd, expect, drain=False) test.cleanup_stop(self) - self._host = "localhost" + self._host = "127.0.0.1" log.debug("Started broker %s (%s, %s)" % (self.name, self.pname, self.log)) + self._log_ready = False def host(self): return self._host @@ -302,8 +303,8 @@ class Broker(Popen): c.close() def _prep_sender(self, queue, durable, xprops): - s = queue + "; {create:always, node-properties:{durable:" + str(durable) - if xprops != None: s += ", x-properties:{" + xprops + "}" + s = queue + "; {create:always, node:{durable:" + str(durable) + if xprops != None: s += ", x-declare:{" + xprops + "}" return s + "}}" def send_message(self, queue, message, durable=True, xprops=None, session=None): @@ -344,16 +345,17 @@ class Broker(Popen): def log_ready(self): """Return true if the log file exists and contains a broker ready message""" + if self._log_ready: return True if not os.path.exists(self.log): return False - ready_msg = re.compile("notice Broker running") f = file(self.log) try: for l in f: - if ready_msg.search(l): return True + if "notice Broker running" in l: + self._log_ready = True + return True return False finally: f.close() - # FIXME aconway 2010-03-02: rename to wait_ready def ready(self): """Wait till broker is ready to serve clients""" # First make sure the broker is listening by checking the log. @@ -361,7 +363,7 @@ class Broker(Popen): raise Exception("Timed out waiting for broker %s" % self.name) # Make a connection, this will wait for extended cluster init to finish. try: self.connect().close() - except: raise RethrownException("Broker %s failed ready test %s"%self.name) + except: raise RethrownException("Broker %s failed ready test"%self.name) class Cluster: """A cluster of brokers in a test.""" @@ -427,6 +429,7 @@ class BrokerTest(TestCase): for p in self.stopem: try: p.stop() except Exception, e: err.append(str(e)) + if err: raise Exception("Unexpected process status:\n "+"\n ".join(err)) def cleanup_stop(self, stopable): @@ -446,7 +449,7 @@ class BrokerTest(TestCase): if (wait): try: b.ready() except Exception, e: - raise Exception("Failed to start broker %s: %s" % ( b.name, e)) + raise Exception("Failed to start broker %s(%s): %s" % (b.name, b.log, e)) return b def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait=True): Propchange: qpid/branches/qmf-devel0.7a/qpid/python/qpid/concurrency.py ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Mar 23 18:00:49 2010 @@ -1,2 +1,3 @@ /qpid/branches/qmfv2/qpid/python/qpid/concurrency.py:902858,902894 /qpid/branches/qpid.rnr/python/qpid/concurrency.py:894071-896158 +/qpid/trunk/qpid/python/qpid/concurrency.py:919043-926606 Modified: qpid/branches/qmf-devel0.7a/qpid/python/qpid/messaging/constants.py URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/python/qpid/messaging/constants.py?rev=926686&r1=926685&r2=926686&view=diff ============================================================================== --- qpid/branches/qmf-devel0.7a/qpid/python/qpid/messaging/constants.py (original) +++ qpid/branches/qmf-devel0.7a/qpid/python/qpid/messaging/constants.py Tue Mar 23 18:00:49 2010 @@ -17,11 +17,16 @@ # under the License. # +__SELF__ = object() + class Constant: - def __init__(self, name, value=None): + def __init__(self, name, value=__SELF__): self.name = name - self.value = value + if value is __SELF__: + self.value = self + else: + self.value = value def __repr__(self): return self.name @@ -30,3 +35,6 @@ AMQP_PORT = 5672 AMQPS_PORT = 5671 UNLIMITED = Constant("UNLIMITED", 0xFFFFFFFFL) + +REJECTED = Constant("REJECTED") +RELEASED = Constant("RELEASED") Modified: qpid/branches/qmf-devel0.7a/qpid/python/qpid/messaging/driver.py URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/python/qpid/messaging/driver.py?rev=926686&r1=926685&r2=926686&view=diff ============================================================================== --- qpid/branches/qmf-devel0.7a/qpid/python/qpid/messaging/driver.py (original) +++ qpid/branches/qmf-devel0.7a/qpid/python/qpid/messaging/driver.py Tue Mar 23 18:00:49 2010 @@ -18,7 +18,7 @@ # import socket, struct, sys, time -from logging import getLogger +from logging import getLogger, DEBUG from qpid import compat from qpid import sasl from qpid.concurrency import synchronized @@ -27,13 +27,13 @@ from qpid.exceptions import Timeout, Ver from qpid.framing import OpEncoder, SegmentEncoder, FrameEncoder, \ FrameDecoder, SegmentDecoder, OpDecoder from qpid.messaging import address -from qpid.messaging.constants import UNLIMITED +from qpid.messaging.constants import UNLIMITED, REJECTED, RELEASED from qpid.messaging.exceptions import ConnectError -from qpid.messaging.message import get_codec, Message +from qpid.messaging.message import get_codec, Disposition, Message from qpid.ops import * from qpid.selector import Selector from qpid.util import connect -from qpid.validator import And, Context, Map, Types, Values +from qpid.validator import And, Context, List, Map, Types, Values from threading import Condition, Thread log = getLogger("qpid.messaging") @@ -78,9 +78,8 @@ class Pattern: sst.write_cmd(ExchangeBind(exchange=exchange, queue=queue, binding_key=self.value.replace("*", "#"))) -FILTER_DEFAULTS = { - "topic": Pattern("*"), - "amq.failover": Pattern("DUMMY") +SUBJECT_DEFAULTS = { + "topic": "#" } # XXX @@ -130,7 +129,14 @@ class SessionState: id = self.sent self.write_cmd(query, lambda: handler(self.results.pop(id))) - def write_cmd(self, cmd, action=noop): + def apply_overrides(self, cmd, overrides): + for k, v in overrides.items(): + cmd[k.replace('-', '_')] = v + + def write_cmd(self, cmd, action=noop, overrides=None): + if overrides: + self.apply_overrides(cmd, overrides) + if action != noop: cmd.sync = True if self.detached: @@ -154,28 +160,36 @@ class SessionState: self.driver.write_op(op) POLICIES = Values("always", "sender", "receiver", "never") +RELIABILITY = Values("unreliable", "at-most-once", "at-least-once", + "exactly-once") -class Bindings: - - def validate(self, o, ctx): - t = ctx.containers[1].get("type", "queue") - if t != "queue": - return "bindings are only permitted on nodes of type queue" +DECLARE = Map({}, restricted=False) +BINDINGS = List(Map({ + "exchange": Types(basestring), + "queue": Types(basestring), + "key": Types(basestring), + "arguments": Map({}, restricted=False) + })) COMMON_OPTS = { - "create": POLICIES, - "delete": POLICIES, - "assert": POLICIES, - "node-properties": Map({ - "type": Values("queue", "topic"), - "durable": Types(bool), - "x-properties": Map({ - "type": Types(basestring), - "bindings": And(Types(list), Bindings()) - }, - restricted=False) - }) - } + "create": POLICIES, + "delete": POLICIES, + "assert": POLICIES, + "node": Map({ + "type": Values("queue", "topic"), + "durable": Types(bool), + "x-declare": DECLARE, + "x-bindings": BINDINGS + }), + "link": Map({ + "name": Types(basestring), + "durable": Types(bool), + "reliability": RELIABILITY, + "x-declare": DECLARE, + "x-bindings": BINDINGS, + "x-subscribe": Map({}, restricted=False) + }) + } RECEIVE_MODES = Values("browse", "consume") @@ -196,36 +210,46 @@ class LinkIn: _rcv.destination = str(rcv.id) sst.destinations[_rcv.destination] = _rcv _rcv.draining = False + _rcv.on_unlink = [] def do_link(self, sst, rcv, _rcv, type, subtype, action): + link_opts = _rcv.options.get("link", {}) + # XXX: default? + reliability = link_opts.get("reliability", "unreliable") + declare = link_opts.get("x-declare", {}) + subscribe = link_opts.get("x-subscribe", {}) acq_mode = acquire_mode.pre_acquired if type == "topic": - _rcv._queue = "%s.%s" % (rcv.session.name, _rcv.destination) - sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True)) - filter = _rcv.options.get("filter") - if _rcv.subject is None and filter is None: - f = FILTER_DEFAULTS[subtype] - elif _rcv.subject and filter: - # XXX - raise Exception("can't supply both subject and filter") - elif _rcv.subject: - # XXX - f = Pattern(_rcv.subject) - else: - f = filter - f._bind(sst, _rcv.name, _rcv._queue) + default_name = "%s.%s" % (rcv.session.name, _rcv.destination) + _rcv._queue = link_opts.get("name", default_name) + sst.write_cmd(QueueDeclare(queue=_rcv._queue, + durable=link_opts.get("durable", False), + exclusive=True, + auto_delete=(reliability == "unreliable")), + overrides=declare) + _rcv.on_unlink = [QueueDelete(_rcv._queue)] + subject = _rcv.subject or SUBJECT_DEFAULTS.get(subtype) + sst.write_cmd(ExchangeBind(_rcv._queue, _rcv.name, subject)) + bindings = get_bindings(link_opts, _rcv._queue, _rcv.name, subject) elif type == "queue": _rcv._queue = _rcv.name if _rcv.options.get("mode", "consume") == "browse": acq_mode = acquire_mode.not_acquired + bindings = get_bindings(link_opts, queue=_rcv._queue) + sst.write_cmds(bindings) sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=_rcv.destination, - acquire_mode = acq_mode)) + acquire_mode = acq_mode), + overrides=subscribe) sst.write_cmd(MessageSetFlowMode(_rcv.destination, flow_mode.credit), action) def do_unlink(self, sst, rcv, _rcv, action=noop): - sst.write_cmd(MessageCancel(_rcv.destination), action) + link_opts = _rcv.options.get("link", {}) + reliability = link_opts.get("reliability") + cmds = [MessageCancel(_rcv.destination)] + cmds.extend(_rcv.on_unlink) + sst.write_cmds(cmds, action) def del_link(self, sst, rcv, _rcv): del sst.destinations[_rcv.destination] @@ -240,13 +264,16 @@ class LinkOut: _snd.closing = False def do_link(self, sst, snd, _snd, type, subtype, action): + link_opts = _snd.options.get("link", {}) if type == "topic": _snd._exchange = _snd.name _snd._routing_key = _snd.subject + bindings = get_bindings(link_opts, exchange=_snd.name, key=_snd.subject) elif type == "queue": _snd._exchange = "" _snd._routing_key = _snd.name - action() + bindings = get_bindings(link_opts, queue=_snd.name) + sst.write_cmds(bindings, action) def do_unlink(self, sst, snd, _snd, action=noop): action() @@ -435,6 +462,19 @@ class Driver: self._host = (self._host + 1) % len(self._hosts) self.close_engine(e) +DEFAULT_DISPOSITION = Disposition(None) + +def get_bindings(opts, queue=None, exchange=None, key=None): + bindings = opts.get("x-bindings", []) + cmds = [] + for b in bindings: + exchange = b.get("exchange", exchange) + queue = b.get("queue", queue) + key = b.get("key", key) + args = b.get("arguments", {}) + cmds.append(ExchangeBind(queue, exchange, key, args)) + return cmds + class Engine: def __init__(self, connection): @@ -783,12 +823,6 @@ class Engine: err = self.declare(sst, lnk, action) else: err = ("no such queue: %s" % lnk.name,) - elif type == "queue": - try: - cmds = self.bindings(lnk) - sst.write_cmds(cmds, lambda: action(type, subtype)) - except address.ParseError, e: - err = (e,) else: action(type, subtype) @@ -829,23 +863,21 @@ class Engine: def declare(self, sst, lnk, action): name = lnk.name - props = lnk.options.get("node-properties", {}) + props = lnk.options.get("node", {}) durable = props.get("durable", DURABLE_DEFAULT) type = props.get("type", "queue") - xprops = props.get("x-properties", {}) + declare = props.get("x-declare", {}) if type == "topic": cmd = ExchangeDeclare(exchange=name, durable=durable) + bindings = get_bindings(props, exchange=name) elif type == "queue": cmd = QueueDeclare(queue=name, durable=durable) + bindings = get_bindings(props, queue=name) else: raise ValueError(type) - for f in cmd.FIELDS: - if f.name != "arguments" and xprops.has_key(f.name): - cmd[f.name] = xprops.pop(f.name) - if xprops: - cmd.arguments = xprops + sst.apply_overrides(cmd, declare) if type == "topic": if cmd.type is None: @@ -855,11 +887,7 @@ class Engine: subtype = None cmds = [cmd] - if type == "queue": - try: - cmds.extend(self.bindings(lnk)) - except address.ParseError, e: - return (e,) + cmds.extend(bindings) def declared(): self.address_cache[name] = (type, subtype) @@ -867,16 +895,6 @@ class Engine: sst.write_cmds(cmds, declared) - def bindings(self, lnk): - props = lnk.options.get("node-properties", {}) - xprops = props.get("x-properties", {}) - bindings = xprops.get("bindings", []) - cmds = [] - for b in bindings: - n, s, o = address.parse(b) - cmds.append(ExchangeBind(lnk.name, n, s, o)) - return cmds - def delete(self, sst, name, action): def deleted(): del self.address_cache[name] @@ -915,19 +933,49 @@ class Engine: if ssn.acked: messages = [m for m in ssn.acked if m not in sst.acked] if messages: - # XXX: we're ignoring acks that get lost when disconnected, - # could we deal this via some message-id based purge? - ids = RangedSet(*[m._transfer_id for m in messages if m._transfer_id is not None]) + ids = RangedSet() + + disposed = [(DEFAULT_DISPOSITION, [])] + for m in messages: + # XXX: we're ignoring acks that get lost when disconnected, + # could we deal this via some message-id based purge? + if m._transfer_id is None: + continue + ids.add(m._transfer_id) + disp = m._disposition or DEFAULT_DISPOSITION + last, msgs = disposed[-1] + if disp.type is last.type and disp.options == last.options: + msgs.append(m) + else: + disposed.append((disp, [m])) + for range in ids: sst.executed.add_range(range) sst.write_op(SessionCompleted(sst.executed)) - def ack_ack(): - for m in messages: - ssn.acked.remove(m) - if not ssn.transactional: - sst.acked.remove(m) - sst.write_cmd(MessageAccept(ids), ack_ack) - log.debug("SACK[%s]: %s", ssn.log_id, m) + + def ack_acker(msgs): + def ack_ack(): + for m in msgs: + ssn.acked.remove(m) + if not ssn.transactional: + sst.acked.remove(m) + return ack_ack + + for disp, msgs in disposed: + if not msgs: continue + if disp.type is None: + op = MessageAccept + elif disp.type is RELEASED: + op = MessageRelease + elif disp.type is REJECTED: + op = MessageReject + sst.write_cmd(op(RangedSet(*[m._transfer_id for m in msgs]), + **disp.options), + ack_acker(msgs)) + if log.isEnabledFor(DEBUG): + for m in msgs: + log.debug("SACK[%s]: %s, %s", ssn.log_id, m, m._disposition) + sst.acked.extend(messages) if ssn.committing and not sst.committing: @@ -948,7 +996,7 @@ class Engine: for range in ids: sst.executed.add_range(range) sst.write_op(SessionCompleted(sst.executed)) - sst.write_cmd(MessageRelease(ids)) + sst.write_cmd(MessageRelease(ids, True)) sst.write_cmd(TxRollback(), do_rb_ok) def do_rb_ok(): @@ -1055,8 +1103,11 @@ class Engine: if mp.application_headers is None: mp.application_headers = {} mp.application_headers[TO] = msg.to - if msg.durable: - dp.delivery_mode = delivery_mode.persistent + if msg.durable is not None: + if msg.durable: + dp.delivery_mode = delivery_mode.persistent + else: + dp.delivery_mode = delivery_mode.non_persistent if msg.priority is not None: dp.priority = msg.priority if msg.ttl is not None: @@ -1109,7 +1160,8 @@ class Engine: if mp.reply_to is not None: msg.reply_to = reply_to2addr(mp.reply_to) msg.correlation_id = mp.correlation_id - msg.durable = dp.delivery_mode == delivery_mode.persistent + if dp.delivery_mode is not None: + msg.durable = dp.delivery_mode == delivery_mode.persistent msg.priority = dp.priority msg.ttl = dp.ttl msg.redelivered = dp.redelivered Modified: qpid/branches/qmf-devel0.7a/qpid/python/qpid/messaging/endpoints.py URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/python/qpid/messaging/endpoints.py?rev=926686&r1=926685&r2=926686&view=diff ============================================================================== --- qpid/branches/qmf-devel0.7a/qpid/python/qpid/messaging/endpoints.py (original) +++ qpid/branches/qmf-devel0.7a/qpid/python/qpid/messaging/endpoints.py Tue Mar 23 18:00:49 2010 @@ -295,14 +295,29 @@ class Session: create: , delete: , assert: , - node-properties: { + node: { type: , durable: , - x-properties: { - bindings: ["/", ...], - : - } + x-declare: { ... ... } + x-bindings: [, ... ] } + link: { + name: , + durable: , + reliability: , + x-declare: { ... ... } + x-bindings: [, ... ] + x-subscribe: { ... ... } + } + } + + Bindings are specified as a map with the following options:: + + { + exchange: , + queue: , + key: , + arguments: } The create, delete, and assert policies specify who should perfom @@ -316,14 +331,12 @@ class Session: The node-type is one of: - I{topic}: a topic node will default to the topic exchange, - x-properties may be used to specify other exchange types + x-declare may be used to specify other exchange types - I{queue}: this is the default node-type - The x-properties map permits arbitrary additional keys and values to - be specified. These keys and values are passed through when creating - a node or asserting facts about an existing node. Any passthrough - keys and values that do not match a standard field of the underlying - exchange or queue declare command will be sent in the arguments map. + The x-declare map permits protocol specific keys and values to be + specified. These keys and values are passed through when creating a + node or asserting facts about an existing node. Examples -------- @@ -353,18 +366,18 @@ class Session: You can customize the properties of the queue:: - my-queue; {create: always, node-properties: {durable: True}} + my-queue; {create: always, node: {durable: True}} You can create a topic instead if you want:: - my-queue; {create: always, node-properties: {type: topic}} + my-queue; {create: always, node: {type: topic}} You can assert that the address resolves to a node with particular properties:: my-transient-topic; { assert: always, - node-properties: { + node: { type: topic, durable: False } @@ -508,7 +521,7 @@ class Session: raise Empty @synchronized - def acknowledge(self, message=None, sync=True): + def acknowledge(self, message=None, disposition=None, sync=True): """ Acknowledge the given L{Message}. If message is None, then all unacknowledged messages on the session are acknowledged. @@ -530,6 +543,7 @@ class Session: raise InsufficientCapacity("ack_capacity = %s" % self.ack_capacity) self._wakeup() self._ewait(lambda: len(self.acked) < self.ack_capacity) + m._disposition = disposition self.unacked.remove(m) self.acked.append(m) Modified: qpid/branches/qmf-devel0.7a/qpid/python/qpid/messaging/message.py URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/python/qpid/messaging/message.py?rev=926686&r1=926685&r2=926686&view=diff ============================================================================== --- qpid/branches/qmf-devel0.7a/qpid/python/qpid/messaging/message.py (original) +++ qpid/branches/qmf-devel0.7a/qpid/python/qpid/messaging/message.py Tue Mar 23 18:00:49 2010 @@ -129,7 +129,7 @@ class Message: "correlation_id", "priority", "ttl"]: value = self.__dict__[name] if value is not None: args.append("%s=%r" % (name, value)) - for name in ["durable", "properties"]: + for name in ["durable", "redelivered", "properties"]: value = self.__dict__[name] if value: args.append("%s=%r" % (name, value)) if self.content_type != get_type(self.content): @@ -141,4 +141,15 @@ class Message: args.append(repr(self.content)) return "Message(%s)" % ", ".join(args) -__all__ = ["Message"] +class Disposition: + + def __init__(self, type, **options): + self.type = type + self.options = options + + def __repr__(self): + args = [str(self.type)] + \ + ["%s=%r" % (k, v) for k, v in self.options.items()] + return "Disposition(%s)" % ", ".join(args) + +__all__ = ["Message", "Disposition"] Modified: qpid/branches/qmf-devel0.7a/qpid/python/qpid/tests/messaging/__init__.py URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/python/qpid/tests/messaging/__init__.py?rev=926686&r1=926685&r2=926686&view=diff ============================================================================== --- qpid/branches/qmf-devel0.7a/qpid/python/qpid/tests/messaging/__init__.py (original) +++ qpid/branches/qmf-devel0.7a/qpid/python/qpid/tests/messaging/__init__.py Tue Mar 23 18:00:49 2010 @@ -59,6 +59,9 @@ class Base(Test): else: return "%s[%s, %s]" % (base, count, self.test_id) + def message(self, base, count = None, **kwargs): + return Message(content=self.content(base, count), **kwargs) + def ping(self, ssn): PING_Q = 'ping-queue; {create: always, delete: always}' # send a message @@ -70,16 +73,52 @@ class Base(Test): ssn.acknowledge() assert msg.content == content, "expected %r, got %r" % (content, msg.content) - def drain(self, rcv, limit=None, timeout=0, expected=None): - contents = [] + def drain(self, rcv, limit=None, timeout=0, expected=None, redelivered=False): + messages = [] try: - while limit is None or len(contents) < limit: - contents.append(rcv.fetch(timeout=timeout).content) + while limit is None or len(messages) < limit: + messages.append(rcv.fetch(timeout=timeout)) except Empty: pass if expected is not None: - assert expected == contents, "expected %s, got %s" % (expected, contents) - return contents + self.assertEchos(expected, messages, redelivered) + return messages + + def diff(self, m1, m2): + result = {} + for attr in ("id", "subject", "user_id", "to", "reply_to", + "correlation_id", "durable", "priority", "ttl", + "redelivered", "properties", "content_type", + "content"): + a1 = getattr(m1, attr) + a2 = getattr(m2, attr) + if a1 != a2: + result[attr] = (a1, a2) + return result + + def assertEcho(self, msg, echo, redelivered=False): + if not isinstance(msg, Message) or not isinstance(echo, Message): + if isinstance(msg, Message): + msg = msg.content + if isinstance(echo, Message): + echo = echo.content + assert msg == echo, "expected %s, got %s" % (msg, echo) + else: + delta = self.diff(msg, echo) + mttl, ettl = delta.pop("ttl", (0, 0)) + if redelivered: + assert echo.redelivered, \ + "expected %s to be redelivered: %s" % (msg, echo) + if delta.has_key("redelivered"): + del delta["redelivered"] + assert mttl is not None and ettl is not None, "%s, %s" % (mttl, ettl) + assert mttl >= ettl, "%s, %s" % (mttl, ettl) + assert not delta, "expected %s, got %s, delta %s" % (msg, echo, delta) + + def assertEchos(self, msgs, echoes, redelivered=False): + assert len(msgs) == len(echoes), "%s, %s" % (msgs, echoes) + for m, e in zip(msgs, echoes): + self.assertEcho(m, e, redelivered) def assertEmpty(self, rcv): contents = self.drain(rcv) Modified: qpid/branches/qmf-devel0.7a/qpid/python/qpid/tests/messaging/endpoints.py URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/python/qpid/tests/messaging/endpoints.py?rev=926686&r1=926685&r2=926686&view=diff ============================================================================== --- qpid/branches/qmf-devel0.7a/qpid/python/qpid/tests/messaging/endpoints.py (original) +++ qpid/branches/qmf-devel0.7a/qpid/python/qpid/tests/messaging/endpoints.py Tue Mar 23 18:00:49 2010 @@ -227,21 +227,60 @@ class SessionTests(Base): def testAcknowledgeAsyncAckCapUNLIMITED(self): self.ackTest(lambda ssn: ssn.acknowledge(sync=False), UNLIMITED) - def send(self, ssn, queue, base, count=1): - snd = ssn.sender(queue, durable=self.durable()) - contents = [] + def testRelease(self): + msgs = [self.message("testRelease", i) for i in range(3)] + snd = self.ssn.sender("test-release-queue; {create: always, delete: always}") + for m in msgs: + snd.send(m) + rcv = self.ssn.receiver(snd.target) + echos = self.drain(rcv, expected=msgs) + self.ssn.acknowledge(echos[0]) + self.ssn.acknowledge(echos[1], Disposition(RELEASED, set_redelivered=True)) + self.ssn.acknowledge(echos[2], Disposition(RELEASED)) + self.drain(rcv, limit=1, expected=msgs[1:2], redelivered=True) + self.drain(rcv, expected=msgs[2:3]) + self.ssn.acknowledge() + + def testReject(self): + msgs = [self.message("testReject", i) for i in range(3)] + snd = self.ssn.sender(""" + test-reject-queue; { + create: always, + delete: always, + node: { + x-declare: { + alternate-exchange: 'amq.topic' + } + } + } +""") + for m in msgs: + snd.send(m) + rcv = self.ssn.receiver(snd.target) + rej = self.ssn.receiver("amq.topic") + echos = self.drain(rcv, expected=msgs) + self.ssn.acknowledge(echos[0]) + self.ssn.acknowledge(echos[1], Disposition(REJECTED)) + self.ssn.acknowledge(echos[2], + Disposition(REJECTED, code=3, text="test-reject")) + self.drain(rej, expected=msgs[1:]) + self.ssn.acknowledge() + + def send(self, ssn, target, base, count=1): + snd = ssn.sender(target, durable=self.durable()) + messages = [] for i in range(count): - c = self.content(base, i) + c = self.message(base, i) snd.send(c) - contents.append(c) + messages.append(c) snd.close() - return contents + return messages def txTest(self, commit): TX_Q = 'test-tx-queue; {create: sender, delete: receiver}' TX_Q_COPY = 'test-tx-queue-copy; {create: always, delete: always}' txssn = self.conn.session(transactional=True) - contents = self.send(self.ssn, TX_Q, "txTest", 3) + messages = self.send(self.ssn, TX_Q, "txTest", 3) txrcv = txssn.receiver(TX_Q) txsnd = txssn.sender(TX_Q_COPY, durable=self.durable()) rcv = self.ssn.receiver(txrcv.source) @@ -255,10 +294,10 @@ class SessionTests(Base): if commit: txssn.commit() self.assertEmpty(rcv) - assert contents == self.drain(copy_rcv) + self.drain(copy_rcv, expected=messages) else: txssn.rollback() - assert contents == self.drain(rcv) + self.drain(rcv, expected=messages, redelivered=True) self.assertEmpty(copy_rcv) self.ssn.acknowledge() @@ -271,13 +310,13 @@ class SessionTests(Base): def txTestSend(self, commit): TX_SEND_Q = 'test-tx-send-queue; {create: sender, delete: receiver}' txssn = self.conn.session(transactional=True) - contents = self.send(txssn, TX_SEND_Q, "txTestSend", 3) + messages = self.send(txssn, TX_SEND_Q, "txTestSend", 3) rcv = self.ssn.receiver(TX_SEND_Q) self.assertEmpty(rcv) if commit: txssn.commit() - assert contents == self.drain(rcv) + self.drain(rcv, expected=messages) self.ssn.acknowledge() else: txssn.rollback() @@ -297,18 +336,17 @@ class SessionTests(Base): txssn = self.conn.session(transactional=True) txrcv = txssn.receiver(TX_ACK_QC) self.assertEmpty(txrcv) - contents = self.send(self.ssn, TX_ACK_QC, "txTestAck", 3) - assert contents == self.drain(txrcv) + messages = self.send(self.ssn, TX_ACK_QC, "txTestAck", 3) + self.drain(txrcv, expected=messages) if commit: txssn.acknowledge() else: txssn.rollback() - drained = self.drain(txrcv) - assert contents == drained, "expected %s, got %s" % (contents, drained) + self.drain(txrcv, expected=messages, redelivered=True) txssn.acknowledge() txssn.rollback() - assert contents == self.drain(txrcv) + self.drain(txrcv, expected=messages, redelivered=True) txssn.commit() # commit without ack self.assertEmpty(txrcv) @@ -316,7 +354,7 @@ class SessionTests(Base): txssn = self.conn.session(transactional=True) txrcv = txssn.receiver(TX_ACK_QC) - assert contents == self.drain(txrcv) + self.drain(txrcv, expected=messages, redelivered=True) txssn.acknowledge() txssn.commit() rcv = self.ssn.receiver(TX_ACK_QD) @@ -477,7 +515,7 @@ class ReceiverTests(Base): snd = self.ssn.sender("""test-double-close; { create: always, delete: sender, - node-properties: { + node: { type: topic } } @@ -533,9 +571,9 @@ class AddressTests(Base): assert "error in options: %s" % error == str(e), e def testIllegalKey(self): - self.badOption("{create: always, node-properties: " + self.badOption("{create: always, node: " "{this-property-does-not-exist: 3}}", - "node-properties: this-property-does-not-exist: " + "node: this-property-does-not-exist: " "illegal key") def testWrongValue(self): @@ -543,23 +581,17 @@ class AddressTests(Base): "('always', 'sender', 'receiver', 'never')") def testWrongType1(self): - self.badOption("{node-properties: asdf}", - "node-properties: asdf is not a map") + self.badOption("{node: asdf}", + "node: asdf is not a map") def testWrongType2(self): - self.badOption("{node-properties: {durable: []}}", - "node-properties: durable: [] is not a bool") - - def testNonQueueBindings(self): - self.badOption("{node-properties: {type: topic, x-properties: " - "{bindings: []}}}", - "node-properties: x-properties: bindings: " - "bindings are only permitted on nodes of type queue") + self.badOption("{node: {durable: []}}", + "node: durable: [] is not a bool") def testCreateQueue(self): snd = self.ssn.sender("test-create-queue; {create: always, delete: always, " - "node-properties: {type: queue, durable: False, " - "x-properties: {auto_delete: true}}}") + "node: {type: queue, durable: False, " + "x-declare: {auto_delete: true}}}") content = self.content("testCreateQueue") snd.send(content) rcv = self.ssn.receiver("test-create-queue") @@ -569,10 +601,10 @@ class AddressTests(Base): addr = """test-create-exchange; { create: always, delete: always, - node-properties: { + node: { type: topic, durable: False, - x-properties: {auto_delete: true, %s} + x-declare: {auto_delete: true, %s} } }""" % props snd = self.ssn.sender(addr) @@ -639,15 +671,15 @@ class AddressTests(Base): # XXX: need to figure out close after error self.conn._remove_session(self.ssn) - def testBindings(self): + def testNodeBindingsQueue(self): snd = self.ssn.sender(""" -test-bindings-queue; { +test-node-bindings-queue; { create: always, delete: always, - node-properties: { - x-properties: { - bindings: ["amq.topic/a.#", "amq.direct/b", "amq.topic/c.*"] - } + node: { + x-bindings: [{exchange: "amq.topic", key: "a.#"}, + {exchange: "amq.direct", key: "b"}, + {exchange: "amq.topic", key: "c.*"}] } } """) @@ -658,49 +690,80 @@ test-bindings-queue; { snd_a.send("two") snd_b.send("three") snd_c.send("four") - rcv = self.ssn.receiver("test-bindings-queue") + rcv = self.ssn.receiver("test-node-bindings-queue") self.drain(rcv, expected=["one", "two", "three", "four"]) - def testBindingsAdditive(self): - m1 = self.content("testBindingsAdditive", 1) - m2 = self.content("testBindingsAdditive", 2) - m3 = self.content("testBindingsAdditive", 3) - m4 = self.content("testBindingsAdditive", 4) - + def testNodeBindingsTopic(self): + rcv = self.ssn.receiver("test-node-bindings-topic-queue; {create: always, delete: always}") + rcv_a = self.ssn.receiver("test-node-bindings-topic-queue-a; {create: always, delete: always}") + rcv_b = self.ssn.receiver("test-node-bindings-topic-queue-b; {create: always, delete: always}") + rcv_c = self.ssn.receiver("test-node-bindings-topic-queue-c; {create: always, delete: always}") snd = self.ssn.sender(""" -test-bindings-additive-queue; { +test-node-bindings-topic; { create: always, delete: always, - node-properties: { - x-properties: { - bindings: ["amq.topic/a"] - } + node: { + type: topic, + x-bindings: [{queue: test-node-bindings-topic-queue, key: "#"}, + {queue: test-node-bindings-topic-queue-a, key: "a.#"}, + {queue: test-node-bindings-topic-queue-b, key: "b"}, + {queue: test-node-bindings-topic-queue-c, key: "c.*"}] } } """) + m1 = Message("one") + m2 = Message(subject="a.foo", content="two") + m3 = Message(subject="b", content="three") + m4 = Message(subject="c.bar", content="four") + snd.send(m1) + snd.send(m2) + snd.send(m3) + snd.send(m4) + self.drain(rcv, expected=[m1, m2, m3, m4]) + self.drain(rcv_a, expected=[m2]) + self.drain(rcv_b, expected=[m3]) + self.drain(rcv_c, expected=[m4]) + + def testLinkBindings(self): + m_a = self.message("testLinkBindings", 1, subject="a") + m_b = self.message("testLinkBindings", 2, subject="b") - snd_a = self.ssn.sender("amq.topic/a") - snd_b = self.ssn.sender("amq.topic/b") + self.ssn.sender("test-link-bindings-queue; {create: always, delete: always}") + snd = self.ssn.sender("amq.topic") - snd_a.send(m1) - snd_b.send(m2) + snd.send(m_a) + snd.send(m_b) + snd.close() - rcv = self.ssn.receiver("test-bindings-additive-queue") - self.drain(rcv, expected=[m1]) + rcv = self.ssn.receiver("test-link-bindings-queue") + self.assertEmpty(rcv) + + snd = self.ssn.sender(""" +amq.topic; { + link: { + x-bindings: [{queue: test-link-bindings-queue, key: a}] + } +} +""") - new_snd = self.ssn.sender(""" -test-bindings-additive-queue; { - node-properties: { - x-properties: { - bindings: ["amq.topic/b"] - } + snd.send(m_a) + snd.send(m_b) + + self.drain(rcv, expected=[m_a]) + rcv.close() + + rcv = self.ssn.receiver(""" +test-link-bindings-queue; { + link: { + x-bindings: [{exchange: "amq.topic", key: b}] } } """) - new_snd.send(m3) - snd_b.send(m4) - self.drain(rcv, expected=[m3, m4]) + snd.send(m_a) + snd.send(m_b) + + self.drain(rcv, expected=[m_a, m_b]) def testSubjectOverride(self): snd = self.ssn.sender("amq.topic/a") @@ -726,6 +789,32 @@ test-bindings-additive-queue; { assert e2.subject == "b", "subject: %s" % e2.subject self.assertEmpty(rcv) + def doReliabilityTest(self, reliability, messages, expected): + snd = self.ssn.sender("amq.topic") + rcv = self.ssn.receiver("amq.topic; {link: {reliability: %s}}" % reliability) + for m in messages: + snd.send(m) + self.conn.disconnect() + self.conn.connect() + self.drain(rcv, expected=expected) + + def testReliabilityUnreliable(self): + msgs = [self.message("testReliabilityUnreliable", i) for i in range(3)] + self.doReliabilityTest("unreliable", msgs, []) + + def testReliabilityAtLeastOnce(self): + msgs = [self.message("testReliabilityAtLeastOnce", i) for i in range(3)] + self.doReliabilityTest("at-least-once", msgs, msgs) + + def testLinkName(self): + msgs = [self.message("testLinkName", i) for i in range(3)] + snd = self.ssn.sender("amq.topic") + trcv = self.ssn.receiver("amq.topic; {link: {name: test-link-name}}") + qrcv = self.ssn.receiver("test-link-name") + for m in msgs: + snd.send(m) + self.drain(qrcv, expected=msgs) + NOSUCH_Q = "this-queue-should-not-exist" UNPARSEABLE_ADDR = "name/subject; {bad options" UNLEXABLE_ADDR = "\0x0\0x1\0x2\0x3" @@ -838,8 +927,7 @@ class SenderTests(Base): msgs = [self.content("asyncTest", i) for i in range(15)] for m in msgs: self.snd.send(m, sync=False) - drained = self.drain(self.rcv, timeout=self.delay()) - assert msgs == drained, "expected %s, got %s" % (msgs, drained) + self.drain(self.rcv, timeout=self.delay(), expected=msgs) self.ssn.acknowledge() def testSendAsyncCapacity0(self): Modified: qpid/branches/qmf-devel0.7a/qpid/python/qpid/tests/messaging/message.py URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/python/qpid/tests/messaging/message.py?rev=926686&r1=926685&r2=926686&view=diff ============================================================================== --- qpid/branches/qmf-devel0.7a/qpid/python/qpid/tests/messaging/message.py (original) +++ qpid/branches/qmf-devel0.7a/qpid/python/qpid/tests/messaging/message.py Tue Mar 23 18:00:49 2010 @@ -68,20 +68,7 @@ class MessageEchoTests(Base): def check(self, msg): self.snd.send(msg) echo = self.rcv.fetch(0) - - assert msg.id == echo.id - assert msg.subject == echo.subject - assert msg.user_id == echo.user_id - assert msg.to == echo.to - assert msg.reply_to == echo.reply_to - assert msg.correlation_id == echo.correlation_id - assert msg.durable == echo.durable - assert msg.priority == echo.priority - assert msg.ttl == echo.ttl - assert msg.properties == echo.properties - assert msg.content_type == echo.content_type - assert msg.content == echo.content, "%s, %s" % (msg, echo) - + self.assertEcho(msg, echo) self.ssn.acknowledge(echo) def testStringContent(self): Modified: qpid/branches/qmf-devel0.7a/qpid/python/qpid/validator.py URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/python/qpid/validator.py?rev=926686&r1=926685&r2=926686&view=diff ============================================================================== --- qpid/branches/qmf-devel0.7a/qpid/python/qpid/validator.py (original) +++ qpid/branches/qmf-devel0.7a/qpid/python/qpid/validator.py Tue Mar 23 18:00:49 2010 @@ -54,6 +54,20 @@ class Types: else: return "%s is not one of: %s" % (o, ", ".join([t.__name__ for t in self.types])) +class List: + + def __init__(self, condition): + self.condition = condition + + def validate(self, o, ctx): + if not isinstance(o, list): + return "%s is not a list" % o + + ctx.push(o) + for v in o: + err = self.condition.validate(v, ctx) + if err: return err + class Map: def __init__(self, map, restricted=True): Propchange: qpid/branches/qmf-devel0.7a/qpid/ruby/ext/sasl/extconf.rb ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Mar 23 18:00:49 2010 @@ -0,0 +1 @@ +/qpid/trunk/qpid/ruby/ext/sasl/extconf.rb:919043-926606 Modified: qpid/branches/qmf-devel0.7a/qpid/tests/src/py/qpid_tests/broker_0_10/exchange.py URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/tests/src/py/qpid_tests/broker_0_10/exchange.py?rev=926686&r1=926685&r2=926686&view=diff ============================================================================== --- qpid/branches/qmf-devel0.7a/qpid/tests/src/py/qpid_tests/broker_0_10/exchange.py (original) +++ qpid/branches/qmf-devel0.7a/qpid/tests/src/py/qpid_tests/broker_0_10/exchange.py Tue Mar 23 18:00:49 2010 @@ -430,6 +430,16 @@ class HeadersExchangeTests(TestHelper): self.myBasicPublish({"irrelevant":0}) self.assertEmpty(self.q) + def testMatchVoidValue(self): + self.session.exchange_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'any', "name":None}) + self.myAssertPublishGet({"name":"fred"}) + self.myAssertPublishGet({"name":"bob"}) + + # Wont match + self.myBasicPublish({}) + self.myBasicPublish({"irrelevant":0}) + self.assertEmpty(self.q) + class MiscellaneousErrorsTests(TestHelper): """ Propchange: qpid/branches/qmf-devel0.7a/qpid/tests/src/py/qpid_tests/broker_0_9/queue.py ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Mar 23 18:00:49 2010 @@ -1,2 +1,3 @@ /qpid/branches/qmfv2/qpid/python/tests_0-9/queue.py:902858,902894 /qpid/branches/qpid.rnr/python/tests_0-9/queue.py:894071-896158 +/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/queue.py:919043-926606 --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscribe@qpid.apache.org