activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [27/52] [abbrv] activemq-artemis git commit: open wire changes equivalent to ab16f7098fb52d2b4c40627ed110e1776525f208
Date Tue, 01 Mar 2016 16:38:23 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b036fc2c/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
index 97cd6f6..9729793 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
@@ -32,6 +32,7 @@ import junit.framework.TestCase;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.ProducerBrokerExchange;
+import org.apache.activemq.broker.region.SubscriptionStatistics;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTextMessage;
@@ -53,12 +54,13 @@ import org.slf4j.LoggerFactory;
 /**
  * @author gtully
  * @see https://issues.apache.org/activemq/browse/AMQ-2020
- */
+ **/
 public class QueueDuplicatesFromStoreTest extends TestCase {
+   private static final Logger LOG = LoggerFactory
+           .getLogger(QueueDuplicatesFromStoreTest.class);
 
-   private static final Logger LOG = LoggerFactory.getLogger(QueueDuplicatesFromStoreTest.class);
-
-   ActiveMQQueue destination = new ActiveMQQueue("queue-" + QueueDuplicatesFromStoreTest.class.getSimpleName());
+   ActiveMQQueue destination = new ActiveMQQueue("queue-"
+           + QueueDuplicatesFromStoreTest.class.getSimpleName());
    BrokerService brokerService;
 
    final static String mesageIdRoot = "11111:22222:";
@@ -89,7 +91,7 @@ public class QueueDuplicatesFromStoreTest extends TestCase {
    }
 
    public void testNoDuplicateAfterCacheFullAndAckedWithLargeAuditDepth() throws Exception {
-      doTestNoDuplicateAfterCacheFullAndAcked(1024 * 10);
+      doTestNoDuplicateAfterCacheFullAndAcked(1024*10);
    }
 
    public void testNoDuplicateAfterCacheFullAndAckedWithSmallAuditDepth() throws Exception {
@@ -97,13 +99,15 @@ public class QueueDuplicatesFromStoreTest extends TestCase {
    }
 
    public void doTestNoDuplicateAfterCacheFullAndAcked(final int auditDepth) throws Exception {
-      final PersistenceAdapter persistenceAdapter = brokerService.getPersistenceAdapter();
-      final MessageStore queueMessageStore = persistenceAdapter.createQueueMessageStore(destination);
+      final PersistenceAdapter persistenceAdapter =  brokerService.getPersistenceAdapter();
+      final MessageStore queueMessageStore =
+              persistenceAdapter.createQueueMessageStore(destination);
       final ConnectionContext contextNotInTx = new ConnectionContext();
       final ConsumerInfo consumerInfo = new ConsumerInfo();
       final DestinationStatistics destinationStatistics = new DestinationStatistics();
       consumerInfo.setExclusive(true);
-      final Queue queue = new Queue(brokerService, destination, queueMessageStore, destinationStatistics, brokerService.getTaskRunnerFactory());
+      final Queue queue = new Queue(brokerService, destination,
+              queueMessageStore, destinationStatistics, brokerService.getTaskRunnerFactory());
 
       // a workaround for this issue
       // queue.setUseCache(false);
@@ -134,34 +138,38 @@ public class QueueDuplicatesFromStoreTest extends TestCase {
       // pull from store in small windows
       Subscription subscription = new Subscription() {
 
+         private SubscriptionStatistics subscriptionStatistics = new SubscriptionStatistics();
+
          @Override
          public void add(MessageReference node) throws Exception {
             if (enqueueCounter.get() != node.getMessageId().getProducerSequenceId()) {
-               errors.add("Not in sequence at: " + enqueueCounter.get() + ", received: " + node.getMessageId().getProducerSequenceId());
+               errors.add("Not in sequence at: " + enqueueCounter.get() + ", received: "
+                       + node.getMessageId().getProducerSequenceId());
             }
-            assertEquals("is in order", enqueueCounter.get(), node.getMessageId().getProducerSequenceId());
+            assertEquals("is in order", enqueueCounter.get(), node
+                    .getMessageId().getProducerSequenceId());
             receivedLatch.countDown();
             enqueueCounter.incrementAndGet();
             node.decrementReferenceCount();
          }
 
          @Override
-         public void add(ConnectionContext context, Destination destination) throws Exception {
+         public void add(ConnectionContext context, Destination destination)
+                 throws Exception {
          }
 
          @Override
          public int countBeforeFull() {
             if (isFull()) {
                return 0;
-            }
-            else {
+            } else {
                return fullWindow - (int) (enqueueCounter.get() - ackedCount.get());
             }
          }
 
          @Override
          public void destroy() {
-         }
+         };
 
          @Override
          public void gc() {
@@ -253,7 +261,8 @@ public class QueueDuplicatesFromStoreTest extends TestCase {
          }
 
          @Override
-         public boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException {
+         public boolean matches(MessageReference node,
+                                MessageEvaluationContext context) throws IOException {
             return true;
          }
 
@@ -263,11 +272,13 @@ public class QueueDuplicatesFromStoreTest extends TestCase {
          }
 
          @Override
-         public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception {
+         public void processMessageDispatchNotification(
+                 MessageDispatchNotification mdn) throws Exception {
          }
 
          @Override
-         public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception {
+         public Response pullMessage(ConnectionContext context,
+                                     MessagePull pull) throws Exception {
             return null;
          }
 
@@ -277,7 +288,8 @@ public class QueueDuplicatesFromStoreTest extends TestCase {
          }
 
          @Override
-         public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
+         public List<MessageReference> remove(ConnectionContext context,
+                                              Destination destination) throws Exception {
             return null;
          }
 
@@ -286,7 +298,9 @@ public class QueueDuplicatesFromStoreTest extends TestCase {
          }
 
          @Override
-         public void setSelector(String selector) throws InvalidSelectorException, UnsupportedOperationException {
+         public void setSelector(String selector)
+                 throws InvalidSelectorException,
+                 UnsupportedOperationException {
          }
 
          @Override
@@ -294,7 +308,8 @@ public class QueueDuplicatesFromStoreTest extends TestCase {
          }
 
          @Override
-         public boolean addRecoveredMessage(ConnectionContext context, MessageReference message) throws Exception {
+         public boolean addRecoveredMessage(ConnectionContext context,
+                                            MessageReference message) throws Exception {
             return false;
          }
 
@@ -304,16 +319,18 @@ public class QueueDuplicatesFromStoreTest extends TestCase {
          }
 
          @Override
-         public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception {
+         public void acknowledge(ConnectionContext context, MessageAck ack)
+                 throws Exception {
          }
 
          @Override
-         public int getCursorMemoryHighWaterMark() {
+         public int getCursorMemoryHighWaterMark(){
             return 0;
          }
 
          @Override
-         public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
+         public void setCursorMemoryHighWaterMark(
+                 int cursorMemoryHighWaterMark) {
          }
 
          @Override
@@ -336,14 +353,24 @@ public class QueueDuplicatesFromStoreTest extends TestCase {
          }
 
          @Override
-         public void incrementConsumedCount() {
+         public void incrementConsumedCount(){
 
          }
 
          @Override
-         public void resetConsumedCount() {
+         public void resetConsumedCount(){
 
          }
+
+         @Override
+         public SubscriptionStatistics getSubscriptionStatistics() {
+            return subscriptionStatistics;
+         }
+
+         @Override
+         public long getInFlightMessageSize() {
+            return subscriptionStatistics.getInflightMessageSize().getTotalSize();
+         }
       };
 
       queue.addSubscription(contextNotInTx, subscription);
@@ -356,9 +383,12 @@ public class QueueDuplicatesFromStoreTest extends TestCase {
                for (int j = 0; j < ackBatchSize; j++, removeIndex++) {
                   ackedCount.incrementAndGet();
                   MessageAck ack = new MessageAck();
-                  ack.setLastMessageId(new MessageId(mesageIdRoot + removeIndex));
+                  ack.setLastMessageId(new MessageId(mesageIdRoot
+                          + removeIndex));
                   ack.setMessageCount(1);
-                  queue.removeMessage(contextNotInTx, subscription, new IndirectMessageReference(getMessage(removeIndex)), ack);
+                  queue.removeMessage(contextNotInTx, subscription,
+                          new IndirectMessageReference(
+                                  getMessage(removeIndex)), ack);
                   queue.wakeup();
 
                }
@@ -373,7 +403,8 @@ public class QueueDuplicatesFromStoreTest extends TestCase {
 
       assertTrue("There are no errors: " + errors, errors.isEmpty());
       assertEquals(count, enqueueCounter.get());
-      assertEquals("store count is correct", count - removeIndex, queueMessageStore.getMessageCount());
+      assertEquals("store count is correct", count - removeIndex,
+              queueMessageStore.getMessageCount());
    }
 
    private Message getMessage(int i) throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b036fc2c/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java
index b38a965..c9d0339 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java
@@ -61,7 +61,6 @@ import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.state.ProducerState;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.thread.TaskRunnerFactory;
-
 import junit.framework.TestCase;
 
 public class SubscriptionAddRemoveQueueTest extends TestCase {
@@ -177,16 +176,20 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
 
    public class SimpleImmediateDispatchSubscription implements Subscription, LockOwner {
 
-      List<MessageReference> dispatched = Collections.synchronizedList(new ArrayList<MessageReference>());
+      private SubscriptionStatistics subscriptionStatistics = new SubscriptionStatistics();
+      List<MessageReference> dispatched =
+              Collections.synchronizedList(new ArrayList<MessageReference>());
+
 
       @Override
-      public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception {
+      public void acknowledge(ConnectionContext context, MessageAck ack)
+              throws Exception {
       }
 
       @Override
       public void add(MessageReference node) throws Exception {
          // immediate dispatch
-         QueueMessageReference qmr = (QueueMessageReference) node;
+         QueueMessageReference  qmr = (QueueMessageReference)node;
          qmr.lock(this);
          dispatched.add(qmr);
       }
@@ -400,5 +403,15 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
          return 10;
       }
 
+      @Override
+      public SubscriptionStatistics getSubscriptionStatistics() {
+         return subscriptionStatistics;
+      }
+
+      @Override
+      public long getInFlightMessageSize() {
+         return subscriptionStatistics.getInflightMessageSize().getTotalSize();
+      }
+
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b036fc2c/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java
deleted file mode 100644
index ab388f0..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java
+++ /dev/null
@@ -1,432 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.broker.region.cursors;
-
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.AutoFailTestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.jmx.QueueViewMBean;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.broker.region.policy.StorePendingQueueMessageStoragePolicy;
-import org.apache.activemq.usage.MemoryUsage;
-import org.apache.activemq.usage.StoreUsage;
-import org.apache.activemq.usage.SystemUsage;
-import org.apache.activemq.usage.TempUsage;
-import org.apache.activemq.util.Wait;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Modified CursorSupport Unit test to reproduce the negative queue issue.
- *
- * Keys to reproducing:
- * 1) Consecutive queues with listener on first sending to second queue
- * 2) Push each queue to the memory limit
- * This seems to help reproduce the issue more consistently, but
- * we have seen times in our production environment where the
- * negative queue can occur without. Our memory limits are
- * very high in production and it still happens in varying
- * frequency.
- * 3) Prefetch
- * Lowering the prefetch down to 10 and below seems to help
- * reduce occurrences.
- * 4) # of consumers per queue
- * The issue occurs less with fewer consumers
- *
- * Things that do not affect reproduction:
- * 1) Spring - we use spring in our production applications, but this test case works
- * with or without it.
- * 2) transacted
- */
-public class NegativeQueueTest extends AutoFailTestSupport {
-
-   private static final Logger LOG = LoggerFactory.getLogger(NegativeQueueTest.class);
-
-   public static SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMdd,hh:mm:ss:SSS");
-
-   private static final String QUEUE_1_NAME = "conn.test.queue.1";
-   private static final String QUEUE_2_NAME = "conn.test.queue.2";
-
-   private static final long QUEUE_MEMORY_LIMIT = 2097152;
-   private static final long MEMORY_USAGE = 400000000;
-   private static final long TEMP_USAGE = 200000000;
-   private static final long STORE_USAGE = 1000000000;
-   // ensure we exceed the cache 70%
-   private static final int MESSAGE_COUNT = 2100;
-
-   protected static final boolean TRANSACTED = true;
-   protected static final boolean DEBUG = true;
-   protected static int NUM_CONSUMERS = 20;
-   protected static int PREFETCH_SIZE = 1000;
-
-   protected BrokerService broker;
-   protected String bindAddress = "tcp://localhost:0";
-
-   public void testWithDefaultPrefetch() throws Exception {
-      PREFETCH_SIZE = 1000;
-      NUM_CONSUMERS = 20;
-      blastAndConsume();
-   }
-
-   public void x_testWithDefaultPrefetchFiveConsumers() throws Exception {
-      PREFETCH_SIZE = 1000;
-      NUM_CONSUMERS = 5;
-      blastAndConsume();
-   }
-
-   public void x_testWithDefaultPrefetchTwoConsumers() throws Exception {
-      PREFETCH_SIZE = 1000;
-      NUM_CONSUMERS = 2;
-      blastAndConsume();
-   }
-
-   public void testWithDefaultPrefetchOneConsumer() throws Exception {
-      PREFETCH_SIZE = 1000;
-      NUM_CONSUMERS = 1;
-      blastAndConsume();
-   }
-
-   public void testWithMediumPrefetch() throws Exception {
-      PREFETCH_SIZE = 50;
-      NUM_CONSUMERS = 20;
-      blastAndConsume();
-   }
-
-   public void x_testWithSmallPrefetch() throws Exception {
-      PREFETCH_SIZE = 10;
-      NUM_CONSUMERS = 20;
-      blastAndConsume();
-   }
-
-   public void testWithNoPrefetch() throws Exception {
-      PREFETCH_SIZE = 1;
-      NUM_CONSUMERS = 20;
-      blastAndConsume();
-   }
-
-   public void blastAndConsume() throws Exception {
-      LOG.info(getName());
-      ConnectionFactory factory = createConnectionFactory();
-
-      //get proxy queues for statistics lookups
-      Connection proxyConnection = factory.createConnection();
-      proxyConnection.start();
-      Session proxySession = proxyConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      final QueueViewMBean proxyQueue1 = getProxyToQueueViewMBean(proxySession.createQueue(QUEUE_1_NAME));
-      final QueueViewMBean proxyQueue2 = getProxyToQueueViewMBean(proxySession.createQueue(QUEUE_2_NAME));
-
-      // LOAD THE QUEUE
-      Connection producerConnection = factory.createConnection();
-      producerConnection.start();
-      Session session = producerConnection.createSession(TRANSACTED, Session.AUTO_ACKNOWLEDGE);
-      Destination queue = session.createQueue(QUEUE_1_NAME);
-      MessageProducer producer = session.createProducer(queue);
-      List<TextMessage> senderList = new ArrayList<>();
-      for (int i = 0; i < MESSAGE_COUNT; i++) {
-         TextMessage msg = session.createTextMessage(i + " " + formatter.format(new Date()));
-         senderList.add(msg);
-         producer.send(msg);
-         if (TRANSACTED)
-            session.commit();
-         if (DEBUG && i % 100 == 0) {
-            int index = (i / 100) + 1;
-            System.out.print(index - ((index / 10) * 10));
-         }
-      }
-
-      //get access to the Queue info
-      if (DEBUG) {
-         System.out.println("");
-         System.out.println("Queue1 Size = " + proxyQueue1.getQueueSize());
-         System.out.println("Queue1 Memory % Used = " + proxyQueue1.getMemoryPercentUsage());
-         System.out.println("Queue1 Memory Available = " + proxyQueue1.getMemoryLimit());
-      }
-
-      // FLUSH THE QUEUE
-      final CountDownLatch latch1 = new CountDownLatch(1);
-      final CountDownLatch latch2 = new CountDownLatch(1);
-      Connection[] consumerConnections1 = new Connection[NUM_CONSUMERS];
-      List<Message> consumerList1 = new ArrayList<>();
-      Connection[] consumerConnections2 = new Connection[NUM_CONSUMERS];
-      Connection[] producerConnections2 = new Connection[NUM_CONSUMERS];
-      List<Message> consumerList2 = new ArrayList<>();
-
-      for (int ix = 0; ix < NUM_CONSUMERS; ix++) {
-         producerConnections2[ix] = factory.createConnection();
-         producerConnections2[ix].start();
-         consumerConnections1[ix] = getConsumerConnection(factory);
-         Session consumerSession = consumerConnections1[ix].createSession(TRANSACTED, Session.AUTO_ACKNOWLEDGE);
-         MessageConsumer consumer = consumerSession.createConsumer(session.createQueue(QUEUE_1_NAME));
-         consumer.setMessageListener(new SessionAwareMessageListener(producerConnections2[ix], consumerSession, QUEUE_2_NAME, latch1, consumerList1));
-      }
-
-      latch1.await(200000, TimeUnit.MILLISECONDS);
-      if (DEBUG) {
-         System.out.println("");
-         System.out.println("Queue2 Size = " + proxyQueue2.getQueueSize());
-         System.out.println("Queue2 Memory % Used = " + proxyQueue2.getMemoryPercentUsage());
-         System.out.println("Queue2 Memory Available = " + proxyQueue2.getMemoryLimit());
-      }
-
-      for (int ix = 0; ix < NUM_CONSUMERS; ix++) {
-         consumerConnections2[ix] = getConsumerConnection(factory);
-         Session consumerSession = consumerConnections2[ix].createSession(TRANSACTED, Session.AUTO_ACKNOWLEDGE);
-         MessageConsumer consumer = consumerSession.createConsumer(session.createQueue(QUEUE_2_NAME));
-         consumer.setMessageListener(new SessionAwareMessageListener(consumerSession, latch2, consumerList2));
-      }
-
-      boolean success = Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisified() throws Exception {
-            boolean done = latch2.await(10, TimeUnit.SECONDS);
-            if (DEBUG) {
-               System.out.println("");
-               System.out.println("Queue1 Size = " + proxyQueue1.getQueueSize());
-               System.out.println("Queue1 Memory % Used = " + proxyQueue1.getMemoryPercentUsage());
-               System.out.println("Queue2 Size = " + proxyQueue2.getQueueSize());
-               System.out.println("Queue2 Memory % Used = " + proxyQueue2.getMemoryPercentUsage());
-               System.out.println("Queue2 Memory Available = " + proxyQueue2.getMemoryLimit());
-            }
-            return done;
-         }
-      }, 300 * 1000);
-      if (!success) {
-         dumpAllThreads("blocked waiting on 2");
-      }
-      assertTrue("got all expected messages on 2", success);
-
-      producerConnection.close();
-      for (int ix = 0; ix < NUM_CONSUMERS; ix++) {
-         consumerConnections1[ix].close();
-         consumerConnections2[ix].close();
-         producerConnections2[ix].close();
-      }
-
-      //let the consumer statistics on queue2 have time to update
-      Thread.sleep(500);
-
-      if (DEBUG) {
-         System.out.println("");
-         System.out.println("Queue1 Size = " + proxyQueue1.getQueueSize());
-         System.out.println("Queue1 Memory % Used = " + proxyQueue1.getMemoryPercentUsage());
-         System.out.println("Queue2 Size = " + proxyQueue2.getQueueSize());
-         System.out.println("Queue2 Memory % Used = " + proxyQueue2.getMemoryPercentUsage());
-      }
-
-      Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisified() throws Exception {
-            return 0 == proxyQueue1.getQueueSize();
-         }
-      });
-      assertEquals("Queue1 has gone negative,", 0, proxyQueue1.getQueueSize());
-
-      Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisified() throws Exception {
-            return 0 == proxyQueue2.getQueueSize();
-         }
-      });
-      assertEquals("Queue2 has gone negative,", 0, proxyQueue2.getQueueSize());
-      proxyConnection.close();
-
-   }
-
-   private QueueViewMBean getProxyToQueueViewMBean(Queue queue) throws MalformedObjectNameException, JMSException {
-      final String prefix = "org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=";
-
-      ObjectName queueViewMBeanName = new ObjectName(prefix + queue.getQueueName());
-      QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext().newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
-
-      return proxy;
-   }
-
-   protected Connection getConsumerConnection(ConnectionFactory fac) throws JMSException {
-      Connection connection = fac.createConnection();
-      connection.start();
-      return connection;
-   }
-
-   @Override
-   protected void setUp() throws Exception {
-      if (broker == null) {
-         broker = createBroker();
-      }
-      super.setUp();
-   }
-
-   @Override
-   protected void tearDown() throws Exception {
-      super.tearDown();
-      if (broker != null) {
-         broker.stop();
-         broker.waitUntilStopped();
-      }
-   }
-
-   protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
-      ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(bindAddress);
-      Properties props = new Properties();
-      props.setProperty("prefetchPolicy.durableTopicPrefetch", "" + PREFETCH_SIZE);
-      props.setProperty("prefetchPolicy.optimizeDurableTopicPrefetch", "" + PREFETCH_SIZE);
-      props.setProperty("prefetchPolicy.queuePrefetch", "" + PREFETCH_SIZE);
-      cf.setProperties(props);
-      return cf;
-   }
-
-   protected BrokerService createBroker() throws Exception {
-      BrokerService answer = new BrokerService();
-      configureBroker(answer);
-      answer.start();
-      answer.waitUntilStarted();
-      bindAddress = answer.getTransportConnectors().get(0).getConnectUri().toString();
-      return answer;
-   }
-
-   protected void configureBroker(BrokerService answer) throws Exception {
-      PolicyEntry policy = new PolicyEntry();
-      policy.setMemoryLimit(QUEUE_MEMORY_LIMIT);
-      policy.setPendingQueuePolicy(new StorePendingQueueMessageStoragePolicy());
-
-      // disable the cache to be sure setBatch is the problem
-      // will get lots of duplicates
-      // real problem is sync between cursor and store add - leads to out or order messages
-      // in the cursor so setBatch can break.
-      // policy.setUseCache(false);
-
-      PolicyMap pMap = new PolicyMap();
-      pMap.setDefaultEntry(policy);
-      answer.setDestinationPolicy(pMap);
-      answer.setDeleteAllMessagesOnStartup(true);
-      answer.addConnector("tcp://localhost:0");
-
-      MemoryUsage memoryUsage = new MemoryUsage();
-      memoryUsage.setLimit(MEMORY_USAGE);
-      memoryUsage.setPercentUsageMinDelta(20);
-
-      TempUsage tempUsage = new TempUsage();
-      tempUsage.setLimit(TEMP_USAGE);
-
-      StoreUsage storeUsage = new StoreUsage();
-      storeUsage.setLimit(STORE_USAGE);
-
-      SystemUsage systemUsage = new SystemUsage();
-      systemUsage.setMemoryUsage(memoryUsage);
-      systemUsage.setTempUsage(tempUsage);
-      systemUsage.setStoreUsage(storeUsage);
-      answer.setSystemUsage(systemUsage);
-   }
-
-   /**
-    * Message listener that is given the Session for transacted consumers
-    */
-   class SessionAwareMessageListener implements MessageListener {
-
-      private final List<Message> consumerList;
-      private final CountDownLatch latch;
-      private final Session consumerSession;
-      private Session producerSession;
-      private MessageProducer producer;
-
-      public SessionAwareMessageListener(Session consumerSession, CountDownLatch latch, List<Message> consumerList) {
-         this(null, consumerSession, null, latch, consumerList);
-      }
-
-      public SessionAwareMessageListener(Connection producerConnection,
-                                         Session consumerSession,
-                                         String outQueueName,
-                                         CountDownLatch latch,
-                                         List<Message> consumerList) {
-         this.consumerList = consumerList;
-         this.latch = latch;
-         this.consumerSession = consumerSession;
-
-         if (producerConnection != null) {
-            try {
-               producerSession = producerConnection.createSession(TRANSACTED, Session.AUTO_ACKNOWLEDGE);
-               Destination queue = producerSession.createQueue(outQueueName);
-               producer = producerSession.createProducer(queue);
-            }
-            catch (JMSException e) {
-               e.printStackTrace();
-            }
-         }
-      }
-
-      @Override
-      public void onMessage(Message msg) {
-         try {
-            if (producer == null) {
-               // sleep to act as a slow consumer
-               // which will force a mix of direct and polled dispatching
-               // using the cursor on the broker
-               Thread.sleep(50);
-            }
-            else {
-               producer.send(msg);
-               if (TRANSACTED)
-                  producerSession.commit();
-            }
-         }
-         catch (Exception e) {
-            e.printStackTrace();
-         }
-
-         synchronized (consumerList) {
-            consumerList.add(msg);
-            if (DEBUG && consumerList.size() % 100 == 0) {
-               int index = consumerList.size() / 100;
-               System.out.print(index - ((index / 10) * 10));
-            }
-            if (consumerList.size() == MESSAGE_COUNT) {
-               latch.countDown();
-            }
-         }
-         if (TRANSACTED) {
-            try {
-               consumerSession.commit();
-            }
-            catch (JMSException e) {
-               e.printStackTrace();
-            }
-         }
-      }
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b036fc2c/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeQueueTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeQueueTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeQueueTest.java
deleted file mode 100644
index 2d8adb5..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeQueueTest.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.broker.virtual;
-
-import java.net.URI;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.EmbeddedBrokerTestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.spring.ConsumerBean;
-import org.apache.activemq.xbean.XBeanBrokerFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- *
- *
- */
-public class CompositeQueueTest extends EmbeddedBrokerTestSupport {
-
-   private static final Logger LOG = LoggerFactory.getLogger(CompositeQueueTest.class);
-
-   protected int total = 10;
-   protected Connection connection;
-   public String messageSelector1, messageSelector2 = null;
-
-   public void testVirtualTopicCreation() throws Exception {
-      if (connection == null) {
-         connection = createConnection();
-      }
-      connection.start();
-
-      ConsumerBean messageList1 = new ConsumerBean();
-      ConsumerBean messageList2 = new ConsumerBean();
-      messageList1.setVerbose(true);
-      messageList2.setVerbose(true);
-
-      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-      Destination producerDestination = getProducerDestination();
-      Destination destination1 = getConsumer1Dsetination();
-      Destination destination2 = getConsumer2Dsetination();
-
-      LOG.info("Sending to: " + producerDestination);
-      LOG.info("Consuming from: " + destination1 + " and " + destination2);
-
-      MessageConsumer c1 = session.createConsumer(destination1, messageSelector1);
-      MessageConsumer c2 = session.createConsumer(destination2, messageSelector2);
-
-      c1.setMessageListener(messageList1);
-      c2.setMessageListener(messageList2);
-
-      // create topic producer
-      MessageProducer producer = session.createProducer(producerDestination);
-      assertNotNull(producer);
-
-      for (int i = 0; i < total; i++) {
-         producer.send(createMessage(session, i));
-      }
-
-      assertMessagesArrived(messageList1, messageList2);
-   }
-
-   protected void assertMessagesArrived(ConsumerBean messageList1, ConsumerBean messageList2) {
-      messageList1.assertMessagesArrived(total);
-      messageList2.assertMessagesArrived(total);
-   }
-
-   protected TextMessage createMessage(Session session, int i) throws JMSException {
-      TextMessage textMessage = session.createTextMessage("message: " + i);
-      if (i % 2 != 0) {
-         textMessage.setStringProperty("odd", "yes");
-      }
-      else {
-         textMessage.setStringProperty("odd", "no");
-      }
-      textMessage.setIntProperty("i", i);
-      return textMessage;
-   }
-
-   protected Destination getConsumer1Dsetination() {
-      return new ActiveMQQueue("FOO");
-   }
-
-   protected Destination getConsumer2Dsetination() {
-      return new ActiveMQTopic("BAR");
-   }
-
-   protected Destination getProducerDestination() {
-      return new ActiveMQQueue("MY.QUEUE");
-   }
-
-   @Override
-   protected void tearDown() throws Exception {
-      if (connection != null) {
-         connection.close();
-      }
-      super.tearDown();
-   }
-
-   @Override
-   protected BrokerService createBroker() throws Exception {
-      XBeanBrokerFactory factory = new XBeanBrokerFactory();
-      BrokerService answer = factory.createBroker(new URI(getBrokerConfigUri()));
-      return answer;
-   }
-
-   protected String getBrokerConfigUri() {
-      return "org/apache/activemq/broker/virtual/composite-queue.xml";
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b036fc2c/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeTopicTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeTopicTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeTopicTest.java
deleted file mode 100644
index 9ada103..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeTopicTest.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.broker.virtual;
-
-import javax.jms.Destination;
-
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
-
-/**
- *
- *
- */
-public class CompositeTopicTest extends CompositeQueueTest {
-
-   @Override
-   protected Destination getConsumer1Dsetination() {
-      return new ActiveMQQueue("FOO");
-   }
-
-   @Override
-   protected Destination getConsumer2Dsetination() {
-      return new ActiveMQTopic("BAR");
-   }
-
-   @Override
-   protected Destination getProducerDestination() {
-      return new ActiveMQTopic("MY.TOPIC");
-   }
-
-   @Override
-   protected String getBrokerConfigUri() {
-      return "org/apache/activemq/broker/virtual/composite-topic.xml";
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b036fc2c/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/DestinationInterceptorDurableSubTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/DestinationInterceptorDurableSubTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/DestinationInterceptorDurableSubTest.java
deleted file mode 100644
index 39e9d2a..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/DestinationInterceptorDurableSubTest.java
+++ /dev/null
@@ -1,283 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.broker.virtual;
-
-import java.io.IOException;
-import java.net.URI;
-
-import javax.jms.Connection;
-import javax.jms.Session;
-import javax.jms.Topic;
-import javax.jms.TopicSubscriber;
-import javax.management.InstanceNotFoundException;
-import javax.management.MBeanServerConnection;
-import javax.management.ObjectInstance;
-import javax.management.ObjectName;
-import javax.management.remote.JMXConnector;
-import javax.management.remote.JMXConnectorFactory;
-import javax.management.remote.JMXServiceURL;
-
-import org.apache.activemq.EmbeddedBrokerTestSupport;
-
-import org.apache.activemq.broker.Broker;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.broker.ProducerBrokerExchange;
-import org.apache.activemq.broker.region.Destination;
-import org.apache.activemq.broker.region.DestinationFilter;
-import org.apache.activemq.broker.region.DestinationInterceptor;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.xbean.XBeanBrokerFactory;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Test for AMQ-4571.
- * checks that durable subscription is fully unregistered
- * when using nested destination interceptors.
- */
-public class DestinationInterceptorDurableSubTest extends EmbeddedBrokerTestSupport {
-
-   private static final transient Logger LOG = LoggerFactory.getLogger(DestinationInterceptorDurableSubTest.class);
-   private MBeanServerConnection mbsc = null;
-   public static final String JMX_CONTEXT_BASE_NAME = "org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Topic,destinationName=";
-
-   /**
-    * Tests AMQ-4571.
-    *
-    * @throws Exception
-    */
-   public void testVirtualTopicRemoval() throws Exception {
-
-      LOG.debug("Running testVirtualTopicRemoval()");
-      String clientId1 = "myId1";
-      String clientId2 = "myId2";
-
-      Connection conn = null;
-      Session session = null;
-
-      try {
-         assertTrue(broker.isStarted());
-
-         // create durable sub 1
-         conn = createConnection();
-         conn.setClientID(clientId1);
-         conn.start();
-         session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         // Topic topic = session.createTopic(destination.getPhysicalName());
-         TopicSubscriber sub1 = session.createDurableSubscriber((Topic) destination, clientId1);
-
-         // create durable sub 2
-         TopicSubscriber sub2 = session.createDurableSubscriber((Topic) destination, clientId2);
-
-         // verify two subs registered in JMX
-         assertSubscriptionCount(destination.getPhysicalName(), 2);
-         assertTrue(isSubRegisteredInJmx(destination.getPhysicalName(), clientId1));
-         assertTrue(isSubRegisteredInJmx(destination.getPhysicalName(), clientId2));
-
-         // delete sub 1
-         sub1.close();
-         session.unsubscribe(clientId1);
-
-         // verify only one sub registered in JMX
-         assertSubscriptionCount(destination.getPhysicalName(), 1);
-         assertFalse(isSubRegisteredInJmx(destination.getPhysicalName(), clientId1));
-         assertTrue(isSubRegisteredInJmx(destination.getPhysicalName(), clientId2));
-
-         // delete sub 2
-         sub2.close();
-         session.unsubscribe(clientId2);
-
-         // verify no sub registered in JMX
-         assertSubscriptionCount(destination.getPhysicalName(), 0);
-         assertFalse(isSubRegisteredInJmx(destination.getPhysicalName(), clientId1));
-         assertFalse(isSubRegisteredInJmx(destination.getPhysicalName(), clientId2));
-      }
-      finally {
-         session.close();
-         conn.close();
-      }
-   }
-
-   /**
-    * Connects to broker using JMX
-    *
-    * @return The JMX connection
-    * @throws IOException in case of any errors
-    */
-   protected MBeanServerConnection connectJMXBroker() throws IOException {
-      // connect to broker via JMX
-      JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://:1299/jmxrmi");
-      JMXConnector jmxc = JMXConnectorFactory.connect(url, null);
-      MBeanServerConnection mbsc = jmxc.getMBeanServerConnection();
-      LOG.debug("JMX connection established");
-      return mbsc;
-   }
-
-   /**
-    * Asserts that the Subscriptions JMX attribute of a topic has the expected
-    * count.
-    *
-    * @param topicName     name of the topic destination
-    * @param expectedCount expected number of subscriptions
-    * @return
-    */
-   protected boolean assertSubscriptionCount(String topicName, int expectedCount) {
-      try {
-         if (mbsc == null) {
-            mbsc = connectJMXBroker();
-         }
-         // query broker queue size
-         ObjectName[] tmp = (ObjectName[]) mbsc.getAttribute(new ObjectName(JMX_CONTEXT_BASE_NAME + topicName), "Subscriptions");
-         assertEquals(expectedCount, tmp.length);
-      }
-      catch (Exception ex) {
-         LOG.error(ex.getMessage());
-         return false;
-      }
-      return true;
-   }
-
-   /**
-    * Checks if a subscriptions for topic topicName with subName is registered in JMX
-    *
-    * @param topicName physical name of topic destination (excluding prefix 'topic://')
-    * @param subName   name of the durable subscription
-    * @return true if registered, false otherwise
-    */
-   protected boolean isSubRegisteredInJmx(String topicName, String subName) {
-
-      try {
-         if (mbsc == null) {
-            mbsc = connectJMXBroker();
-         }
-
-         // A durable sub is registered under the Subscriptions JMX attribute of the topic and
-         // as its own ObjectInstance under the topic's Consumer namespace.
-         // AMQ-4571 only removed the latter not the former on unsubscribe(), so we need
-         // to check against both.
-         ObjectName[] names = (ObjectName[]) mbsc.getAttribute(new ObjectName(JMX_CONTEXT_BASE_NAME + topicName), "Subscriptions");
-         ObjectInstance instance = mbsc.getObjectInstance(new ObjectName(JMX_CONTEXT_BASE_NAME +
-                                                                                             topicName +
-                                                                                             ",endpoint=Consumer,clientId=myId1,consumerId=Durable(myId1_" +
-                                                                                             subName +
-                                                                                             ")"));
-
-         if (instance == null)
-            return false;
-
-         for (int i = 0; i < names.length; i++) {
-            if (names[i].toString().contains(subName))
-               return true;
-         }
-      }
-      catch (InstanceNotFoundException ine) {
-         //this may be expected so log at info level
-         LOG.info(ine.toString());
-         return false;
-      }
-      catch (Exception ex) {
-         LOG.error(ex.toString());
-         return false;
-      }
-      return false;
-   }
-
-   @Override
-   protected void tearDown() throws Exception {
-      super.tearDown();
-   }
-
-   @Override
-   protected BrokerService createBroker() throws Exception {
-      XBeanBrokerFactory factory = new XBeanBrokerFactory();
-      BrokerService answer = factory.createBroker(new URI(getBrokerConfigUri()));
-
-      // lets disable persistence as we are a test
-      answer.setPersistent(false);
-      useTopic = true;
-      return answer;
-   }
-
-   protected String getBrokerConfigUri() {
-      return "org/apache/activemq/broker/virtual/virtual-topics-and-interceptor.xml";
-   }
-
-   /**
-    * Simple but custom topic interceptor.
-    * To be used for testing nested interceptors in conjunction with
-    * virtual topic interceptor.
-    */
-   public static class SimpleDestinationInterceptor implements DestinationInterceptor {
-
-      private final Logger LOG = LoggerFactory.getLogger(SimpleDestinationInterceptor.class);
-      private BrokerService broker;
-
-      public SimpleDestinationInterceptor() {
-      }
-
-      /* (non-Javadoc)
-       * @see org.apache.activemq.broker.BrokerServiceAware#setBrokerService(org.apache.activemq.broker.BrokerService)
-       */
-      public void setBrokerService(BrokerService brokerService) {
-         LOG.info("setBrokerService()");
-         this.broker = brokerService;
-      }
-
-      /* (non-Javadoc)
-       * @see org.apache.activemq.broker.region.DestinationInterceptor#intercept(org.apache.activemq.broker.region.Destination)
-       */
-      @Override
-      public Destination intercept(final Destination destination) {
-         LOG.info("intercept({})", destination.getName());
-
-         if (!destination.getActiveMQDestination().getPhysicalName().startsWith("ActiveMQ")) {
-            return new DestinationFilter(destination) {
-               @Override
-               public void send(ProducerBrokerExchange context, Message message) throws Exception {
-                  // Send message to Destination
-                  if (LOG.isDebugEnabled()) {
-                     LOG.debug("SimpleDestinationInterceptor: Sending message to destination:" + this.getActiveMQDestination().getPhysicalName());
-                  }
-                  // message.setDestination(destination.getActiveMQDestination());
-                  super.send(context, message);
-               }
-            };
-         }
-         return destination;
-      }
-
-      /* (non-Javadoc)
-       * @see org.apache.activemq.broker.region.DestinationInterceptor#remove(org.apache.activemq.broker.region.Destination)
-       */
-      @Override
-      public void remove(Destination destination) {
-         LOG.info("remove({})", destination.getName());
-         this.broker = null;
-      }
-
-      /* (non-Javadoc)
-       * @see org.apache.activemq.broker.region.DestinationInterceptor#create(org.apache.activemq.broker.Broker, org.apache.activemq.broker.ConnectionContext, org.apache.activemq.command.ActiveMQDestination)
-       */
-      @Override
-      public void create(Broker broker, ConnectionContext context, ActiveMQDestination destination) throws Exception {
-         LOG.info("create(" + broker.getBrokerName() + ", " + context.toString() + ", " + destination.getPhysicalName());
-      }
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b036fc2c/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/FilteredQueueTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/FilteredQueueTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/FilteredQueueTest.java
deleted file mode 100644
index e91ae4b..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/FilteredQueueTest.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.broker.virtual;
-
-import org.apache.activemq.spring.ConsumerBean;
-
-/**
- *
- */
-public class FilteredQueueTest extends CompositeQueueTest {
-
-   @Override
-   protected String getBrokerConfigUri() {
-      return "org/apache/activemq/broker/virtual/filtered-queue.xml";
-   }
-
-   @Override
-   protected void assertMessagesArrived(ConsumerBean messageList1, ConsumerBean messageList2) {
-      messageList1.assertMessagesArrived(total / 2);
-      messageList2.assertMessagesArrived(1);
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b036fc2c/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueCorrectMemoryUsageTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueCorrectMemoryUsageTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueCorrectMemoryUsageTest.java
deleted file mode 100644
index 5ca00b7..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueCorrectMemoryUsageTest.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.broker.virtual;
-
-import java.util.Arrays;
-import java.util.LinkedList;
-import java.util.List;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.activemq.EmbeddedBrokerTestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.DestinationInterceptor;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.broker.region.virtual.MirroredQueue;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
-import org.apache.activemq.usage.MemoryUsage;
-import org.apache.activemq.usage.StoreUsage;
-import org.apache.activemq.usage.SystemUsage;
-import org.apache.activemq.usage.TempUsage;
-import org.apache.activemq.util.IOHelper;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.util.Assert;
-
-/**
- * This test will determine that the producer flow control does not kick in.
- * The original MirroredQueue implementation was causing the queue to update
- * the topic memory usage instead of the queue memory usage.
- * The reason is that the message memory usage instance will not be updated
- * unless it is null.  This was the case when the message was initially sent
- * to the topic but then it was non-null when it was being sent to the queue.
- * When the region destination was set, the associated memory usage was not
- * updated to the passed queue destination and thus the memory usage of the
- * topic was being updated instead.
- *
- * @author Claudio Corsi
- */
-public class MirroredQueueCorrectMemoryUsageTest extends EmbeddedBrokerTestSupport {
-
-   private static final Logger logger = LoggerFactory.getLogger(MirroredQueueCorrectMemoryUsageTest.class);
-
-   private static final long ONE_MB = 0x0100000;
-   private static final long TEN_MB = ONE_MB * 10;
-   private static final long TWENTY_MB = TEN_MB * 2;
-
-   private static final String CREATED_STATIC_FOR_PERSISTENT = "created.static.for.persistent";
-
-   @Override
-   protected boolean isPersistent() {
-      return true;
-   }
-
-   @Override
-   protected BrokerService createBroker() throws Exception {
-      // Create the broker service instance....
-      BrokerService broker = super.createBroker();
-      // Create and add the mirrored queue destination interceptor ....
-      DestinationInterceptor[] destinationInterceptors = new DestinationInterceptor[1];
-      MirroredQueue mq = new MirroredQueue();
-      mq.setCopyMessage(true);
-      mq.setPrefix("");
-      mq.setPostfix(".qmirror");
-      destinationInterceptors[0] = mq;
-      broker.setDestinationInterceptors(destinationInterceptors);
-      // Create the destination policy for the topics and queues
-      PolicyMap policyMap = new PolicyMap();
-      List<PolicyEntry> entries = new LinkedList<>();
-      // Create Topic policy entry
-      PolicyEntry policyEntry = new PolicyEntry();
-      super.useTopic = true;
-      ActiveMQDestination destination = super.createDestination(">");
-      Assert.isTrue(destination.isTopic(), "Created destination was not a topic");
-      policyEntry.setDestination(destination);
-      policyEntry.setProducerFlowControl(true);
-      policyEntry.setMemoryLimit(ONE_MB); // x10
-      entries.add(policyEntry);
-      // Create Queue policy entry
-      policyEntry = new PolicyEntry();
-      super.useTopic = false;
-      destination = super.createDestination(CREATED_STATIC_FOR_PERSISTENT);
-      Assert.isTrue(destination.isQueue(), "Created destination was not a queue");
-      policyEntry.setDestination(destination);
-      policyEntry.setProducerFlowControl(true);
-      policyEntry.setMemoryLimit(TEN_MB);
-      entries.add(policyEntry);
-      policyMap.setPolicyEntries(entries);
-      broker.setDestinationPolicy(policyMap);
-      // Set destinations
-      broker.setDestinations(new ActiveMQDestination[]{destination});
-      // Set system usage
-      SystemUsage memoryManager = new SystemUsage();
-      MemoryUsage memoryUsage = new MemoryUsage();
-      memoryUsage.setLimit(TEN_MB);
-      memoryManager.setMemoryUsage(memoryUsage);
-      StoreUsage storeUsage = new StoreUsage();
-      storeUsage.setLimit(TWENTY_MB);
-      memoryManager.setStoreUsage(storeUsage);
-      TempUsage tempDiskUsage = new TempUsage();
-      tempDiskUsage.setLimit(TEN_MB);
-      memoryManager.setTempUsage(tempDiskUsage);
-      broker.setSystemUsage(memoryManager);
-      // Set the persistent adapter
-      KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter();
-      persistenceAdapter.setJournalMaxFileLength((int) TEN_MB);
-      // Delete all current messages...
-      IOHelper.deleteFile(persistenceAdapter.getDirectory());
-      broker.setPersistenceAdapter(persistenceAdapter);
-      return broker;
-   }
-
-   @Override
-   @Before
-   protected void setUp() throws Exception {
-      super.setUp();
-   }
-
-   @Override
-   @After
-   protected void tearDown() throws Exception {
-      super.tearDown();
-   }
-
-   @Test(timeout = 40000)
-   public void testNoMemoryUsageIncreaseForTopic() throws Exception {
-      Connection connection = super.createConnection();
-      connection.start();
-      Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-      Destination destination = session.createQueue(CREATED_STATIC_FOR_PERSISTENT);
-      MessageProducer producer = session.createProducer(destination);
-      producer.setDeliveryMode(DeliveryMode.PERSISTENT);
-      char[] m = new char[1024];
-      Arrays.fill(m, 'x');
-      // create some messages that have 1k each
-      for (int i = 1; i < 12000; i++) {
-         producer.send(session.createTextMessage(new String(m)));
-         logger.debug("Sent message: " + i);
-      }
-      producer.close();
-      session.close();
-      connection.stop();
-      connection.close();
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b036fc2c/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueTest.java
deleted file mode 100644
index 127f04c..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueTest.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.broker.virtual;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TemporaryQueue;
-
-import org.apache.activemq.EmbeddedBrokerTestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.RegionBroker;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.spring.ConsumerBean;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- *
- */
-public class MirroredQueueTest extends EmbeddedBrokerTestSupport {
-
-   private static final transient Logger LOG = LoggerFactory.getLogger(MirroredQueueTest.class);
-   private Connection connection;
-
-   public void testSendingToQueueIsMirrored() throws Exception {
-      if (connection == null) {
-         connection = createConnection();
-      }
-      connection.start();
-
-      ConsumerBean messageList = new ConsumerBean();
-      messageList.setVerbose(true);
-
-      Destination consumeDestination = createConsumeDestination();
-
-      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      LOG.info("Consuming from: " + consumeDestination);
-
-      MessageConsumer c1 = session.createConsumer(consumeDestination);
-      c1.setMessageListener(messageList);
-
-      // create topic producer
-      ActiveMQQueue sendDestination = new ActiveMQQueue(getQueueName());
-      LOG.info("Sending to: " + sendDestination);
-
-      MessageProducer producer = session.createProducer(sendDestination);
-      assertNotNull(producer);
-
-      int total = 10;
-      for (int i = 0; i < total; i++) {
-         producer.send(session.createTextMessage("message: " + i));
-      }
-
-      ///Thread.sleep(1000000);
-
-      messageList.assertMessagesArrived(total);
-
-      LOG.info("Received: " + messageList);
-   }
-
-   public void testTempMirroredQueuesClearDown() throws Exception {
-      if (connection == null) {
-         connection = createConnection();
-      }
-      connection.start();
-      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      TemporaryQueue tempQueue = session.createTemporaryQueue();
-      RegionBroker rb = (RegionBroker) broker.getBroker().getAdaptor(RegionBroker.class);
-      assertTrue(rb.getDestinationMap().size() == 5);
-      tempQueue.delete();
-      assertTrue(rb.getDestinationMap().size() == 4);
-   }
-
-   protected Destination createConsumeDestination() {
-      return new ActiveMQTopic("VirtualTopic.Mirror." + getQueueName());
-   }
-
-   protected String getQueueName() {
-      return "My.Queue";
-   }
-
-   @Override
-   protected BrokerService createBroker() throws Exception {
-      BrokerService answer = new BrokerService();
-      answer.setUseMirroredQueues(true);
-      answer.setPersistent(isPersistent());
-      answer.addConnector(bindAddress);
-      return answer;
-   }
-
-   @Override
-   protected void tearDown() throws Exception {
-      if (connection != null) {
-         connection.close();
-      }
-      super.tearDown();
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b036fc2c/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueUsingVirtualTopicQueueTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueUsingVirtualTopicQueueTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueUsingVirtualTopicQueueTest.java
deleted file mode 100644
index 6acaad1..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueUsingVirtualTopicQueueTest.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.broker.virtual;
-
-import javax.jms.Destination;
-
-import org.apache.activemq.command.ActiveMQQueue;
-
-/**
- *
- *
- */
-public class MirroredQueueUsingVirtualTopicQueueTest extends MirroredQueueTest {
-
-   @Override
-   protected Destination createConsumeDestination() {
-      String queueName = "Consumer.A.VirtualTopic.Mirror." + getQueueName();
-      return new ActiveMQQueue(queueName);
-   }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b036fc2c/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualDestPerfTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualDestPerfTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualDestPerfTest.java
deleted file mode 100644
index 85e14c7..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualDestPerfTest.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.broker.virtual;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.LinkedHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.management.ObjectName;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.jmx.QueueViewMBean;
-import org.apache.activemq.broker.region.DestinationInterceptor;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.broker.region.virtual.CompositeTopic;
-import org.apache.activemq.broker.region.virtual.VirtualDestination;
-import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
-import org.apache.activemq.command.ActiveMQBytesMessage;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
-import org.apache.activemq.util.ByteSequence;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class VirtualDestPerfTest {
-
-   private static final Logger LOG = LoggerFactory.getLogger(VirtualDestPerfTest.class);
-   public int messageSize = 5 * 1024;
-   public int messageCount = 10000;
-   ActiveMQTopic target = new ActiveMQTopic("target");
-   BrokerService brokerService;
-   ActiveMQConnectionFactory connectionFactory;
-
-   @Test
-   @Ignore("comparison test - 'new' no wait on future with async send broker side is always on")
-   public void testAsyncSendBurstToFillCache() throws Exception {
-      startBroker(4, true, true);
-      connectionFactory.setUseAsyncSend(true);
-
-      // a burst of messages to fill the cache
-      messageCount = 22000;
-      messageSize = 10 * 1024;
-
-      LinkedHashMap<Integer, Long> results = new LinkedHashMap<>();
-
-      final ActiveMQQueue queue = new ActiveMQQueue("targetQ");
-      for (Integer numThreads : new Integer[]{1, 2}) {
-         ExecutorService executor = Executors.newFixedThreadPool(numThreads);
-         final AtomicLong numMessagesToSend = new AtomicLong(messageCount);
-         purge();
-         long startTime = System.currentTimeMillis();
-         for (int i = 0; i < numThreads; i++) {
-            executor.execute(new Runnable() {
-               @Override
-               public void run() {
-                  try {
-                     produceMessages(numMessagesToSend, queue);
-                  }
-                  catch (Exception e) {
-                     e.printStackTrace();
-                  }
-               }
-            });
-         }
-         executor.shutdown();
-         executor.awaitTermination(5, TimeUnit.MINUTES);
-         long endTime = System.currentTimeMillis();
-         long seconds = (endTime - startTime) / 1000;
-         LOG.info("For numThreads {} duration {}", numThreads.intValue(), seconds);
-         results.put(numThreads, seconds);
-         LOG.info("Broker got {} messages", brokerService.getAdminView().getTotalEnqueueCount());
-      }
-
-      brokerService.stop();
-      brokerService.waitUntilStopped();
-      LOG.info("Results: {}", results);
-   }
-
-   private void purge() throws Exception {
-      ObjectName[] queues = brokerService.getAdminView().getQueues();
-      if (queues.length == 1) {
-         QueueViewMBean queueViewMBean = (QueueViewMBean) brokerService.getManagementContext().newProxyInstance(queues[0], QueueViewMBean.class, false);
-         queueViewMBean.purge();
-      }
-   }
-
-   @Test
-   @Ignore("comparison test - takes too long and really needs a peek at the graph")
-   public void testPerf() throws Exception {
-      LinkedHashMap<Integer, Long> resultsT = new LinkedHashMap<>();
-      LinkedHashMap<Integer, Long> resultsF = new LinkedHashMap<>();
-
-      for (int i = 2; i < 11; i++) {
-         for (Boolean concurrent : new Boolean[]{true, false}) {
-            startBroker(i, concurrent, false);
-
-            long startTime = System.currentTimeMillis();
-            produceMessages(new AtomicLong(messageCount), target);
-            long endTime = System.currentTimeMillis();
-            long seconds = (endTime - startTime) / 1000;
-            LOG.info("For routes {} duration {}", i, seconds);
-            if (concurrent) {
-               resultsT.put(i, seconds);
-            }
-            else {
-               resultsF.put(i, seconds);
-            }
-            brokerService.stop();
-            brokerService.waitUntilStopped();
-         }
-      }
-      LOG.info("results T{} F{}", resultsT, resultsF);
-      LOG.info("http://www.chartgo.com/samples.do?chart=line&border=1&show3d=0&width=600&height=500&roundedge=1&transparency=1&legend=1&title=Send:10k::Concurrent-v-Serial&xtitle=routes&ytitle=Duration(seconds)&chrtbkgndcolor=white&threshold=0.0&lang=en" + "&xaxis1=" + toStr(resultsT.keySet()) + "&yaxis1=" + toStr(resultsT.values()) + "&group1=concurrent" + "&xaxis2=" + toStr(resultsF.keySet()) + "&yaxis2=" + toStr(resultsF.values()) + "&group2=serial" + "&from=linejsp");
-   }
-
-   private String toStr(Collection set) {
-      return set.toString().replace(",", "%0D%0A").replace("[", "").replace("]", "").replace(" ", "");
-   }
-
-   protected void produceMessages(AtomicLong messageCount, ActiveMQDestination destination) throws Exception {
-      final ByteSequence payLoad = new ByteSequence(new byte[messageSize]);
-      Connection connection = connectionFactory.createConnection();
-      MessageProducer messageProducer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createProducer(destination);
-      messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
-      ActiveMQBytesMessage message = new ActiveMQBytesMessage();
-      message.setContent(payLoad);
-      while (messageCount.decrementAndGet() >= 0) {
-         messageProducer.send(message);
-      }
-      connection.close();
-   }
-
-   private void startBroker(int fanoutCount,
-                            boolean concurrentSend,
-                            boolean concurrentStoreAndDispatchQueues) throws Exception {
-      brokerService = new BrokerService();
-      brokerService.setDeleteAllMessagesOnStartup(true);
-      brokerService.setUseVirtualTopics(true);
-      brokerService.addConnector("tcp://0.0.0.0:0");
-      brokerService.setAdvisorySupport(false);
-      PolicyMap destPolicyMap = new PolicyMap();
-      PolicyEntry defaultEntry = new PolicyEntry();
-      defaultEntry.setExpireMessagesPeriod(0);
-      defaultEntry.setOptimizedDispatch(true);
-      defaultEntry.setCursorMemoryHighWaterMark(110);
-      destPolicyMap.setDefaultEntry(defaultEntry);
-      brokerService.setDestinationPolicy(destPolicyMap);
-
-      CompositeTopic route = new CompositeTopic();
-      route.setName("target");
-      route.setForwardOnly(true);
-      route.setConcurrentSend(concurrentSend);
-      Collection<ActiveMQQueue> routes = new ArrayList<>();
-      for (int i = 0; i < fanoutCount; i++) {
-         routes.add(new ActiveMQQueue("route." + i));
-      }
-      route.setForwardTo(routes);
-      VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor();
-      interceptor.setVirtualDestinations(new VirtualDestination[]{route});
-      brokerService.setDestinationInterceptors(new DestinationInterceptor[]{interceptor});
-      brokerService.start();
-
-      connectionFactory = new ActiveMQConnectionFactory(brokerService.getTransportConnectors().get(0).getPublishableConnectString());
-      connectionFactory.setWatchTopicAdvisories(false);
-      if (brokerService.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) {
-
-         //with parallel sends and no consumers, concurrentStoreAnd dispatch, which uses a single thread by default
-         // will stop/impeed write batching. The num threads will need tweaking when consumers are in the mix but may introduce
-         // order issues
-         ((KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter()).setConcurrentStoreAndDispatchQueues(concurrentStoreAndDispatchQueues);
-      }
-   }
-}


Mime
View raw message