activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject [1/2] activemq-artemis git commit: ARTEMIS-2027: handle aborted AMQP deliveries
Date Mon, 13 Aug 2018 14:52:46 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 6f0988dc5 -> 005745e7a


ARTEMIS-2027: handle aborted AMQP deliveries


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/e1ba608d
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/e1ba608d
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/e1ba608d

Branch: refs/heads/master
Commit: e1ba608d64883f071442340ed091441187f48a01
Parents: 6f0988d
Author: Robbie Gemmell <robbie@apache.org>
Authored: Mon Aug 13 14:01:01 2018 +0100
Committer: Robbie Gemmell <robbie@apache.org>
Committed: Mon Aug 13 14:01:01 2018 +0100

----------------------------------------------------------------------
 artemis-protocols/artemis-amqp-protocol/pom.xml |  5 ++
 .../proton/ProtonServerReceiverContext.java     | 17 +++++
 .../proton/ProtonServerReceiverContextTest.java | 73 ++++++++++++++++++++
 3 files changed, 95 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e1ba608d/artemis-protocols/artemis-amqp-protocol/pom.xml
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/pom.xml b/artemis-protocols/artemis-amqp-protocol/pom.xml
index 5f91a51..e383b37 100644
--- a/artemis-protocols/artemis-amqp-protocol/pom.xml
+++ b/artemis-protocols/artemis-amqp-protocol/pom.xml
@@ -123,5 +123,10 @@
          <groupId>org.osgi</groupId>
          <artifactId>osgi.cmpn</artifactId>
       </dependency>
+      <dependency>
+         <groupId>org.mockito</groupId>
+         <artifactId>mockito-core</artifactId>
+         <scope>test</scope>
+      </dependency>
    </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e1ba608d/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
index aad89a8..c3df1a7 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
@@ -218,6 +218,23 @@ public class ProtonServerReceiverContext extends ProtonInitializable
implements
          if (!delivery.isReadable()) {
             return;
          }
+
+         if (delivery.isAborted()) {
+            receiver = ((Receiver) delivery.getLink());
+
+            // Aborting implicitly remotely settles, so advance
+            // receiver to the next delivery and settle locally.
+            receiver.advance();
+            delivery.settle();
+
+            // Replenish the credit if not doing a drain
+            if (!receiver.getDrain()) {
+               receiver.flow(1);
+            }
+
+            return;
+         }
+
          if (delivery.isPartial()) {
             return;
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e1ba608d/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java
b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java
new file mode 100644
index 0000000..88dfe3a
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.proton;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Receiver;
+import org.junit.Test;
+
+public class ProtonServerReceiverContextTest {
+
+   @Test
+   public void testOnMessageWithAbortedDelivery() throws Exception {
+      doOnMessageWithAbortedDeliveryTestImpl(false);
+   }
+
+   @Test
+   public void testOnMessageWithAbortedDeliveryDrain() throws Exception {
+      doOnMessageWithAbortedDeliveryTestImpl(true);
+   }
+
+   private void doOnMessageWithAbortedDeliveryTestImpl(boolean drain) throws ActiveMQAMQPException
{
+      Receiver mockReceiver = mock(Receiver.class);
+      AMQPConnectionContext mockConnContext = mock(AMQPConnectionContext.class);
+
+      when(mockConnContext.getAmqpCredits()).thenReturn(100);
+      when(mockConnContext.getAmqpLowCredits()).thenReturn(30);
+
+      ProtonServerReceiverContext rc = new ProtonServerReceiverContext(null, mockConnContext,
null, mockReceiver);
+
+      Delivery mockDelivery = mock(Delivery.class);
+      when(mockDelivery.isReadable()).thenReturn(true);
+      when(mockDelivery.isAborted()).thenReturn(true);
+      when(mockDelivery.isPartial()).thenReturn(true);
+      when(mockDelivery.getLink()).thenReturn(mockReceiver);
+
+      if (drain) {
+         when(mockReceiver.getDrain()).thenReturn(true);
+      }
+
+      rc.onMessage(mockDelivery);
+
+      verify(mockReceiver, times(1)).advance();
+      verify(mockDelivery, times(1)).settle();
+
+      verify(mockReceiver, times(1)).getDrain();
+      if (!drain) {
+         verify(mockReceiver, times(1)).flow(1);
+      }
+      verifyNoMoreInteractions(mockReceiver);
+   }
+
+}


Mime
View raw message