This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new cdc283f ARTEMIS-2915 duplicate temp queues using OpenWire
new c5bb2cc This closes #3276
cdc283f is described below
commit cdc283fba523248b2a83cca04d389a5f4d3680d0
Author: Justin Bertram <jbertram@apache.org>
AuthorDate: Thu Sep 24 11:27:20 2020 -0500
ARTEMIS-2915 duplicate temp queues using OpenWire
---
.../core/protocol/openwire/OpenWireConnection.java | 26 ++++++++++++++++++---
.../integration/openwire/SimpleOpenWireTest.java | 27 ++++++++++++++++++++++
2 files changed, 50 insertions(+), 3 deletions(-)
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index e091d79..3ad7ba0 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -853,9 +853,13 @@ public class OpenWireConnection extends AbstractRemotingConnection implements
Se
}
if (dest.isTemporary()) {
- //Openwire needs to store the DestinationInfo in order to send
- //Advisory messages to clients
- this.state.addTempDestination(info);
+ //Openwire needs to store the DestinationInfo in order to send Advisory messages
to clients
+ if (!tempDestinationExists(info.getDestination().getPhysicalName())) {
+ this.state.addTempDestination(info);
+ if (logger.isDebugEnabled()) {
+ logger.debug(this + " added temp destination to state: " + info.getDestination().getPhysicalName()
+ "; " + state.getTempDestinations().size());
+ }
+ }
}
if (created && !AdvisorySupport.isAdvisoryTopic(dest)) {
@@ -963,6 +967,9 @@ public class OpenWireConnection extends AbstractRemotingConnection implements
Se
public void tempQueueDeleted(SimpleString bindingName) {
ActiveMQDestination dest = new ActiveMQTempQueue(bindingName.toString());
state.removeTempDestination(dest);
+ if (logger.isDebugEnabled()) {
+ logger.debug(this + " removed temp destination from state: " + bindingName + ";
" + state.getTempDestinations().size());
+ }
if (!AdvisorySupport.isAdvisoryTopic(dest)) {
AMQConnectionContext context = getContext();
@@ -1121,6 +1128,19 @@ public class OpenWireConnection extends AbstractRemotingConnection
implements Se
}
}
+ private boolean tempDestinationExists(String name) {
+ boolean result = false;
+
+ for (DestinationInfo destinationInfo : state.getTempDestinations()) {
+ if (destinationInfo.getDestination().getPhysicalName().equals(name)) {
+ result = true;
+ break;
+ }
+ }
+
+ return result;
+ }
+
CommandProcessor commandProcessorInstance = new CommandProcessor();
// This will listen for commands through the protocolmanager
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
index dac4794..979d276 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
@@ -72,6 +72,7 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
@@ -115,6 +116,32 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
}
@Test
+ public void testDuplicateTemporaryDestination() throws Exception {
+ Connection connection = factory.createConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Destination queue = session.createTemporaryQueue();
+ for (int i = 0; i < 10; i++) {
+ MessageProducer producer = session.createProducer(queue);
+ producer.close();
+ }
+
+ int tempDestinationCount = 0;
+ for (RemotingConnection remotingConnection : server.getRemotingService().getConnections())
{
+ if (remotingConnection instanceof OpenWireConnection) {
+ OpenWireConnection openWireConnection = (OpenWireConnection) remotingConnection;
+ if (openWireConnection.getState() != null && openWireConnection.getState().getTempDestinations()
!= null) {
+ tempDestinationCount += openWireConnection.getState().getTempDestinations().size();
+ }
+ }
+ }
+
+ assertTrue(tempDestinationCount <= 1);
+
+ session.close();
+ connection.close();
+ }
+
+ @Test
public void testTransactionalSimple() throws Exception {
try (Connection connection = factory.createConnection()) {
|