activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [1/2] activemq-artemis git commit: ARTEMIS-1277 - purgeOnNoConsumer is not working properly
Date Tue, 11 Jul 2017 14:56:21 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 32fa29f4f -> f8554c732


ARTEMIS-1277 - purgeOnNoConsumer is not working properly

make sure rollback doesn't add messages back

https://issues.apache.org/jira/browse/ARTEMIS-1277


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/9ad3ad46
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/9ad3ad46
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/9ad3ad46

Branch: refs/heads/master
Commit: 9ad3ad465736ef79c6d511b573708fd0ca23a782
Parents: 32fa29f
Author: Andy Taylor <andy.tayls67@gmail.com>
Authored: Tue Jul 11 11:25:48 2017 +0100
Committer: Andy Taylor <andy.tayls67@gmail.com>
Committed: Tue Jul 11 11:25:48 2017 +0100

----------------------------------------------------------------------
 .../artemis/core/server/impl/QueueImpl.java     |  4 +
 .../amqp/AmqpPurgeOnNoConsumersTest.java        | 83 ++++++++++++++++++++
 2 files changed, 87 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9ad3ad46/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 002e511..82524ff 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -2837,6 +2837,10 @@ public class QueueImpl implements Queue {
    }
 
    void postRollback(final LinkedList<MessageReference> refs) {
+      //if we have purged then ignore adding the messages back
+      if (purgeOnNoConsumers && getConsumerCount() == 0) {
+         return;
+      }
       addHead(refs, false);
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9ad3ad46/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpPurgeOnNoConsumersTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpPurgeOnNoConsumersTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpPurgeOnNoConsumersTest.java
new file mode 100644
index 0000000..068e1ce
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpPurgeOnNoConsumersTest.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+public class AmqpPurgeOnNoConsumersTest extends AmqpClientTestSupport {
+
+   @Test(timeout = 60000)
+   public void testQueueReceiverReadMessage() throws Exception {
+      String queue = "purgeQueue";
+      SimpleString ssQueue = new SimpleString(queue);
+      server.addAddressInfo(new AddressInfo(ssQueue, RoutingType.ANYCAST));
+      server.createQueue(ssQueue, RoutingType.ANYCAST, ssQueue, null, true, false, 1, true,
false);
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      final AmqpReceiver receiver = session.createReceiver(queue);
+
+      Queue queueView = getProxyToQueue(queue);
+      assertEquals(0, queueView.getMessageCount());
+
+      Thread t = new Thread(new Runnable() {
+         @Override
+         public void run() {
+            for (int i = 0; i < 4; i++) {
+               try {
+                  AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS);
+                  receive.accept();
+                  assertNotNull(receive);
+               } catch (Exception e) {
+                  e.printStackTrace();
+               }
+            }
+            try {
+               receiver.close();
+            } catch (IOException e) {
+               e.printStackTrace();
+            }
+         }
+      });
+
+      t.start();
+
+      receiver.flow(5);
+
+      sendMessages(queue, 5);
+
+      t.join(5000);
+
+      assertEquals(0, queueView.getMessageCount());
+
+      connection.close();
+   }
+}


Mime
View raw message