activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [2/2] activemq-artemis git commit: [ARTEMIS-1241] check for FQQN to parse out address and queue for auto creation
Date Wed, 17 Jan 2018 18:26:41 GMT
[ARTEMIS-1241] check for FQQN to parse out address and queue for auto creation


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/ea266cd7
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/ea266cd7
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/ea266cd7

Branch: refs/heads/master
Commit: ea266cd74b473bc22ace1cb95336a32286e98a56
Parents: 9e781be
Author: gtully <gary.tully@gmail.com>
Authored: Fri Jan 12 10:26:34 2018 +0000
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Wed Jan 17 13:23:23 2018 -0500

----------------------------------------------------------------------
 .../core/protocol/openwire/amq/AMQSession.java  |  18 ++-
 .../integration/openwire/FQQNOpenWireTest.java  | 151 ++++++++++++++++++-
 2 files changed, 164 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea266cd7/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index bca7eae..b0eb678 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -41,9 +41,11 @@ import org.apache.activemq.artemis.core.server.QueueQueryResult;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.server.ServerSession;
 import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.reader.MessageUtil;
 import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
 import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
+import org.apache.activemq.artemis.utils.CompositeAddress;
 import org.apache.activemq.artemis.utils.IDGenerator;
 import org.apache.activemq.artemis.utils.SimpleIDGenerator;
 import org.apache.activemq.command.ActiveMQDestination;
