activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [1/2] activemq-artemis git commit: ARTEMIS-1352 auto-create MULTICAST queue when AMQP client sends to topic
Date Fri, 29 Sep 2017 01:40:29 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 6fc51b10d -> 3035a57e4


ARTEMIS-1352 auto-create MULTICAST queue when AMQP client sends to topic


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/d1cd1e71
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/d1cd1e71
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/d1cd1e71

Branch: refs/heads/master
Commit: d1cd1e71a198927c5180a1a032d6796e82b86da8
Parents: 6fc51b1
Author: Justin Bertram <jbertram@apache.org>
Authored: Wed Sep 6 09:07:21 2017 -0500
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Thu Sep 28 21:40:18 2017 -0400

----------------------------------------------------------------------
 .../amqp/broker/AMQPSessionCallback.java        | 23 ++++++++++-----
 .../proton/ProtonServerReceiverContext.java     | 15 ++++++----
 .../integration/amqp/AmqpMaxFrameSizeTest.java  |  3 ++
 .../tests/integration/client/ConsumerTest.java  | 30 ++++++++++++++++++++
 4 files changed, 58 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d1cd1e71/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 6807ada..21afbf9 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -21,6 +21,7 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
 import org.apache.activemq.artemis.api.core.Message;
@@ -276,15 +277,23 @@ public class AMQPSessionCallback implements SessionCallback {
       return queueQueryResult;
    }
 
