activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [activemq-artemis] branch master updated: ARTEMIS-2915 duplicate temp queues using OpenWire
Date Thu, 24 Sep 2020 16:48:43 GMT
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new cdc283f  ARTEMIS-2915 duplicate temp queues using OpenWire
     new c5bb2cc  This closes #3276
cdc283f is described below

commit cdc283fba523248b2a83cca04d389a5f4d3680d0
Author: Justin Bertram <jbertram@apache.org>
AuthorDate: Thu Sep 24 11:27:20 2020 -0500

    ARTEMIS-2915 duplicate temp queues using OpenWire
---
 .../core/protocol/openwire/OpenWireConnection.java | 26 ++++++++++++++++++---
 .../integration/openwire/SimpleOpenWireTest.java   | 27 ++++++++++++++++++++++
 2 files changed, 50 insertions(+), 3 deletions(-)

diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index e091d79..3ad7ba0 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -853,9 +853,13 @@ public class OpenWireConnection extends AbstractRemotingConnection implements
Se
       }
 
       if (dest.isTemporary()) {
-         //Openwire needs to store the DestinationInfo in order to send
-         //Advisory messages to clients
-         this.state.addTempDestination(info);
+         //Openwire needs to store the DestinationInfo in order to send Advisory messages
to clients
+         if (!tempDestinationExists(info.getDestination().getPhysicalName())) {
+            this.state.addTempDestination(info);
+            if (logger.isDebugEnabled()) {
+               logger.debug(this + " added temp destination to state: " + info.getDestination().getPhysicalName()
+ "; " + state.getTempDestinations().size());
+            }
+         }
       }
 
       if (created && !AdvisorySupport.isAdvisoryTopic(dest)) {
@@ -963,6 +967,9 @@ public class OpenWireConnection extends AbstractRemotingConnection implements
Se
    public void tempQueueDeleted(SimpleString bindingName) {
       ActiveMQDestination dest = new ActiveMQTempQueue(bindingName.toString());
       state.removeTempDestination(dest);
+      if (logger.isDebugEnabled()) {
+         logger.debug(this + " removed temp destination from state: " + bindingName + ";
" + state.getTempDestinations().size());
+      }
 
       if (!AdvisorySupport.isAdvisoryTopic(dest)) {
          AMQConnectionContext context = getContext();
@@ -1121,6 +1128,19 @@ public class OpenWireConnection extends AbstractRemotingConnection
implements Se
       }
    }
 
+   private boolean tempDestinationExists(String name) {
+      boolean result = false;
+
+      for (DestinationInfo destinationInfo : state.getTempDestinations()) {
+         if (destinationInfo.getDestination().getPhysicalName().equals(name)) {
+            result = true;
+            break;
+         }
+      }
+
+      return result;
+   }
+
    CommandProcessor commandProcessorInstance = new CommandProcessor();
 
    // This will listen for commands through the protocolmanager
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
index dac4794..979d276 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
@@ -72,6 +72,7 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.tests.util.Wait;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
@@ -115,6 +116,32 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
    }
 
    @Test
+   public void testDuplicateTemporaryDestination() throws Exception {
+      Connection connection = factory.createConnection();
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      Destination queue = session.createTemporaryQueue();
+      for (int i = 0; i < 10; i++) {
+         MessageProducer producer = session.createProducer(queue);
+         producer.close();
+      }
+
+      int tempDestinationCount = 0;
+      for (RemotingConnection remotingConnection : server.getRemotingService().getConnections())
{
+         if (remotingConnection instanceof OpenWireConnection) {
+            OpenWireConnection openWireConnection = (OpenWireConnection) remotingConnection;
+            if (openWireConnection.getState() != null && openWireConnection.getState().getTempDestinations()
!= null) {
+               tempDestinationCount += openWireConnection.getState().getTempDestinations().size();
+            }
+         }
+      }
+
+      assertTrue(tempDestinationCount <= 1);
+
+      session.close();
+      connection.close();
+   }
+
+   @Test
    public void testTransactionalSimple() throws Exception {
       try (Connection connection = factory.createConnection()) {
 


Mime
View raw message