@@ -197,7 +199,21 @@ public class AMQSession implements SessionCallback {
          try {
             if (!queueBinding.isExists()) {
                if (bindingQuery.isAutoCreateQueues()) {
-                  server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, true,
isTemporary);
+                  SimpleString queueNameToUse = queueName;
+                  SimpleString addressToUse = queueName;
+                  RoutingType routingTypeToUse = RoutingType.ANYCAST;
+                  if (CompositeAddress.isFullyQualified(queueName.toString())) {
+                     CompositeAddress compositeAddress = CompositeAddress.getQueueName(queueName.toString());
+                     addressToUse = new SimpleString(compositeAddress.getAddress());
+                     queueNameToUse = new SimpleString(compositeAddress.getQueueName());
+                     if (bindingQuery.getAddressInfo() != null) {
+                        routingTypeToUse = bindingQuery.getAddressInfo().getRoutingType();
+                     } else {
+                        AddressSettings as = server.getAddressSettingsRepository().getMatch(addressToUse.toString());
+                        routingTypeToUse = as.getDefaultAddressRoutingType();
+                     }
+                  }
+                  server.createQueue(addressToUse, routingTypeToUse, queueNameToUse, null,
true, isTemporary);
                   connection.addKnownDestination(queueName);
                } else {
                   hasQueue = false;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea266cd7/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/FQQNOpenWireTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/FQQNOpenWireTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/FQQNOpenWireTest.java
index ff819ec..1c20ff3 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/FQQNOpenWireTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/FQQNOpenWireTest.java
@@ -41,7 +41,6 @@ import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
 import org.apache.activemq.artemis.core.server.QueueQueryResult;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.utils.CompositeAddress;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -315,15 +314,159 @@ public class FQQNOpenWireTest extends OpenWireTestBase {
    }
 
    @Test
-   @Ignore("need to figure auto bindings creation")
-   public void testVirtualTopicFQQNAutoCreate() throws Exception {
+   public void testVirtualTopicFQQNAutoCreateQueue() throws Exception {
       Connection exConn = null;
 
       SimpleString topic = new SimpleString("VirtualTopic.Orders");
       SimpleString subscriptionQ = new SimpleString("Consumer.A");
 
+      // defaults are false via test setUp
+      this.server.addAddressInfo(new AddressInfo(topic, RoutingType.MULTICAST));
+      this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateQueues(true);
+
+      try {
+         ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory();
+         exFact.setWatchTopicAdvisories(false);
+         exConn = exFact.createConnection();
+         exConn.start();
+
+         Session session = exConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Destination destination = session.createTopic(topic.toString());
+         MessageProducer producer = session.createProducer(destination);
+
+         Destination destinationFQN = session.createQueue(CompositeAddress.toFullQN(topic,
subscriptionQ).toString());
+
+         MessageConsumer messageConsumerA = session.createConsumer(destinationFQN);
+         MessageConsumer messageConsumerB = session.createConsumer(destinationFQN);
+
+         TextMessage message = session.createTextMessage("This is a text message");
+         producer.send(message);
+
+         // only one consumer should get the message
+         TextMessage messageReceivedA = (TextMessage) messageConsumerA.receive(2000);
+         TextMessage messageReceivedB = (TextMessage) messageConsumerB.receive(2000);
+
+         assertTrue((messageReceivedA == null || messageReceivedB == null));
+         String text = messageReceivedA != null ? messageReceivedA.getText() : messageReceivedB.getText();
+         assertEquals("This is a text message", text);
+
+         messageConsumerA.close();
+         messageConsumerB.close();
+
+      } finally {
+         if (exConn != null) {
+            exConn.close();
+         }
+      }
+   }
+
+   @Test
+   public void testVirtualTopicFQQNAutoCreateQAndAddress() throws Exception {
+      Connection exConn = null;
+
+      SimpleString topic = new SimpleString("VirtualTopic.Orders");
+      SimpleString subscriptionQ = new SimpleString("Consumer.A");
+
+      // defaults are false via test setUp
+      this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateQueues(true);
+      this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateAddresses(true);
+
+      try {
+         ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory();
+         exFact.setWatchTopicAdvisories(false);
+         exConn = exFact.createConnection();
+         exConn.start();
+
+         Session session = exConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Destination destination = session.createTopic(topic.toString());
+         MessageProducer producer = session.createProducer(destination);
+
+         Destination destinationFQN = session.createQueue(CompositeAddress.toFullQN(topic,
subscriptionQ).toString());
+
+         MessageConsumer messageConsumerA = session.createConsumer(destinationFQN);
+         MessageConsumer messageConsumerB = session.createConsumer(destinationFQN);
+
+         TextMessage message = session.createTextMessage("This is a text message");
+         producer.send(message);
+
+         // only one consumer should get the message
+         TextMessage messageReceivedA = (TextMessage) messageConsumerA.receive(2000);
+         TextMessage messageReceivedB = (TextMessage) messageConsumerB.receive(2000);
+
+         assertTrue((messageReceivedA == null || messageReceivedB == null));
+         String text = messageReceivedA != null ? messageReceivedA.getText() : messageReceivedB.getText();
+         assertEquals("This is a text message", text);
+
+         messageConsumerA.close();
+         messageConsumerB.close();
+
+      } finally {
+         if (exConn != null) {
+            exConn.close();
+         }
+      }
+   }
+
+   @Test
+   public void testVirtualTopicFQQNConsumerAutoCreateQAndAddress() throws Exception {
+      Connection exConn = null;
+
+      SimpleString topic = new SimpleString("VirtualTopic.Orders");
+      SimpleString subscriptionQ = new SimpleString("Consumer.A");
+
+      // defaults are false via test setUp
       this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateQueues(true);
-      this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setDefaultAddressRoutingType(RoutingType.MULTICAST);
+      this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateAddresses(true);
+
+      try {
+         ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory();
+         exFact.setWatchTopicAdvisories(false);
+         exConn = exFact.createConnection();
+         exConn.start();
+
+         Session session = exConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Destination destination = session.createTopic(topic.toString());
+         Destination destinationFQN = session.createQueue(CompositeAddress.toFullQN(topic,
subscriptionQ).toString());
+
+         MessageConsumer messageConsumerA = session.createConsumer(destinationFQN);
+         MessageConsumer messageConsumerB = session.createConsumer(destinationFQN);
+
+         MessageProducer producer = session.createProducer(destination);
+         TextMessage message = session.createTextMessage("This is a text message");
+         producer.send(message);
+
+         // only one consumer should get the message
+         TextMessage messageReceivedA = (TextMessage) messageConsumerA.receive(2000);
+         TextMessage messageReceivedB = (TextMessage) messageConsumerB.receive(2000);
+
+         assertTrue((messageReceivedA == null || messageReceivedB == null));
+         String text = messageReceivedA != null ? messageReceivedA.getText() : messageReceivedB.getText();
+         assertEquals("This is a text message", text);
+
+         messageConsumerA.close();
+         messageConsumerB.close();
+
+      } finally {
+         if (exConn != null) {
+            exConn.close();
+         }
+      }
+   }
+
+   @Test
+   public void testVirtualTopicFQQNAutoCreateQWithExistingAddressWithAnyCastDefault() throws
Exception {
+      Connection exConn = null;
+
+      SimpleString topic = new SimpleString("VirtualTopic.Orders");
+      SimpleString subscriptionQ = new SimpleString("Consumer.A");
+
+      // defaults are false via test setUp
+      this.server.addAddressInfo(new AddressInfo(topic, RoutingType.MULTICAST));
+      this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateQueues(true);
+      this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateAddresses(false);
+
+      // set default to anycast which would fail if used in queue auto creation
+      this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setDefaultAddressRoutingType(RoutingType.ANYCAST);
 
       try {
          ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory();


Mime
View raw message