activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject [activemq-artemis] branch main updated: ARTEMIS-3323 - ensure openwire message id is unique and consistent for the life of a broker when converted from core
Date Tue, 01 Jun 2021 18:43:33 GMT
This is an automated email from the ASF dual-hosted git repository.

gtully pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 7ce9030  ARTEMIS-3323 - ensure openwire message id is unique and consistent for the
life of a broker when converted from core
7ce9030 is described below

commit 7ce9030e9f1235ce3040d2111780a8bbb62bf927
Author: gtully <gary.tully@gmail.com>
AuthorDate: Tue Jun 1 12:16:29 2021 +0100

    ARTEMIS-3323 - ensure openwire message id is unique and consistent for the life of a broker
when converted from core
---
 .../openwire/OpenWireMessageConverter.java         |  39 ++++----
 .../core/protocol/openwire/amq/AMQConsumer.java    |  23 ++---
 .../core/protocol/openwire/amq/AMQSession.java     |   2 +-
 .../openwire/OpenWireMessageConverterTest.java     |  94 +++++++++++++++++++
 .../protocol/openwire/amq/AMQConsumerTest.java     |  13 ++-
 .../cluster/distribution/ClusterTestBase.java      |   6 +-
 .../cluster/MessageRedistributionTest.java         | 102 +++++++++++++++++++++
 .../openwire/interop/GeneralInteropTest.java       |  41 +++++++++
 .../artemis/tests/unit/util/UUIDGeneratorTest.java |  16 ++++
 9 files changed, 292 insertions(+), 44 deletions(-)

diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index 3348b0f..136f7b9 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -49,6 +49,7 @@ import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.reader.MessageUtil;
 import org.apache.activemq.artemis.utils.DataConstants;
+import org.apache.activemq.artemis.utils.UUID;
 import org.apache.activemq.artemis.utils.UUIDGenerator;
 import org.apache.activemq.artemis.utils.collections.TypedProperties;
 import org.apache.activemq.command.ActiveMQBytesMessage;
