qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject [2/2] qpid-jms git commit: QPIDJMS-51 Further refine the QueueBrowser implementation removing the now unneeded AmqpQueueBrowser class and merging the implementation up into the pull consumer code in AmqpConsumer. Adding in tests for QueueBrowser and som
Date Mon, 31 Aug 2015 20:31:59 GMT
QPIDJMS-51 Further refine the QueueBrowser implementation removing the
now unneeded AmqpQueueBrowser class and merging the implementation up
into the pull consumer code in AmqpConsumer.  Adding in tests for
QueueBrowser and some failover tests for pull functionality. 

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/9afe5daa
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/9afe5daa
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/9afe5daa

Branch: refs/heads/master
Commit: 9afe5daaaef7427ba0783afea7716a7ab4d75ec1
Parents: f2f2cca
Author: Timothy Bish <tabish121@gmail.com>
Authored: Mon Aug 31 16:31:49 2015 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Mon Aug 31 16:31:49 2015 -0400

----------------------------------------------------------------------
 .../qpid/jms/provider/amqp/AmqpConsumer.java    |  52 ++--
 .../jms/provider/amqp/AmqpQueueBrowser.java     | 103 -------
 .../qpid/jms/provider/amqp/AmqpSession.java     |   9 +-
 .../QueueBrowserIntegrationTest.java            | 149 ++++++++++
 .../failover/FailoverIntegrationTest.java       | 289 +++++++++++++++++++
 .../qpid/jms/test/testpeer/TestAmqpPeer.java    | 131 ++++++++-
 6 files changed, 602 insertions(+), 131 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9afe5daa/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index 9ba2730..22a2932 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -153,7 +153,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo,
Receiver
             Receiver receiver = getEndpoint();
             if (receiver.getRemoteCredit() <= 0) {
                 if (receiver.getQueued() <= 0) {
-                    pullRequest.onFailure(null);
+                    pullRequest.onFailure();
                 } else {
                     pullRequest.onSuccess();
                 }
@@ -248,6 +248,10 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo,
Receiver
             source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
         }
 
