qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rob...@apache.org
Subject qpid-jms git commit: QPIDJMS-85: ensure all deliveries have state when settled
Date Wed, 15 Jul 2015 15:05:45 GMT
Repository: qpid-jms
Updated Branches:
  refs/heads/master 45edd842e -> 32839a599


QPIDJMS-85: ensure all deliveries have state when settled


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/32839a59
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/32839a59
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/32839a59

Branch: refs/heads/master
Commit: 32839a59908040c492e0e11d7a00bd3f8548a1fa
Parents: 45edd84
Author: Robert Gemmell <robbie@apache.org>
Authored: Wed Jul 15 16:03:22 2015 +0100
Committer: Robert Gemmell <robbie@apache.org>
Committed: Wed Jul 15 16:03:22 2015 +0100

----------------------------------------------------------------------
 .../qpid/jms/provider/amqp/AmqpConsumer.java    | 38 +++++++++----
 .../integration/ConsumerIntegrationTest.java    | 58 ++++++++++++++++++++
 .../jms/integration/SessionIntegrationTest.java |  2 +-
 3 files changed, 86 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/32839a59/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index 46212af..a5405ac 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -20,6 +20,7 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 
 import java.io.IOException;
+import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -54,6 +55,7 @@ import org.apache.qpid.proton.amqp.messaging.Target;
 import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
 import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
 import org.apache.qpid.proton.amqp.transaction.TransactionalState;
+import org.apache.qpid.proton.amqp.transport.DeliveryState;
 import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
 import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
 import org.apache.qpid.proton.engine.Delivery;
@@ -75,6 +77,16 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo,
Receiver
 
     private static final int INITIAL_BUFFER_CAPACITY = 1024 * 128;
 
+    private static final Modified MODIFIED_FAILED = new Modified();
+    private static final Modified MODIFIED_UNDELIVERABLE = new Modified();
+    static
+    {
+        MODIFIED_FAILED.setDeliveryFailed(true);
+
+        MODIFIED_UNDELIVERABLE.setDeliveryFailed(true);
+        MODIFIED_UNDELIVERABLE.setUndeliverableHere(true);
+    }
+
     protected final AmqpSession session;
     protected final Map<JmsInboundMessageDispatch, Delivery> delivered = new LinkedHashMap<JmsInboundMessageDispatch,
Delivery>();
     protected boolean presettle;
@@ -228,12 +240,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo,
Receiver
         }
 
         source.setOutcomes(outcomes);
-
-        Modified modified = new Modified();
-        modified.setDeliveryFailed(true);
-        modified.setUndeliverableHere(false);
-
-        source.setDefaultOutcome(modified);
+        source.setDefaultOutcome(MODIFIED_FAILED);
 
         if (resource.isNoLocal()) {
             filters.put(JMS_NO_LOCAL_SYMBOL, AmqpJmsNoLocalType.NO_LOCAL);
@@ -296,6 +303,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo,
Receiver
             if (!isPresettle()) {
                 delivered.put(envelope, delivery);
             }
+            setDefaultDeliveryState(delivery, MODIFIED_FAILED);
             sendFlowIfNeeded();
         } else if (ackType.equals(ACK_TYPE.CONSUMED)) {
             // A Consumer may not always send a DELIVERED ack so we need to
@@ -412,7 +420,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo,
Receiver
     }
 
     private void processDelivery(Delivery incoming) throws Exception {
-
+        setDefaultDeliveryState(incoming, Released.getInstance());
         Message amqpMessage = decodeIncomingMessage(incoming);
         long deliveryCount = amqpMessage.getDeliveryCount();
         int maxRedeliveries = getJmsResource().getRedeliveryPolicy().getMaxRedeliveries();
@@ -457,6 +465,17 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo,
Receiver
         deliver(envelope);
     }
 