@@ -87,8 +88,6 @@ public final class OpenWireMessageConverter {
    private static final SimpleString AMQ_MSG_CLUSTER = new SimpleString(AMQ_PREFIX + "CLUSTER");
    private static final SimpleString AMQ_MSG_COMMAND_ID = new SimpleString(AMQ_PREFIX + "COMMAND_ID");
    private static final SimpleString AMQ_MSG_DATASTRUCTURE = new SimpleString(AMQ_PREFIX
+ "DATASTRUCTURE");
-   private static final SimpleString AMQ_MSG_GROUP_ID = org.apache.activemq.artemis.api.core.Message.HDR_GROUP_ID;
-   private static final SimpleString AMQ_MSG_GROUP_SEQUENCE = org.apache.activemq.artemis.api.core.Message.HDR_GROUP_SEQUENCE;
    private static final SimpleString AMQ_MSG_MESSAGE_ID = new SimpleString(AMQ_PREFIX + "MESSAGE_ID");
    private static final SimpleString AMQ_MSG_ORIG_DESTINATION =  new SimpleString(AMQ_PREFIX
+ "ORIG_DESTINATION");
    private static final SimpleString AMQ_MSG_ORIG_TXID = new SimpleString(AMQ_PREFIX + "ORIG_TXID");
@@ -131,7 +130,7 @@ public final class OpenWireMessageConverter {
       } else if (contents != null) {
          final boolean messageCompressed = messageSend.isCompressed();
          if (messageCompressed) {
-            coreMessage.putBooleanProperty(AMQ_MSG_COMPRESSED, messageCompressed);
+            coreMessage.putBooleanProperty(AMQ_MSG_COMPRESSED, true);
          }
 
          switch (coreType) {
@@ -468,7 +467,7 @@ public final class OpenWireMessageConverter {
          SimpleString key = new SimpleString(entry.getKey());
          Object value = entry.getValue();
          if (value instanceof UTF8Buffer) {
-            value = ((UTF8Buffer) value).toString();
+            value = value.toString();
          }
          TypedProperties.setObjectProperty(key, value, props);
       }
@@ -498,8 +497,8 @@ public final class OpenWireMessageConverter {
    public static MessageDispatch createMessageDispatch(MessageReference reference,
                                                        ICoreMessage message,
                                                        WireFormat marshaller,
-                                                       AMQConsumer consumer) throws IOException
{
-      ActiveMQMessage amqMessage = toAMQMessage(reference, message, marshaller, consumer);
+                                                       AMQConsumer consumer, UUID serverNodeUUID)
throws IOException {
+      ActiveMQMessage amqMessage = toAMQMessage(reference, message, marshaller, consumer,
serverNodeUUID);
 
       //we can use core message id for sequenceId
       amqMessage.getMessageId().setBrokerSequenceId(message.getMessageID());
@@ -526,11 +525,11 @@ public final class OpenWireMessageConverter {
    private static ActiveMQMessage toAMQMessage(MessageReference reference,
                                                ICoreMessage coreMessage,
                                                WireFormat marshaller,
-                                               AMQConsumer consumer) throws IOException {
+                                               AMQConsumer consumer, UUID serverNodeUUID)
throws IOException {
       final ActiveMQMessage amqMsg;
       final byte coreType = coreMessage.getType();
       final Boolean compressProp = (Boolean) coreMessage.getObjectProperty(AMQ_MSG_COMPRESSED);
-      final boolean isCompressed = compressProp == null ? false : compressProp.booleanValue();
+      final boolean isCompressed = compressProp != null && compressProp;
       final byte[] bytes;
       final ActiveMQBuffer buffer = coreMessage.getDataBuffer();
       buffer.resetReaderIndex();
@@ -591,12 +590,12 @@ public final class OpenWireMessageConverter {
       amqMsg.setArrival(arrival);
 
       final Object brokerPath = coreMessage.getObjectProperty(AMQ_MSG_BROKER_PATH);
-      if (brokerPath != null && brokerPath instanceof SimpleString && ((SimpleString)brokerPath).length()
> 0) {
+      if (brokerPath instanceof SimpleString && ((SimpleString)brokerPath).length()
> 0) {
          setAMQMsgBrokerPath(amqMsg, ((SimpleString)brokerPath).toString());
       }
 
       final Object clusterPath = coreMessage.getObjectProperty(AMQ_MSG_CLUSTER);
-      if (clusterPath != null && clusterPath instanceof SimpleString && ((SimpleString)clusterPath).length()
> 0) {
+      if (clusterPath instanceof SimpleString && ((SimpleString)clusterPath).length()
> 0) {
          setAMQMsgClusterPath(amqMsg, ((SimpleString)clusterPath).toString());
       }
 
@@ -626,20 +625,15 @@ public final class OpenWireMessageConverter {
 
       amqMsg.setGroupSequence(coreMessage.getGroupSequence());
 
+      final byte[] midBytes = coreMessage.getBytesProperty(AMQ_MSG_MESSAGE_ID);
       final MessageId mid;
-      final byte[] midBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_MESSAGE_ID);
       if (midBytes != null) {
          ByteSequence midSeq = new ByteSequence(midBytes);
          mid = (MessageId) marshaller.unmarshal(midSeq);
       } else {
-         final SimpleString connectionId = (SimpleString) coreMessage.getObjectProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME);
-         if (connectionId != null) {
-            mid = new MessageId("ID:" + connectionId.toString() + ":-1:-1:-1", coreMessage.getMessageID());
-         } else {
-            //JMSMessageID should be started with "ID:"
-            String midd = "ID:" + UUIDGenerator.getInstance().generateStringUUID() + ":-1:-1:-1:-1";
-            mid = new MessageId(midd);
-         }
+         //JMSMessageID should be started with "ID:" and needs to be globally unique (node
+ journal id)
+         String midd = "ID:" + serverNodeUUID + ":-1:-1:-1";
+         mid = new MessageId(midd, coreMessage.getMessageID());
       }
 
       amqMsg.setMessageId(mid);
@@ -673,7 +667,7 @@ public final class OpenWireMessageConverter {
       }
 
       final Object userId = coreMessage.getObjectProperty(AMQ_MSG_USER_ID);
-      if (userId != null && userId instanceof SimpleString && ((SimpleString)userId).length()
> 0) {
+      if (userId instanceof SimpleString && ((SimpleString)userId).length() >
0) {
          amqMsg.setUserID(((SimpleString)userId).toString());
       }
 
@@ -694,7 +688,7 @@ public final class OpenWireMessageConverter {
 
       final Set<SimpleString> props = coreMessage.getPropertyNames();
       if (props != null) {
-         setAMQMsgObjectProperties(amqMsg, coreMessage, props, consumer);
+         setAMQMsgObjectProperties(amqMsg, coreMessage, props);
       }
 
       if (bytes != null) {
@@ -945,8 +939,7 @@ public final class OpenWireMessageConverter {
 
    private static void setAMQMsgObjectProperties(final ActiveMQMessage amqMsg,
                                                  final ICoreMessage coreMessage,
-                                                 final Set<SimpleString> props,
-                                                 final AMQConsumer consumer) throws IOException
{
+                                                 final Set<SimpleString> props) throws
IOException {
       for (SimpleString s : props) {
          final String keyStr = s.toString();
          if (!coreMessage.containsProperty(ManagementHelper.HDR_NOTIFICATION_TYPE) &&
(keyStr.startsWith("_AMQ") || keyStr.startsWith("__HDR_"))) {
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index 89eafe7..d68fa91 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -16,7 +16,6 @@
  */
 package org.apache.activemq.artemis.core.protocol.openwire.amq;
 
-import java.io.IOException;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Set;
@@ -60,10 +59,8 @@ import org.apache.activemq.command.MessagePull;
 import org.apache.activemq.command.RemoveInfo;
 
 public class AMQConsumer {
-   private static final String AMQ_NOTIFICATIONS_DESTINATION = "activemq.notifications";
    private final AMQSession session;
    private final org.apache.activemq.command.ActiveMQDestination openwireDestination;
-   private final boolean hasNotificationDestination;
    private final ConsumerInfo info;
    private final ScheduledExecutorService scheduledPool;
    private ServerConsumer serverConsumer;
@@ -85,7 +82,6 @@ public class AMQConsumer {
                       boolean internalAddress) {
       this.session = amqSession;
       this.openwireDestination = d;
-      this.hasNotificationDestination = d.toString().contains(AMQ_NOTIFICATIONS_DESTINATION);
       this.info = info;
       this.scheduledPool = scheduledPool;
       this.prefetchSize = info.getPrefetchSize();
@@ -132,7 +128,7 @@ public class AMQConsumer {
             preAck = true;
          }
          String id = info.getClientId() != null ? info.getClientId() : this.getId().getConnectionId();
-         String noLocalSelector = MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'"
+ id + "'";
+         String noLocalSelector = MessageUtil.CONNECTION_ID_PROPERTY_NAME + "<>'" +
id + "'";
          if (selector == null) {
             selector = new SimpleString(noLocalSelector);
          } else {
@@ -250,7 +246,7 @@ public class AMQConsumer {
       }
    }
 
-   public int handleDeliver(MessageReference reference, ICoreMessage message, int deliveryCount)
{
+   public int handleDeliver(MessageReference reference, ICoreMessage message) {
       MessageDispatch dispatch;
       try {
          MessagePullHandler pullHandler = messagePullHandler.get();
@@ -264,15 +260,12 @@ public class AMQConsumer {
             message.removeProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME);
          }
          //handleDeliver is performed by an executor (see JBPAPP-6030): any AMQConsumer can
share the session.wireFormat()
-         dispatch = OpenWireMessageConverter.createMessageDispatch(reference, message, session.wireFormat(),
this);
+         dispatch = OpenWireMessageConverter.createMessageDispatch(reference, message, session.wireFormat(),
this, session.getCoreServer().getNodeManager().getUUID());
          int size = dispatch.getMessage().getSize();
          reference.setProtocolData(dispatch.getMessage().getMessageId());
          session.deliverMessage(dispatch);
          currentWindow.decrementAndGet();
          return size;
-      } catch (IOException e) {
-         ActiveMQServerLogger.LOGGER.warn("Error during message dispatch", e);
-         return 0;
       } catch (Throwable t) {
          ActiveMQServerLogger.LOGGER.warn("Error during message dispatch", t);
          return 0;
@@ -399,9 +392,6 @@ public class AMQConsumer {
       serverConsumer.close(false);
    }
 
-   public boolean hasNotificationDestination() {
-      return hasNotificationDestination;
-   }
 
    public org.apache.activemq.command.ActiveMQDestination getOpenwireDestination() {
       return openwireDestination;
@@ -488,12 +478,11 @@ public class AMQConsumer {
       }
    }
 
-   public boolean removeRolledback(MessageReference messageReference) {
+   public void removeRolledback(MessageReference messageReference) {
       final Set<MessageReference> rolledbackMessageRefs = getRolledbackMessageRefs();
-      if (rolledbackMessageRefs == null) {
-         return false;
+      if (rolledbackMessageRefs != null) {
+         rolledbackMessageRefs.remove(messageReference);
       }
-      return rolledbackMessageRefs.remove(messageReference);
    }
 
    public void addRolledback(MessageReference messageReference) {
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index d4de2ee..f96b6c9 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -311,7 +311,7 @@ public class AMQSession implements SessionCallback {
       AMQConsumer theConsumer = (AMQConsumer) consumer.getProtocolData();
       //clear up possible rolledback ids.
       theConsumer.removeRolledback(reference);
-      return theConsumer.handleDeliver(reference, message.toCore(), deliveryCount);
+      return theConsumer.handleDeliver(reference, message.toCore());
    }
 
    @Override
diff --git a/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java
b/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java
new file mode 100644
index 0000000..d29676f
--- /dev/null
+++ b/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.protocol.openwire;
+
+import org.apache.activemq.ActiveMQMessageAuditNoSync;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
+import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl;
+import org.apache.activemq.artemis.reader.MessageUtil;
+import org.apache.activemq.artemis.utils.UUID;
+import org.apache.activemq.artemis.utils.UUIDGenerator;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.openwire.OpenWireFormatFactory;
+import org.apache.activemq.wireformat.WireFormat;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.assertFalse;
+
+public class OpenWireMessageConverterTest {
+
+   final OpenWireFormatFactory formatFactory = new OpenWireFormatFactory();
+   final WireFormat openWireFormat =  formatFactory.createWireFormat();
+   final byte[] content = new byte[] {'a','a'};
+   final String address = "Q";
+   final ActiveMQDestination destination = new ActiveMQQueue(address);
+   final UUID nodeUUID = UUIDGenerator.getInstance().generateUUID();
+
+   @Test
+   public void createMessageDispatch() throws Exception {
+
+      ActiveMQMessageAuditNoSync mqMessageAuditNoSync = new ActiveMQMessageAuditNoSync();
+
+      for (int i = 0; i < 10; i++) {
+
+         ICoreMessage msg = new CoreMessage().initBuffer(100);
+         msg.setMessageID(i);
+         msg.getBodyBuffer().writeBytes(content);
+         msg.setAddress(address);
+
+         MessageReference messageReference = new MessageReferenceImpl(msg, Mockito.mock(Queue.class));
+         AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
+         Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
+
+         MessageDispatch dispatch = OpenWireMessageConverter.createMessageDispatch(messageReference,
msg, openWireFormat, amqConsumer, nodeUUID);
+
+         MessageId messageId = dispatch.getMessage().getMessageId();
+         assertFalse(mqMessageAuditNoSync.isDuplicate(messageId));
+      }
+
+
+      for (int i = 10; i < 20; i++) {
+
+         CoreMessage msg = new CoreMessage().initBuffer(100);
+         msg.setMessageID(i);
+         msg.getBodyBuffer().writeBytes(content);
+         msg.setAddress(address);
+
+         // share a connection id
+         msg.getProperties().putProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME, "MyClient");
+
+
+         MessageReference messageReference = new MessageReferenceImpl(msg, Mockito.mock(Queue.class));
+         AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
+         Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
+
+         MessageDispatch dispatch = OpenWireMessageConverter.createMessageDispatch(messageReference,
msg, openWireFormat, amqConsumer, nodeUUID);
+
+         MessageId messageId = dispatch.getMessage().getMessageId();
+         assertFalse(mqMessageAuditNoSync.isDuplicate(messageId));
+      }
+
+   }
+}
\ No newline at end of file
diff --git a/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerTest.java
b/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerTest.java
index 5c1a879..b20f209 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerTest.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerTest.java
@@ -25,9 +25,12 @@ import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.server.NodeManager;
 import org.apache.activemq.artemis.core.server.ServerSession;
 import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
 import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
+import org.apache.activemq.artemis.utils.UUID;
+import org.apache.activemq.artemis.utils.UUIDGenerator;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.ConsumerInfo;
@@ -55,6 +58,12 @@ public class AMQConsumerTest {
    }
 
    private AMQConsumer getConsumer(int prefetchSize) throws Exception {
+      UUID nodeId = UUIDGenerator.getInstance().generateUUID();
+      ActiveMQServer coreServer = Mockito.mock(ActiveMQServer.class);
+      NodeManager nodeManager = Mockito.mock(NodeManager.class);
+      Mockito.when(coreServer.getNodeManager()).thenReturn(nodeManager);
+      Mockito.when(nodeManager.getUUID()).thenReturn(nodeId);
+
       ServerSession coreSession = Mockito.mock(ServerSession.class);
       Mockito.when(coreSession.createConsumer(ArgumentMatchers.anyLong(), ArgumentMatchers.nullable(SimpleString.class),
                                               ArgumentMatchers.nullable(SimpleString.class),
ArgumentMatchers.anyInt(),
@@ -62,7 +71,7 @@ public class AMQConsumerTest {
                                               ArgumentMatchers.nullable(Integer.class))).thenReturn(Mockito.mock(ServerConsumerImpl.class));
       AMQSession session = Mockito.mock(AMQSession.class);
       Mockito.when(session.getConnection()).thenReturn(Mockito.mock(OpenWireConnection.class));
-      Mockito.when(session.getCoreServer()).thenReturn(Mockito.mock(ActiveMQServer.class));
+      Mockito.when(session.getCoreServer()).thenReturn(coreServer);
       Mockito.when(session.getCoreSession()).thenReturn(coreSession);
       Mockito.when(session.convertWildcard(ArgumentMatchers.any(ActiveMQDestination.class))).thenReturn("");
 
@@ -81,7 +90,7 @@ public class AMQConsumerTest {
 
       Assert.assertTrue(consumer.hasCredits());
 
-      consumer.handleDeliver(reference, message, 0);
+      consumer.handleDeliver(reference, message);
 
       Assert.assertFalse(consumer.hasCredits());
 
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
index 4a88fcc..a6aeeaa 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
@@ -130,6 +130,10 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
 
    protected ServerLocator[] locators;
 
+   protected boolean isForceUniqueStorageManagerIds() {
+      return true;
+   }
+
    @Override
    @Before
    public void setUp() throws Exception {
@@ -1934,7 +1938,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
          log.debug("started server " + servers[node]);
          waitForServerToStart(servers[node]);
 
-         if (servers[node].getStorageManager() != null) {
+         if (servers[node].getStorageManager() != null && isForceUniqueStorageManagerIds())
{
             for (int i = 0; i < node * 1000; i++) {
                // it is common to have messages landing with similar IDs on separate nodes,
which could hide a few issues.
                // so we make them unequal
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/cluster/MessageRedistributionTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/cluster/MessageRedistributionTest.java
index d83491a..ceaad7b 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/cluster/MessageRedistributionTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/cluster/MessageRedistributionTest.java
@@ -16,9 +16,11 @@
  */
 package org.apache.activemq.artemis.tests.integration.openwire.cluster;
 
+import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
 import org.apache.activemq.artemis.core.postoffice.Binding;
 import org.apache.activemq.artemis.core.postoffice.Bindings;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -35,7 +37,9 @@ import javax.jms.Connection;
 import javax.jms.Destination;
 import javax.jms.Message;
 import javax.jms.MessageProducer;
+import javax.jms.Queue;
 import javax.jms.Session;
+import javax.jms.TextMessage;
 import javax.jms.Topic;
 import javax.jms.TopicSubscriber;
 import java.util.Collection;
@@ -44,6 +48,12 @@ import java.util.concurrent.TimeUnit;
 
 public class MessageRedistributionTest extends ClusterTestBase {
 
+   @Override
+   protected boolean isForceUniqueStorageManagerIds() {
+      // we want to verify messageId uniqueness across brokers
+      return false;
+   }
+
    @Test
    public void testRemoteConsumerClose() throws Exception {
 
@@ -77,6 +87,98 @@ public class MessageRedistributionTest extends ClusterTestBase {
    }
 
    @Test
+   public void testFailoverNonClusteredBrokersInteropWithCoreProducer() throws Exception
{
+
+      setupServer(0, true, true);
+      setupServer(1, true, true);
+
+      startServers(0, 1);
+
+      servers[0].getAddressSettingsRepository().getMatch("#").setRedeliveryDelay(0).setRedistributionDelay(0);
+      servers[1].getAddressSettingsRepository().getMatch("#").setRedeliveryDelay(0).setRedistributionDelay(0);
+
+      setupSessionFactory(0, true);
+      setupSessionFactory(1, true);
+
+      createAddressInfo(0, "q", RoutingType.ANYCAST, -1, false);
+      createAddressInfo(1, "q", RoutingType.ANYCAST, -1, false);
+      createQueue(0, "q", "q", null, true, RoutingType.ANYCAST);
+      createQueue(1, "q", "q", null, true, RoutingType.ANYCAST);
+
+
+      final int numMessagesPerNode = 1000;
+      produceWithCoreTo(0, numMessagesPerNode);
+      produceWithCoreTo(1, numMessagesPerNode);
+
+      // consume with openwire from both brokers which both start with journal id = 0, should
be in lock step
+
+      String zero = getServerUri(0);
+      String one = getServerUri(1);
+
+      ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("failover:(" + zero
+ "," + one + ")?jms.prefetchPolicy.all=10&randomize=false&timeout=400&reconnectDelay=500&useExponentialBackOff=false&initialReconnectDelay=500&nested.wireFormat.maxInactivityDuration=500&nested.wireFormat.maxInactivityDurationInitalDelay=500&nested.ignoreRemoteWireFormat=true&nested.soTimeout=500&nested.connectionTimeout=400&jms.connectResponseTimeout=400&jms.sendTimeout=400&jms.closeTimeout=400");
+      factory.setWatchTopicAdvisories(false);
+
+      CountDownLatch continueLatch = new CountDownLatch(1);
+      CountDownLatch received = new CountDownLatch(numMessagesPerNode * 2);
+      final Connection conn = factory.createConnection();
+      conn.start();
+
+      ((ActiveMQConnection)conn).setClientInternalExceptionListener(Throwable::printStackTrace);
+
+      Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      Destination dest = ActiveMQDestination.createDestination("q", ActiveMQDestination.QUEUE_TYPE);
+      session.createConsumer(dest).setMessageListener(message -> {
+         try {
+            received.countDown();
+         } catch (Exception exception) {
+            exception.printStackTrace();
+         }
+      });
+
+
+      assertTrue(Wait.waitFor(new org.apache.activemq.artemis.utils.Wait.Condition() {
+         @Override
+         public boolean isSatisfied() throws Exception {
+            return received.getCount() <= numMessagesPerNode;
+         }
+      }));
+
+      // force a failover to the other broker
+      servers[0].stop(false, true);
+
+      // get all the messages, our openwire audit does not detect any duplicate
+      assertTrue(Wait.waitFor(() -> {
+         return received.await(1, TimeUnit.SECONDS);
+      }));
+
+      conn.close();
+   }
+
+   private void produceWithCoreTo(int serveId, final int numMessagesPerNode) throws Exception
{
+
+      String targetUrl = getServerUri(serveId);
+      Connection jmsConn = null;
+      try {
+         org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory coreCf = ActiveMQJMSClient.createConnectionFactory(targetUrl,
"cf" + serveId);
+         jmsConn = coreCf.createConnection();
+         jmsConn.setClientID("theProducer");
+         Session coreSession = jmsConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         TextMessage msg = coreSession.createTextMessage("TEXT");
+         Queue queue = coreSession.createQueue("q");
+         MessageProducer producer = coreSession.createProducer(queue);
+         for (int i = 0; i < numMessagesPerNode; i++) {
+            msg.setIntProperty("MM", i);
+            msg.setIntProperty("SN", serveId);
+            producer.send(msg);
+         }
+      } finally {
+         if (jmsConn != null) {
+            jmsConn.close();
+         }
+      }
+   }
+
+   @Test
    public void testAdvisoriesNotClustered() throws Exception {
 
       setupServer(0, true, true);
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/interop/GeneralInteropTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/interop/GeneralInteropTest.java
index 869f6f2..28715b1 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/interop/GeneralInteropTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/interop/GeneralInteropTest.java
@@ -250,10 +250,51 @@ public class GeneralInteropTest extends BasicOpenWireTest {
       }
    }
 
+
+   @Test
+   public void testReceiveTwiceTheSameCoreMessage() throws Exception {
+
+      final String text = "HelloAgain";
+      sendMultipleTextMessagesUsingCoreJms(queueName, text, 1);
+
+      String urlString = "failover:(tcp://" + OWHOST + ":" + OWPORT + "?wireFormat.MaxInactivityDuration=5000)";
+      ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(urlString);
+      Connection connection = connectionFactory.createConnection();
+      try {
+         connection.setClientID("clientId");
+         connection.start();
+
+         Message message = null;
+         Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+         Queue queue = session.createQueue(queueName);
+         MessageConsumer consumer = session.createConsumer(queue);
+         message = consumer.receive(4000);
+         assertNotNull(message);
+
+         String id1 = message.getJMSMessageID();
+         consumer.close();
+
+         // consume again!
+         consumer = session.createConsumer(queue);
+         message = consumer.receive(4000);
+         assertNotNull(message);
+
+         String id2 = message.getJMSMessageID();
+
+         assertEquals(id1, id2);
+      } finally {
+         if (connection != null) {
+            connection.close();
+         }
+      }
+   }
+
+
    private void sendMultipleTextMessagesUsingCoreJms(String queueName, String text, int num)
throws Exception {
       Connection jmsConn = null;
       try {
          jmsConn = coreCf.createConnection();
+         jmsConn.setClientID("PROD");
          Session session = jmsConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
          Queue queue = session.createQueue(queueName);
          MessageProducer producer = session.createProducer(queue);
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/UUIDGeneratorTest.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/UUIDGeneratorTest.java
index 2eaa419..1ffdfea 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/UUIDGeneratorTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/UUIDGeneratorTest.java
@@ -21,6 +21,8 @@ import org.apache.activemq.artemis.utils.UUIDGenerator;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.HashSet;
+import java.util.Set;
 import java.util.UUID;
 
 public class UUIDGeneratorTest extends ActiveMQTestBase {
@@ -42,6 +44,20 @@ public class UUIDGeneratorTest extends ActiveMQTestBase {
       assertEquals(javaId.toString(), nativeId.toString());
    }
 
+
+   @Test
+   public void testDifferentInTightLoop() throws Exception {
+      UUIDGenerator gen = UUIDGenerator.getInstance();
+
+      final int numIterations = 10000;
+      Set<org.apache.activemq.artemis.utils.UUID> uuidSet = new HashSet<>();
+      for (int i = 0; i < numIterations; i++) {
+         org.apache.activemq.artemis.utils.UUID nativeId = gen.generateUUID();
+         uuidSet.add(nativeId);
+      }
+      assertEquals("All there", numIterations, uuidSet.size());
+   }
+
    @Test
    public void testGetHardwareAddress() throws Exception {
       byte[] bytes = UUIDGenerator.getHardwareAddress();

Mime
View raw message