activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [44/50] [abbrv] activemq-artemis git commit: ARTEMIS-789 Add OpenWire Tests for MULTICAST, ANYCAST
Date Fri, 09 Dec 2016 19:49:28 GMT
ARTEMIS-789 Add OpenWire Tests for MULTICAST,ANYCAST


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/a182a135
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/a182a135
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/a182a135

Branch: refs/heads/master
Commit: a182a135e963c1a4355c3f7d7d72d5ee6b70ed14
Parents: b742a35
Author: Howard Gao <howard.gao@gmail.com>
Authored: Thu Dec 1 20:01:57 2016 +0800
Committer: Martyn Taylor <mtaylor@redhat.com>
Committed: Fri Dec 9 18:43:15 2016 +0000

----------------------------------------------------------------------
 .../protocol/openwire/OpenWireConnection.java   |   6 +-
 .../core/protocol/openwire/amq/AMQConsumer.java |   7 +-
 .../core/protocol/openwire/amq/AMQSession.java  |  10 +-
 .../integration/addressing/MulticastTest.java   | 189 +++++++++++++++++++
 .../integration/openwire/BasicOpenWireTest.java |   9 +-
 5 files changed, 211 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a182a135/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index fb7a364..d6add20 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -35,7 +35,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.advisory.AdvisorySupport;
-import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
@@ -63,6 +62,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.BindingQueryResult;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.RoutingType;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.server.ServerSession;
 import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
