Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 56926 invoked from network); 23 Mar 2011 22:58:41 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 23 Mar 2011 22:58:41 -0000 Received: (qmail 13179 invoked by uid 500); 23 Mar 2011 22:58:41 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 13124 invoked by uid 500); 23 Mar 2011 22:58:41 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 13117 invoked by uid 99); 23 Mar 2011 22:58:41 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 23 Mar 2011 22:58:41 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 23 Mar 2011 22:58:39 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 93D3623889D7; Wed, 23 Mar 2011 22:58:18 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1084797 - in /activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover: FailoverConsumerOutstandingCommitTest.java FailoverConsumerUnconsumedTest.java FailoverPrefetchZeroTest.java FailoverTransactionTest.java Date: Wed, 23 Mar 2011 22:58:18 -0000 To: commits@activemq.apache.org From: tabish@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110323225818.93D3623889D7@eris.apache.org> Author: tabish Date: Wed Mar 23 22:58:18 2011 New Revision: 1084797 URL: http://svn.apache.org/viewvc?rev=1084797&view=rev Log: Update the tests so that they're not dependent on port 61616 Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverPrefetchZeroTest.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java?rev=1084797&r1=1084796&r2=1084797&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java Wed Mar 23 22:58:18 2011 @@ -52,55 +52,58 @@ import org.junit.After; import org.junit.Test; public class FailoverConsumerOutstandingCommitTest { - + private static final Logger LOG = LoggerFactory.getLogger(FailoverConsumerOutstandingCommitTest.class); - private static final String QUEUE_NAME = "FailoverWithOutstandingCommit"; + private static final String QUEUE_NAME = "FailoverWithOutstandingCommit"; private static final String MESSAGE_TEXT = "Test message "; - private String url = "tcp://localhost:61616"; - final int prefetch = 10; - BrokerService broker; - - public void startCleanBroker() throws Exception { - startBroker(true); - } - - @After - public void stopBroker() throws Exception { - if (broker != null) { - broker.stop(); - } - } - - public void startBroker(boolean deleteAllMessagesOnStartup) throws Exception { - broker = createBroker(deleteAllMessagesOnStartup); + private static final String TRANSPORT_URI = "tcp://localhost:0"; + private String url; + final int prefetch = 10; + BrokerService broker; + + @After + public void stopBroker() throws Exception { + if (broker != null) { + broker.stop(); + } + } + + public void startBroker(boolean deleteAllMessagesOnStartup) throws Exception { + broker = createBroker(deleteAllMessagesOnStartup); broker.start(); - } + } - public BrokerService createBroker(boolean deleteAllMessagesOnStartup) throws Exception { - broker = new BrokerService(); - broker.addConnector(url); - broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup); - PolicyMap policyMap = new PolicyMap(); - PolicyEntry defaultEntry = new PolicyEntry(); - - // optimizedDispatche and sync dispatch ensure that the dispatch happens - // before the commit reply that the consumer.clearDispatchList is waiting for. - defaultEntry.setOptimizedDispatch(true); + public BrokerService createBroker(boolean deleteAllMessagesOnStartup) throws Exception { + return createBroker(deleteAllMessagesOnStartup, TRANSPORT_URI); + } + + public BrokerService createBroker(boolean deleteAllMessagesOnStartup, String bindAddress) throws Exception { + broker = new BrokerService(); + broker.addConnector(bindAddress); + broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup); + PolicyMap policyMap = new PolicyMap(); + PolicyEntry defaultEntry = new PolicyEntry(); + + // optimizedDispatche and sync dispatch ensure that the dispatch happens + // before the commit reply that the consumer.clearDispatchList is waiting for. + defaultEntry.setOptimizedDispatch(true); policyMap.setDefaultEntry(defaultEntry); broker.setDestinationPolicy(policyMap); - - return broker; - } - - @Test - public void testFailoverConsumerDups() throws Exception { - doTestFailoverConsumerDups(true); - } - - public void doTestFailoverConsumerDups(final boolean watchTopicAdvisories) throws Exception { - + + url = broker.getTransportConnectors().get(0).getConnectUri().toString(); + + return broker; + } + + @Test + public void testFailoverConsumerDups() throws Exception { + doTestFailoverConsumerDups(true); + } + + public void doTestFailoverConsumerDups(final boolean watchTopicAdvisories) throws Exception { + broker = createBroker(true); - + broker.setPlugins(new BrokerPlugin[] { new BrokerPluginSupport() { @Override @@ -108,7 +111,7 @@ public class FailoverConsumerOutstanding TransactionId xid, boolean onePhase) throws Exception { // so commit will hang as if reply is lost context.setDontSendReponse(true); - Executors.newSingleThreadExecutor().execute(new Runnable() { + Executors.newSingleThreadExecutor().execute(new Runnable() { public void run() { LOG.info("Stopping broker before commit..."); try { @@ -122,17 +125,17 @@ public class FailoverConsumerOutstanding } }); broker.start(); - + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); cf.setWatchTopicAdvisories(watchTopicAdvisories); cf.setDispatchAsync(false); - + final ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection(); connection.start(); - + final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); final Queue destination = producerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=" + prefetch); - + final Session consumerSession = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); @@ -144,9 +147,9 @@ public class FailoverConsumerOutstanding public void onMessage(Message message) { LOG.info("consume one and commit"); - + assertNotNull("got message", message); - + try { consumerSession.commit(); } catch (JMSException e) { @@ -157,7 +160,7 @@ public class FailoverConsumerOutstanding LOG.info("done commit"); } }); - + // may block if broker shutodwn happens quickly Executors.newSingleThreadExecutor().execute(new Runnable() { public void run() { @@ -171,15 +174,15 @@ public class FailoverConsumerOutstanding LOG.info("producer done"); } }); - + // will be stopped by the plugin broker.waitUntilStopped(); - broker = createBroker(false); + broker = createBroker(false, url); broker.start(); assertTrue("consumer added through failover", commitDoneLatch.await(20, TimeUnit.SECONDS)); assertTrue("another message was recieved after failover", messagesReceived.await(20, TimeUnit.SECONDS)); - + connection.close(); } @@ -187,12 +190,12 @@ public class FailoverConsumerOutstanding public void TestFailoverConsumerOutstandingSendTxIncomplete() throws Exception { doTestFailoverConsumerOutstandingSendTx(false); } - + @Test public void TestFailoverConsumerOutstandingSendTxComplete() throws Exception { doTestFailoverConsumerOutstandingSendTx(true); } - + public void doTestFailoverConsumerOutstandingSendTx(final boolean doActualBrokerCommit) throws Exception { final boolean watchTopicAdvisories = true; broker = createBroker(true); @@ -233,7 +236,7 @@ public class FailoverConsumerOutstanding final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); final Queue destination = producerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=" + prefetch); - + final Queue signalDestination = producerSession.createQueue(QUEUE_NAME + ".signal" + "?consumer.prefetchSize=" + prefetch); @@ -280,7 +283,7 @@ public class FailoverConsumerOutstanding // will be stopped by the plugin broker.waitUntilStopped(); - broker = createBroker(false); + broker = createBroker(false, url); broker.start(); assertTrue("commit done through failover", commitDoneLatch.await(20, TimeUnit.SECONDS)); @@ -291,8 +294,8 @@ public class FailoverConsumerOutstanding assertEquals("get message 0 second", MESSAGE_TEXT + "0", receivedMessages.get(1).getText()); assertTrue("another message was received", messagesReceived.await(20, TimeUnit.SECONDS)); assertEquals("get message 1 eventually", MESSAGE_TEXT + "1", receivedMessages.get(2).getText()); - - + + connection.close(); } @@ -312,28 +315,28 @@ public class FailoverConsumerOutstanding final Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED); final MessageConsumer testConsumer = consumerSession.createConsumer(destination); assertNull("no message yet", testConsumer.receiveNoWait()); - + produceMessage(producerSession, destination, 1); producerSession.close(); // consume then rollback after restart Message msg = testConsumer.receive(5000); assertNotNull(msg); - + // restart with outstanding delivered message broker.stop(); broker.waitUntilStopped(); - broker = createBroker(false); + broker = createBroker(false, url); broker.start(); - + consumerSession.rollback(); - + // receive again msg = testConsumer.receive(10000); assertNotNull("got message again after rollback", msg); consumerSession.commit(); - + // close before sweep consumerSession.close(); msg = receiveMessage(cf, destination); Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java?rev=1084797&r1=1084796&r2=1084797&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java Wed Mar 23 22:58:18 2011 @@ -51,51 +51,55 @@ import org.junit.Test; // see https://issues.apache.org/activemq/browse/AMQ-2573 public class FailoverConsumerUnconsumedTest { - + private static final Logger LOG = LoggerFactory.getLogger(FailoverConsumerUnconsumedTest.class); - private static final String QUEUE_NAME = "FailoverWithUnconsumed"; - private String url = "tcp://localhost:61616"; - final int prefetch = 10; - BrokerService broker; - - public void startCleanBroker() throws Exception { - startBroker(true); - } - - @After - public void stopBroker() throws Exception { - if (broker != null) { - broker.stop(); - } - } - - public void startBroker(boolean deleteAllMessagesOnStartup) throws Exception { - broker = createBroker(deleteAllMessagesOnStartup); + private static final String QUEUE_NAME = "FailoverWithUnconsumed"; + private static final String TRANSPORT_URI = "tcp://localhost:0"; + private String url; + final int prefetch = 10; + BrokerService broker; + + @After + public void stopBroker() throws Exception { + if (broker != null) { + broker.stop(); + } + } + + public void startBroker(boolean deleteAllMessagesOnStartup) throws Exception { + broker = createBroker(deleteAllMessagesOnStartup); broker.start(); - } + } + + public BrokerService createBroker(boolean deleteAllMessagesOnStartup) throws Exception { + return createBroker(deleteAllMessagesOnStartup, TRANSPORT_URI); + } + + public BrokerService createBroker(boolean deleteAllMessagesOnStartup, String bindAddress) throws Exception { + broker = new BrokerService(); + broker.addConnector(bindAddress); + broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup); + + this.url = broker.getTransportConnectors().get(0).getConnectUri().toString(); - public BrokerService createBroker(boolean deleteAllMessagesOnStartup) throws Exception { - broker = new BrokerService(); - broker.addConnector(url); - broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup); - return broker; - } - - @Test - public void testFailoverConsumerDups() throws Exception { - doTestFailoverConsumerDups(true); - } - - @Test + return broker; + } + + @Test + public void testFailoverConsumerDups() throws Exception { + doTestFailoverConsumerDups(true); + } + + @Test public void testFailoverConsumerDupsNoAdvisoryWatch() throws Exception { doTestFailoverConsumerDups(false); } - - public void doTestFailoverConsumerDups(final boolean watchTopicAdvisories) throws Exception { - - final int maxConsumers = 4; + + public void doTestFailoverConsumerDups(final boolean watchTopicAdvisories) throws Exception { + + final int maxConsumers = 4; broker = createBroker(true); - + broker.setPlugins(new BrokerPlugin[] { new BrokerPluginSupport() { int consumerCount; @@ -106,7 +110,7 @@ public class FailoverConsumerUnconsumedT final ConsumerInfo info) throws Exception { if (++consumerCount == maxConsumers + (watchTopicAdvisories ? 1:0)) { context.setDontSendReponse(true); - Executors.newSingleThreadExecutor().execute(new Runnable() { + Executors.newSingleThreadExecutor().execute(new Runnable() { public void run() { LOG.info("Stopping broker on consumer: " + info.getConsumerId()); try { @@ -122,13 +126,13 @@ public class FailoverConsumerUnconsumedT } }); broker.start(); - + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); cf.setWatchTopicAdvisories(watchTopicAdvisories); - + final ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection(); connection.start(); - + final Session consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); final Queue destination = consumerSession.createQueue(QUEUE_NAME + "?jms.consumer.prefetch=" + prefetch); @@ -136,9 +140,9 @@ public class FailoverConsumerUnconsumedT for (int i=0; i