qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject qpid-jms git commit: QPIDJMS-257 Defer consumer close if delivered messages still pending
Date Tue, 31 Jan 2017 19:53:08 GMT
Repository: qpid-jms
Updated Branches:
  refs/heads/master 6d1adfd22 -> 98c362188


QPIDJMS-257 Defer consumer close if delivered messages still pending

If a consumer in client Ack mode has delivered but not ack'd messages
outstanding defer the close until those messages are acknowledged.
Session close will override if consumer remains deferred until its
parent is closed.

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/98c36218
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/98c36218
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/98c36218

Branch: refs/heads/master
Commit: 98c36218866d6297d633caed805002b46ae5c1d1
Parents: 6d1adfd
Author: Timothy Bish <tabish121@gmail.com>
Authored: Tue Jan 31 14:52:42 2017 -0500
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Tue Jan 31 14:52:42 2017 -0500

----------------------------------------------------------------------
 .../org/apache/qpid/jms/JmsMessageConsumer.java |   2 +-
 .../qpid/jms/provider/amqp/AmqpConsumer.java    |  96 +++++--
 .../integration/ConsumerIntegrationTest.java    | 285 ++++++++++++++++++-
 3 files changed, 355 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/98c36218/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
index ff19388..0cca6ff 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
@@ -78,7 +78,7 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer,
JmsMe
                                  String name, String selector, boolean noLocal) throws JMSException
{
         this.session = session;
         this.connection = session.getConnection();
-        this.acknowledgementMode = session.acknowledgementMode();
+        this.acknowledgementMode = isBrowser() ? Session.AUTO_ACKNOWLEDGE : session.acknowledgementMode();
 
         if (destination.isTemporary()) {
             connection.checkConsumeFromTemporaryDestination((JmsTemporaryDestination) destination);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/98c36218/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index 9f9cd42..b88bc18 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -33,6 +33,7 @@ import javax.jms.JMSException;
 
 import org.apache.qpid.jms.JmsDestination;
 import org.apache.qpid.jms.JmsOperationTimedOutException;
+import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
 import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
 import org.apache.qpid.jms.message.JmsMessage;
 import org.apache.qpid.jms.meta.JmsConsumerId;
@@ -71,12 +72,26 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo,
Receiver
     protected final ByteBuf incomingBuffer = Unpooled.buffer(INITIAL_BUFFER_CAPACITY);
     protected final AtomicLong incomingSequence = new AtomicLong(0);
 
+    protected boolean deferredClose;
+
     public AmqpConsumer(AmqpSession session, JmsConsumerInfo info, Receiver receiver) {
         super(info, receiver, session);
 
         this.session = session;
     }
 
+    @Override
+    public void close(AsyncResult request) {
+        // If we have pending deliveries we remain open to allow for ACK or for a
+        // pending transaction that this consumer is active in to complete.
+        if (shouldDeferClose()) {
+            request.onSuccess();
+            deferredClose = true;
+        } else {
+            super.close(request);
+        }
+    }
+
     /**
      * Starts the consumer by setting the link credit to the given prefetch value.
      *
@@ -85,7 +100,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo,
Receiver
      */
     public void start(AsyncResult request) {
         JmsConsumerInfo consumerInfo = getResourceInfo();
-        if(consumerInfo.isListener() && consumerInfo.getPrefetchSize() == 0) {
+        if (consumerInfo.isListener() && consumerInfo.getPrefetchSize() == 0) {
             sendFlowForNoPrefetchListener();
         } else {
             sendFlowIfNeeded();
@@ -223,6 +238,8 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo,
Receiver
         }
 
         delivered.clear();
+
+        tryCompleteDeferredClose();
     }
 
     /**
@@ -296,6 +313,8 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo,
Receiver
         } else {
             LOG.warn("Unsupported Ack Type for message: {}", envelope);
         }
+
+        tryCompleteDeferredClose();
     }
 
     /**
@@ -438,6 +457,14 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo,
Receiver
 
     private boolean processDelivery(Delivery incoming) throws Exception {
         incoming.setDefaultDeliveryState(Released.getInstance());
+
+        // If we are awaiting to close for some conditions to be met then we don't
+        // need to decode or dispatch the message.
+        if (deferredClose) {
+            getEndpoint().advance();
+            return true;
+        }
+
         JmsMessage message = null;
         try {
             message = AmqpCodec.decodeMessage(this, unwrapIncomingMessage(incoming)).asJmsMessage();
@@ -452,26 +479,28 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo,
Receiver
             return false;
         }
 
-        getEndpoint().advance();
-
-        // Let the message do any final processing before sending it onto a consumer.
-        // We could defer this to a later stage such as the JmsConnection or even in
-        // the JmsMessageConsumer dispatch method if we needed to.
-        message.onDispatch();
+        try {
+            // Let the message do any final processing before sending it onto a consumer.
+            // We could defer this to a later stage such as the JmsConnection or even in
+            // the JmsMessageConsumer dispatch method if we needed to.
+            message.onDispatch();
 
-        JmsInboundMessageDispatch envelope = new JmsInboundMessageDispatch(getNextIncomingSequenceNumber());
-        envelope.setMessage(message);
-        envelope.setConsumerId(getResourceInfo().getId());
-        // Store link to delivery in the hint for use in acknowledge requests.
-        envelope.setProviderHint(incoming);
-        envelope.setMessageId(message.getFacade().getProviderMessageIdObject());
+            JmsInboundMessageDispatch envelope = new JmsInboundMessageDispatch(getNextIncomingSequenceNumber());
+            envelope.setMessage(message);
+            envelope.setConsumerId(getResourceInfo().getId());
+            // Store link to delivery in the hint for use in acknowledge requests.
+            envelope.setProviderHint(incoming);
+            envelope.setMessageId(message.getFacade().getProviderMessageIdObject());
 
-        // Store reference to envelope in delivery context for recovery
-        incoming.setContext(envelope);
+            // Store reference to envelope in delivery context for recovery
+            incoming.setContext(envelope);
 
-        deliver(envelope);
+            deliver(envelope);
 
-        return true;
+            return true;
+        } finally {
+            getEndpoint().advance();
+        }
     }
 
     protected long getNextIncomingSequenceNumber() {
@@ -600,7 +629,38 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo,
Receiver
         }
     }
 
-    //----- Inner classes used in message pull operations --------------------//
+    private boolean shouldDeferClose() {
+        return !delivered.isEmpty();
+    }
+
+    private void tryCompleteDeferredClose() {
+        if (deferredClose && delivered.isEmpty()) {
+            close(new DeferredCloseRequest());
+        }
+    }
+
+    //----- Inner class used to report on deferred close ---------------------//
+
+    protected final class DeferredCloseRequest implements AsyncResult {
+
+        @Override
+        public void onFailure(Throwable result) {
+            LOG.trace("Failed deferred close of consumer: {} - {}", getConsumerId(), result.getMessage());
+            getParent().getProvider().fireNonFatalProviderException(JmsExceptionSupport.create(result));
+        }
+
+        @Override
+        public void onSuccess() {
+            LOG.trace("Completed deferred close of consumer: {}", getConsumerId());
+        }
+
+        @Override
+        public boolean isComplete() {
+            return isClosed();
+        }
+    }
+
+    //----- Inner class used in message pull operations ----------------------//
 
     protected static final class ScheduledRequest implements AsyncResult {
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/98c36218/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
index d1cab10..00af430 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
@@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 import javax.jms.Connection;
+import javax.jms.ExceptionListener;
 import javax.jms.IllegalStateException;
 import javax.jms.JMSException;
 import javax.jms.Message;
@@ -58,7 +59,6 @@ import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
 import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError;
 import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
 import org.apache.qpid.jms.test.testpeer.matchers.AcceptedMatcher;
-import org.apache.qpid.jms.test.testpeer.matchers.ModifiedMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.ReleasedMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher;
@@ -351,32 +351,37 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase {
             MessageConsumer consumer = session.createDurableSubscriber(topic, subscriptionName);
 
             int consumeCount = 2;
+            Message receivedMessage = null;
+
             for (int i = 1; i <= consumeCount; i++) {
-                Message receivedMessage = consumer.receive(3000);
+                receivedMessage = consumer.receive(3000);
 
                 assertNotNull(receivedMessage);
                 assertTrue(receivedMessage instanceof TextMessage);
             }
 
-            testPeer.expectDetach(false, true, false);
-
-            // Expect the messages that were not acked to be to be either
-            // modified or released depending on whether the app saw them
+            // Expect the messages that were not delivered to be released.
             for (int i = 1; i <= consumeCount; i++) {
-                testPeer.expectDisposition(true, new ModifiedMatcher().withDeliveryFailed(equalTo(true)));
+                testPeer.expectDisposition(true, new AcceptedMatcher());
             }
+
+            receivedMessage.acknowledge();
+
+            testPeer.expectDetach(false, true, false);
+
             for (int i = consumeCount + 1 ; i <= messageCount; i++) {
                 testPeer.expectDisposition(true, new ReleasedMatcher());
             }
+
             testPeer.expectEnd();
 
             consumer.close();
             session.close();
 
-            testPeer.waitForAllHandlersToComplete(3000);
-
             testPeer.expectClose();
             connection.close();
+
+            testPeer.waitForAllHandlersToComplete(3000);
         }
     }
 
@@ -1304,4 +1309,266 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase {
             testPeer.waitForAllHandlersToComplete(2000);
         }
     }
+
+    @Test(timeout=20000)
+    public void testCloseClientAckAsyncConsumerCanStillAckMessages() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+            Queue queue = session.createQueue(getTestName());
+
+            int messageCount = 5;
+            int consumeCount = 5;
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"),
messageCount);
+
+            final CountDownLatch consumedLatch = new CountDownLatch(consumeCount);
+            final AtomicReference<Message> receivedMessage = new AtomicReference<>();
+
+            MessageConsumer consumer = session.createConsumer(queue);
+            consumer.setMessageListener(new MessageListener() {
+                @Override
+                public void onMessage(Message message) {
+                    receivedMessage.set(message);
+                    consumedLatch.countDown();
+                }
+            });
+
+            assertTrue("Did not consume all messages", consumedLatch.await(10, TimeUnit.SECONDS));
+
+            // Close should be deferred as these messages were delivered but not acknowledged.
+            consumer.close();
+
+            testPeer.waitForAllHandlersToComplete(3000);
+
+            for (int i = 1; i <= consumeCount; i++) {
+                testPeer.expectDisposition(true, new AcceptedMatcher());
+            }
+
+            // Ack the last read message, which should accept all previous messages as well.
+            receivedMessage.get().acknowledge();
+
+            // Now the consumer should close.
+            testPeer.expectDetach(true, true, true);
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(3000);
+        }
+    }
+
+    @Test(timeout=20000)
+    public void testCloseClientAckSyncConsumerCanStillAckMessages() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+            Queue queue = session.createQueue(getTestName());
+
+            int messageCount = 5;
+            int consumeCount = 4;
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"),
messageCount);
+
+            MessageConsumer consumer = session.createConsumer(queue);
+            Message receivedMessage = null;
+
+            for (int i = 1; i <= consumeCount; i++) {
+                receivedMessage = consumer.receive(3000);
+
+                assertNotNull(receivedMessage);
+                assertTrue(receivedMessage instanceof TextMessage);
+            }
+
+            // Close should be deferred as these messages were delivered but not acknowledged.
+            consumer.close();
+
+            testPeer.waitForAllHandlersToComplete(3000);
+
+            for (int i = 1; i <= consumeCount; i++) {
+                testPeer.expectDisposition(true, new AcceptedMatcher());
+            }
+
+            // Ack the last read message, which should accept all previous messages as well.
+            receivedMessage.acknowledge();
+
+            // Now the consumer should close.
+            testPeer.expectDetach(true, true, true);
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(3000);
+        }
+    }
+
+    @Test(timeout=20000)
+    public void testConsumerWithDeferredCloseActsAsClosed() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+            Queue queue = session.createQueue(getTestName());
+
+            int messageCount = 5;
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"),
messageCount);
+
+            MessageConsumer consumer = session.createConsumer(queue);
+
+            int consumeCount = 3;
+            Message receivedMessage = null;
+
+            for (int i = 1; i <= consumeCount; i++) {
+                receivedMessage = consumer.receive(3000);
+
+                assertNotNull(receivedMessage);
+                assertTrue(receivedMessage instanceof TextMessage);
+            }
+
+            // Close should be deferred as these messages were delivered but not acknowledged.
+            consumer.close();
+
+            testPeer.waitForAllHandlersToComplete(3000);
+
+            // Should not be able to consume from the consumer once closed.
+            try {
+                consumer.receive();
+                fail("Should throw as this consumer is closed.");
+            } catch (IllegalStateException ise) {}
+
+            try {
+                consumer.receive(100);
+                fail("Should throw as this consumer is closed.");
+            } catch (IllegalStateException ise) {}
+
+            try {
+                consumer.receiveNoWait();
+                fail("Should throw as this consumer is closed.");
+            } catch (IllegalStateException ise) {}
+
+            for (int i = 1; i <= consumeCount; i++) {
+                testPeer.expectDisposition(true, new AcceptedMatcher());
+            }
+
+            // Now the consumer should close.
+            testPeer.expectDetach(true, true, true);
+
+            // Ack the last read message, which should accept all previous messages as well.
+            receivedMessage.acknowledge();
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(3000);
+        }
+    }
+
+    @Test(timeout=20000)
+    public void testDeferredCloseTimeoutAlertsExceptionListener() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            final CountDownLatch errorLatch = new CountDownLatch(1);
+
+            JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+            connection.setCloseTimeout(500);
+            connection.setExceptionListener(new ExceptionListener() {
+
+                @Override
+                public void onException(JMSException exception) {
+                    if (exception instanceof JmsOperationTimedOutException) {
+                        errorLatch.countDown();
+                    }
+                }
+            });
+            connection.start();
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+            Queue queue = session.createQueue(getTestName());
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"),
1);
+
+            MessageConsumer consumer = session.createConsumer(queue);
+            Message receivedMessage = consumer.receive(3000);
+
+            assertNotNull(receivedMessage);
+            assertTrue(receivedMessage instanceof TextMessage);
+
+            // Close should be deferred as these messages were delivered but not acknowledged.
+            consumer.close();
+
+            testPeer.waitForAllHandlersToComplete(3000);
+
+            testPeer.expectDisposition(true, new AcceptedMatcher());
+
+            // Ack the read message, which should accept all previous messages as well.
+            receivedMessage.acknowledge();
+
+            // Now the consumer should close.
+            testPeer.expectDetach(true, false, true);
+
+            assertTrue("Did not get timed out error", errorLatch.await(10, TimeUnit.SECONDS));
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(3000);
+        }
+    }
+
+    @Test(timeout=20000)
+    public void testSessionCloseDoesNotDeferConsumerClose() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+            Queue queue = session.createQueue(getTestName());
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"),
1);
+
+            MessageConsumer consumer = session.createConsumer(queue);
+
+            Message receivedMessage = consumer.receive(3000);
+
+            assertNotNull(receivedMessage);
+            assertTrue(receivedMessage instanceof TextMessage);
+
+            testPeer.waitForAllHandlersToComplete(3000);
+            testPeer.expectEnd();
+
+            session.close();
+
+            // Consumer and Session should be closed, not acknowledge allowed
+            try {
+                receivedMessage.acknowledge();
+                fail("Should not be allowed to call acknowledge after session closed.");
+            } catch (IllegalStateException ise) {
+            }
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(3000);
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message