@@ -716,13 +716,13 @@ public class OpenWireConnection extends AbstractRemotingConnection implements
Se
          QueueBinding binding = (QueueBinding) server.getPostOffice().getBinding(qName);
          if (binding == null) {
             if (dest.isTemporary()) {
-               internalSession.createQueue(qName, qName, null, dest.isTemporary(), false);
+               internalSession.createQueue(qName, qName, RoutingType.ANYCAST, null, dest.isTemporary(),
false);
             } else {
                ConnectionInfo connInfo = getState().getInfo();
                CheckType checkType = dest.isTemporary() ? CheckType.CREATE_NON_DURABLE_QUEUE
: CheckType.CREATE_DURABLE_QUEUE;
                server.getSecurityStore().check(qName, checkType, this);
                server.checkQueueCreationLimit(getUsername());
-               server.createQueue(qName, ActiveMQDefaultConfiguration.DEFAULT_ROUTING_TYPE,
qName, connInfo == null ? null : SimpleString.toSimpleString(connInfo.getUserName()), true,
false);
+               server.createQueue(qName, RoutingType.ANYCAST, qName, connInfo == null ? null
: SimpleString.toSimpleString(connInfo.getUserName()), true, false);
             }
          }
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a182a135/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index 1803cc8..ef54b59 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -37,6 +37,7 @@ import org.apache.activemq.artemis.core.server.RoutingType;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.command.ConsumerControl;
@@ -130,6 +131,10 @@ public class AMQConsumer {
 
       SimpleString queueName;
 
+      AddressInfo addressInfo = session.getCoreServer().getAddressInfo(address);
+      if (addressInfo != null) {
+         addressInfo.addRoutingType(RoutingType.MULTICAST);
+      }
       if (isDurable) {
          queueName = new SimpleString(org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForDurableSubscription(true,
clientID, subscriptionName));
          QueueQueryResult result = session.getCoreSession().executeQueueQuery(queueName);
@@ -160,7 +165,7 @@ public class AMQConsumer {
       } else {
          queueName = new SimpleString(UUID.randomUUID().toString());
 
-         session.getCoreSession().createQueue(address, queueName, selector, true, false);
+         session.getCoreSession().createQueue(address, queueName, RoutingType.MULTICAST,
selector, true, false);
 
       }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a182a135/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index a92a379..c64374a 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -23,7 +23,6 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
@@ -34,6 +33,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.BindingQueryResult;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.QueueQueryResult;
+import org.apache.activemq.artemis.core.server.RoutingType;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.ServerSession;
@@ -173,7 +173,7 @@ public class AMQSession implements SessionCallback {
 
          if (!queueBinding.isExists()) {
             if (isAutoCreate) {
-               server.createQueue(queueName, ActiveMQDefaultConfiguration.DEFAULT_ROUTING_TYPE,
queueName, null, true, isTemporary);
+               server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, true,
isTemporary);
                connection.addKnownDestination(queueName);
             } else {
                hasQueue = false;
@@ -279,6 +279,7 @@ public class AMQSession implements SessionCallback {
       messageSend.setBrokerInTime(System.currentTimeMillis());
 
       ActiveMQDestination destination = messageSend.getDestination();
+
       ActiveMQDestination[] actualDestinations = null;
       if (destination.isComposite()) {
          actualDestinations = destination.getCompositeDestinations();
@@ -382,6 +383,11 @@ public class AMQSession implements SessionCallback {
             checkAutoCreateQueue(new SimpleString(actualDestinations[i].getPhysicalName()),
actualDestinations[i].isTemporary());
          }
 
+         if (actualDestinations[i].isQueue()) {
+            coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE,
RoutingType.ANYCAST.getType());
+         } else {
+            coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE,
RoutingType.MULTICAST.getType());
+         }
          RoutingStatus result = getCoreSession().send(coreMsg, false, actualDestinations[i].isTemporary());
 
          if (result == RoutingStatus.NO_BINDINGS && actualDestinations[i].isQueue())
{

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a182a135/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/MulticastTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/MulticastTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/MulticastTest.java
new file mode 100644
index 0000000..487a9f9
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/MulticastTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.addressing;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.RoutingType;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.utils.TimeUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+public class MulticastTest extends ActiveMQTestBase {
+
+   private SimpleString baseAddress = new SimpleString("multicast.address");
+
+   private AddressInfo addressInfo;
+
+   private ActiveMQServer server;
+
+   private ClientSessionFactory sessionFactory;
+
+   @Before
+   public void setup() throws Exception {
+      server = createServer(true);
+      server.start();
+
+      server.waitForActivation(10, TimeUnit.SECONDS);
+
+      ServerLocator sl = ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+      sessionFactory = sl.createSessionFactory();
+
+      addSessionFactory(sessionFactory);
+
+      addressInfo = new AddressInfo(baseAddress);
+      addressInfo.addRoutingType(RoutingType.MULTICAST);
+      server.createOrUpdateAddressInfo(addressInfo);
+   }
+
+   @Test
+   public void testTxCommitReceive() throws Exception {
+
+      Queue q1 = server.createQueue(baseAddress, RoutingType.MULTICAST, baseAddress.concat(".1"),
null, true, false, Queue.MAX_CONSUMERS_UNLIMITED, false, true);
+      Queue q2 = server.createQueue(baseAddress, RoutingType.MULTICAST, baseAddress.concat(".2"),
null, true, false, Queue.MAX_CONSUMERS_UNLIMITED, false, true);
+
+      ClientSession session = sessionFactory.createSession(false, false);
+      session.start();
+
+      ClientConsumer consumer1 = session.createConsumer(q1.getName());
+      ClientConsumer consumer2 = session.createConsumer(q2.getName());
+
+      ClientProducer producer = session.createProducer(baseAddress);
+
+      final int num = 10;
+
+      for (int i = 0; i < num; i++) {
+         ClientMessage m = session.createMessage(ClientMessage.TEXT_TYPE, true);
+         m.getBodyBuffer().writeString("AnyCast" + i);
+         producer.send(m);
+      }
+      assertNull(consumer1.receive(200));
+      assertNull(consumer2.receive(200));
+      session.commit();
+
+      assertTrue(TimeUtils.waitOnBoolean(true, 2000, () -> num == q1.getMessageCount()));
+      assertTrue(TimeUtils.waitOnBoolean(true, 2000, () -> num == q2.getMessageCount()));
+
+      ClientConsumer[] consumers = new ClientConsumer[] {consumer1, consumer2};
+      for (int i = 0; i < consumers.length; i++) {
+
+         for (int j = 0; j < num; j++) {
+            ClientMessage m = consumers[i].receive(2000);
+            assertNotNull(m);
+            System.out.println("consumer" + i + " received: " + m.getBodyBuffer().readString());
+         }
+
+         assertNull(consumers[i].receive(200));
+         session.commit();
+
+         assertNull(consumers[i].receive(200));
+      }
+
+      q1.deleteQueue();
+      q2.deleteQueue();
+   }
+
+   @Test
+   public void testTxRollbackReceive() throws Exception {
+
+      Queue q1 = server.createQueue(baseAddress, RoutingType.MULTICAST, baseAddress.concat(".1"),
null, true, false, Queue.MAX_CONSUMERS_UNLIMITED, false, true);
+      Queue q2 = server.createQueue(baseAddress, RoutingType.MULTICAST, baseAddress.concat(".2"),
null, true, false, Queue.MAX_CONSUMERS_UNLIMITED, false, true);
+
+      ClientSession session = sessionFactory.createSession(false, false);
+      session.start();
+
+      ClientConsumer consumer1 = session.createConsumer(q1.getName());
+      ClientConsumer consumer2 = session.createConsumer(q2.getName());
+
+      ClientProducer producer = session.createProducer(baseAddress);
+
+      final int num = 10;
+
+      for (int i = 0; i < num; i++) {
+         ClientMessage m = session.createMessage(ClientMessage.TEXT_TYPE, true);
+         m.getBodyBuffer().writeString("AnyCast" + i);
+         producer.send(m);
+      }
+      assertNull(consumer1.receive(200));
+      assertNull(consumer2.receive(200));
+      session.commit();
+      session.close();
+
+      assertTrue(TimeUtils.waitOnBoolean(true, 2000, () -> num == q1.getMessageCount()));
+      assertTrue(TimeUtils.waitOnBoolean(true, 2000, () -> num == q2.getMessageCount()));
+
+      ClientSession session1 = sessionFactory.createSession(false, false);
+      ClientSession session2 = sessionFactory.createSession(false, false);
+      session1.start();
+      session2.start();
+
+      consumer1 = session1.createConsumer(q1.getName());
+      consumer2 = session2.createConsumer(q2.getName());
+
+      ClientConsumer[] consumers = new ClientConsumer[] {consumer1, consumer2};
+      ClientSession[] sessions = new ClientSession[] {session1, session2};
+      Queue[] queues = new Queue[] {q1, q2};
+
+      for (int i = 0; i < consumers.length; i++) {
+
+         for (int j = 0; j < num; j++) {
+            ClientMessage m = consumers[i].receive(2000);
+            assertNotNull(m);
+            System.out.println("consumer" + i + " received: " + m.getBodyBuffer().readString());
+         }
+
+         assertNull(consumers[i].receive(200));
+         sessions[i].rollback();
+         sessions[i].close();
+
+         sessions[i] = sessionFactory.createSession(false, false);
+         sessions[i].start();
+
+         //receive same after rollback
+         consumers[i] = sessions[i].createConsumer(queues[i].getName());
+
+         for (int j = 0; j < num; j++) {
+            ClientMessage m = consumers[i].receive(2000);
+            assertNotNull(m);
+            System.out.println("consumer" + i + " received: " + m.getBodyBuffer().readString());
+         }
+
+         assertNull(consumers[i].receive(200));
+         sessions[i].commit();
+
+         assertNull(consumers[i].receive(200));
+         sessions[i].close();
+      }
+
+      q1.deleteQueue();
+      q2.deleteQueue();
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a182a135/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java
index 7c40834..e6026c4 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java
@@ -33,6 +33,7 @@ import org.apache.activemq.ActiveMQXAConnectionFactory;
 import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.RoutingType;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.junit.After;
 import org.junit.Before;
@@ -65,15 +66,15 @@ public class BasicOpenWireTest extends OpenWireTestBase {
    public void setUp() throws Exception {
       super.setUp();
       SimpleString coreQueue = new SimpleString(queueName);
-      this.server.createQueue(coreQueue, RoutingType.MULTICAST, coreQueue, null, false, false);
+      this.server.createQueue(coreQueue, RoutingType.ANYCAST, coreQueue, null, false, false);
       testQueues.put(queueName, coreQueue);
 
       SimpleString coreQueue2 = new SimpleString(queueName2);
-      this.server.createQueue(coreQueue2, RoutingType.MULTICAST, coreQueue2, null, false,
false);
+      this.server.createQueue(coreQueue2, RoutingType.ANYCAST, coreQueue2, null, false, false);
       testQueues.put(queueName2, coreQueue2);
 
       SimpleString durableQueue = new SimpleString(durableQueueName);
-      this.server.createQueue(durableQueue, RoutingType.MULTICAST, durableQueue, null, true,
false);
+      this.server.createQueue(durableQueue, RoutingType.ANYCAST, durableQueue, null, true,
false);
       testQueues.put(durableQueueName, durableQueue);
 
       if (!enableSecurity) {
@@ -139,7 +140,7 @@ public class BasicOpenWireTest extends OpenWireTestBase {
       SimpleString coreQ = testQueues.get(qname);
       if (coreQ == null) {
          coreQ = new SimpleString(qname);
-         this.server.createQueue(coreQ, RoutingType.MULTICAST, coreQ, null, false, false);
+         this.server.createQueue(coreQ, RoutingType.ANYCAST, coreQ, null, false, false);
          testQueues.put(qname, coreQ);
       }
    }


Mime
View raw message