-   public boolean bindingQuery(String address) throws Exception {
-      BindingQueryResult bindingQueryResult = serverSession.executeBindingQuery(SimpleString.toSimpleString(address));
-      if (!bindingQueryResult.isExists() && bindingQueryResult.isAutoCreateQueues())
{
+   public boolean bindingQuery(String address, RoutingType routingType) throws Exception
{
+      SimpleString simpleAddress = SimpleString.toSimpleString(address);
+      BindingQueryResult bindingQueryResult = serverSession.executeBindingQuery(simpleAddress);
+      if (routingType == RoutingType.MULTICAST && !bindingQueryResult.isExists()
&& bindingQueryResult.isAutoCreateAddresses()) {
          try {
-            serverSession.createQueue(new SimpleString(address), new SimpleString(address),
RoutingType.ANYCAST, null, false, true);
+            serverSession.createAddress(simpleAddress, routingType, true);
+         } catch (ActiveMQAddressExistsException e) {
+            // The address may have been created by another thread in the mean time.  Catch
and do nothing.
+         }
+         bindingQueryResult = serverSession.executeBindingQuery(simpleAddress);
+      } else if (routingType == RoutingType.ANYCAST && !bindingQueryResult.isExists()
&& bindingQueryResult.isAutoCreateQueues()) {
+         try {
+            serverSession.createQueue(simpleAddress, simpleAddress, routingType, null, false,
true);
          } catch (ActiveMQQueueExistsException e) {
             // The queue may have been created by another thread in the mean time.  Catch
and do nothing.
          }
-         bindingQueryResult = serverSession.executeBindingQuery(SimpleString.toSimpleString(address));
+         bindingQueryResult = serverSession.executeBindingQuery(simpleAddress);
       }
       return bindingQueryResult.isExists();
    }
@@ -406,7 +415,7 @@ public class AMQPSessionCallback implements SessionCallback {
             return;
          }
 
-         if (!bindingQuery(message.getAddress().toString())) {
+         if (!bindingQuery(message.getAddress().toString(), RoutingType.ANYCAST)) {
             throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist();
          }
       }
@@ -660,7 +669,7 @@ public class AMQPSessionCallback implements SessionCallback {
    }
 
    public RoutingType getDefaultRoutingType(String address) {
-      return manager.getServer().getAddressSettingsRepository().getMatch(address).getDefaultQueueRoutingType();
+      return manager.getServer().getAddressSettingsRepository().getMatch(address).getDefaultAddressRoutingType();
    }
 
    public void check(SimpleString address, CheckType checkType, SecurityAuth session) throws
Exception {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d1cd1e71/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
index 2feb8da..eee35a6 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
@@ -119,12 +119,13 @@ public class ProtonServerReceiverContext extends ProtonInitializable
implements
 
             if (address != null && !address.isEmpty()) {
                try {
-                  if (!sessionSPI.bindingQuery(address)) {
+                  if (!sessionSPI.bindingQuery(address, getRoutingType(target.getCapabilities())))
{
                      throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist();
                   }
                } catch (ActiveMQAMQPNotFoundException e) {
                   throw e;
                } catch (Exception e) {
+                  e.printStackTrace();
                   throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
                }
 
@@ -177,11 +178,13 @@ public class ProtonServerReceiverContext extends ProtonInitializable
implements
    }
 
    private RoutingType getRoutingType(Symbol[] symbols) {
-      for (Symbol symbol : symbols) {
-         if (AmqpSupport.TEMP_TOPIC_CAPABILITY.equals(symbol) || AmqpSupport.TOPIC_CAPABILITY.equals(symbol))
{
-            return RoutingType.MULTICAST;
-         } else if (AmqpSupport.TEMP_QUEUE_CAPABILITY.equals(symbol) || AmqpSupport.QUEUE_CAPABILITY.equals(symbol))
{
-            return RoutingType.ANYCAST;
+      if (symbols != null) {
+         for (Symbol symbol : symbols) {
+            if (AmqpSupport.TEMP_TOPIC_CAPABILITY.equals(symbol) || AmqpSupport.TOPIC_CAPABILITY.equals(symbol))
{
+               return RoutingType.MULTICAST;
+            } else if (AmqpSupport.TEMP_QUEUE_CAPABILITY.equals(symbol) || AmqpSupport.QUEUE_CAPABILITY.equals(symbol))
{
+               return RoutingType.ANYCAST;
+            }
          }
       }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d1cd1e71/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMaxFrameSizeTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMaxFrameSizeTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMaxFrameSizeTest.java
index 778cd40..c6e1008 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMaxFrameSizeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMaxFrameSizeTest.java
@@ -19,6 +19,8 @@ package org.apache.activemq.artemis.tests.integration.amqp;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.transport.amqp.client.AmqpClient;
 import org.apache.activemq.transport.amqp.client.AmqpConnection;
 import org.apache.activemq.transport.amqp.client.AmqpMessage;
@@ -40,6 +42,7 @@ public class AmqpMaxFrameSizeTest extends AmqpClientTestSupport {
 
    @Test(timeout = 60000)
    public void testMultipleTransfers() throws Exception {
+      server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));
 
       String testQueueName = "ConnectionFrameSize";
       int nMsgs = 200;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d1cd1e71/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
index 69c7f68..fc474aa 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
@@ -231,6 +231,36 @@ public class ConsumerTest extends ActiveMQTestBase {
    }
 
    @Test
+   public void testAutoCreateMulticastAddress() throws Throwable {
+      if (!isNetty()) {
+         // no need to run the test, there's no AMQP support
+         return;
+      }
+
+      assertNull(server.getAddressInfo(SimpleString.toSimpleString("topic")));
+
+      ConnectionFactory factorySend = createFactory(2);
+      Connection connection = factorySend.createConnection();
+
+      try {
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         javax.jms.Topic topic = session.createTopic("topic");
+         MessageProducer producer = session.createProducer(topic);
+         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+         TextMessage msg = session.createTextMessage("hello");
+         msg.setIntProperty("mycount", 0);
+         producer.send(msg);
+      } finally {
+         connection.close();
+      }
+
+      assertNotNull(server.getAddressInfo(SimpleString.toSimpleString("topic")));
+      assertEquals(RoutingType.MULTICAST, server.getAddressInfo(SimpleString.toSimpleString("topic")).getRoutingType());
+      assertEquals(0, server.getTotalMessageCount());
+   }
+
+   @Test
    public void testSendCoreReceiveAMQP() throws Throwable {
 
       if (!isNetty()) {


Mime
View raw message