qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject qpid-jms git commit: QPIDJMS-116 QPIDJMS-112 Don't send drain requests when the consumer is stopped or connection is not started.
Date Fri, 25 Sep 2015 15:32:41 GMT
Repository: qpid-jms
Updated Branches:
  refs/heads/master a76b51ac1 -> 03b491240


QPIDJMS-116 QPIDJMS-112 Don't send drain requests when the consumer is
stopped or connection is not started.  

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/03b49124
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/03b49124
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/03b49124

Branch: refs/heads/master
Commit: 03b491240d88ff5dde1b3759d000e7963ee1492d
Parents: a76b51a
Author: Timothy Bish <tabish121@gmail.com>
Authored: Fri Sep 25 11:32:17 2015 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Fri Sep 25 11:32:17 2015 -0400

----------------------------------------------------------------------
 .../org/apache/qpid/jms/JmsMessageConsumer.java |  2 +-
 .../integration/ConsumerIntegrationTest.java    | 73 +++++++++++++++++++-
 .../jms/consumer/JmsMessageConsumerTest.java    | 39 -----------
 3 files changed, 73 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/03b49124/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
index e347f1c..98a2bc5 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
@@ -675,7 +675,7 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC
      * @param forcePull TODO
      */
     protected void performPullIfRequired(long timeout, boolean forcePull) throws JMSException
{
-        if ((isPullConsumer() || forcePull) && !messageQueue.isClosed() &&
messageQueue.isEmpty()) {
+        if ((isPullConsumer() || forcePull) && messageQueue.isRunning() &&
messageQueue.isEmpty()) {
             connection.pull(getConsumerId(), timeout);
         }
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/03b49124/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
index 75fe373..9388755 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
@@ -20,13 +20,16 @@ package org.apache.qpid.jms.integration;
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.jms.Connection;
 import javax.jms.IllegalStateException;
@@ -45,6 +48,7 @@ import org.apache.qpid.jms.test.testpeer.AmqpPeerRunnable;
 import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
 import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError;
 import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
+import org.apache.qpid.jms.test.testpeer.matchers.AcceptedMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.ModifiedMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.ReleasedMatcher;
 import org.apache.qpid.proton.amqp.DescribedType;
@@ -347,11 +351,78 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase {
 
             try {
                 consumer.receiveNoWait();
-
                 fail("An exception should have been thrown");
             } catch (JMSException jmse) {
                 // Expected
             }
         }
     }
+
+    @Test(timeout=20000)
+    public void testSetMessageListenerAfterStartAndSend() throws Exception {
+        final AtomicInteger counter = new AtomicInteger(0);
+        final CountDownLatch done = new CountDownLatch(1);
+
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue destination = session.createQueue(getTestName());
+            connection.start();
+
+            final int messageCount = 4;
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"),
messageCount);
+
+            MessageConsumer consumer = session.createConsumer(destination);
+
+            for (int i = 0; i < messageCount; i++) {
+                testPeer.expectDisposition(true, new AcceptedMatcher());
+            }
+
+            testPeer.expectDetach(true, true, true);
+
+            consumer.setMessageListener(new MessageListener() {
+                @Override
+                public void onMessage(Message m) {
+                    LOG.debug("Async consumer got Message: {}", m);
+                    counter.incrementAndGet();
+                    if (counter.get() == messageCount) {
+                        done.countDown();
+                    }
+                }
+            });
+
+            assertTrue(done.await(1000, TimeUnit.MILLISECONDS));
+            assertEquals(messageCount, counter.get());
+
+            consumer.close();
+
+            testPeer.waitForAllHandlersToComplete(2000);
+        }
+    }
+
+    @Test //(timeout=20000)
+    public void testNoReceivedMessagesWhenConnectionNotStarted() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue destination = session.createQueue(getTestName());
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"),
3);
+
+            MessageConsumer consumer = session.createConsumer(destination);
+
+            assertNull(consumer.receive(500));
+
+            testPeer.waitForAllHandlersToComplete(2000);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/03b49124/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageConsumerTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageConsumerTest.java
b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageConsumerTest.java
index 6f20117..7a73726 100644
--- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageConsumerTest.java
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageConsumerTest.java
@@ -281,45 +281,6 @@ public class JmsMessageConsumerTest extends AmqpTestSupport {
         }
     }
 
-    @Test(timeout=60000)
-    public void testSetMessageListenerAfterStartAndSend() throws Exception {
-        final int msgCount = 4;
-        final Connection connection = createAmqpConnection();
-        final AtomicInteger counter = new AtomicInteger(0);
-        final CountDownLatch done = new CountDownLatch(1);
-        this.connection = connection;
-
-        connection.start();
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Queue destination = session.createQueue(name.getMethodName());
-        MessageConsumer consumer = session.createConsumer(destination);
-        sendToAmqQueue(msgCount);
-
-        consumer.setMessageListener(new MessageListener() {
-            @Override
-            public void onMessage(Message m) {
-                LOG.debug("Async consumer got Message: {}", m);
-                counter.incrementAndGet();
-                if (counter.get() == msgCount) {
-                    done.countDown();
-                }
-            }
-        });
-
-        assertTrue(done.await(1000, TimeUnit.MILLISECONDS));
-        assertEquals(msgCount, counter.get());
-    }
-
-    @Test(timeout=60000)
-    public void testNoReceivedMessagesWhenConnectionNotStarted() throws Exception {
-        connection = createAmqpConnection();
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Queue destination = session.createQueue(name.getMethodName());
-        MessageConsumer consumer = session.createConsumer(destination);
-        sendToAmqQueue(3);
-        assertNull(consumer.receive(2000));
-    }
-
     @Test(timeout = 60000)
     public void testMessagesAreAckedAMQProducer() throws Exception {
         int messagesSent = 3;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message