activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [35/50] [abbrv] activemq-artemis git commit: ARTEMIS-780 Update Shared Queue API to use Address model
Date Fri, 09 Dec 2016 19:49:19 GMT
ARTEMIS-780 Update Shared Queue API to use Address model


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/279383a7
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/279383a7
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/279383a7

Branch: refs/heads/master
Commit: 279383a7985dd9b92ab89584bf7eda7029343be8
Parents: c480351
Author: Martyn Taylor <mtaylor@redhat.com>
Authored: Tue Nov 29 14:11:27 2016 +0000
Committer: Martyn Taylor <mtaylor@redhat.com>
Committed: Fri Dec 9 18:43:15 2016 +0000

----------------------------------------------------------------------
 .../activemq/artemis/api/core/Message.java      |  5 +++
 .../wireformat/CreateSharedQueueMessage_V2.java |  1 -
 .../jms/client/ActiveMQMessageProducer.java     |  4 ++
 .../core/postoffice/impl/LocalQueueBinding.java | 18 ++++++--
 .../core/postoffice/impl/PostOfficeImpl.java    |  5 +++
 .../core/server/impl/ActiveMQServerImpl.java    | 19 +++++++-
 .../artemis/jms/tests/MessageProducerTest.java  | 46 ++++++++++++++++++++
 7 files changed, 92 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/279383a7/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index 1ea9309..80116ed 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -112,6 +112,11 @@ public interface Message {
     */
    SimpleString HDR_VALIDATED_USER = new SimpleString("_AMQ_VALIDATED_USER");
 
+   /**
+    * The Routing Type for this message.  Ensures that this message is only routed to queues
with matching routing type.
+    */
+   SimpleString HDR_ROUTING_TYPE = new SimpleString("_AMQ_ROUTING_TYPE");
+
    byte DEFAULT_TYPE = 0;
 
    byte OBJECT_TYPE = 2;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/279383a7/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage_V2.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage_V2.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage_V2.java
index 40b9cb5..c8bf86e 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage_V2.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage_V2.java
@@ -64,7 +64,6 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage
{
       return buff.toString();
    }
 
-
    @Override
    public void encodeRest(final ActiveMQBuffer buffer) {
       buffer.writeSimpleString(address);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/279383a7/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
index 47d9ff2..aa4754b 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
@@ -41,6 +41,7 @@ import org.apache.activemq.artemis.api.core.client.ClientMessage;
 import org.apache.activemq.artemis.api.core.client.ClientProducer;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
+import org.apache.activemq.artemis.core.message.impl.MessageImpl;
 import org.apache.activemq.artemis.core.server.RoutingType;
 import org.apache.activemq.artemis.utils.UUID;
 import org.apache.activemq.artemis.utils.UUIDGenerator;
@@ -491,6 +492,9 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender,
To
       ClientMessage coreMessage = activeMQJmsMessage.getCoreMessage();
       coreMessage.putStringProperty(ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME, connID);
 
+      byte routingType = destination.isQueue() ? RoutingType.ANYCAST.getType() : RoutingType.MULTICAST.getType();
+      coreMessage.putByteProperty(MessageImpl.HDR_ROUTING_TYPE, routingType);
+
       try {
          /**
           * Using a completionListener requires wrapping using a {@link CompletionListenerWrapper},

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/279383a7/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java
index 30e3768..d02f0f0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java
@@ -18,12 +18,13 @@ package org.apache.activemq.artemis.core.postoffice.impl;
 
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.filter.Filter;
+import org.apache.activemq.artemis.core.message.impl.MessageInternal;
 import org.apache.activemq.artemis.core.postoffice.BindingType;
 import org.apache.activemq.artemis.core.postoffice.QueueBinding;
 import org.apache.activemq.artemis.core.server.Bindable;
-import org.apache.activemq.artemis.core.server.RoutingType;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.RoutingType;
 import org.apache.activemq.artemis.core.server.ServerMessage;
 
 public class LocalQueueBinding implements QueueBinding {
@@ -117,12 +118,23 @@ public class LocalQueueBinding implements QueueBinding {
 
    @Override
    public void route(final ServerMessage message, final RoutingContext context) throws Exception
{
-      queue.route(message, context);
+      if (isMatchRoutingType(message)) {
+         queue.route(message, context);
+      }
    }
 
    @Override
    public void routeWithAck(ServerMessage message, RoutingContext context) throws Exception
{
-      queue.routeWithAck(message, context);
+      if (isMatchRoutingType(message)) {
+         queue.routeWithAck(message, context);
+      }
+   }
+
+   private boolean isMatchRoutingType(ServerMessage message) {
+      if (message.containsProperty(MessageInternal.HDR_ROUTING_TYPE)) {
+         return message.getByteProperty(MessageInternal.HDR_ROUTING_TYPE) == queue.getRoutingType().getType();
+      }
+      return true;
    }
 
    public boolean isQueueBinding() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/279383a7/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index dc73680..2fc3409 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -655,6 +655,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener,
Binding
                               final RoutingContext context,
                               final boolean direct,
                               boolean rejectDuplicates) throws Exception {
+
       RoutingStatus result = RoutingStatus.OK;
       // Sanity check
       if (message.getRefCount() > 0) {
@@ -663,6 +664,10 @@ public class PostOfficeImpl implements PostOffice, NotificationListener,
Binding
 
       SimpleString address = message.getAddress();
 
+      if (address.toString().equals("testQueue")) {
+         System.out.println("f");
+      }
+
       setPagingStore(message);
 
       AtomicBoolean startedTX = new AtomicBoolean(false);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/279383a7/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index f6a0ebd..d6e626c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -1542,7 +1542,10 @@ public class ActiveMQServerImpl implements ActiveMQServer {
    }
 
    @Override
-   public void createSharedQueue(final SimpleString address, RoutingType routingType, final
SimpleString name, final SimpleString filterString,
+   public void createSharedQueue(final SimpleString address,
+                                 RoutingType routingType,
+                                 final SimpleString name,
+                                 final SimpleString filterString,
                                  final SimpleString user,
                                  boolean durable) throws Exception {
       //force the old contract about address
@@ -1558,7 +1561,19 @@ public class ActiveMQServerImpl implements ActiveMQServer {
          }
       }
 
-      final Queue queue = createQueue(address, routingType, name, filterString, user, durable,
!durable, false);
+      final Queue queue = createQueue(address,
+                                      name,
+                                      routingType,
+                                      filterString,
+                                      user,
+                                      durable,
+                                      !durable,
+                                      true,
+                                      !durable,
+                                      false,
+                                      Queue.MAX_CONSUMERS_UNLIMITED,
+                                      false,
+                                      true);
 
       if (!queue.getAddress().equals(address)) {
          throw ActiveMQMessageBundle.BUNDLE.queueSubscriptionBelongsToDifferentAddress(name);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/279383a7/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/MessageProducerTest.java
----------------------------------------------------------------------
diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/MessageProducerTest.java
b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/MessageProducerTest.java
index d001f5b..c5fb964 100644
--- a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/MessageProducerTest.java
+++ b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/MessageProducerTest.java
@@ -25,19 +25,29 @@ import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
+import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
+import javax.jms.Topic;
 import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
+import org.apache.activemq.artemis.core.server.RoutingType;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.jms.tests.message.SimpleJMSMessage;
 import org.apache.activemq.artemis.jms.tests.message.SimpleJMSTextMessage;
 import org.apache.activemq.artemis.jms.tests.util.ProxyAssertSupport;
 import org.junit.Test;
 
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
 public class MessageProducerTest extends JMSTestCase {
 
    @Test
@@ -695,6 +705,42 @@ public class MessageProducerTest extends JMSTestCase {
 
       ProxyAssertSupport.assertTrue(listener.exception instanceof javax.jms.IllegalStateException);
    }
+
+   @Test
+   public void testSendToQueueOnlyWhenTopicWithSameAddress() throws Exception {
+      SimpleString addr = SimpleString.toSimpleString("testAddr");
+
+      Set<RoutingType> supportedRoutingTypes = new HashSet<>();
+      supportedRoutingTypes.add(RoutingType.ANYCAST);
+      supportedRoutingTypes.add(RoutingType.MULTICAST);
+
+      servers.get(0).getActiveMQServer().createAddressInfo(new AddressInfo(addr, supportedRoutingTypes));
+      servers.get(0).getActiveMQServer().createQueue(addr, RoutingType.ANYCAST, addr, null,
false, false);
+
+      Connection pconn = createConnection();
+      pconn.start();
+
+      Session ps = pconn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      Queue queue = ps.createQueue(addr.toString());
+      Topic topic = ps.createTopic(addr.toString());
+
+      MessageConsumer queueConsumer = ps.createConsumer(queue);
+      MessageConsumer topicConsumer = ps.createConsumer(topic);
+
+      MessageProducer queueProducer = ps.createProducer(queue);
+      queueProducer.send(ps.createMessage());
+
+      assertNotNull(queueConsumer.receive(1000));
+      assertNull(topicConsumer.receive(1000));
+
+      MessageProducer topicProducer = ps.createProducer(topic);
+      topicProducer.send(ps.createMessage());
+
+      assertNull(queueConsumer.receive(1000));
+      assertNotNull(topicConsumer.receive(1000));
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------


Mime
View raw message