+        if (resource.isBrowser()) {
+            source.setDistributionMode(COPY);
+        }
+
         Symbol typeCapability =  AmqpDestinationHelper.INSTANCE.toTypeCapability(resource.getDestination());
         if(typeCapability != null) {
             source.setCapabilities(typeCapability);
@@ -327,7 +331,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo,
Receiver
             }
             LOG.debug("Consumed Ack of message: {}", envelope);
             if (!delivery.isSettled()) {
-                if (session.isTransacted() && !isBrowser()) {
+                if (session.isTransacted() && !resource.isBrowser()) {
                     Binary txnId = session.getTransactionContext().getAmqpTransactionId();
                     if (txnId != null) {
                         TransactionalState txState = new TransactionalState();
@@ -405,11 +409,13 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo,
Receiver
      */
     public void pull(final long timeout) {
         LOG.trace("Pull on consumer {} with timeout = {}", getConsumerId(), timeout);
-        if (getEndpoint().getCredit() == 0 && getEndpoint().getQueued() == 0) {
+        if (getEndpoint().getQueued() == 0) {
             if (timeout < 0) {
-                getEndpoint().flow(1);
+                if (getEndpoint().getCredit() == 0) {
+                    getEndpoint().flow(1);
+                }
             } else if (timeout == 0) {
-                pullRequest = new PullRequest();
+                pullRequest = new DrainingPullRequest();
                 getEndpoint().drain(1);
             } else if (timeout > 0) {
                 // We need to drain the credit if no message arrives. If that
@@ -421,12 +427,16 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo,
Receiver
                     public void run() {
                         if (getEndpoint().getRemoteCredit() != 0) {
                             getEndpoint().drain(0);
+                            pullRequest = new DrainingPullRequest();
                             session.getProvider().pumpToProtonTransport();
                         }
                     }
                 }, timeout);
 
-                getEndpoint().flow(1);
+                if (getEndpoint().getCredit() == 0) {
+                    getEndpoint().flow(1);
+                }
+
                 pullRequest = new TimedPullRequest(future);
             }
         }
@@ -466,7 +476,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo,
Receiver
 
                     if (pullRequest != null) {
                         // Failure as we didn't get the desired message
-                        pullRequest.onFailure(null);
+                        pullRequest.onFailure();
                         pullRequest = null;
                     }
                 }
@@ -557,10 +567,6 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo,
Receiver
         return this.getEndpoint();
     }
 
-    public boolean isBrowser() {
-        return false;
-    }
-
     public boolean isPresettle() {
         return presettle;
     }
@@ -634,10 +640,18 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo,
Receiver
 
     //----- Inner classes used in message pull operations --------------------//
 
-    protected class PullRequest implements AsyncResult {
+    protected interface PullRequest {
+
+        void onSuccess();
+
+        void onFailure();
+
+    }
+
+    protected class DrainingPullRequest implements PullRequest {
 
         @Override
-        public void onFailure(Throwable result) {
+        public void onFailure() {
             JmsInboundMessageDispatch pullDone = new JmsInboundMessageDispatch(getNextIncomingSequenceNumber());
             pullDone.setConsumerId(getConsumerId());
             // Lack of setMessage on the dispatch is taken as signal no message arrived.
@@ -652,14 +666,9 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo,
Receiver
         public void onSuccess() {
             // Nothing to do here.
         }
-
-        @Override
-        public boolean isComplete() {
-            return false;
-        }
     }
 
-    protected class TimedPullRequest extends PullRequest {
+    protected class TimedPullRequest implements PullRequest {
 
         private final ScheduledFuture<?> completionTask;
 
@@ -668,6 +677,11 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo,
Receiver
         }
 
         @Override
+        public void onFailure() {
+            // Nothing to do here timer task should handle the no message case.
+        }
+
+        @Override
         public void onSuccess() {
             completionTask.cancel(false);
         }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9afe5daa/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java
deleted file mode 100644
index a71dc2c..0000000
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.jms.provider.amqp;
-
-import java.util.concurrent.ScheduledFuture;
-
-import org.apache.qpid.jms.meta.JmsConsumerInfo;
-import org.apache.qpid.proton.amqp.messaging.Source;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Queue Browser implementation for AMQP
- */
-public class AmqpQueueBrowser extends AmqpConsumer {
-
-    private static final Logger LOG = LoggerFactory.getLogger(AmqpQueueBrowser.class);
-
-    public AmqpQueueBrowser(AmqpSession session, JmsConsumerInfo info) {
-        super(session, info);
-    }
-
-    @Override
-    public void pull(final long timeout) {
-
-        // Zero prefetch QueueBrowser behaves the same as a standard pull consumer.
-        if (resource.getPrefetchSize() == 0) {
-            super.pull(timeout);
-            return;
-        }
-
-        LOG.trace("Pull on browser {} with timeout = {}", getConsumerId(), timeout);
-
-        // Pull for browser is called when there are no available messages buffered.
-        // If we still have some to dispatch then no pull is needed otherwise we might
-        // need to attempt try and drain to end the browse.
-        if (getEndpoint().getQueued() == 0) {
-            final ScheduledFuture<?> future = getSession().schedule(new Runnable()
{
-
-                @Override
-                public void run() {
-                    // Try for one last time to pull a message down, if this
-                    // fails then we can end the browse otherwise the link credit
-                    // will get updated on the next sent disposition and we will
-                    // end up back here if no more messages arrive.
-                    LOG.trace("Browser {} attemptig to force a message dispatch");
-                    getEndpoint().drain(1);
-                    pullRequest = new PullRequest();
-                    session.getProvider().pumpToProtonTransport();
-                }
-            }, timeout);
-
-            pullRequest = new BrowseEndPullRequest(future);
-        }
-    }
-
-    @Override
-    protected void configureSource(Source source) {
-        if (resource.isBrowser()) {
-            source.setDistributionMode(COPY);
-        }
-
-        super.configureSource(source);
-    }
-
-    @Override
-    public boolean isBrowser() {
-        return true;
-    }
-
-    @Override
-    public String toString() {
-        return "AmqpQueueBrowser { " + this.resource.getConsumerId() + " }";
-    }
-
-    //----- Inner classes used in message pull operations --------------------//
-
-    protected class BrowseEndPullRequest extends TimedPullRequest {
-
-        public BrowseEndPullRequest(ScheduledFuture<?> completionTask) {
-            super(completionTask);
-        }
-
-        @Override
-        public void onFailure(Throwable result) {
-            // Nothing to do, the timer will take care of the end of browse signal.
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9afe5daa/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
index 27186c4..96f4719 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
@@ -137,14 +137,7 @@ public class AmqpSession extends AmqpAbstractResource<JmsSessionInfo,
Session> {
     }
 
     public AmqpConsumer createConsumer(JmsConsumerInfo consumerInfo) {
-        AmqpConsumer result = null;
-
-        if (consumerInfo.isBrowser()) {
-            result = new AmqpQueueBrowser(this, consumerInfo);
-        } else {
-            result = new AmqpConsumer(this, consumerInfo);
-        }
-
+        AmqpConsumer result = new AmqpConsumer(this, consumerInfo);
         result.setPresettle(connection.isPresettleConsumers());
         return result;
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9afe5daa/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/QueueBrowserIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/QueueBrowserIntegrationTest.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/QueueBrowserIntegrationTest.java
index 021f4e3..0937cca 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/QueueBrowserIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/QueueBrowserIntegrationTest.java
@@ -16,18 +16,23 @@
  */
 package org.apache.qpid.jms.integration;
 
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.util.Enumeration;
 
 import javax.jms.Connection;
+import javax.jms.Message;
 import javax.jms.Queue;
 import javax.jms.QueueBrowser;
 import javax.jms.Session;
 
+import org.apache.qpid.jms.JmsConnection;
 import org.apache.qpid.jms.test.QpidJmsTestCase;
 import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
 import org.apache.qpid.jms.test.testpeer.describedtypes.Declare;
@@ -38,11 +43,15 @@ import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompos
 import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.DescribedType;
+import org.apache.qpid.proton.amqp.UnsignedInteger;
 import org.junit.Test;
 
 public class QueueBrowserIntegrationTest extends QpidJmsTestCase {
+
     private final IntegrationTestFixture testFixture = new IntegrationTestFixture();
 
+    //----- Test basic create and destroy mechanisms -------------------------//
+
     @Test(timeout=30000)
     public void testCreateQueueBrowserWithoutEnumeration() throws IOException, Exception
{
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
@@ -91,6 +100,37 @@ public class QueueBrowserIntegrationTest extends QpidJmsTestCase {
         }
     }
 
+    //----- Tests for expected behaviors of a QueueBrowser implementation ----//
+
+    @Test(timeout=30000)
+    public void testQueueBrowserNextElementWithNoMessage() throws IOException, Exception
{
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            // Expected the browser to create a consumer and send credit.
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlow();
+            testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(1)));
+            testPeer.expectDetach(true, true, true);
+
+            QueueBrowser browser = session.createBrowser(queue);
+            Enumeration<?> queueView = browser.getEnumeration();
+            assertNotNull(queueView);
+            assertNull(queueView.nextElement());
+
+            browser.close();
+
+            testPeer.waitForAllHandlersToComplete(3000);
+        }
+    }
+    //----- Tests that cover QueueBrowser and Session Ack mode interaction ---//
+
     @Test(timeout=30000)
     public void testCreateQueueBrowseAutoAckSession() throws IOException, Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
@@ -196,4 +236,113 @@ public class QueueBrowserIntegrationTest extends QpidJmsTestCase {
             testPeer.waitForAllHandlersToComplete(3000);
         }
     }
+
+    //----- Tests that cover QueueBrowser when prefetch is zero --------------//
+
+    @Test(timeout=30000)
+    public void testCreateQueueBrowserAndEnumerationZeroPrefetch() throws IOException, Exception
{
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            JmsConnection jmsConnection = (JmsConnection) connection;
+            jmsConnection.getPrefetchPolicy().setAll(0);
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            // Expected the browser to create a consumer and send credit.
+            testPeer.expectReceiverAttach();
+            testPeer.expectDetach(true, true, true);
+
+            QueueBrowser browser = session.createBrowser(queue);
+            Enumeration<?> queueView = browser.getEnumeration();
+            assertNotNull(queueView);
+
+            browser.close();
+
+            testPeer.waitForAllHandlersToComplete(3000);
+        }
+    }
+
+    @Test(timeout=30000)
+    public void testQueueBrowseHasMoreElementsZeroPrefetchNoMessage() throws IOException,
Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            JmsConnection jmsConnection = (JmsConnection) connection;
+            jmsConnection.getPrefetchPolicy().setAll(0);
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            // Expected the browser to create a consumer and send credit.
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(1)));
+            testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(1)));
+            testPeer.expectDetach(true, true, true);
+
+            QueueBrowser browser = session.createBrowser(queue);
+            Enumeration<?> queueView = browser.getEnumeration();
+            assertNotNull(queueView);
+            assertFalse(queueView.hasMoreElements());
+
+            browser.close();
+
+            testPeer.waitForAllHandlersToComplete(3000);
+        }
+    }
+
+    @Test(timeout=30000)
+    public void testQueueBrowseHasMoreElementsZeroPrefetchDrainedMessage() throws IOException,
Exception {
+        DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
+
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            JmsConnection jmsConnection = (JmsConnection) connection;
+            jmsConnection.getPrefetchPolicy().setAll(0);
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            // Expected the browser to create a consumer and send credit.
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(1)));
+
+            // After timeout the browser should drain and here we want to ensure that if
a
+            // message arrives that we handle it correctly.
+            testPeer.expectLinkFlowRespondWithTransfer(
+                null, null, null, null, amqpValueNullContent, 1, true, false, equalTo(UnsignedInteger.valueOf(1)),
1, false);
+
+            // Message gets ack'd right away
+            testPeer.expectDispositionThatIsAcceptedAndSettled();
+
+            // Next attempt should not get a message and trigger a false on hasMoreElemets.
+            testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(1)));
+            testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(1)));
+
+            testPeer.expectDetach(true, true, true);
+
+            QueueBrowser browser = session.createBrowser(queue);
+            Enumeration<?> queueView = browser.getEnumeration();
+            assertNotNull(queueView);
+            assertTrue(queueView.hasMoreElements());
+            Message message = (Message) queueView.nextElement();
+            assertNotNull(message);
+            assertFalse(queueView.hasMoreElements());
+
+            browser.close();
+
+            testPeer.waitForAllHandlersToComplete(3000);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9afe5daa/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
index 1092bc1..3ff9ce2 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
@@ -19,9 +19,13 @@ package org.apache.qpid.jms.provider.failover;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.net.URI;
+import java.util.Enumeration;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
@@ -30,8 +34,10 @@ import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.JMSException;
 import javax.jms.Message;
