activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbert...@apache.org
Subject [2/2] activemq-artemis git commit: ARTEMIS-1558 Message Grouping Openwire Interoperability Issue
Date Fri, 15 Dec 2017 02:31:14 GMT
ARTEMIS-1558 Message Grouping Openwire Interoperability Issue

Openwire message grouping doesn't work because the groupID of
a message is not passed correctly.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/1c156e0d
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/1c156e0d
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/1c156e0d

Branch: refs/heads/master
Commit: 1c156e0d47c164e15fc5a469e2b26252b634a178
Parents: 9fd24f1
Author: Howard Gao <howard.gao@gmail.com>
Authored: Thu Dec 14 13:44:51 2017 +0800
Committer: Justin Bertram <jbertram@apache.org>
Committed: Thu Dec 14 20:30:53 2017 -0600

----------------------------------------------------------------------
 .../openwire/OpenWireMessageConverter.java      |   4 +-
 .../jms/cluster/TemporaryQueueClusterTest.java  |   1 -
 .../openwire/OpenWireGroupingTest.java          | 150 +++++++++++++++++++
 3 files changed, 152 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1c156e0d/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index 2f9fee4..a5bb0f9 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -80,7 +80,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
    private static final String AMQ_MSG_CLUSTER = AMQ_PREFIX + "CLUSTER";
    private static final String AMQ_MSG_COMMAND_ID = AMQ_PREFIX + "COMMAND_ID";
    private static final String AMQ_MSG_DATASTRUCTURE = AMQ_PREFIX + "DATASTRUCTURE";
-   private static final String AMQ_MSG_GROUP_ID = AMQ_PREFIX + "GROUP_ID";
+   private static final String AMQ_MSG_GROUP_ID = org.apache.activemq.artemis.api.core.Message.HDR_GROUP_ID.toString();
    private static final String AMQ_MSG_GROUP_SEQUENCE = AMQ_PREFIX + "GROUP_SEQUENCE";
    private static final String AMQ_MSG_MESSAGE_ID = AMQ_PREFIX + "MESSAGE_ID";
    private static final String AMQ_MSG_ORIG_DESTINATION = AMQ_PREFIX + "ORIG_DESTINATION";
@@ -698,7 +698,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
 
       amqMsg.setDestination(OpenWireUtil.toAMQAddress(coreMessage, actualDestination));
 
-      Object value = coreMessage.getObjectProperty(AMQ_MSG_GROUP_ID);
+      Object value = coreMessage.getGroupID();
       if (value != null) {
          String groupId = value.toString();
          amqMsg.setGroupID(groupId);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1c156e0d/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/TemporaryQueueClusterTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/TemporaryQueueClusterTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/TemporaryQueueClusterTest.java
index ae3a93a..4caf67e 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/TemporaryQueueClusterTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/TemporaryQueueClusterTest.java
@@ -83,7 +83,6 @@ public class TemporaryQueueClusterTest extends JMSClusteredTestBase {
       }
    }
 
-   // TODO: this is broken because temporary queues are no longer created with the "jms.temp-queue"
prefix which means the cluster-connection won't match it
    @Test
    public void testTemporaryQueue() throws Exception {
       jmsServer1.createQueue(false, QUEUE_NAME, null, false, "/queue/target");

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1c156e0d/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireGroupingTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireGroupingTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireGroupingTest.java
new file mode 100644
index 0000000..d85c019
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireGroupingTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.openwire;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class OpenWireGroupingTest extends BasicOpenWireTest {
+
+   //whether to use core send grouping messages
+   private boolean coreSend;
+   //whether to use core receive grouping messages
+   private boolean coreReceive;
+
+   @Parameterized.Parameters(name = "core-send={0} core-receive={1}")
+   public static Collection<Object[]> params() {
+      return Arrays.asList(new Object[][]{{true, true},
+                                          {true, false},
+                                          {false, true},
+                                          {false, false}});
+   }
+
+   public OpenWireGroupingTest(boolean coreSend, boolean coreReceive) {
+      this.coreSend = coreSend;
+      this.coreReceive = coreReceive;
+   }
+
+   @Test
+   public void testGrouping() throws Exception {
+
+      String jmsxgroupID = null;
+
+      ConnectionFactory sendFact = coreSend ? coreCf : factory;
+      ConnectionFactory receiveFact = coreReceive ? coreCf : factory;
+
+      final int num = 10;
+      try (Connection coreConn = sendFact.createConnection()) {
+
+         Session session = coreConn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+         Queue queue = session.createQueue(queueName);
+         MessageProducer producer = session.createProducer(queue);
+
+         for (int j = 0; j < num; j++) {
+            TextMessage message = session.createTextMessage();
+
+            message.setText("Message" + j);
+
+            setProperty(message);
+
+            producer.send(message);
+
+            String prop = message.getStringProperty("JMSXGroupID");
+
+            assertNotNull(prop);
+
+            if (jmsxgroupID != null) {
+               assertEquals(jmsxgroupID, prop);
+            } else {
+               jmsxgroupID = prop;
+            }
+         }
+      }
+      try (Connection connection = receiveFact.createConnection()) {
+
+         Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+         Queue queue = session.createQueue(queueName);
+
+         MessageConsumer consumer1 = session.createConsumer(queue);
+         MessageConsumer consumer2 = session.createConsumer(queue);
+         MessageConsumer consumer3 = session.createConsumer(queue);
+
+         connection.start();
+
+         List<MessageConsumer> otherConsumers = new ArrayList<>();
+         otherConsumers.add(consumer1);
+         otherConsumers.add(consumer2);
+         otherConsumers.add(consumer3);
+
+         //find out which one broker picks up
+         MessageConsumer groupConsumer = null;
+         for (MessageConsumer consumer : otherConsumers) {
+            TextMessage tm = (TextMessage) consumer.receive(2000);
+            if (tm != null) {
+               assertEquals("Message" + 0, tm.getText());
+               otherConsumers.remove(consumer);
+               groupConsumer = consumer;
+               break;
+            }
+         }
+         assertNotNull(groupConsumer);
+
+         //All msgs should go to the group consumer
+         for (int j = 1; j < num; j++) {
+
+            TextMessage tm = (TextMessage) groupConsumer.receive(2000);
+
+            assertNotNull(tm);
+
+            assertEquals("Message" + j, tm.getText());
+
+            assertEquals(tm.getStringProperty("JMSXGroupID"), jmsxgroupID);
+         }
+
+         for (MessageConsumer consumer : otherConsumers) {
+            assertNull(consumer.receive(100));
+         }
+      }
+
+   }
+
+   protected void setProperty(Message message) {
+      if (coreSend) {
+         ((ActiveMQMessage) message).getCoreMessage().putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_GROUP_ID,
new SimpleString("foo"));
+      } else {
+         org.apache.activemq.command.ActiveMQMessage m = (org.apache.activemq.command.ActiveMQMessage)
message;
+         m.setGroupID("foo");
+      }
+   }
+}


Mime
View raw message