Return-Path: X-Original-To: apmail-qpid-commits-archive@www.apache.org Delivered-To: apmail-qpid-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3BA8917BC4 for ; Mon, 31 Aug 2015 20:31:59 +0000 (UTC) Received: (qmail 56578 invoked by uid 500); 31 Aug 2015 20:31:59 -0000 Delivered-To: apmail-qpid-commits-archive@qpid.apache.org Received: (qmail 56486 invoked by uid 500); 31 Aug 2015 20:31:59 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 56431 invoked by uid 99); 31 Aug 2015 20:31:59 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 31 Aug 2015 20:31:59 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E1B48E03CD; Mon, 31 Aug 2015 20:31:58 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tabish@apache.org To: commits@qpid.apache.org Date: Mon, 31 Aug 2015 20:31:59 -0000 Message-Id: <01befd28507e4fc38ec5965f0a759cd3@git.apache.org> In-Reply-To: <12ec0b67c3fc464181f1bcc666ee8375@git.apache.org> References: <12ec0b67c3fc464181f1bcc666ee8375@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] qpid-jms git commit: QPIDJMS-51 Further refine the QueueBrowser implementation removing the now unneeded AmqpQueueBrowser class and merging the implementation up into the pull consumer code in AmqpConsumer. Adding in tests for QueueBrowser and som QPIDJMS-51 Further refine the QueueBrowser implementation removing the now unneeded AmqpQueueBrowser class and merging the implementation up into the pull consumer code in AmqpConsumer. Adding in tests for QueueBrowser and some failover tests for pull functionality. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/9afe5daa Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/9afe5daa Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/9afe5daa Branch: refs/heads/master Commit: 9afe5daaaef7427ba0783afea7716a7ab4d75ec1 Parents: f2f2cca Author: Timothy Bish Authored: Mon Aug 31 16:31:49 2015 -0400 Committer: Timothy Bish Committed: Mon Aug 31 16:31:49 2015 -0400 ---------------------------------------------------------------------- .../qpid/jms/provider/amqp/AmqpConsumer.java | 52 ++-- .../jms/provider/amqp/AmqpQueueBrowser.java | 103 ------- .../qpid/jms/provider/amqp/AmqpSession.java | 9 +- .../QueueBrowserIntegrationTest.java | 149 ++++++++++ .../failover/FailoverIntegrationTest.java | 289 +++++++++++++++++++ .../qpid/jms/test/testpeer/TestAmqpPeer.java | 131 ++++++++- 6 files changed, 602 insertions(+), 131 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9afe5daa/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java index 9ba2730..22a2932 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java @@ -153,7 +153,7 @@ public class AmqpConsumer extends AmqpAbstractResource 0) { // We need to drain the credit if no message arrives. If that @@ -421,12 +427,16 @@ public class AmqpConsumer extends AmqpAbstractResource completionTask; @@ -668,6 +677,11 @@ public class AmqpConsumer extends AmqpAbstractResource future = getSession().schedule(new Runnable() { - - @Override - public void run() { - // Try for one last time to pull a message down, if this - // fails then we can end the browse otherwise the link credit - // will get updated on the next sent disposition and we will - // end up back here if no more messages arrive. - LOG.trace("Browser {} attemptig to force a message dispatch"); - getEndpoint().drain(1); - pullRequest = new PullRequest(); - session.getProvider().pumpToProtonTransport(); - } - }, timeout); - - pullRequest = new BrowseEndPullRequest(future); - } - } - - @Override - protected void configureSource(Source source) { - if (resource.isBrowser()) { - source.setDistributionMode(COPY); - } - - super.configureSource(source); - } - - @Override - public boolean isBrowser() { - return true; - } - - @Override - public String toString() { - return "AmqpQueueBrowser { " + this.resource.getConsumerId() + " }"; - } - - //----- Inner classes used in message pull operations --------------------// - - protected class BrowseEndPullRequest extends TimedPullRequest { - - public BrowseEndPullRequest(ScheduledFuture completionTask) { - super(completionTask); - } - - @Override - public void onFailure(Throwable result) { - // Nothing to do, the timer will take care of the end of browse signal. - } - } -} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9afe5daa/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java index 27186c4..96f4719 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java @@ -137,14 +137,7 @@ public class AmqpSession extends AmqpAbstractResource { } public AmqpConsumer createConsumer(JmsConsumerInfo consumerInfo) { - AmqpConsumer result = null; - - if (consumerInfo.isBrowser()) { - result = new AmqpQueueBrowser(this, consumerInfo); - } else { - result = new AmqpConsumer(this, consumerInfo); - } - + AmqpConsumer result = new AmqpConsumer(this, consumerInfo); result.setPresettle(connection.isPresettleConsumers()); return result; } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9afe5daa/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/QueueBrowserIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/QueueBrowserIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/QueueBrowserIntegrationTest.java index 021f4e3..0937cca 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/QueueBrowserIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/QueueBrowserIntegrationTest.java @@ -16,18 +16,23 @@ */ package org.apache.qpid.jms.integration; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.Enumeration; import javax.jms.Connection; +import javax.jms.Message; import javax.jms.Queue; import javax.jms.QueueBrowser; import javax.jms.Session; +import org.apache.qpid.jms.JmsConnection; import org.apache.qpid.jms.test.QpidJmsTestCase; import org.apache.qpid.jms.test.testpeer.TestAmqpPeer; import org.apache.qpid.jms.test.testpeer.describedtypes.Declare; @@ -38,11 +43,15 @@ import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompos import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.DescribedType; +import org.apache.qpid.proton.amqp.UnsignedInteger; import org.junit.Test; public class QueueBrowserIntegrationTest extends QpidJmsTestCase { + private final IntegrationTestFixture testFixture = new IntegrationTestFixture(); + //----- Test basic create and destroy mechanisms -------------------------// + @Test(timeout=30000) public void testCreateQueueBrowserWithoutEnumeration() throws IOException, Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { @@ -91,6 +100,37 @@ public class QueueBrowserIntegrationTest extends QpidJmsTestCase { } } + //----- Tests for expected behaviors of a QueueBrowser implementation ----// + + @Test(timeout=30000) + public void testQueueBrowserNextElementWithNoMessage() throws IOException, Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer); + connection.start(); + + testPeer.expectBegin(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("myQueue"); + + // Expected the browser to create a consumer and send credit. + testPeer.expectReceiverAttach(); + testPeer.expectLinkFlow(); + testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(1))); + testPeer.expectDetach(true, true, true); + + QueueBrowser browser = session.createBrowser(queue); + Enumeration queueView = browser.getEnumeration(); + assertNotNull(queueView); + assertNull(queueView.nextElement()); + + browser.close(); + + testPeer.waitForAllHandlersToComplete(3000); + } + } + //----- Tests that cover QueueBrowser and Session Ack mode interaction ---// + @Test(timeout=30000) public void testCreateQueueBrowseAutoAckSession() throws IOException, Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { @@ -196,4 +236,113 @@ public class QueueBrowserIntegrationTest extends QpidJmsTestCase { testPeer.waitForAllHandlersToComplete(3000); } } + + //----- Tests that cover QueueBrowser when prefetch is zero --------------// + + @Test(timeout=30000) + public void testCreateQueueBrowserAndEnumerationZeroPrefetch() throws IOException, Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer); + connection.start(); + + JmsConnection jmsConnection = (JmsConnection) connection; + jmsConnection.getPrefetchPolicy().setAll(0); + + testPeer.expectBegin(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("myQueue"); + + // Expected the browser to create a consumer and send credit. + testPeer.expectReceiverAttach(); + testPeer.expectDetach(true, true, true); + + QueueBrowser browser = session.createBrowser(queue); + Enumeration queueView = browser.getEnumeration(); + assertNotNull(queueView); + + browser.close(); + + testPeer.waitForAllHandlersToComplete(3000); + } + } + + @Test(timeout=30000) + public void testQueueBrowseHasMoreElementsZeroPrefetchNoMessage() throws IOException, Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer); + connection.start(); + + JmsConnection jmsConnection = (JmsConnection) connection; + jmsConnection.getPrefetchPolicy().setAll(0); + + testPeer.expectBegin(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("myQueue"); + + // Expected the browser to create a consumer and send credit. + testPeer.expectReceiverAttach(); + testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(1))); + testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(1))); + testPeer.expectDetach(true, true, true); + + QueueBrowser browser = session.createBrowser(queue); + Enumeration queueView = browser.getEnumeration(); + assertNotNull(queueView); + assertFalse(queueView.hasMoreElements()); + + browser.close(); + + testPeer.waitForAllHandlersToComplete(3000); + } + } + + @Test(timeout=30000) + public void testQueueBrowseHasMoreElementsZeroPrefetchDrainedMessage() throws IOException, Exception { + DescribedType amqpValueNullContent = new AmqpValueDescribedType(null); + + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer); + connection.start(); + + JmsConnection jmsConnection = (JmsConnection) connection; + jmsConnection.getPrefetchPolicy().setAll(0); + + testPeer.expectBegin(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("myQueue"); + + // Expected the browser to create a consumer and send credit. + testPeer.expectReceiverAttach(); + testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(1))); + + // After timeout the browser should drain and here we want to ensure that if a + // message arrives that we handle it correctly. + testPeer.expectLinkFlowRespondWithTransfer( + null, null, null, null, amqpValueNullContent, 1, true, false, equalTo(UnsignedInteger.valueOf(1)), 1, false); + + // Message gets ack'd right away + testPeer.expectDispositionThatIsAcceptedAndSettled(); + + // Next attempt should not get a message and trigger a false on hasMoreElemets. + testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(1))); + testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(1))); + + testPeer.expectDetach(true, true, true); + + QueueBrowser browser = session.createBrowser(queue); + Enumeration queueView = browser.getEnumeration(); + assertNotNull(queueView); + assertTrue(queueView.hasMoreElements()); + Message message = (Message) queueView.nextElement(); + assertNotNull(message); + assertFalse(queueView.hasMoreElements()); + + browser.close(); + + testPeer.waitForAllHandlersToComplete(3000); + } + } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9afe5daa/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java index 1092bc1..3ff9ce2 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java @@ -19,9 +19,13 @@ package org.apache.qpid.jms.provider.failover; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.net.URI; +import java.util.Enumeration; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -30,8 +34,10 @@ import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Message; +import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Queue; +import javax.jms.QueueBrowser; import javax.jms.Session; import org.apache.qpid.jms.JmsConnection; @@ -40,9 +46,12 @@ import org.apache.qpid.jms.JmsDefaultConnectionListener; import org.apache.qpid.jms.test.QpidJmsTestCase; import org.apache.qpid.jms.test.testpeer.TestAmqpPeer; import org.apache.qpid.jms.test.testpeer.describedtypes.Accepted; +import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType; import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher; import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher; import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher; +import org.apache.qpid.proton.amqp.DescribedType; +import org.apache.qpid.proton.amqp.UnsignedInteger; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -226,6 +235,286 @@ public class FailoverIntegrationTest extends QpidJmsTestCase { } } + @Test(timeout = 20000) + public void testFailoverHandlesDropPullConsumerReceiveNoWait() throws Exception { + try (TestAmqpPeer originalPeer = new TestAmqpPeer(); + TestAmqpPeer finalPeer = new TestAmqpPeer();) { + + final CountDownLatch originalConnected = new CountDownLatch(1); + final CountDownLatch finalConnected = new CountDownLatch(1); + + // Create a peer to connect to, then one to reconnect to + final String originalURI = createPeerURI(originalPeer); + final String finalURI = createPeerURI(finalPeer); + + LOG.info("Original peer is at: {}", originalURI); + LOG.info("Final peer is at: {}", finalURI); + + // Connect to the first peer + originalPeer.expectSaslAnonymousConnect(); + originalPeer.expectBegin(); + originalPeer.expectBegin(); + originalPeer.expectReceiverAttach(); + originalPeer.expectLinkFlowThenDrop(); + + final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer); + connection.getPrefetchPolicy().setQueuePrefetch(0); + connection.addConnectionListener(new JmsDefaultConnectionListener() { + @Override + public void onConnectionEstablished(URI remoteURI) { + LOG.info("Connection Established: {}", remoteURI); + if (originalURI.equals(remoteURI.toString())) { + originalConnected.countDown(); + } + } + + @Override + public void onConnectionRestored(URI remoteURI) { + LOG.info("Connection Restored: {}", remoteURI); + if (finalURI.equals(remoteURI.toString())) { + finalConnected.countDown(); + } + } + }); + connection.start(); + + assertTrue("Should connect to original peer", originalConnected.await(5, TimeUnit.SECONDS)); + + // --- Post Failover Expectations of FinalPeer --- // + + // Create session+producer, send a persistent message on auto-ack session for synchronous send + finalPeer.expectSaslAnonymousConnect(); + finalPeer.expectBegin(); + finalPeer.expectBegin(); + finalPeer.expectReceiverAttach(); + finalPeer.expectDetach(true, true, true); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("myQueue"); + MessageConsumer consumer = session.createConsumer(queue); + + assertNull(consumer.receiveNoWait()); + + assertTrue("Should connect to final peer", finalConnected.await(5, TimeUnit.SECONDS)); + + consumer.close(); + + finalPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout = 20000) + public void testFailoverHandlesDropPullConsumerReceiveWithTimeout() throws Exception { + try (TestAmqpPeer originalPeer = new TestAmqpPeer(); + TestAmqpPeer finalPeer = new TestAmqpPeer();) { + + final CountDownLatch originalConnected = new CountDownLatch(1); + final CountDownLatch finalConnected = new CountDownLatch(1); + + // Create a peer to connect to, then one to reconnect to + final String originalURI = createPeerURI(originalPeer); + final String finalURI = createPeerURI(finalPeer); + + LOG.info("Original peer is at: {}", originalURI); + LOG.info("Final peer is at: {}", finalURI); + + // Connect to the first peer + originalPeer.expectSaslAnonymousConnect(); + originalPeer.expectBegin(); + originalPeer.expectBegin(); + originalPeer.expectReceiverAttach(); + originalPeer.expectLinkFlowThenDrop(); + + final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer); + connection.getPrefetchPolicy().setQueuePrefetch(0); + connection.addConnectionListener(new JmsDefaultConnectionListener() { + @Override + public void onConnectionEstablished(URI remoteURI) { + LOG.info("Connection Established: {}", remoteURI); + if (originalURI.equals(remoteURI.toString())) { + originalConnected.countDown(); + } + } + + @Override + public void onConnectionRestored(URI remoteURI) { + LOG.info("Connection Restored: {}", remoteURI); + if (finalURI.equals(remoteURI.toString())) { + finalConnected.countDown(); + } + } + }); + connection.start(); + + assertTrue("Should connect to original peer", originalConnected.await(5, TimeUnit.SECONDS)); + + // --- Post Failover Expectations of FinalPeer --- // + + // Create session+producer, send a persistent message on auto-ack session for synchronous send + finalPeer.expectSaslAnonymousConnect(); + finalPeer.expectBegin(); + finalPeer.expectBegin(); + finalPeer.expectReceiverAttach(); + finalPeer.expectLinkFlow(); + finalPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(1))); + finalPeer.expectDetach(true, true, true); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("myQueue"); + MessageConsumer consumer = session.createConsumer(queue); + + assertNull(consumer.receive(2000)); + + assertTrue("Should connect to final peer", finalConnected.await(5, TimeUnit.SECONDS)); + + consumer.close(); + + finalPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout = 20000) + public void testFailoverHandlesDropPullConsumerReceive() throws Exception { + try (TestAmqpPeer originalPeer = new TestAmqpPeer(); + TestAmqpPeer finalPeer = new TestAmqpPeer();) { + + final CountDownLatch originalConnected = new CountDownLatch(1); + final CountDownLatch finalConnected = new CountDownLatch(1); + + // Create a peer to connect to, then one to reconnect to + final String originalURI = createPeerURI(originalPeer); + final String finalURI = createPeerURI(finalPeer); + + LOG.info("Original peer is at: {}", originalURI); + LOG.info("Final peer is at: {}", finalURI); + + // Connect to the first peer + originalPeer.expectSaslAnonymousConnect(); + originalPeer.expectBegin(); + originalPeer.expectBegin(); + originalPeer.expectReceiverAttach(); + originalPeer.expectLinkFlowThenDrop(); + + final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer); + connection.getPrefetchPolicy().setQueuePrefetch(0); + connection.addConnectionListener(new JmsDefaultConnectionListener() { + @Override + public void onConnectionEstablished(URI remoteURI) { + LOG.info("Connection Established: {}", remoteURI); + if (originalURI.equals(remoteURI.toString())) { + originalConnected.countDown(); + } + } + + @Override + public void onConnectionRestored(URI remoteURI) { + LOG.info("Connection Restored: {}", remoteURI); + if (finalURI.equals(remoteURI.toString())) { + finalConnected.countDown(); + } + } + }); + connection.start(); + + assertTrue("Should connect to original peer", originalConnected.await(5, TimeUnit.SECONDS)); + + // --- Post Failover Expectations of FinalPeer --- // + + DescribedType amqpValueNullContent = new AmqpValueDescribedType(null); + + // Create session+producer, send a persistent message on auto-ack session for synchronous send + finalPeer.expectSaslAnonymousConnect(); + finalPeer.expectBegin(); + finalPeer.expectBegin(); + finalPeer.expectReceiverAttach(); + finalPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent); + finalPeer.expectDispositionThatIsAcceptedAndSettled(); + finalPeer.expectDetach(true, true, true); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("myQueue"); + MessageConsumer consumer = session.createConsumer(queue); + + assertNotNull(consumer.receive()); + + assertTrue("Should connect to final peer", finalConnected.await(5, TimeUnit.SECONDS)); + + consumer.close(); + + finalPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout = 20000) + public void testFailoverHandlesDropAfterQueueBrowserDrain() throws Exception { + try (TestAmqpPeer originalPeer = new TestAmqpPeer(); + TestAmqpPeer finalPeer = new TestAmqpPeer();) { + + final CountDownLatch originalConnected = new CountDownLatch(1); + final CountDownLatch finalConnected = new CountDownLatch(1); + + // Create a peer to connect to, then one to reconnect to + final String originalURI = createPeerURI(originalPeer); + final String finalURI = createPeerURI(finalPeer); + + LOG.info("Original peer is at: {}", originalURI); + LOG.info("Final peer is at: {}", finalURI); + + // Connect to the first peer + originalPeer.expectSaslAnonymousConnect(); + originalPeer.expectBegin(); + + final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer); + connection.addConnectionListener(new JmsDefaultConnectionListener() { + @Override + public void onConnectionEstablished(URI remoteURI) { + LOG.info("Connection Established: {}", remoteURI); + if (originalURI.equals(remoteURI.toString())) { + originalConnected.countDown(); + } + } + + @Override + public void onConnectionRestored(URI remoteURI) { + LOG.info("Connection Restored: {}", remoteURI); + if (finalURI.equals(remoteURI.toString())) { + finalConnected.countDown(); + } + } + }); + connection.start(); + + assertTrue("Should connect to original peer", originalConnected.await(5, TimeUnit.SECONDS)); + + // Create session+producer, send a persistent message on auto-ack session for synchronous send + originalPeer.expectBegin(); + originalPeer.expectReceiverAttach(); + originalPeer.expectLinkFlow(); + originalPeer.expectLinkFlowThenDrop(); + + // --- Post Failover Expectations of FinalPeer --- // + + // Create session+producer, send a persistent message on auto-ack session for synchronous send + finalPeer.expectSaslAnonymousConnect(); + finalPeer.expectBegin(); + finalPeer.expectBegin(); + finalPeer.expectReceiverAttach(); + finalPeer.expectLinkFlow(); + finalPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(1))); + finalPeer.expectDetach(true, true, true); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("myQueue"); + QueueBrowser browser = session.createBrowser(queue); + Enumeration queueView = browser.getEnumeration(); + + assertNotNull(queueView); + assertFalse(queueView.hasMoreElements()); + + browser.close(); + } + } + private JmsConnection establishAnonymousConnecton(TestAmqpPeer... peers) throws JMSException { if(peers.length == 0) { throw new IllegalArgumentException("No test peers were given, at least 1 required"); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9afe5daa/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java index aab25a3..c788347 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java @@ -60,7 +60,6 @@ import org.apache.qpid.jms.test.testpeer.describedtypes.SaslOutcomeFrame; import org.apache.qpid.jms.test.testpeer.describedtypes.Source; import org.apache.qpid.jms.test.testpeer.describedtypes.Target; import org.apache.qpid.jms.test.testpeer.describedtypes.TransferFrame; -import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType; import org.apache.qpid.jms.test.testpeer.describedtypes.sections.ApplicationPropertiesDescribedType; import org.apache.qpid.jms.test.testpeer.describedtypes.sections.HeaderDescribedType; import org.apache.qpid.jms.test.testpeer.describedtypes.sections.MessageAnnotationsDescribedType; @@ -1617,4 +1616,134 @@ public class TestAmqpPeer implements AutoCloseable addHandler(new HeaderHandlerImpl(AmqpHeader.SASL_HEADER, AmqpHeader.SASL_HEADER, exitAfterHeader)); } + + public void expectLinkFlowThenDrop() + { + AmqpPeerRunnable exitAfterFlow = new AmqpPeerRunnable() { + @Override + public void run() { + _driverRunnable.exitReadLoopEarly(); + } + }; + + final FlowMatcher flowMatcher = new FlowMatcher().onCompletion(exitAfterFlow); + + addHandler(flowMatcher); + } + +// public void expectLinkFlowThenDrop() +// { +// AmqpPeerRunnable exitAfterHeader = new AmqpPeerRunnable() { +// @Override +// public void run() { +// _driverRunnable.exitReadLoopEarly(); +// } +// }; +// +// if (nextIncomingId == null && count > 0) +// { +// throw new IllegalArgumentException("The remote NextIncomingId must be specified if transfers have been requested"); +// } +// +// Matcher drainMatcher = null; +// if(drain) +// { +// drainMatcher = equalTo(true); +// } +// else +// { +// drainMatcher = Matchers.anyOf(equalTo(false), nullValue()); +// } +// +// Matcher remoteNextIncomingIdMatcher = null; +// if(nextIncomingId != null) +// { +// remoteNextIncomingIdMatcher = Matchers.equalTo(UnsignedInteger.valueOf(nextIncomingId)); +// } +// else +// { +// remoteNextIncomingIdMatcher = Matchers.greaterThanOrEqualTo(UnsignedInteger.ONE); +// } +// +// final FlowMatcher flowMatcher = new FlowMatcher() +// .withLinkCredit(Matchers.greaterThanOrEqualTo(UnsignedInteger.valueOf(count))) +// .withDrain(drainMatcher) +// .withNextIncomingId(remoteNextIncomingIdMatcher); +// +// CompositeAmqpPeerRunnable composite = new CompositeAmqpPeerRunnable(); +// boolean addComposite = false; +// +// if (appPropertiesDescribedType == null && addMessageNumberProperty) { +// appPropertiesDescribedType = new ApplicationPropertiesDescribedType(); +// } +// +// for(int i = 0; i < count; i++) +// { +// final int nextId = nextIncomingId + i; +// +// String tagString = "theDeliveryTag" + nextId; +// Binary dtag = new Binary(tagString.getBytes()); +// +// if(addMessageNumberProperty) { +// appPropertiesDescribedType.setApplicationProperty(MESSAGE_NUMBER, i); +// } +// +// final TransferFrame transferResponse = new TransferFrame() +// .setDeliveryId(UnsignedInteger.valueOf(nextId)) +// .setDeliveryTag(dtag) +// .setMessageFormat(UnsignedInteger.ZERO) +// .setSettled(false); +// +// Binary payload = prepareTransferPayload(headerDescribedType, messageAnnotationsDescribedType, +// propertiesDescribedType, appPropertiesDescribedType, content); +// +// // The response frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder. +// final FrameSender transferResponseSender = new FrameSender(this, FrameType.AMQP, -1, transferResponse, payload); +// transferResponseSender.setValueProvider(new ValueProvider() +// { +// @Override +// public void setValues() +// { +// transferResponse.setHandle(flowMatcher.getReceivedHandle()); +// transferResponseSender.setChannel(flowMatcher.getActualChannel()); +// } +// }); +// +// addComposite = true; +// composite.add(transferResponseSender); +// } +// +// if(drain && sendDrainFlowResponse) +// { +// final FlowFrame drainResponse = new FlowFrame(); +// drainResponse.setOutgoingWindow(UnsignedInteger.ZERO); //TODO: shouldnt be hard coded +// drainResponse.setIncomingWindow(UnsignedInteger.valueOf(Integer.MAX_VALUE)); //TODO: shouldnt be hard coded +// drainResponse.setLinkCredit(UnsignedInteger.ZERO); +// drainResponse.setDrain(true); +// +// // The flow frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder. +// final FrameSender flowResponseSender = new FrameSender(this, FrameType.AMQP, -1, drainResponse, null); +// flowResponseSender.setValueProvider(new ValueProvider() +// { +// @Override +// public void setValues() +// { +// flowResponseSender.setChannel(flowMatcher.getActualChannel()); +// drainResponse.setHandle(flowMatcher.getReceivedHandle()); +// drainResponse.setDeliveryCount(calculateNewDeliveryCount(flowMatcher)); +// drainResponse.setNextOutgoingId(calculateNewOutgoingId(flowMatcher, count)); +// drainResponse.setNextIncomingId(flowMatcher.getReceivedNextOutgoingId()); +// } +// }); +// +// addComposite = true; +// composite.add(flowResponseSender); +// } +// +// if(addComposite) { +// flowMatcher.onCompletion(composite); +// } +// +// addHandler(flowMatcher); +// } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org For additional commands, e-mail: commits-help@qpid.apache.org