+import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
+import javax.jms.QueueBrowser;
 import javax.jms.Session;
 
 import org.apache.qpid.jms.JmsConnection;
@@ -40,9 +46,12 @@ import org.apache.qpid.jms.JmsDefaultConnectionListener;
 import org.apache.qpid.jms.test.QpidJmsTestCase;
 import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
 import org.apache.qpid.jms.test.testpeer.describedtypes.Accepted;
+import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
 import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
+import org.apache.qpid.proton.amqp.DescribedType;
+import org.apache.qpid.proton.amqp.UnsignedInteger;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -226,6 +235,286 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
         }
     }
 
+    @Test(timeout = 20000)
+    public void testFailoverHandlesDropPullConsumerReceiveNoWait() throws Exception {
+        try (TestAmqpPeer originalPeer = new TestAmqpPeer();
+             TestAmqpPeer finalPeer = new TestAmqpPeer();) {
+
+            final CountDownLatch originalConnected = new CountDownLatch(1);
+            final CountDownLatch finalConnected = new CountDownLatch(1);
+
+            // Create a peer to connect to, then one to reconnect to
+            final String originalURI = createPeerURI(originalPeer);
+            final String finalURI = createPeerURI(finalPeer);
+
+            LOG.info("Original peer is at: {}", originalURI);
+            LOG.info("Final peer is at: {}", finalURI);
+
+            // Connect to the first peer
+            originalPeer.expectSaslAnonymousConnect();
+            originalPeer.expectBegin();
+            originalPeer.expectBegin();
+            originalPeer.expectReceiverAttach();
+            originalPeer.expectLinkFlowThenDrop();
+
+            final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer);
+            connection.getPrefetchPolicy().setQueuePrefetch(0);
+            connection.addConnectionListener(new JmsDefaultConnectionListener() {
+                @Override
+                public void onConnectionEstablished(URI remoteURI) {
+                    LOG.info("Connection Established: {}", remoteURI);
+                    if (originalURI.equals(remoteURI.toString())) {
+                        originalConnected.countDown();
+                    }
+                }
+
+                @Override
+                public void onConnectionRestored(URI remoteURI) {
+                    LOG.info("Connection Restored: {}", remoteURI);
+                    if (finalURI.equals(remoteURI.toString())) {
+                        finalConnected.countDown();
+                    }
+                }
+            });
+            connection.start();
+
+            assertTrue("Should connect to original peer", originalConnected.await(5, TimeUnit.SECONDS));
+
+            // --- Post Failover Expectations of FinalPeer --- //
+
+            // Create session+producer, send a persistent message on auto-ack session for
synchronous send
+            finalPeer.expectSaslAnonymousConnect();
+            finalPeer.expectBegin();
+            finalPeer.expectBegin();
+            finalPeer.expectReceiverAttach();
+            finalPeer.expectDetach(true, true, true);
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+            MessageConsumer consumer = session.createConsumer(queue);
+
+            assertNull(consumer.receiveNoWait());
+
+            assertTrue("Should connect to final peer", finalConnected.await(5, TimeUnit.SECONDS));
+
+            consumer.close();
+
+            finalPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testFailoverHandlesDropPullConsumerReceiveWithTimeout() throws Exception
{
+        try (TestAmqpPeer originalPeer = new TestAmqpPeer();
+             TestAmqpPeer finalPeer = new TestAmqpPeer();) {
+
+            final CountDownLatch originalConnected = new CountDownLatch(1);
+            final CountDownLatch finalConnected = new CountDownLatch(1);
+
+            // Create a peer to connect to, then one to reconnect to
+            final String originalURI = createPeerURI(originalPeer);
+            final String finalURI = createPeerURI(finalPeer);
+
+            LOG.info("Original peer is at: {}", originalURI);
+            LOG.info("Final peer is at: {}", finalURI);
+
+            // Connect to the first peer
+            originalPeer.expectSaslAnonymousConnect();
+            originalPeer.expectBegin();
+            originalPeer.expectBegin();
+            originalPeer.expectReceiverAttach();
+            originalPeer.expectLinkFlowThenDrop();
+
+            final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer);
+            connection.getPrefetchPolicy().setQueuePrefetch(0);
+            connection.addConnectionListener(new JmsDefaultConnectionListener() {
+                @Override
+                public void onConnectionEstablished(URI remoteURI) {
+                    LOG.info("Connection Established: {}", remoteURI);
+                    if (originalURI.equals(remoteURI.toString())) {
+                        originalConnected.countDown();
+                    }
+                }
+
+                @Override
+                public void onConnectionRestored(URI remoteURI) {
+                    LOG.info("Connection Restored: {}", remoteURI);
+                    if (finalURI.equals(remoteURI.toString())) {
+                        finalConnected.countDown();
+                    }
+                }
+            });
+            connection.start();
+
+            assertTrue("Should connect to original peer", originalConnected.await(5, TimeUnit.SECONDS));
+
+            // --- Post Failover Expectations of FinalPeer --- //
+
+            // Create session+producer, send a persistent message on auto-ack session for
synchronous send
+            finalPeer.expectSaslAnonymousConnect();
+            finalPeer.expectBegin();
+            finalPeer.expectBegin();
+            finalPeer.expectReceiverAttach();
+            finalPeer.expectLinkFlow();
+            finalPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(1)));
+            finalPeer.expectDetach(true, true, true);
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+            MessageConsumer consumer = session.createConsumer(queue);
+
+            assertNull(consumer.receive(2000));
+
+            assertTrue("Should connect to final peer", finalConnected.await(5, TimeUnit.SECONDS));
+
+            consumer.close();
+
+            finalPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testFailoverHandlesDropPullConsumerReceive() throws Exception {
+        try (TestAmqpPeer originalPeer = new TestAmqpPeer();
+             TestAmqpPeer finalPeer = new TestAmqpPeer();) {
+
+            final CountDownLatch originalConnected = new CountDownLatch(1);
+            final CountDownLatch finalConnected = new CountDownLatch(1);
+
+            // Create a peer to connect to, then one to reconnect to
+            final String originalURI = createPeerURI(originalPeer);
+            final String finalURI = createPeerURI(finalPeer);
+
+            LOG.info("Original peer is at: {}", originalURI);
+            LOG.info("Final peer is at: {}", finalURI);
+
+            // Connect to the first peer
+            originalPeer.expectSaslAnonymousConnect();
+            originalPeer.expectBegin();
+            originalPeer.expectBegin();
+            originalPeer.expectReceiverAttach();
+            originalPeer.expectLinkFlowThenDrop();
+
+            final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer);
+            connection.getPrefetchPolicy().setQueuePrefetch(0);
+            connection.addConnectionListener(new JmsDefaultConnectionListener() {
+                @Override
+                public void onConnectionEstablished(URI remoteURI) {
+                    LOG.info("Connection Established: {}", remoteURI);
+                    if (originalURI.equals(remoteURI.toString())) {
+                        originalConnected.countDown();
+                    }
+                }
+
+                @Override
+                public void onConnectionRestored(URI remoteURI) {
+                    LOG.info("Connection Restored: {}", remoteURI);
+                    if (finalURI.equals(remoteURI.toString())) {
+                        finalConnected.countDown();
+                    }
+                }
+            });
+            connection.start();
+
+            assertTrue("Should connect to original peer", originalConnected.await(5, TimeUnit.SECONDS));
+
+            // --- Post Failover Expectations of FinalPeer --- //
+
+            DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
+
+            // Create session+producer, send a persistent message on auto-ack session for
synchronous send
+            finalPeer.expectSaslAnonymousConnect();
+            finalPeer.expectBegin();
+            finalPeer.expectBegin();
+            finalPeer.expectReceiverAttach();
+            finalPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent);
+            finalPeer.expectDispositionThatIsAcceptedAndSettled();
+            finalPeer.expectDetach(true, true, true);
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+            MessageConsumer consumer = session.createConsumer(queue);
+
+            assertNotNull(consumer.receive());
+
+            assertTrue("Should connect to final peer", finalConnected.await(5, TimeUnit.SECONDS));
+
+            consumer.close();
+
+            finalPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testFailoverHandlesDropAfterQueueBrowserDrain() throws Exception {
+        try (TestAmqpPeer originalPeer = new TestAmqpPeer();
+             TestAmqpPeer finalPeer = new TestAmqpPeer();) {
+
+            final CountDownLatch originalConnected = new CountDownLatch(1);
+            final CountDownLatch finalConnected = new CountDownLatch(1);
+
+            // Create a peer to connect to, then one to reconnect to
+            final String originalURI = createPeerURI(originalPeer);
+            final String finalURI = createPeerURI(finalPeer);
+
+            LOG.info("Original peer is at: {}", originalURI);
+            LOG.info("Final peer is at: {}", finalURI);
+
+            // Connect to the first peer
+            originalPeer.expectSaslAnonymousConnect();
+            originalPeer.expectBegin();
+
+            final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer);
+            connection.addConnectionListener(new JmsDefaultConnectionListener() {
+                @Override
+                public void onConnectionEstablished(URI remoteURI) {
+                    LOG.info("Connection Established: {}", remoteURI);
+                    if (originalURI.equals(remoteURI.toString())) {
+                        originalConnected.countDown();
+                    }
+                }
+
+                @Override
+                public void onConnectionRestored(URI remoteURI) {
+                    LOG.info("Connection Restored: {}", remoteURI);
+                    if (finalURI.equals(remoteURI.toString())) {
+                        finalConnected.countDown();
+                    }
+                }
+            });
+            connection.start();
+
+            assertTrue("Should connect to original peer", originalConnected.await(5, TimeUnit.SECONDS));
+
+            // Create session+producer, send a persistent message on auto-ack session for
synchronous send
+            originalPeer.expectBegin();
+            originalPeer.expectReceiverAttach();
+            originalPeer.expectLinkFlow();
+            originalPeer.expectLinkFlowThenDrop();
+
+            // --- Post Failover Expectations of FinalPeer --- //
+
+            // Create session+producer, send a persistent message on auto-ack session for
synchronous send
+            finalPeer.expectSaslAnonymousConnect();
+            finalPeer.expectBegin();
+            finalPeer.expectBegin();
+            finalPeer.expectReceiverAttach();
+            finalPeer.expectLinkFlow();
+            finalPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(1)));
+            finalPeer.expectDetach(true, true, true);
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+            QueueBrowser browser = session.createBrowser(queue);
+            Enumeration<?> queueView = browser.getEnumeration();
+
+            assertNotNull(queueView);
+            assertFalse(queueView.hasMoreElements());
+
+            browser.close();
+        }
+    }
+
     private JmsConnection establishAnonymousConnecton(TestAmqpPeer... peers) throws JMSException
{
         if(peers.length == 0) {
             throw new IllegalArgumentException("No test peers were given, at least 1 required");

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9afe5daa/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index aab25a3..c788347 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -60,7 +60,6 @@ import org.apache.qpid.jms.test.testpeer.describedtypes.SaslOutcomeFrame;
 import org.apache.qpid.jms.test.testpeer.describedtypes.Source;
 import org.apache.qpid.jms.test.testpeer.describedtypes.Target;
 import org.apache.qpid.jms.test.testpeer.describedtypes.TransferFrame;
-import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
 import org.apache.qpid.jms.test.testpeer.describedtypes.sections.ApplicationPropertiesDescribedType;
 import org.apache.qpid.jms.test.testpeer.describedtypes.sections.HeaderDescribedType;
 import org.apache.qpid.jms.test.testpeer.describedtypes.sections.MessageAnnotationsDescribedType;
@@ -1617,4 +1616,134 @@ public class TestAmqpPeer implements AutoCloseable
 
         addHandler(new HeaderHandlerImpl(AmqpHeader.SASL_HEADER, AmqpHeader.SASL_HEADER,
exitAfterHeader));
     }
+
+    public void expectLinkFlowThenDrop()
+    {
+        AmqpPeerRunnable exitAfterFlow = new AmqpPeerRunnable() {
+            @Override
+            public void run() {
+                _driverRunnable.exitReadLoopEarly();
+            }
+        };
+
+        final FlowMatcher flowMatcher = new FlowMatcher().onCompletion(exitAfterFlow);
+
+        addHandler(flowMatcher);
+    }
+
+//    public void expectLinkFlowThenDrop()
+//    {
+//        AmqpPeerRunnable exitAfterHeader = new AmqpPeerRunnable() {
+//            @Override
+//            public void run() {
+//                _driverRunnable.exitReadLoopEarly();
+//            }
+//        };
+//
+//        if (nextIncomingId == null && count > 0)
+//        {
+//            throw new IllegalArgumentException("The remote NextIncomingId must be specified
if transfers have been requested");
+//        }
+//
+//        Matcher<Boolean> drainMatcher = null;
+//        if(drain)
+//        {
+//            drainMatcher = equalTo(true);
+//        }
+//        else
+//        {
+//            drainMatcher = Matchers.anyOf(equalTo(false), nullValue());
+//        }
+//
+//        Matcher<UnsignedInteger> remoteNextIncomingIdMatcher = null;
+//        if(nextIncomingId != null)
+//        {
+//             remoteNextIncomingIdMatcher = Matchers.equalTo(UnsignedInteger.valueOf(nextIncomingId));
+//        }
+//        else
+//        {
+//            remoteNextIncomingIdMatcher = Matchers.greaterThanOrEqualTo(UnsignedInteger.ONE);
+//        }
+//
+//        final FlowMatcher flowMatcher = new FlowMatcher()
+//                        .withLinkCredit(Matchers.greaterThanOrEqualTo(UnsignedInteger.valueOf(count)))
+//                        .withDrain(drainMatcher)
+//                        .withNextIncomingId(remoteNextIncomingIdMatcher);
+//
+//        CompositeAmqpPeerRunnable composite = new CompositeAmqpPeerRunnable();
+//        boolean addComposite = false;
+//
+//        if (appPropertiesDescribedType == null && addMessageNumberProperty) {
+//            appPropertiesDescribedType = new ApplicationPropertiesDescribedType();
+//        }
+//
+//        for(int i = 0; i < count; i++)
+//        {
+//            final int nextId = nextIncomingId + i;
+//
+//            String tagString = "theDeliveryTag" + nextId;
+//            Binary dtag = new Binary(tagString.getBytes());
+//
+//            if(addMessageNumberProperty) {
+//                appPropertiesDescribedType.setApplicationProperty(MESSAGE_NUMBER, i);
+//            }
+//
+//            final TransferFrame transferResponse = new TransferFrame()
+//            .setDeliveryId(UnsignedInteger.valueOf(nextId))
+//            .setDeliveryTag(dtag)
+//            .setMessageFormat(UnsignedInteger.ZERO)
+//            .setSettled(false);
+//
+//            Binary payload = prepareTransferPayload(headerDescribedType, messageAnnotationsDescribedType,
+//                    propertiesDescribedType, appPropertiesDescribedType, content);
+//
+//            // The response frame channel will be dynamically set based on the incoming
frame. Using the -1 is an illegal placeholder.
+//            final FrameSender transferResponseSender = new FrameSender(this, FrameType.AMQP,
-1, transferResponse, payload);
+//            transferResponseSender.setValueProvider(new ValueProvider()
+//            {
+//                @Override
+//                public void setValues()
+//                {
+//                    transferResponse.setHandle(flowMatcher.getReceivedHandle());
+//                    transferResponseSender.setChannel(flowMatcher.getActualChannel());
+//                }
+//            });
+//
+//            addComposite = true;
+//            composite.add(transferResponseSender);
+//        }
+//
+//        if(drain && sendDrainFlowResponse)
+//        {
+//            final FlowFrame drainResponse = new FlowFrame();
+//            drainResponse.setOutgoingWindow(UnsignedInteger.ZERO); //TODO: shouldnt be
hard coded
+//            drainResponse.setIncomingWindow(UnsignedInteger.valueOf(Integer.MAX_VALUE));
//TODO: shouldnt be hard coded
+//            drainResponse.setLinkCredit(UnsignedInteger.ZERO);
+//            drainResponse.setDrain(true);
+//
+//            // The flow frame channel will be dynamically set based on the incoming frame.
Using the -1 is an illegal placeholder.
+//            final FrameSender flowResponseSender = new FrameSender(this, FrameType.AMQP,
-1, drainResponse, null);
+//            flowResponseSender.setValueProvider(new ValueProvider()
+//            {
+//                @Override
+//                public void setValues()
+//                {
+//                    flowResponseSender.setChannel(flowMatcher.getActualChannel());
+//                    drainResponse.setHandle(flowMatcher.getReceivedHandle());
+//                    drainResponse.setDeliveryCount(calculateNewDeliveryCount(flowMatcher));
+//                    drainResponse.setNextOutgoingId(calculateNewOutgoingId(flowMatcher,
count));
+//                    drainResponse.setNextIncomingId(flowMatcher.getReceivedNextOutgoingId());
+//                }
+//            });
+//
+//            addComposite = true;
+//            composite.add(flowResponseSender);
+//        }
+//
+//        if(addComposite) {
+//            flowMatcher.onCompletion(composite);
+//        }
+//
+//        addHandler(flowMatcher);
+//    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message