activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [33/44] activemq-artemis git commit: open wire changes equivalent to ab16f7098fb52d2b4c40627ed110e1776525f208
Date Tue, 23 Feb 2016 19:39:29 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f76f4d1f/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDLQTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDLQTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDLQTest.java
deleted file mode 100644
index d05a5c7..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDLQTest.java
+++ /dev/null
@@ -1,433 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.broker.virtual;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import junit.framework.TestCase;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.ActiveMQMessageProducer;
-import org.apache.activemq.ActiveMQSession;
-import org.apache.activemq.RedeliveryPolicy;
-import org.apache.activemq.broker.BrokerFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.Queue;
-import org.apache.activemq.broker.region.RegionBroker;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Unit test for virtual topics and DLQ messaging. See individual test for more
- * detail
- */
-public class VirtualTopicDLQTest extends TestCase {
-
-   private static BrokerService broker;
-
-   private static final Logger LOG = LoggerFactory.getLogger(VirtualTopicDLQTest.class);
-
-   static final String jmsConnectionURI = "failover:(vm://localhost)";
-
-   // Virtual Topic that the test publishes 10 messages to
-   private static final String virtualTopicName = "VirtualTopic.Test";
-
-   // Queues that receive all the messages send to the virtual topic
-   private static final String consumer1Prefix = "Consumer.A.";
-   private static final String consumer2Prefix = "Consumer.B.";
-   private static final String consumer3Prefix = "Consumer.C.";
-
-   // Expected Individual Dead Letter Queue names that are tied to the
-   // Subscriber Queues
-   private static final String dlqPrefix = "ActiveMQ.DLQ.Topic.";
-
-   // Number of messages
-   private static final int numberMessages = 6;
-
-   @Override
-   @Before
-   public void setUp() throws Exception {
-      try {
-         broker = BrokerFactory.createBroker("xbean:org/apache/activemq/broker/virtual/virtual-individual-dlq.xml", true);
-         broker.start();
-         broker.waitUntilStarted();
-      }
-      catch (Exception e) {
-         e.printStackTrace();
-         throw e;
-      }
-   }
-
-   @Override
-   @After
-   public void tearDown() throws Exception {
-      try {
-         // Purge the DLQ's so counts are correct for next run
-         purgeDestination(dlqPrefix + consumer1Prefix + virtualTopicName);
-         purgeDestination(dlqPrefix + consumer2Prefix + virtualTopicName);
-         purgeDestination(dlqPrefix + consumer3Prefix + virtualTopicName);
-      }
-      catch (Exception e) {
-         e.printStackTrace();
-      }
-
-      if (broker != null) {
-         broker.stop();
-         broker.waitUntilStopped();
-         broker = null;
-      }
-   }
-
-   /*
-    * This test verifies that all undelivered messages sent to a consumers
-    * listening on a queue associated with a virtual topic with be forwarded to
-    * separate DLQ's.
-    *
-    * Note that the broker config, deadLetterStrategy need to have the enable
-    * audit set to false so that duplicate message sent from a topic to
-    * individual consumers are forwarded to the DLQ
-    *
-    * <deadLetterStrategy> <bean
-    * xmlns="http://www.springframework.org/schema/beans"
-    * class="org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy"
-    * > <property name="useQueueForQueueMessages" value="true"></property>
-    * <property name="processNonPersistent" value="true"></property> <property
-    * name="processExpired" value="false"></property> <property
-    * name="enableAudit" value="false"></property>
-    *
-    * </bean> </deadLetterStrategy>
-    */
-   @Test
-   public void testVirtualTopicSubscriberDeadLetterQueue() throws Exception {
-
-      TestConsumer consumer1 = null;
-      TestConsumer consumer2 = null;
-      TestConsumer consumer3 = null;
-      TestConsumer dlqConsumer1 = null;
-      TestConsumer dlqConsumer2 = null;
-      TestConsumer dlqConsumer3 = null;
-
-      try {
-
-         // The first 2 consumers will rollback, ultimately causing messages
-         // to land on the DLQ
-         consumer1 = new TestConsumer(consumer1Prefix + virtualTopicName, false, numberMessages, true);
-         thread(consumer1, false);
-
-         consumer2 = new TestConsumer(consumer2Prefix + virtualTopicName, false, numberMessages, true);
-         thread(consumer2, false);
-
-         // TestConsumer that does not throw exceptions, messages should not
-         // land on DLQ
-         consumer3 = new TestConsumer(consumer3Prefix + virtualTopicName, false, numberMessages, false);
-         thread(consumer3, false);
-
-         // TestConsumer to read the expected Dead Letter Queue
-         dlqConsumer1 = new TestConsumer(dlqPrefix + consumer1Prefix + virtualTopicName, false, numberMessages, false);
-         thread(dlqConsumer1, false);
-
-         dlqConsumer2 = new TestConsumer(dlqPrefix + consumer2Prefix + virtualTopicName, false, numberMessages, false);
-         thread(dlqConsumer2, false);
-
-         dlqConsumer3 = new TestConsumer(dlqPrefix + consumer3Prefix + virtualTopicName, false, numberMessages, false);
-         thread(dlqConsumer3, false);
-
-         // Give the consumers a second to start
-         Thread.sleep(1000);
-
-         // Start the producer
-         TestProducer producer = new TestProducer(virtualTopicName, true, numberMessages);
-         thread(producer, false);
-
-         assertTrue("sent all producer messages in time, count is: " + producer.getLatch().getCount(), producer.getLatch().await(10, TimeUnit.SECONDS));
-         LOG.info("producer successful, count = " + producer.getLatch().getCount());
-
-         assertTrue("remaining consumer1 count should be zero, is: " + consumer1.getLatch().getCount(), consumer1.getLatch().await(10, TimeUnit.SECONDS));
-         LOG.info("consumer1 successful, count = " + consumer1.getLatch().getCount());
-
-         assertTrue("remaining consumer2 count should be zero, is: " + consumer2.getLatch().getCount(), consumer2.getLatch().await(10, TimeUnit.SECONDS));
-         LOG.info("consumer2 successful, count = " + consumer2.getLatch().getCount());
-
-         assertTrue("remaining consumer3 count should be zero, is: " + consumer3.getLatch().getCount(), consumer3.getLatch().await(10, TimeUnit.SECONDS));
-         LOG.info("consumer3 successful, count = " + consumer3.getLatch().getCount());
-
-         assertTrue("remaining dlqConsumer1 count should be zero, is: " + dlqConsumer1.getLatch().getCount(), dlqConsumer1.getLatch().await(10, TimeUnit.SECONDS));
-         LOG.info("dlqConsumer1 successful, count = " + dlqConsumer1.getLatch().getCount());
-
-         assertTrue("remaining dlqConsumer2 count should be zero, is: " + dlqConsumer2.getLatch().getCount(), dlqConsumer2.getLatch().await(10, TimeUnit.SECONDS));
-         LOG.info("dlqConsumer2 successful, count = " + dlqConsumer2.getLatch().getCount());
-
-         assertTrue("remaining dlqConsumer3 count should be " + numberMessages + ", is: " + dlqConsumer3.getLatch().getCount(), dlqConsumer3.getLatch().getCount() == numberMessages);
-         LOG.info("dlqConsumer2 successful, count = " + dlqConsumer2.getLatch().getCount());
-
-      }
-      catch (Exception e) {
-         e.printStackTrace();
-         throw e;
-      }
-      finally {
-         // Tell consumers to stop (don't read any more messages after this)
-         if (consumer1 != null)
-            consumer1.setStop(true);
-         if (consumer2 != null)
-            consumer2.setStop(true);
-         if (consumer3 != null)
-            consumer3.setStop(true);
-         if (dlqConsumer1 != null)
-            dlqConsumer1.setStop(true);
-         if (dlqConsumer2 != null)
-            dlqConsumer2.setStop(true);
-         if (dlqConsumer3 != null)
-            dlqConsumer3.setStop(true);
-      }
-   }
-
-   private static Thread thread(Runnable runnable, boolean daemon) {
-      Thread brokerThread = new Thread(runnable);
-      brokerThread.setDaemon(daemon);
-      brokerThread.start();
-      return brokerThread;
-   }
-
-   private class TestProducer implements Runnable {
-
-      private String destinationName = null;
-      private boolean isTopic = true;
-      private int numberMessages = 0;
-      private CountDownLatch latch = null;
-
-      public TestProducer(String destinationName, boolean isTopic, int numberMessages) {
-         this.destinationName = destinationName;
-         this.isTopic = isTopic;
-         this.numberMessages = numberMessages;
-         latch = new CountDownLatch(numberMessages);
-      }
-
-      public CountDownLatch getLatch() {
-         return latch;
-      }
-
-      @Override
-      public void run() {
-         ActiveMQConnectionFactory connectionFactory = null;
-         ActiveMQConnection connection = null;
-         ActiveMQSession session = null;
-         Destination destination = null;
-
-         try {
-            LOG.info("Started TestProducer for destination (" + destinationName + ")");
-
-            connectionFactory = new ActiveMQConnectionFactory(jmsConnectionURI);
-            connection = (ActiveMQConnection) connectionFactory.createConnection();
-            connection.start();
-            session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-            if (isTopic) {
-               destination = session.createTopic(this.destinationName);
-            }
-            else {
-               destination = session.createQueue(this.destinationName);
-            }
-
-            // Create a MessageProducer from the Session to the Topic or
-            // Queue
-            ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(destination);
-            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-
-            for (int i = 0; i < numberMessages; i++) {
-               TextMessage message = session.createTextMessage("I am a message :: " + String.valueOf(i));
-               try {
-                  producer.send(message);
-
-               }
-               catch (Exception deeperException) {
-                  LOG.info("Producer for destination (" + destinationName + ") Caught: " + deeperException);
-               }
-
-               latch.countDown();
-               Thread.sleep(1000);
-            }
-
-            LOG.info("Finished TestProducer for destination (" + destinationName + ")");
-
-         }
-         catch (Exception e) {
-            LOG.error("Terminating TestProducer(" + destinationName + ")Caught: " + e);
-            e.printStackTrace();
-
-         }
-         finally {
-            try {
-               // Clean up
-               if (session != null)
-                  session.close();
-               if (connection != null)
-                  connection.close();
-
-            }
-            catch (Exception e) {
-               e.printStackTrace();
-               LOG.error("Closing connection/session (" + destinationName + ")Caught: " + e);
-            }
-         }
-      }
-   }
-
-   private class TestConsumer implements Runnable, ExceptionListener, MessageListener {
-
-      private String destinationName = null;
-      private boolean isTopic = true;
-      private CountDownLatch latch = null;
-      private int maxRedeliveries = 0;
-      private int receivedMessageCounter = 0;
-      private boolean bFakeFail = false;
-      private boolean bStop = false;
-
-      private ActiveMQConnectionFactory connectionFactory = null;
-      private ActiveMQConnection connection = null;
-      private Session session = null;
-      private MessageConsumer consumer = null;
-
-      public TestConsumer(String destinationName, boolean isTopic, int expectedNumberMessages, boolean bFakeFail) {
-         this.destinationName = destinationName;
-         this.isTopic = isTopic;
-         latch = new CountDownLatch(expectedNumberMessages * (this.bFakeFail ? (maxRedeliveries + 1) : 1));
-         this.bFakeFail = bFakeFail;
-      }
-
-      public CountDownLatch getLatch() {
-         return latch;
-      }
-
-      @Override
-      public void run() {
-
-         try {
-            LOG.info("Started TestConsumer for destination (" + destinationName + ")");
-
-            connectionFactory = new ActiveMQConnectionFactory(jmsConnectionURI);
-            connection = (ActiveMQConnection) connectionFactory.createConnection();
-            connection.start();
-            session = connection.createSession(true, Session.SESSION_TRANSACTED);
-
-            RedeliveryPolicy policy = connection.getRedeliveryPolicy();
-            policy.setInitialRedeliveryDelay(1);
-            policy.setUseExponentialBackOff(false);
-            policy.setMaximumRedeliveries(maxRedeliveries);
-
-            connection.setExceptionListener(this);
-
-            Destination destination = null;
-            if (isTopic) {
-               destination = session.createTopic(destinationName);
-            }
-            else {
-               destination = session.createQueue(destinationName);
-            }
-
-            consumer = session.createConsumer(destination);
-            consumer.setMessageListener(this);
-
-            while (!bStop) {
-               Thread.sleep(100);
-            }
-
-            LOG.info("Finished TestConsumer for destination name (" + destinationName + ") remaining " + this.latch.getCount() + " messages " + this.toString());
-
-         }
-         catch (Exception e) {
-            LOG.error("Consumer (" + destinationName + ") Caught: " + e);
-            e.printStackTrace();
-         }
-         finally {
-            try {
-               // Clean up
-               if (consumer != null)
-                  consumer.close();
-               if (session != null)
-                  session.close();
-               if (connection != null)
-                  connection.close();
-
-            }
-            catch (Exception e) {
-               e.printStackTrace();
-               LOG.error("Closing connection/session (" + destinationName + ")Caught: " + e);
-            }
-         }
-      }
-
-      @Override
-      public synchronized void onException(JMSException ex) {
-         ex.printStackTrace();
-         LOG.error("Consumer for destination, (" + destinationName + "), JMS Exception occurred.  Shutting down client.");
-      }
-
-      public synchronized void setStop(boolean bStop) {
-         this.bStop = bStop;
-      }
-
-      @Override
-      public synchronized void onMessage(Message message) {
-         receivedMessageCounter++;
-         latch.countDown();
-
-         LOG.info("Consumer for destination (" + destinationName + ") latch countdown: " + latch.getCount() + " :: Number messages received " + this.receivedMessageCounter);
-
-         try {
-            LOG.info("Consumer for destination (" + destinationName + ") Received message id :: " + message.getJMSMessageID());
-
-            if (!bFakeFail) {
-               LOG.info("Consumer on destination " + destinationName + " committing JMS Session for message: " + message.toString());
-               session.commit();
-            }
-            else {
-               LOG.info("Consumer on destination " + destinationName + " rolling back JMS Session for message: " + message.toString());
-               session.rollback(); // rolls back all the consumed messages
-               // on the session to
-            }
-
-         }
-         catch (JMSException ex) {
-            ex.printStackTrace();
-            LOG.error("Error reading JMS Message from destination " + destinationName + ".");
-         }
-      }
-   }
-
-   private static void purgeDestination(String destination) throws Exception {
-      final Queue dest = (Queue) ((RegionBroker) broker.getRegionBroker()).getQueueRegion().getDestinationMap().get(new ActiveMQQueue(destination));
-      dest.purge();
-      assertEquals(0, dest.getDestinationStatistics().getMessages().getCount());
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f76f4d1f/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDisconnectSelectorTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDisconnectSelectorTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDisconnectSelectorTest.java
deleted file mode 100644
index 925b82c..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDisconnectSelectorTest.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.broker.virtual;
-
-import java.net.URI;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.EmbeddedBrokerTestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.spring.ConsumerBean;
-import org.apache.activemq.xbean.XBeanBrokerFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Test case for  https://issues.apache.org/jira/browse/AMQ-3004
- */
-
-public class VirtualTopicDisconnectSelectorTest extends EmbeddedBrokerTestSupport {
-
-   private static final Logger LOG = LoggerFactory.getLogger(VirtualTopicDisconnectSelectorTest.class);
-   protected Connection connection;
-
-   public void testVirtualTopicSelectorDisconnect() throws Exception {
-      testVirtualTopicDisconnect("odd = 'no'", 3000, 1500);
-   }
-
-   public void testVirtualTopicNoSelectorDisconnect() throws Exception {
-      testVirtualTopicDisconnect(null, 3000, 3000);
-   }
-
-   public void testVirtualTopicDisconnect(String messageSelector, int total, int expected) throws Exception {
-      if (connection == null) {
-         connection = createConnection();
-      }
-      connection.start();
-
-      final ConsumerBean messageList = new ConsumerBean();
-
-      Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
-      Destination producerDestination = getProducerDestination();
-      Destination destination = getConsumerDsetination();
-
-      LOG.info("Sending to: " + producerDestination);
-      LOG.info("Consuming from: " + destination);
-
-      MessageConsumer consumer = createConsumer(session, destination, messageSelector);
-
-      MessageListener listener = new MessageListener() {
-         @Override
-         public void onMessage(Message message) {
-            messageList.onMessage(message);
-            try {
-               message.acknowledge();
-            }
-            catch (JMSException e) {
-               e.printStackTrace();
-            }
-         }
-      };
-
-      consumer.setMessageListener(listener);
-
-      // create topic producer
-      MessageProducer producer = session.createProducer(producerDestination);
-      assertNotNull(producer);
-
-      int disconnectCount = total / 3;
-      int reconnectCount = (total * 2) / 3;
-
-      for (int i = 0; i < total; i++) {
-         producer.send(createMessage(session, i));
-
-         if (i == disconnectCount) {
-            consumer.close();
-         }
-         if (i == reconnectCount) {
-            consumer = createConsumer(session, destination, messageSelector);
-            consumer.setMessageListener(listener);
-         }
-      }
-
-      assertMessagesArrived(messageList, expected, 10000);
-   }
-
-   protected Destination getConsumerDsetination() {
-      return new ActiveMQQueue("Consumer.VirtualTopic.TEST");
-   }
-
-   protected Destination getProducerDestination() {
-      return new ActiveMQTopic("VirtualTopic.TEST");
-   }
-
-   @Override
-   protected void setUp() throws Exception {
-      super.setUp();
-   }
-
-   protected MessageConsumer createConsumer(Session session,
-                                            Destination destination,
-                                            String messageSelector) throws JMSException {
-      if (messageSelector != null) {
-         return session.createConsumer(destination, messageSelector);
-      }
-      else {
-         return session.createConsumer(destination);
-      }
-   }
-
-   protected TextMessage createMessage(Session session, int i) throws JMSException {
-      TextMessage textMessage = session.createTextMessage("message: " + i);
-      if (i % 2 != 0) {
-         textMessage.setStringProperty("odd", "yes");
-      }
-      else {
-         textMessage.setStringProperty("odd", "no");
-      }
-      textMessage.setIntProperty("i", i);
-      return textMessage;
-   }
-
-   protected void assertMessagesArrived(ConsumerBean messageList, int expected, long timeout) {
-      messageList.assertMessagesArrived(expected, timeout);
-
-      messageList.flushMessages();
-
-      LOG.info("validate no other messages on queues");
-      try {
-         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-         Destination destination1 = getConsumerDsetination();
-
-         MessageConsumer c1 = session.createConsumer(destination1, null);
-         c1.setMessageListener(messageList);
-
-         LOG.info("send one simple message that should go to both consumers");
-         MessageProducer producer = session.createProducer(getProducerDestination());
-         assertNotNull(producer);
-
-         producer.send(session.createTextMessage("Last Message"));
-
-         messageList.assertMessagesArrived(1);
-
-      }
-      catch (JMSException e) {
-         e.printStackTrace();
-         fail("unexpeced ex while waiting for last messages: " + e);
-      }
-   }
-
-   protected String getBrokerConfigUri() {
-      return "org/apache/activemq/broker/virtual/disconnected-selector.xml";
-   }
-
-   @Override
-   protected BrokerService createBroker() throws Exception {
-      XBeanBrokerFactory factory = new XBeanBrokerFactory();
-      BrokerService answer = factory.createBroker(new URI(getBrokerConfigUri()));
-      return answer;
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f76f4d1f/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicPubSubTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicPubSubTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicPubSubTest.java
deleted file mode 100644
index 0f2af0a..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicPubSubTest.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.broker.virtual;
-
-import java.util.Vector;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import junit.framework.Test;
-
-import org.apache.activemq.EmbeddedBrokerTestSupport;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.spring.ConsumerBean;
-
-/**
- *
- *
- */
-public class VirtualTopicPubSubTest extends EmbeddedBrokerTestSupport {
-
-   private Vector<Connection> connections = new Vector<>();
-   public int ackMode = Session.AUTO_ACKNOWLEDGE;
-
-   public static Test suite() {
-      return suite(VirtualTopicPubSubTest.class);
-   }
-
-   public void initCombosForTestVirtualTopicCreation() {
-      addCombinationValues("ackMode", new Object[]{new Integer(Session.AUTO_ACKNOWLEDGE), new Integer(Session.CLIENT_ACKNOWLEDGE)});
-   }
-
-   private boolean doneTwice = false;
-
-   public void testVirtualTopicCreation() throws Exception {
-      doTestVirtualTopicCreation(10);
-   }
-
-   public void doTestVirtualTopicCreation(int total) throws Exception {
-
-      ConsumerBean messageList = new ConsumerBean() {
-         @Override
-         public synchronized void onMessage(Message message) {
-            super.onMessage(message);
-            if (ackMode == Session.CLIENT_ACKNOWLEDGE) {
-               try {
-                  message.acknowledge();
-               }
-               catch (JMSException e) {
-                  e.printStackTrace();
-               }
-            }
-
-         }
-      };
-      messageList.setVerbose(true);
-
-      String queueAName = getVirtualTopicConsumerName();
-      // create consumer 'cluster'
-      ActiveMQQueue queue1 = new ActiveMQQueue(queueAName);
-      ActiveMQQueue queue2 = new ActiveMQQueue(queueAName);
-
-      Session session = createStartAndTrackConnection().createSession(false, ackMode);
-      MessageConsumer c1 = session.createConsumer(queue1);
-
-      session = createStartAndTrackConnection().createSession(false, ackMode);
-      MessageConsumer c2 = session.createConsumer(queue2);
-
-      c1.setMessageListener(messageList);
-      c2.setMessageListener(messageList);
-
-      // create topic producer
-      Session producerSession = createStartAndTrackConnection().createSession(false, ackMode);
-      MessageProducer producer = producerSession.createProducer(new ActiveMQTopic(getVirtualTopicName()));
-      assertNotNull(producer);
-
-      for (int i = 0; i < total; i++) {
-         producer.send(producerSession.createTextMessage("message: " + i));
-      }
-
-      messageList.assertMessagesArrived(total);
-
-      // do twice so we confirm messages do not get redelivered after client acknowledgement
-      if (doneTwice == false) {
-         doneTwice = true;
-         doTestVirtualTopicCreation(0);
-      }
-   }
-
-   private Connection createStartAndTrackConnection() throws Exception {
-      Connection connection = createConnection();
-      connection.start();
-      connections.add(connection);
-      return connection;
-   }
-
-   protected String getVirtualTopicName() {
-      return "VirtualTopic.TEST";
-   }
-
-   protected String getVirtualTopicConsumerName() {
-      return "Consumer.A.VirtualTopic.TEST";
-   }
-
-   @Override
-   protected void tearDown() throws Exception {
-      for (Connection connection : connections) {
-         connection.close();
-      }
-      super.tearDown();
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f76f4d1f/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicPubSubUsingXBeanTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicPubSubUsingXBeanTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicPubSubUsingXBeanTest.java
deleted file mode 100644
index 1d7ea71..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicPubSubUsingXBeanTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.broker.virtual;
-
-import java.net.URI;
-
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.xbean.XBeanBrokerFactory;
-
-/**
- *
- *
- */
-public class VirtualTopicPubSubUsingXBeanTest extends VirtualTopicPubSubTest {
-
-   @Override
-   protected String getVirtualTopicConsumerName() {
-      return "VirtualTopicConsumers.ConsumerNumberOne.FOO";
-   }
-
-   @Override
-   protected String getVirtualTopicName() {
-      return "FOO";
-   }
-
-   @Override
-   protected BrokerService createBroker() throws Exception {
-      XBeanBrokerFactory factory = new XBeanBrokerFactory();
-      BrokerService answer = factory.createBroker(new URI(getBrokerConfigUri()));
-
-      // lets disable persistence as we are a test
-      answer.setPersistent(false);
-
-      return answer;
-   }
-
-   protected String getBrokerConfigUri() {
-      return "org/apache/activemq/broker/virtual/global-virtual-topics.xml";
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f76f4d1f/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicSelectorTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicSelectorTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicSelectorTest.java
deleted file mode 100644
index d94dd18..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicSelectorTest.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.broker.virtual;
-
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.DestinationInterceptor;
-import org.apache.activemq.broker.region.virtual.VirtualDestination;
-import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
-import org.apache.activemq.broker.region.virtual.VirtualTopic;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.spring.ConsumerBean;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class VirtualTopicSelectorTest extends CompositeTopicTest {
-
-   private static final Logger LOG = LoggerFactory.getLogger(VirtualTopicSelectorTest.class);
-
-   @Override
-   protected Destination getConsumer1Dsetination() {
-      return new ActiveMQQueue("Consumer.1.VirtualTopic.TEST");
-   }
-
-   @Override
-   protected Destination getConsumer2Dsetination() {
-      return new ActiveMQQueue("Consumer.2.VirtualTopic.TEST");
-   }
-
-   @Override
-   protected Destination getProducerDestination() {
-      return new ActiveMQTopic("VirtualTopic.TEST");
-   }
-
-   @Override
-   protected void assertMessagesArrived(ConsumerBean messageList1, ConsumerBean messageList2) {
-      messageList1.assertMessagesArrived(total / 2);
-      messageList2.assertMessagesArrived(total / 2);
-
-      messageList1.flushMessages();
-      messageList2.flushMessages();
-
-      LOG.info("validate no other messages on queues");
-      try {
-         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-         Destination destination1 = getConsumer1Dsetination();
-         Destination destination2 = getConsumer2Dsetination();
-         MessageConsumer c1 = session.createConsumer(destination1, null);
-         MessageConsumer c2 = session.createConsumer(destination2, null);
-         c1.setMessageListener(messageList1);
-         c2.setMessageListener(messageList2);
-
-         LOG.info("send one simple message that should go to both consumers");
-         MessageProducer producer = session.createProducer(getProducerDestination());
-         assertNotNull(producer);
-
-         producer.send(session.createTextMessage("Last Message"));
-
-         messageList1.assertMessagesArrived(1);
-         messageList2.assertMessagesArrived(1);
-
-      }
-      catch (JMSException e) {
-         e.printStackTrace();
-         fail("unexpeced ex while waiting for last messages: " + e);
-      }
-   }
-
-   @Override
-   protected BrokerService createBroker() throws Exception {
-      // use message selectors on consumers that need to propagate up to the virtual
-      // topic dispatch so that un matched messages do not linger on subscription queues
-      messageSelector1 = "odd = 'yes'";
-      messageSelector2 = "odd = 'no'";
-
-      BrokerService broker = new BrokerService();
-      broker.setPersistent(false);
-
-      VirtualTopic virtualTopic = new VirtualTopic();
-      // the new config that enables selectors on the intercepter
-      virtualTopic.setSelectorAware(true);
-      VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor();
-      interceptor.setVirtualDestinations(new VirtualDestination[]{virtualTopic});
-      broker.setDestinationInterceptors(new DestinationInterceptor[]{interceptor});
-      return broker;
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f76f4d1f/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicsAndDurableSubsTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicsAndDurableSubsTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicsAndDurableSubsTest.java
deleted file mode 100644
index 4abf811..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicsAndDurableSubsTest.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.broker.virtual;
-
-import javax.jms.Connection;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.activemq.broker.jmx.MBeanTest;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.spring.ConsumerBean;
-
-public class VirtualTopicsAndDurableSubsTest extends MBeanTest {
-
-   private Connection connection;
-
-   public void testVirtualTopicCreationAndDurableSubs() throws Exception {
-      if (connection == null) {
-         connection = createConnection();
-      }
-      connection.setClientID(getAClientID());
-      connection.start();
-
-      ConsumerBean messageList = new ConsumerBean();
-      messageList.setVerbose(true);
-
-      String queueAName = getVirtualTopicConsumerName();
-      // create consumer 'cluster'
-      ActiveMQQueue queue1 = new ActiveMQQueue(queueAName);
-      ActiveMQQueue queue2 = new ActiveMQQueue(queueAName);
-
-      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      MessageConsumer c1 = session.createConsumer(queue1);
-      MessageConsumer c2 = session.createConsumer(queue2);
-
-      c1.setMessageListener(messageList);
-      c2.setMessageListener(messageList);
-
-      // create topic producer
-      MessageProducer producer = session.createProducer(new ActiveMQTopic(getVirtualTopicName()));
-      assertNotNull(producer);
-
-      int total = 10;
-      for (int i = 0; i < total; i++) {
-         producer.send(session.createTextMessage("message: " + i));
-      }
-      messageList.assertMessagesArrived(total);
-
-      //Add and remove durable subscriber after using VirtualTopics
-      assertCreateAndDestroyDurableSubscriptions();
-   }
-
-   protected String getAClientID() {
-      return "VirtualTopicCreationAndDurableSubs";
-   }
-
-   protected String getVirtualTopicName() {
-      return "VirtualTopic.TEST";
-   }
-
-   protected String getVirtualTopicConsumerName() {
-      return "Consumer.A.VirtualTopic.TEST";
-   }
-
-   protected String getDurableSubscriberName() {
-      return "Sub1";
-   }
-
-   protected String getDurableSubscriberTopicName() {
-      return "simple.topic";
-   }
-
-   @Override
-   protected void tearDown() throws Exception {
-      if (connection != null) {
-         connection.close();
-      }
-      super.tearDown();
-   }
-
-   //Overrides test cases from MBeanTest to avoid having them run.
-   @Override
-   public void testMBeans() throws Exception {
-   }
-
-   @Override
-   public void testMoveMessages() throws Exception {
-   }
-
-   @Override
-   public void testRetryMessages() throws Exception {
-   }
-
-   @Override
-   public void testMoveMessagesBySelector() throws Exception {
-   }
-
-   @Override
-   public void testCopyMessagesBySelector() throws Exception {
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f76f4d1f/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/composite-queue.xml
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/composite-queue.xml b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/composite-queue.xml
deleted file mode 100644
index ed3bc73..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/composite-queue.xml
+++ /dev/null
@@ -1,47 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-  
-  http://www.apache.org/licenses/LICENSE-2.0
-  
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-
-<!-- this file can only be parsed using the xbean-spring library -->
-<!-- START SNIPPET: xbean -->
-<beans 
-  xmlns="http://www.springframework.org/schema/beans" 
-  xmlns:amq="http://activemq.apache.org/schema/core"
-  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
-  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
-
-  <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" />
-
-  <broker persistent="false" useJmx="false" xmlns="http://activemq.apache.org/schema/core">
-    <destinationInterceptors>
-      <virtualDestinationInterceptor>
-        <virtualDestinations>
-          <compositeQueue name="MY.QUEUE">
-            <forwardTo>
-              <queue physicalName="FOO" />
-              <topic physicalName="BAR" />
-            </forwardTo>
-          </compositeQueue>
-        </virtualDestinations>
-      </virtualDestinationInterceptor>
-    </destinationInterceptors>
-
-  </broker>
-
-</beans>
-<!-- END SNIPPET: xbean -->

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f76f4d1f/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/composite-topic.xml
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/composite-topic.xml b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/composite-topic.xml
deleted file mode 100644
index ded6471..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/composite-topic.xml
+++ /dev/null
@@ -1,47 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-  
-  http://www.apache.org/licenses/LICENSE-2.0
-  
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-
-<!-- this file can only be parsed using the xbean-spring library -->
-<!-- START SNIPPET: xbean -->
-<beans 
-  xmlns="http://www.springframework.org/schema/beans" 
-  xmlns:amq="http://activemq.apache.org/schema/core"
-  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
-  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
-
-  <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" />
-
-  <broker xmlns="http://activemq.apache.org/schema/core">
-    <destinationInterceptors>
-      <virtualDestinationInterceptor>
-        <virtualDestinations>
-          <compositeTopic name="MY.TOPIC">
-            <forwardTo>
-              <queue physicalName="FOO" />
-              <topic physicalName="BAR" />
-            </forwardTo>
-          </compositeTopic>
-        </virtualDestinations>
-      </virtualDestinationInterceptor>
-    </destinationInterceptors>
-
-  </broker>
-
-</beans>
-<!-- END SNIPPET: xbean -->

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f76f4d1f/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/disconnected-selector.xml
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/disconnected-selector.xml b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/disconnected-selector.xml
deleted file mode 100644
index 2772910..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/disconnected-selector.xml
+++ /dev/null
@@ -1,43 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-  
-  http://www.apache.org/licenses/LICENSE-2.0
-  
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-
-<!-- this file can only be parsed using the xbean-spring library -->
-<!-- START SNIPPET: xbean -->
-<beans
-        xmlns="http://www.springframework.org/schema/beans"
-        xmlns:amq="http://activemq.apache.org/schema/core"
-        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
-  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
-
-      <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" />
-
-    <broker xmlns="http://activemq.apache.org/schema/core" persistent="false">
-        <destinationInterceptors>
-            <virtualDestinationInterceptor>
-                <virtualDestinations>
-                    <virtualTopic name="VirtualTopic.>" prefix="Consumer." selectorAware="true"/>
-                </virtualDestinations>
-            </virtualDestinationInterceptor>
-        </destinationInterceptors>
-        <plugins>
-            <virtualSelectorCacheBrokerPlugin persistFile = "target/selectorcache.data"/>
-        </plugins>
-    </broker>
-</beans>
-<!-- END SNIPPET: xbean -->

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f76f4d1f/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/filtered-queue.xml
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/filtered-queue.xml b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/filtered-queue.xml
deleted file mode 100644
index d51f03c..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/filtered-queue.xml
+++ /dev/null
@@ -1,47 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-  
-  http://www.apache.org/licenses/LICENSE-2.0
-  
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-
-<!-- this file can only be parsed using the xbean-spring library -->
-<!-- START SNIPPET: xbean -->
-<beans 
-  xmlns="http://www.springframework.org/schema/beans" 
-  xmlns:amq="http://activemq.apache.org/schema/core"
-  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
-  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
-
-  <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" />
-
-  <broker xmlns="http://activemq.apache.org/schema/core">
-    <destinationInterceptors>
-      <virtualDestinationInterceptor>
-        <virtualDestinations>
-          <compositeQueue name="MY.QUEUE">
-            <forwardTo>
-              <filteredDestination selector="odd = 'yes'" queue="FOO"/>
-              <filteredDestination selector="i = 5" topic="BAR"/>
-            </forwardTo>
-          </compositeQueue>
-        </virtualDestinations>
-      </virtualDestinationInterceptor>
-    </destinationInterceptors>
-
-  </broker>
-
-</beans>
-<!-- END SNIPPET: xbean -->

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f76f4d1f/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/global-virtual-topics.xml
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/global-virtual-topics.xml b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/global-virtual-topics.xml
deleted file mode 100644
index ddd0667..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/global-virtual-topics.xml
+++ /dev/null
@@ -1,42 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-  
-  http://www.apache.org/licenses/LICENSE-2.0
-  
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-
-<!-- this file can only be parsed using the xbean-spring library -->
-<!-- START SNIPPET: xbean -->
-<beans 
-  xmlns="http://www.springframework.org/schema/beans" 
-  xmlns:amq="http://activemq.apache.org/schema/core"
-  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
-  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
-
-  <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" />
-
-  <broker xmlns="http://activemq.apache.org/schema/core">
-    <destinationInterceptors>
-      <virtualDestinationInterceptor>
-        <virtualDestinations>
-          <virtualTopic name=">" prefix="VirtualTopicConsumers.*." selectorAware="false"/>
-        </virtualDestinations>
-      </virtualDestinationInterceptor>
-    </destinationInterceptors>
-
-  </broker>
-
-</beans>
-<!-- END SNIPPET: xbean -->

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f76f4d1f/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/virtual-individual-dlq.xml
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/virtual-individual-dlq.xml b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/virtual-individual-dlq.xml
deleted file mode 100644
index d725436..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/virtual-individual-dlq.xml
+++ /dev/null
@@ -1,80 +0,0 @@
-<!--
-    Licensed to the Apache Software Foundation (ASF) under one or more
-    contributor license agreements.  See the NOTICE file distributed with
-    this work for additional information regarding copyright ownership.
-    The ASF licenses this file to You under the Apache License, Version 2.0
-    (the "License"); you may not use this file except in compliance with
-    the License.  You may obtain a copy of the License at
-   
-    http://www.apache.org/licenses/LICENSE-2.0
-   
-    Unless required by applicable law or agreed to in writing, software
-    distributed under the License is distributed on an "AS IS" BASIS,
-    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-    See the License for the specific language governing permissions and
-    limitations under the License.
--->
-<!-- START SNIPPET: example -->
-<beans
-  xmlns="http://www.springframework.org/schema/beans"
-  xmlns:amq="http://activemq.apache.org/schema/core"
-  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
-  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
-
-    
-
-    <!-- 
-        The <broker> element is used to configure the ActiveMQ broker. 
-    -->
-    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="bcBroker">
- 
-        <destinationInterceptors>
-      <virtualDestinationInterceptor>
-         <virtualDestinations>
-            <virtualTopic name="VirtualTopic.>" prefix="Consumer.*." />
-         </virtualDestinations>
-      </virtualDestinationInterceptor>
-   </destinationInterceptors>
-              
-   <destinationPolicy>
-      <policyMap>
-         <policyEntries>
-            <policyEntry queue=">" memoryLimit="128 mb" >
-               <deadLetterStrategy>
-                  <bean xmlns="http://www.springframework.org/schema/beans"
-                        class="org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy">
-                     <property name="useQueueForQueueMessages" value="true"></property>
-                     <property name="processNonPersistent" value="true"></property>
-                     <property name="processExpired" value="false"></property>
-                     <property name="enableAudit" value="false"></property>
-                     
-                  </bean>
-               </deadLetterStrategy>
-            </policyEntry>
-            <policyEntry topic=">" memoryLimit="128 mb" >
-               <deadLetterStrategy>
-                  <bean xmlns="http://www.springframework.org/schema/beans"
-                        class="org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy">
-                     <property name="useQueueForQueueMessages" value="true"></property>
-                     <property name="processNonPersistent" value="true"></property>
-                     <property name="processExpired" value="false"></property>
-                     <property name="enableAudit" value="false"></property>
-          
-                  </bean>
-               </deadLetterStrategy>
-             </policyEntry>
-         </policyEntries>
-      </policyMap>
-   </destinationPolicy>
-       
-        <managementContext>
-            <managementContext createConnector="false"/>
-        </managementContext>
-
-    </broker>
-
-    
-    
-</beans>
-<!-- END SNIPPET: example -->

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f76f4d1f/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/virtual-topics-and-interceptor.xml
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/virtual-topics-and-interceptor.xml b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/virtual-topics-and-interceptor.xml
deleted file mode 100644
index fcce72e..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/virtual-topics-and-interceptor.xml
+++ /dev/null
@@ -1,50 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-  
-  http://www.apache.org/licenses/LICENSE-2.0
-  
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-
-<!-- this file can only be parsed using the xbean-spring library -->
-<!-- START SNIPPET: xbean -->
-<beans 
-  xmlns="http://www.springframework.org/schema/beans" 
-  xmlns:amq="http://activemq.apache.org/schema/core"
-  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
-  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
-
-  <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" />
-
-  <broker xmlns="http://activemq.apache.org/schema/core" persistent="false">
-
-
-    <destinationInterceptors>
-      <!--  custom destination interceptor -->
-      <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.broker.virtual.DestinationInterceptorDurableSubTest$SimpleDestinationInterceptor" />
-
-      <virtualDestinationInterceptor>
-        <virtualDestinations>
-          <virtualTopic name=">" prefix="VirtualTopicConsumers.*." selectorAware="false"/>
-        </virtualDestinations>
-      </virtualDestinationInterceptor>
-    </destinationInterceptors>
-
-    <managementContext>
-      <managementContext createConnector="true" connectorPort="1299"/>
-    </managementContext>
-  </broker>
-
-</beans>
-<!-- END SNIPPET: xbean -->

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f76f4d1f/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1282.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1282.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1282.java
deleted file mode 100644
index 0568757..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1282.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.Session;
-
-import junit.framework.TestCase;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-
-/**
- * An AMQ-1282 Test
- */
-public class AMQ1282 extends TestCase {
-
-   private ConnectionFactory factory;
-   private Connection connection;
-   private MapMessage message;
-
-   @Override
-   protected void setUp() throws Exception {
-      factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
-      connection = factory.createConnection();
-      connection.start();
-      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      message = session.createMapMessage();
-      super.setUp();
-   }
-
-   @Override
-   protected void tearDown() throws Exception {
-      connection.close();
-      super.tearDown();
-   }
-
-   public void testUnmappedBooleanMessage() throws JMSException {
-      Object expected;
-      try {
-         expected = Boolean.valueOf(null);
-      }
-      catch (Exception ex) {
-         expected = ex;
-      }
-      try {
-         Boolean actual = message.getBoolean("foo");
-         assertEquals(expected, actual);
-      }
-      catch (Exception ex) {
-         assertEquals(expected, ex);
-      }
-   }
-
-   public void testUnmappedIntegerMessage() throws JMSException {
-      Object expected;
-      try {
-         expected = Integer.valueOf(null);
-      }
-      catch (Exception ex) {
-         expected = ex;
-      }
-      try {
-         Integer actual = message.getInt("foo");
-         assertEquals(expected, actual);
-      }
-      catch (Exception ex) {
-         Class<?> aClass = expected.getClass();
-         assertTrue(aClass.isInstance(ex));
-      }
-   }
-
-   public void testUnmappedShortMessage() throws JMSException {
-      Object expected;
-      try {
-         expected = Short.valueOf(null);
-      }
-      catch (Exception ex) {
-         expected = ex;
-      }
-      try {
-         Short actual = message.getShort("foo");
-         assertEquals(expected, actual);
-      }
-      catch (Exception ex) {
-         Class<?> aClass = expected.getClass();
-         assertTrue(aClass.isInstance(ex));
-      }
-   }
-
-   public void testUnmappedLongMessage() throws JMSException {
-      Object expected;
-      try {
-         expected = Long.valueOf(null);
-      }
-      catch (Exception ex) {
-         expected = ex;
-      }
-      try {
-         Long actual = message.getLong("foo");
-         assertEquals(expected, actual);
-      }
-      catch (Exception ex) {
-         Class<?> aClass = expected.getClass();
-         assertTrue(aClass.isInstance(ex));
-      }
-   }
-
-   public void testUnmappedStringMessage() throws JMSException {
-      Object expected;
-      try {
-         expected = String.valueOf(null);
-      }
-      catch (Exception ex) {
-         expected = ex;
-      }
-      try {
-         String actual = message.getString("foo");
-         assertEquals(expected, actual);
-      }
-      catch (Exception ex) {
-         Class<?> aClass = expected.getClass();
-         assertTrue(aClass.isInstance(ex));
-      }
-   }
-
-   public void testUnmappedCharMessage() throws JMSException {
-      try {
-         message.getChar("foo");
-         fail("should have thrown NullPointerException");
-      }
-      catch (NullPointerException success) {
-         assertNotNull(success);
-      }
-   }
-
-   public void testUnmappedByteMessage() throws JMSException {
-      Object expected;
-      try {
-         expected = Byte.valueOf(null);
-      }
-      catch (Exception ex) {
-         expected = ex;
-      }
-      try {
-         Byte actual = message.getByte("foo");
-         assertEquals(expected, actual);
-      }
-      catch (Exception ex) {
-         Class<?> aClass = expected.getClass();
-         assertTrue(aClass.isInstance(ex));
-      }
-   }
-
-   public void testUnmappedDoubleMessage() throws JMSException {
-      Object expected;
-      try {
-         expected = Double.valueOf(null);
-      }
-      catch (Exception ex) {
-         expected = ex;
-      }
-      try {
-         Double actual = message.getDouble("foo");
-         assertEquals(expected, actual);
-      }
-      catch (Exception ex) {
-         Class<?> aClass = expected.getClass();
-         assertTrue(aClass.isInstance(ex));
-      }
-   }
-
-   public void testUnmappedFloatMessage() throws JMSException {
-      Object expected;
-      try {
-         expected = Float.valueOf(null);
-      }
-      catch (Exception ex) {
-         expected = ex;
-      }
-      try {
-         Float actual = message.getFloat("foo");
-         assertEquals(expected, actual);
-      }
-      catch (Exception ex) {
-         Class<?> aClass = expected.getClass();
-         assertTrue(aClass.isInstance(ex));
-      }
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f76f4d1f/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1687Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1687Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1687Test.java
deleted file mode 100644
index 78a6088..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1687Test.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.EmbeddedBrokerTestSupport;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.spring.ConsumerBean;
-
-/**
- *
- *
- */
-public class AMQ1687Test extends EmbeddedBrokerTestSupport {
-
-   private Connection connection;
-
-   @Override
-   protected ConnectionFactory createConnectionFactory() throws Exception {
-      //prefetch change is not required, but test will not fail w/o it, only spew errors in the AMQ log.
-      return new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString() + "?jms.prefetchPolicy.all=5");
-   }
-
-   public void testVirtualTopicCreation() throws Exception {
-      if (connection == null) {
-         connection = createConnection();
-      }
-      connection.start();
-
-      ConsumerBean messageList = new ConsumerBean();
-      messageList.setVerbose(true);
-
-      String queueAName = getVirtualTopicConsumerName();
-      String queueBName = getVirtualTopicConsumerNameB();
-
-      // create consumer 'cluster'
-      ActiveMQQueue queue1 = new ActiveMQQueue(queueAName);
-      ActiveMQQueue queue2 = new ActiveMQQueue(queueBName);
-
-      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      MessageConsumer c1 = session.createConsumer(queue1);
-      MessageConsumer c2 = session.createConsumer(queue2);
-
-      c1.setMessageListener(messageList);
-      c2.setMessageListener(messageList);
-
-      // create topic producer
-      ActiveMQTopic topic = new ActiveMQTopic(getVirtualTopicName());
-      MessageProducer producer = session.createProducer(topic);
-      assertNotNull(producer);
-
-      int total = 100;
-      for (int i = 0; i < total; i++) {
-         producer.send(session.createTextMessage("message: " + i));
-      }
-
-      messageList.assertMessagesArrived(total * 2);
-   }
-
-   protected String getVirtualTopicName() {
-      return "VirtualTopic.TEST";
-   }
-
-   protected String getVirtualTopicConsumerName() {
-      return "Consumer.A.VirtualTopic.TEST";
-   }
-
-   protected String getVirtualTopicConsumerNameB() {
-      return "Consumer.B.VirtualTopic.TEST";
-   }
-
-   @Override
-   protected void setUp() throws Exception {
-      this.bindAddress = "tcp://localhost:0";
-      super.setUp();
-   }
-
-   @Override
-   protected void tearDown() throws Exception {
-      if (connection != null) {
-         connection.close();
-      }
-      super.tearDown();
-   }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f76f4d1f/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1853Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1853Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1853Test.java
deleted file mode 100644
index 2f7b8fe..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1853Test.java
+++ /dev/null
@@ -1,378 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.*;
-
-import java.net.URI;
-import java.util.Hashtable;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.ActiveMQMessageProducer;
-import org.apache.activemq.ActiveMQSession;
-import org.apache.activemq.RedeliveryPolicy;
-import org.apache.activemq.broker.BrokerFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.util.Wait;
-import org.apache.activemq.util.Wait.Condition;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Test validates that the AMQ consumer blocks on redelivery of a message,
- * through all redeliveries, until the message is either successfully consumed
- * or sent to the DLQ.
- */
-public class AMQ1853Test {
-
-   private static BrokerService broker;
-
-   private static final Logger LOG = LoggerFactory.getLogger(AMQ1853Test.class);
-   static final String jmsConnectionURI = "failover:(vm://localhost)";
-
-   // Virtual Topic that the test publishes 10 messages to
-   private static final String queueFail = "Queue.BlockingConsumer.QueueFail";
-
-   // Number of messages
-
-   private final int producerMessages = 5;
-   private final int totalNumberMessages = producerMessages * 2;
-   private final int maxRedeliveries = 2;
-   private final int redeliveryDelay = 1000;
-
-   private Map<String, AtomicInteger> messageList = null;
-
-   @Before
-   public void setUp() throws Exception {
-      broker = BrokerFactory.createBroker(new URI("broker:()/localhost?persistent=false"));
-      broker.setUseJmx(false);
-      broker.setDeleteAllMessagesOnStartup(true);
-      broker.start();
-      broker.waitUntilStarted();
-   }
-
-   @After
-   public void tearDown() throws Exception {
-      if (broker != null) {
-         broker.stop();
-         broker.waitUntilStopped();
-         broker = null;
-      }
-   }
-
-   @Test
-   public void testConsumerMessagesAreNotOrdered() throws Exception {
-
-      TestConsumer consumerAllFail = null;
-      messageList = new Hashtable<>();
-
-      try {
-
-         // The first 2 consumers will rollback, ultimately causing messages to land on the DLQ
-
-         TestProducer producerAllFail = new TestProducer(queueFail);
-         thread(producerAllFail, false);
-
-         consumerAllFail = new TestConsumer(queueFail, true);
-         thread(consumerAllFail, false);
-
-         // Give the consumers a second to start
-         Thread.sleep(1000);
-
-         thread(producerAllFail, false);
-
-         // Give the consumers a second to start
-         Thread.sleep(1000);
-
-         producerAllFail.getLatch().await();
-
-         LOG.info("producer successful, count = " + producerAllFail.getLatch().getCount());
-         LOG.info("final message list size =  " + messageList.size());
-
-         assertTrue("message list size =  " + messageList.size() + " exptected:" + totalNumberMessages, Wait.waitFor(new Condition() {
-                       @Override
-                       public boolean isSatisified() throws Exception {
-                          return totalNumberMessages == messageList.size();
-                       }
-                    }));
-
-         consumerAllFail.getLatch().await();
-
-         LOG.info("consumerAllFail successful, count = " + consumerAllFail.getLatch().getCount());
-
-         Iterator<String> keys = messageList.keySet().iterator();
-         for (AtomicInteger counter : messageList.values()) {
-            String message = keys.next();
-            LOG.info("final count for message " + message + " counter =  " + counter.get());
-            assertTrue("for message " + message + " counter =  " + counter.get(), counter.get() == maxRedeliveries + 1);
-         }
-
-         assertFalse(consumerAllFail.messageReceiptIsOrdered());
-      }
-      finally {
-         if (consumerAllFail != null) {
-            consumerAllFail.setStop(true);
-         }
-      }
-   }
-
-   private static Thread thread(Runnable runnable, boolean daemon) {
-      Thread brokerThread = new Thread(runnable);
-      brokerThread.setDaemon(daemon);
-      brokerThread.start();
-      return brokerThread;
-   }
-
-   private class TestProducer implements Runnable {
-
-      private CountDownLatch latch = null;
-      private String destinationName = null;
-
-      public TestProducer(String destinationName) {
-         this.destinationName = destinationName;
-         // We run the producer 2 times
-         latch = new CountDownLatch(totalNumberMessages);
-      }
-
-      public CountDownLatch getLatch() {
-         return latch;
-      }
-
-      @Override
-      public void run() {
-
-         ActiveMQConnectionFactory connectionFactory = null;
-         ActiveMQConnection connection = null;
-         ActiveMQSession session = null;
-         Destination destination = null;
-
-         try {
-            LOG.info("Started TestProducer for destination (" + destinationName + ")");
-
-            connectionFactory = new ActiveMQConnectionFactory(jmsConnectionURI);
-            connection = (ActiveMQConnection) connectionFactory.createConnection();
-            connection.setCopyMessageOnSend(false);
-            connection.start();
-            session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-            destination = session.createQueue(this.destinationName);
-
-            // Create a MessageProducer from the Session to the Topic or Queue
-            ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(destination);
-            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-
-            for (int i = 0; i < (producerMessages); i++) {
-               TextMessage message = session.createTextMessage();
-               message.setLongProperty("TestTime", (System.currentTimeMillis()));
-               try {
-                  producer.send(message);
-                  LOG.info("Producer (" + destinationName + ")\n" + message.getJMSMessageID() + " = sent messageId\n");
-
-                  latch.countDown();
-                  LOG.info(" Latch count  " + latch.getCount());
-                  LOG.info("Producer message list size = " + messageList.keySet().size());
-                  messageList.put(message.getJMSMessageID(), new AtomicInteger(0));
-                  LOG.info("Producer message list size = " + messageList.keySet().size());
-
-               }
-               catch (Exception deeperException) {
-                  LOG.info("Producer for destination (" + destinationName + ") Caught: " + deeperException);
-               }
-
-               Thread.sleep(1000);
-            }
-
-            LOG.info("Finished TestProducer for destination (" + destinationName + ")");
-
-         }
-         catch (Exception e) {
-            LOG.error("Terminating TestProducer(" + destinationName + ")Caught: " + e);
-         }
-         finally {
-            try {
-               if (session != null) {
-                  session.close();
-               }
-               if (connection != null) {
-                  connection.close();
-               }
-            }
-            catch (Exception e) {
-               LOG.error("Closing connection/session (" + destinationName + ")Caught: " + e);
-            }
-         }
-      }
-   }
-
-   private class TestConsumer implements Runnable, ExceptionListener, MessageListener {
-
-      private CountDownLatch latch = null;
-      private int receivedMessageCounter = 0;
-      private boolean bFakeFail = false;
-      String destinationName = null;
-      boolean bMessageReceiptIsOrdered = true;
-      boolean bStop = false;
-      String previousMessageId = null;
-
-      private ActiveMQConnectionFactory connectionFactory = null;
-      private ActiveMQConnection connection = null;
-      private Session session = null;
-      private MessageConsumer consumer = null;
-
-      public TestConsumer(String destinationName, boolean bFakeFail) {
-         this.bFakeFail = bFakeFail;
-         latch = new CountDownLatch(totalNumberMessages * (this.bFakeFail ? (maxRedeliveries + 1) : 1));
-         this.destinationName = destinationName;
-      }
-
-      public CountDownLatch getLatch() {
-         return latch;
-      }
-
-      public boolean messageReceiptIsOrdered() {
-         return bMessageReceiptIsOrdered;
-      }
-
-      @Override
-      public void run() {
-
-         try {
-            LOG.info("Started TestConsumer for destination (" + destinationName + ")");
-
-            connectionFactory = new ActiveMQConnectionFactory(jmsConnectionURI);
-            connection = (ActiveMQConnection) connectionFactory.createConnection();
-            connection.setNonBlockingRedelivery(true);
-            session = connection.createSession(true, Session.SESSION_TRANSACTED);
-
-            RedeliveryPolicy policy = connection.getRedeliveryPolicy();
-            policy.setInitialRedeliveryDelay(redeliveryDelay);
-            policy.setBackOffMultiplier(-1);
-            policy.setRedeliveryDelay(redeliveryDelay);
-            policy.setMaximumRedeliveryDelay(-1);
-            policy.setUseExponentialBackOff(false);
-            policy.setMaximumRedeliveries(maxRedeliveries);
-
-            connection.setExceptionListener(this);
-            Destination destination = session.createQueue(destinationName);
-            consumer = session.createConsumer(destination);
-            consumer.setMessageListener(this);
-
-            connection.start();
-
-            while (!bStop) {
-               Thread.sleep(100);
-            }
-
-            LOG.info("Finished TestConsumer for destination name (" + destinationName + ") remaining " + this.latch.getCount() + " messages " + this.toString());
-
-         }
-         catch (Exception e) {
-            LOG.error("Consumer (" + destinationName + ") Caught: " + e);
-         }
-         finally {
-            try {
-               if (consumer != null) {
-                  consumer.close();
-               }
-               if (session != null) {
-                  session.close();
-               }
-               if (connection != null) {
-                  connection.close();
-               }
-            }
-            catch (Exception e) {
-               LOG.error("Closing connection/session (" + destinationName + ")Caught: " + e);
-            }
-         }
-      }
-
-      @Override
-      public synchronized void onException(JMSException ex) {
-         LOG.error("Consumer for destination, (" + destinationName + "), JMS Exception occurred.  Shutting down client.");
-      }
-
-      public synchronized void setStop(boolean bStop) {
-         this.bStop = bStop;
-      }
-
-      @Override
-      public synchronized void onMessage(Message message) {
-         receivedMessageCounter++;
-         latch.countDown();
-
-         LOG.info("Consumer for destination (" + destinationName + ") latch countdown: " + latch.getCount() +
-                     " :: Number messages received " + this.receivedMessageCounter);
-
-         try {
-
-            if (receivedMessageCounter % (maxRedeliveries + 1) == 1) {
-               previousMessageId = message.getJMSMessageID();
-            }
-
-            if (bMessageReceiptIsOrdered) {
-               bMessageReceiptIsOrdered = previousMessageId.trim().equals(message.getJMSMessageID());
-            }
-
-            final String jmsMessageId = message.getJMSMessageID();
-            assertTrue("Did not find expected ", Wait.waitFor(new Wait.Condition() {
-               @Override
-               public boolean isSatisified() throws Exception {
-                  return messageList.containsKey(jmsMessageId);
-               }
-            }));
-
-            AtomicInteger counter = messageList.get(jmsMessageId);
-            counter.incrementAndGet();
-
-            LOG.info("Consumer for destination (" + destinationName + ")\n" + message.getJMSMessageID() + " = currentMessageId\n" + previousMessageId + " = previousMessageId\n" + bMessageReceiptIsOrdered + "= bMessageReceiptIsOrdered\n" + ">>LATENCY " + (System.currentTimeMillis() - message.getLongProperty("TestTime")) + "\n" + "message counter = " + counter.get());
-
-            if (!bFakeFail) {
-               LOG.debug("Consumer on destination " + destinationName + " committing JMS Session for message: " + message.toString());
-               session.commit();
-            }
-            else {
-               LOG.debug("Consumer on destination " + destinationName + " rolling back JMS Session for message: " + message.toString());
-               session.rollback(); // rolls back all the consumed messages on the session to
-            }
-
-         }
-         catch (Exception ex) {
-            ex.printStackTrace();
-            LOG.error("Error reading JMS Message from destination " + destinationName + ".");
-         }
-      }
-   }
-}


Mime
View raw message