This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new 7158d8a ARTEMIS-1592 AutoDeleted queues removed before sendNotifcation on consumer.close()
new fad7aa2 This closes #2622
7158d8a is described below
commit 7158d8a8a4610bdee45f4da5a16dd1f22e7fb7f6
Author: Clebert Suconic <clebertsuconic@apache.org>
AuthorDate: Mon Apr 15 14:55:39 2019 -0400
ARTEMIS-1592 AutoDeleted queues removed before sendNotifcation on consumer.close()
This could cause errors on Notification udpates and clustering.
---
.../core/server/impl/ServerConsumerImpl.java | 6 +-
.../distribution/AutoDeleteDistributedTest.java | 187 +++++++++++++++++++++
2 files changed, 191 insertions(+), 2 deletions(-)
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 2dc834b..c709d4e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -572,8 +572,6 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
{
tx.rollback();
- messageQueue.recheckRefCount(session.getSessionContext());
-
if (!browseOnly) {
TypedProperties props = new TypedProperties();
@@ -601,6 +599,10 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
{
managementService.sendNotification(notification);
}
+
+ // The check here has to be done after the notification is sent, otherwise the queue
will be removed before the consumer.close reach other nodes on a cluster
+ messageQueue.recheckRefCount(session.getSessionContext());
+
if (server.hasBrokerConsumerPlugins()) {
server.callBrokerConsumerPlugins(plugin -> plugin.afterCloseConsumer(this, failed));
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/AutoDeleteDistributedTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/AutoDeleteDistributedTest.java
new file mode 100644
index 0000000..857d6b4
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/AutoDeleteDistributedTest.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.cluster.distribution;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSConsumer;
+import javax.jms.JMSContext;
+import javax.jms.JMSProducer;
+import javax.jms.MessageListener;
+import javax.naming.NamingException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AutoDeleteDistributedTest extends ClusterTestBase {
+
+ private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+
+ start();
+ }
+
+ private void start() throws Exception {
+ setupServers();
+
+ setRedistributionDelay(0);
+
+ setupCluster(MessageLoadBalancingType.ON_DEMAND);
+
+ servers[0].start();
+ servers[1].start();
+ servers[2].start();
+ }
+
+ protected boolean isNetty() {
+ return true;
+ }
+
+ @Override
+ protected void setSessionFactoryCreateLocator(int node, boolean ha, TransportConfiguration
serverTotc) {
+ super.setSessionFactoryCreateLocator(node, ha, serverTotc);
+
+ locators[node].setConsumerWindowSize(0);
+
+ }
+
+ @Test
+ public void testAutoDelete() throws Exception {
+
+ AssertionLoggerHandler.startCapture();
+
+ try {
+
+ AtomicBoolean error = new AtomicBoolean(false);
+
+ int messageCount = 30;
+
+ JMSContext client1JmsContext = createContext(0);
+ JMSContext client2JmsContext = createContext(1);
+
+ final JMSConsumer client2JmsConsumer = client2JmsContext.createConsumer(client2JmsContext.createQueue("queues.myQueue"));
+ final CountDownLatch onMessageReceived = new CountDownLatch(messageCount);
+ client2JmsConsumer.setMessageListener(new MessageListener() {
+ @Override
+ public void onMessage(final javax.jms.Message m) {
+ System.out.println("Message received. " + m);
+ onMessageReceived.countDown();
+ }
+ });
+
+ /*
+ * sending a message to broker1
+ */
+ {
+
+ final CountDownLatch onMessageSent = new CountDownLatch(1);
+ final JMSProducer jmsProducer = client1JmsContext.createProducer();
+ jmsProducer.setAsync(new javax.jms.CompletionListener() {
+ @Override
+ public void onCompletion(final javax.jms.Message m) {
+ System.out.println("Message sent. " + m);
+ onMessageSent.countDown();
+ }
+
+ @Override
+ public void onException(final javax.jms.Message m, final Exception ex) {
+ ex.printStackTrace();
+ error.set(true);
+ }
+ });
+ for (int i = 0; i < messageCount; i++) {
+ final BytesMessage jmsMsg = client1JmsContext.createBytesMessage();
+ jmsMsg.setJMSType("MyType");
+ jmsProducer.send(client1JmsContext.createQueue("queues.myQueue"), jmsMsg);
+ }
+
+ System.out.println("Waiting for message to be sent...");
+ onMessageSent.await(5, TimeUnit.SECONDS);
+ }
+
+ System.out.println("Waiting for message to be received...");
+ Assert.assertTrue(onMessageReceived.await(5, TimeUnit.SECONDS));
+
+ client2JmsConsumer.close();
+ Assert.assertFalse(error.get());
+
+ Thread.sleep(100); // I couldn't make it to fail without a minimal sleep here
+ Assert.assertFalse(AssertionLoggerHandler.findText("java.lang.IllegalStateException"));
+ Assert.assertFalse(AssertionLoggerHandler.findText("Cannot find binding"));
+ } finally {
+ AssertionLoggerHandler.stopCapture();
+ }
+ }
+
+ private JMSContext createContext(int server) throws NamingException {
+ ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:" + (61617
+ server));
+ return cf.createContext();
+ }
+
+ protected void setupCluster(final MessageLoadBalancingType messageLoadBalancingType) throws
Exception {
+ setupClusterConnection("cluster0", "queues", messageLoadBalancingType, 1, isNetty(),
0, 1, 2);
+
+ setupClusterConnection("cluster1", "queues", messageLoadBalancingType, 1, isNetty(),
1, 0, 2);
+
+ setupClusterConnection("cluster2", "queues", messageLoadBalancingType, 1, isNetty(),
2, 0, 1);
+ }
+
+ protected void setRedistributionDelay(final long delay) {
+ AddressSettings as = new AddressSettings().setRedistributionDelay(delay);
+
+ getServer(0).getAddressSettingsRepository().addMatch("queues.*", as);
+ getServer(1).getAddressSettingsRepository().addMatch("queues.*", as);
+ getServer(2).getAddressSettingsRepository().addMatch("queues.*", as);
+ }
+
+ protected void setupServers() throws Exception {
+ setupServer(0, isFileStorage(), isNetty());
+ setupServer(1, isFileStorage(), isNetty());
+ setupServer(2, isFileStorage(), isNetty());
+ servers[0].getConfiguration().addAddressesSetting("*", new AddressSettings().setAutoCreateAddresses(true)
//
+ .setAutoCreateQueues(true) //
+ .setAutoDeleteAddresses(true) //
+ .setAutoDeleteQueues(true) // --> this causes IllegalStateExceptions
+ .setDefaultPurgeOnNoConsumers(true));
+ }
+
+ protected void stopServers() throws Exception {
+ closeAllConsumers();
+
+ closeAllSessionFactories();
+
+ closeAllServerLocatorsFactories();
+
+ stopServers(0, 1, 2);
+
+ clearServer(0, 1, 2);
+ }
+
+}
|