+    private void setDefaultDeliveryState(Delivery incoming, DeliveryState state) {
+        // TODO: temporary to maintain runtime compatibility with older
+        // Proton releases. Replace with direct invocation in future.
+        try {
+            Method m = incoming.getClass().getMethod("setDefaultDeliveryState", DeliveryState.class);
+            m.invoke(incoming, state);
+        } catch (Exception e) {
+            LOG.trace("Exception while setting defaultDeliveryState", e);
+        }
+    }
+
     protected long getNextIncomingSequenceNumber() {
         return incomingSequence.incrementAndGet();
     }
@@ -508,10 +527,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo,
Receiver
     }
 
     protected void deliveryFailed(Delivery incoming) {
-        Modified disposition = new Modified();
-        disposition.setUndeliverableHere(true);
-        disposition.setDeliveryFailed(true);
-        incoming.disposition(disposition);
+        incoming.disposition(MODIFIED_UNDELIVERABLE);
         incoming.settle();
         sendFlowIfNeeded();
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/32839a59/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
index 55e7df6..a84c3de 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.qpid.jms.integration;
 
+import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
@@ -27,12 +28,16 @@ import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.Queue;
 import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
 
 import org.apache.qpid.jms.test.QpidJmsTestCase;
 import org.apache.qpid.jms.test.Wait;
 import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
 import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError;
 import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
+import org.apache.qpid.jms.test.testpeer.matchers.ModifiedMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.ReleasedMatcher;
 import org.apache.qpid.proton.amqp.DescribedType;
 import org.junit.Test;
 
@@ -131,4 +136,57 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase {
             testPeer.waitForAllHandlersToComplete(2000);
         }
     }
+
+    @Test(timeout=20000)
+    public void testCloseDurableSubscriberWithUnackedAnUnconsumedPrefetchedMessages() throws
Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+            String topicName = "myTopic";
+            String subscriptionName = "mySubscription";
+
+            Topic topic = session.createTopic(topicName);
+
+            int messageCount = 5;
+            // Create a consumer and fill the prefetch with some messages,
+            // which we will consume some of but ack none of.
+            testPeer.expectDurableSubscriberAttach(topicName, subscriptionName);
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"),
messageCount);
+
+            MessageConsumer consumer = session.createDurableSubscriber(topic, subscriptionName);
+
+            int consumeCount = 2;
+            for (int i = 1; i <= consumeCount; i++) {
+                Message receivedMessage = consumer.receive(3000);
+
+                assertNotNull(receivedMessage);
+                assertTrue(receivedMessage instanceof TextMessage);
+            }
+
+            testPeer.expectDetach(false, true, false);
+
+            // Expect the messages that were not acked to be to be either
+            // modified or released depending on whether the app saw them
+            for (int i = 1; i <= consumeCount; i++) {
+                testPeer.expectDisposition(true, new ModifiedMatcher().withDeliveryFailed(equalTo(true)));
+            }
+            for (int i = consumeCount + 1 ; i <= messageCount; i++) {
+                testPeer.expectDisposition(true, new ReleasedMatcher());
+            }
+            testPeer.expectEnd();
+
+            consumer.close();
+            session.close();
+
+            testPeer.waitForAllHandlersToComplete(3000);
+
+            testPeer.expectClose();
+            connection.close();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/32839a59/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index ec6094b..e7e5bea 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -1228,7 +1228,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
             sourceMatcher.withAddress(equalTo(queueName));
             sourceMatcher.withDynamic(equalTo(false));
             sourceMatcher.withOutcomes(arrayContaining(Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL,
Released.DESCRIPTOR_SYMBOL, Modified.DESCRIPTOR_SYMBOL));
-            ModifiedMatcher outcomeMatcher = new ModifiedMatcher().withDeliveryFailed(equalTo(true)).withUndeliverableHere(equalTo(false));
+            ModifiedMatcher outcomeMatcher = new ModifiedMatcher().withDeliveryFailed(equalTo(true)).withUndeliverableHere(nullValue());
             sourceMatcher.withDefaultOutcome(outcomeMatcher);
 
             testPeer.expectReceiverAttach(notNullValue(), sourceMatcher);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message