activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [26/52] [abbrv] [partial] activemq-artemis git commit: ARTEMIS-127 Adding activemq unit test module to Artemis
Date Tue, 09 Jun 2015 16:36:57 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueCorrectMemoryUsageTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueCorrectMemoryUsageTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueCorrectMemoryUsageTest.java
new file mode 100644
index 0000000..4c19c7b
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueCorrectMemoryUsageTest.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.virtual;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.DestinationInterceptor;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.virtual.MirroredQueue;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.usage.MemoryUsage;
+import org.apache.activemq.usage.StoreUsage;
+import org.apache.activemq.usage.SystemUsage;
+import org.apache.activemq.usage.TempUsage;
+import org.apache.activemq.util.IOHelper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.Assert;
+
+/**
+ * This test will determine that the producer flow control does not kick in.
+ * The original MirroredQueue implementation was causing the queue to update
+ * the topic memory usage instead of the queue memory usage.
+ * The reason is that the message memory usage instance will not be updated
+ * unless it is null.  This was the case when the message was initially sent
+ * to the topic but then it was non-null when it was being sent to the queue.
+ * When the region destination was set, the associated memory usage was not
+ * updated to the passed queue destination and thus the memory usage of the
+ * topic was being updated instead.
+ *
+ * @author Claudio Corsi
+ */
+public class MirroredQueueCorrectMemoryUsageTest extends EmbeddedBrokerTestSupport {
+
+    private static final Logger logger = LoggerFactory.getLogger(MirroredQueueCorrectMemoryUsageTest.class);
+
+    private static final long ONE_MB = 0x0100000;
+    private static final long TEN_MB = ONE_MB * 10;
+    private static final long TWENTY_MB = TEN_MB * 2;
+
+    private static final String CREATED_STATIC_FOR_PERSISTENT = "created.static.for.persistent";
+
+    @Override
+    protected boolean isPersistent() {
+        return true;
+    }
+
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        // Create the broker service instance....
+        BrokerService broker = super.createBroker();
+        // Create and add the mirrored queue destination interceptor ....
+        DestinationInterceptor[] destinationInterceptors = new DestinationInterceptor[1];
+        MirroredQueue mq = new MirroredQueue();
+        mq.setCopyMessage(true);
+        mq.setPrefix("");
+        mq.setPostfix(".qmirror");
+        destinationInterceptors[0] = mq;
+        broker.setDestinationInterceptors(destinationInterceptors);
+        // Create the destination policy for the topics and queues
+        PolicyMap policyMap = new PolicyMap();
+        List<PolicyEntry> entries = new LinkedList<PolicyEntry>();
+        // Create Topic policy entry
+        PolicyEntry policyEntry = new PolicyEntry();
+        super.useTopic = true;
+        ActiveMQDestination destination = super.createDestination(">");
+        Assert.isTrue(destination.isTopic(), "Created destination was not a topic");
+        policyEntry.setDestination(destination);
+        policyEntry.setProducerFlowControl(true);
+        policyEntry.setMemoryLimit(ONE_MB); // x10
+        entries.add(policyEntry);
+        // Create Queue policy entry
+        policyEntry = new PolicyEntry();
+        super.useTopic = false;
+        destination = super.createDestination(CREATED_STATIC_FOR_PERSISTENT);
+        Assert.isTrue(destination.isQueue(), "Created destination was not a queue");
+        policyEntry.setDestination(destination);
+        policyEntry.setProducerFlowControl(true);
+        policyEntry.setMemoryLimit(TEN_MB);
+        entries.add(policyEntry);
+        policyMap.setPolicyEntries(entries);
+        broker.setDestinationPolicy(policyMap);
+        // Set destinations
+        broker.setDestinations(new ActiveMQDestination[] { destination });
+        // Set system usage
+        SystemUsage memoryManager = new SystemUsage();
+        MemoryUsage memoryUsage = new MemoryUsage();
+        memoryUsage.setLimit(TEN_MB);
+        memoryManager.setMemoryUsage(memoryUsage);
+        StoreUsage storeUsage = new StoreUsage();
+        storeUsage.setLimit(TWENTY_MB);
+        memoryManager.setStoreUsage(storeUsage);
+        TempUsage tempDiskUsage = new TempUsage();
+        tempDiskUsage.setLimit(TEN_MB);
+        memoryManager.setTempUsage(tempDiskUsage);
+        broker.setSystemUsage(memoryManager);
+        // Set the persistent adapter
+        KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter();
+        persistenceAdapter.setJournalMaxFileLength((int)TEN_MB);
+        // Delete all current messages...
+        IOHelper.deleteFile(persistenceAdapter.getDirectory());
+        broker.setPersistenceAdapter(persistenceAdapter);
+        return broker;
+    }
+
+    @Before
+    protected void setUp() throws Exception {
+        super.setUp();
+    }
+
+    @After
+    protected void tearDown() throws Exception {
+        super.tearDown();
+    }
+
+    @Test(timeout=40000)
+    public void testNoMemoryUsageIncreaseForTopic() throws Exception {
+        Connection connection = super.createConnection();
+        connection.start();
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        Destination destination = session.createQueue(CREATED_STATIC_FOR_PERSISTENT);
+        MessageProducer producer = session.createProducer(destination);
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+            char[] m = new char[1024];
+            Arrays.fill(m, 'x');
+            // create some messages that have 1k each
+            for (int i = 1; i < 12000; i++) {
+                 producer.send(session.createTextMessage(new String(m)));
+                 logger.debug("Sent message: " + i);
+            }
+            producer.close();
+            session.close();
+            connection.stop();
+            connection.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueTest.java
new file mode 100644
index 0000000..acbe3b9
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueTest.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.virtual;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.spring.ConsumerBean;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class MirroredQueueTest extends EmbeddedBrokerTestSupport {
+    private static final transient Logger LOG = LoggerFactory.getLogger(MirroredQueueTest.class);
+    private Connection connection;
+
+    public void testSendingToQueueIsMirrored() throws Exception {
+        if (connection == null) {
+            connection = createConnection();
+        }
+        connection.start();
+
+        ConsumerBean messageList = new ConsumerBean();
+        messageList.setVerbose(true);
+
+        Destination consumeDestination = createConsumeDestination();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        LOG.info("Consuming from: " + consumeDestination);
+
+        MessageConsumer c1 = session.createConsumer(consumeDestination);
+        c1.setMessageListener(messageList);
+
+        // create topic producer
+        ActiveMQQueue sendDestination = new ActiveMQQueue(getQueueName());
+        LOG.info("Sending to: " + sendDestination);
+
+        MessageProducer producer = session.createProducer(sendDestination);
+        assertNotNull(producer);
+
+        int total = 10;
+        for (int i = 0; i < total; i++) {
+            producer.send(session.createTextMessage("message: " + i));
+        }
+
+        ///Thread.sleep(1000000);
+
+        messageList.assertMessagesArrived(total);
+
+        LOG.info("Received: " + messageList);
+    }
+
+    public void testTempMirroredQueuesClearDown() throws Exception{
+        if (connection == null) {
+            connection = createConnection();
+        }
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        TemporaryQueue tempQueue = session.createTemporaryQueue();
+        RegionBroker rb = (RegionBroker) broker.getBroker().getAdaptor(
+                RegionBroker.class);
+        assertTrue(rb.getDestinationMap().size()==5);
+        tempQueue.delete();
+        assertTrue(rb.getDestinationMap().size()==4);
+    }
+
+    protected Destination createConsumeDestination() {
+        return new ActiveMQTopic("VirtualTopic.Mirror." + getQueueName());
+    }
+
+    protected String getQueueName() {
+        return "My.Queue";
+    }
+
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        BrokerService answer = new BrokerService();
+        answer.setUseMirroredQueues(true);
+        answer.setPersistent(isPersistent());
+        answer.addConnector(bindAddress);
+        return answer;
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        if (connection != null) {
+            connection.close();
+        }
+        super.tearDown();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueUsingVirtualTopicQueueTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueUsingVirtualTopicQueueTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueUsingVirtualTopicQueueTest.java
new file mode 100644
index 0000000..f8cccf4
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueUsingVirtualTopicQueueTest.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.virtual;
+
+import javax.jms.Destination;
+
+import org.apache.activemq.command.ActiveMQQueue;
+
+/**
+ *
+ * 
+ */
+public class MirroredQueueUsingVirtualTopicQueueTest extends MirroredQueueTest {
+
+    @Override
+    protected Destination createConsumeDestination() {
+        String queueName = "Consumer.A.VirtualTopic.Mirror." + getQueueName();
+        return new ActiveMQQueue(queueName);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualDestPerfTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualDestPerfTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualDestPerfTest.java
new file mode 100644
index 0000000..b822f5d
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualDestPerfTest.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.virtual;
+
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.management.ObjectName;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.region.DestinationInterceptor;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.virtual.CompositeTopic;
+import org.apache.activemq.broker.region.virtual.VirtualDestination;
+import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.util.ByteSequence;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+public class VirtualDestPerfTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(VirtualDestPerfTest.class);
+    public int messageSize = 5*1024;
+    public int messageCount = 10000;
+    ActiveMQTopic target = new ActiveMQTopic("target");
+    BrokerService brokerService;
+    ActiveMQConnectionFactory connectionFactory;
+
+    @Test
+    @Ignore("comparison test - 'new' no wait on future with async send broker side is always on")
+    public void testAsyncSendBurstToFillCache() throws Exception {
+        startBroker(4, true, true);
+        connectionFactory.setUseAsyncSend(true);
+
+        // a burst of messages to fill the cache
+        messageCount = 22000;
+        messageSize = 10*1024;
+
+        LinkedHashMap<Integer, Long> results = new LinkedHashMap<Integer, Long>();
+
+        final ActiveMQQueue queue = new ActiveMQQueue("targetQ");
+        for (Integer numThreads : new Integer[]{1, 2}) {
+            ExecutorService executor = Executors.newFixedThreadPool(numThreads);
+            final AtomicLong numMessagesToSend = new AtomicLong(messageCount);
+            purge();
+            long startTime = System.currentTimeMillis();
+            for (int i=0;i<numThreads;i++) {
+                executor.execute(new Runnable(){
+                    @Override
+                    public void run() {
+                        try {
+                            produceMessages(numMessagesToSend, queue);
+                        } catch (Exception e) {
+                            e.printStackTrace();
+                        }
+                    }
+                });
+            }
+            executor.shutdown();
+            executor.awaitTermination(5, TimeUnit.MINUTES);
+            long endTime = System.currentTimeMillis();
+            long seconds = (endTime - startTime) / 1000;
+            LOG.info("For numThreads {} duration {}", numThreads.intValue(), seconds);
+            results.put(numThreads, seconds);
+            LOG.info("Broker got {} messages", brokerService.getAdminView().getTotalEnqueueCount());
+        }
+
+        brokerService.stop();
+        brokerService.waitUntilStopped();
+        LOG.info("Results: {}", results);
+    }
+
+    private void purge() throws Exception {
+        ObjectName[] queues = brokerService.getAdminView().getQueues();
+        if (queues.length == 1) {
+            QueueViewMBean queueViewMBean = (QueueViewMBean)
+                brokerService.getManagementContext().newProxyInstance(queues[0], QueueViewMBean.class, false);
+            queueViewMBean.purge();
+        }
+    }
+
+    @Test
+    @Ignore("comparison test - takes too long and really needs a peek at the graph")
+    public void testPerf() throws Exception {
+        LinkedHashMap<Integer, Long> resultsT = new LinkedHashMap<Integer, Long>();
+        LinkedHashMap<Integer, Long> resultsF = new LinkedHashMap<Integer, Long>();
+
+        for (int i=2;i<11;i++) {
+            for (Boolean concurrent : new Boolean[]{true, false}) {
+                startBroker(i, concurrent, false);
+
+                long startTime = System.currentTimeMillis();
+                produceMessages(new AtomicLong(messageCount), target);
+                long endTime = System.currentTimeMillis();
+                long seconds = (endTime - startTime) / 1000;
+                LOG.info("For routes {} duration {}", i, seconds);
+                if (concurrent) {
+                    resultsT.put(i, seconds);
+                } else {
+                    resultsF.put(i, seconds);
+                }
+                brokerService.stop();
+                brokerService.waitUntilStopped();
+            }
+        }
+        LOG.info("results T{} F{}", resultsT, resultsF);
+        LOG.info("http://www.chartgo.com/samples.do?chart=line&border=1&show3d=0&width=600&height=500&roundedge=1&transparency=1&legend=1&title=Send:10k::Concurrent-v-Serial&xtitle=routes&ytitle=Duration(seconds)&chrtbkgndcolor=white&threshold=0.0&lang=en"
+                + "&xaxis1=" + toStr(resultsT.keySet())
+                + "&yaxis1=" + toStr(resultsT.values())
+                + "&group1=concurrent"
+                + "&xaxis2=" + toStr(resultsF.keySet())
+                + "&yaxis2=" + toStr(resultsF.values())
+                + "&group2=serial"
+                + "&from=linejsp");
+    }
+
+    private String toStr(Collection set) {
+        return set.toString().replace(",","%0D%0A").replace("[","").replace("]","").replace(" ", "");
+    }
+
+    protected void produceMessages(AtomicLong messageCount, ActiveMQDestination destination) throws Exception {
+        final ByteSequence payLoad = new ByteSequence(new byte[messageSize]);
+        Connection connection = connectionFactory.createConnection();
+        MessageProducer messageProducer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createProducer(destination);
+        messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
+        ActiveMQBytesMessage message = new ActiveMQBytesMessage();
+        message.setContent(payLoad);
+        while (messageCount.decrementAndGet() >= 0) {
+            messageProducer.send(message);
+        }
+        connection.close();
+    }
+
+    private void startBroker(int fanoutCount, boolean concurrentSend, boolean concurrentStoreAndDispatchQueues) throws Exception {
+        brokerService = new BrokerService();
+        brokerService.setDeleteAllMessagesOnStartup(true);
+        brokerService.setUseVirtualTopics(true);
+        brokerService.addConnector("tcp://0.0.0.0:0");
+        brokerService.setAdvisorySupport(false);
+        PolicyMap destPolicyMap = new PolicyMap();
+        PolicyEntry defaultEntry = new PolicyEntry();
+        defaultEntry.setExpireMessagesPeriod(0);
+        defaultEntry.setOptimizedDispatch(true);
+        defaultEntry.setCursorMemoryHighWaterMark(110);
+        destPolicyMap.setDefaultEntry(defaultEntry);
+        brokerService.setDestinationPolicy(destPolicyMap);
+
+        CompositeTopic route = new CompositeTopic();
+        route.setName("target");
+        route.setForwardOnly(true);
+        route.setConcurrentSend(concurrentSend);
+        Collection<ActiveMQQueue> routes = new ArrayList<ActiveMQQueue>();
+        for (int i=0; i<fanoutCount; i++) {
+            routes.add(new ActiveMQQueue("route." + i));
+        }
+        route.setForwardTo(routes);
+        VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor();
+        interceptor.setVirtualDestinations(new VirtualDestination[]{route});
+        brokerService.setDestinationInterceptors(new DestinationInterceptor[]{interceptor});
+        brokerService.start();
+
+        connectionFactory = new ActiveMQConnectionFactory(brokerService.getTransportConnectors().get(0).getPublishableConnectString());
+        connectionFactory.setWatchTopicAdvisories(false);
+        if (brokerService.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) {
+
+            //with parallel sends and no consumers, concurrentStoreAnd dispatch, which uses a single thread by default
+            // will stop/impeed write batching. The num threads will need tweaking when consumers are in the mix but may introduce
+            // order issues
+            ((KahaDBPersistenceAdapter)brokerService.getPersistenceAdapter()).setConcurrentStoreAndDispatchQueues(concurrentStoreAndDispatchQueues);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDLQTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDLQTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDLQTest.java
new file mode 100644
index 0000000..7c853cf
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDLQTest.java
@@ -0,0 +1,415 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.virtual;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQMessageProducer;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.RedeliveryPolicy;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Unit test for virtual topics and DLQ messaging. See individual test for more
+ * detail
+ *
+ */
+public class VirtualTopicDLQTest extends TestCase {
+    private static BrokerService broker;
+
+    private static final Logger LOG = LoggerFactory.getLogger(VirtualTopicDLQTest.class);
+
+    static final String jmsConnectionURI = "failover:(vm://localhost)";
+
+    // Virtual Topic that the test publishes 10 messages to
+    private static final String virtualTopicName = "VirtualTopic.Test";
+
+    // Queues that receive all the messages send to the virtual topic
+    private static final String consumer1Prefix = "Consumer.A.";
+    private static final String consumer2Prefix = "Consumer.B.";
+    private static final String consumer3Prefix = "Consumer.C.";
+
+    // Expected Individual Dead Letter Queue names that are tied to the
+    // Subscriber Queues
+    private static final String dlqPrefix = "ActiveMQ.DLQ.Topic.";
+
+    // Number of messages
+    private static final int numberMessages = 6;
+
+    @Before
+    public void setUp() throws Exception {
+        try {
+            broker = BrokerFactory.createBroker("xbean:org/apache/activemq/broker/virtual/virtual-individual-dlq.xml", true);
+            broker.start();
+            broker.waitUntilStarted();
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw e;
+        }
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        try {
+            // Purge the DLQ's so counts are correct for next run
+            purgeDestination(dlqPrefix + consumer1Prefix + virtualTopicName);
+            purgeDestination(dlqPrefix + consumer2Prefix + virtualTopicName);
+            purgeDestination(dlqPrefix + consumer3Prefix + virtualTopicName);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+            broker = null;
+        }
+    }
+
+    /*
+     * This test verifies that all undelivered messages sent to a consumers
+     * listening on a queue associated with a virtual topic with be forwarded to
+     * separate DLQ's.
+     *
+     * Note that the broker config, deadLetterStrategy need to have the enable
+     * audit set to false so that duplicate message sent from a topic to
+     * individual consumers are forwarded to the DLQ
+     *
+     * <deadLetterStrategy> <bean
+     * xmlns="http://www.springframework.org/schema/beans"
+     * class="org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy"
+     * > <property name="useQueueForQueueMessages" value="true"></property>
+     * <property name="processNonPersistent" value="true"></property> <property
+     * name="processExpired" value="false"></property> <property
+     * name="enableAudit" value="false"></property>
+     *
+     * </bean> </deadLetterStrategy>
+     */
+    @Test
+    public void testVirtualTopicSubscriberDeadLetterQueue() throws Exception {
+
+        TestConsumer consumer1 = null;
+        TestConsumer consumer2 = null;
+        TestConsumer consumer3 = null;
+        TestConsumer dlqConsumer1 = null;
+        TestConsumer dlqConsumer2 = null;
+        TestConsumer dlqConsumer3 = null;
+
+        try {
+
+            // The first 2 consumers will rollback, ultimately causing messages
+            // to land on the DLQ
+            consumer1 = new TestConsumer(consumer1Prefix + virtualTopicName, false, numberMessages, true);
+            thread(consumer1, false);
+
+            consumer2 = new TestConsumer(consumer2Prefix + virtualTopicName, false, numberMessages, true);
+            thread(consumer2, false);
+
+            // TestConsumer that does not throw exceptions, messages should not
+            // land on DLQ
+            consumer3 = new TestConsumer(consumer3Prefix + virtualTopicName, false, numberMessages, false);
+            thread(consumer3, false);
+
+            // TestConsumer to read the expected Dead Letter Queue
+            dlqConsumer1 = new TestConsumer(dlqPrefix + consumer1Prefix + virtualTopicName, false, numberMessages, false);
+            thread(dlqConsumer1, false);
+
+            dlqConsumer2 = new TestConsumer(dlqPrefix + consumer2Prefix + virtualTopicName, false, numberMessages, false);
+            thread(dlqConsumer2, false);
+
+            dlqConsumer3 = new TestConsumer(dlqPrefix + consumer3Prefix + virtualTopicName, false, numberMessages, false);
+            thread(dlqConsumer3, false);
+
+            // Give the consumers a second to start
+            Thread.sleep(1000);
+
+            // Start the producer
+            TestProducer producer = new TestProducer(virtualTopicName, true, numberMessages);
+            thread(producer, false);
+
+            assertTrue("sent all producer messages in time, count is: " + producer.getLatch().getCount(), producer.getLatch().await(10, TimeUnit.SECONDS));
+            LOG.info("producer successful, count = " + producer.getLatch().getCount());
+
+            assertTrue("remaining consumer1 count should be zero, is: " + consumer1.getLatch().getCount(), consumer1.getLatch().await(10, TimeUnit.SECONDS));
+            LOG.info("consumer1 successful, count = " + consumer1.getLatch().getCount());
+
+            assertTrue("remaining consumer2 count should be zero, is: " + consumer2.getLatch().getCount(), consumer2.getLatch().await(10, TimeUnit.SECONDS));
+            LOG.info("consumer2 successful, count = " + consumer2.getLatch().getCount());
+
+            assertTrue("remaining consumer3 count should be zero, is: " + consumer3.getLatch().getCount(), consumer3.getLatch().await(10, TimeUnit.SECONDS));
+            LOG.info("consumer3 successful, count = " + consumer3.getLatch().getCount());
+
+            assertTrue("remaining dlqConsumer1 count should be zero, is: " + dlqConsumer1.getLatch().getCount(),
+                dlqConsumer1.getLatch().await(10, TimeUnit.SECONDS));
+            LOG.info("dlqConsumer1 successful, count = " + dlqConsumer1.getLatch().getCount());
+
+            assertTrue("remaining dlqConsumer2 count should be zero, is: " + dlqConsumer2.getLatch().getCount(),
+                dlqConsumer2.getLatch().await(10, TimeUnit.SECONDS));
+            LOG.info("dlqConsumer2 successful, count = " + dlqConsumer2.getLatch().getCount());
+
+            assertTrue("remaining dlqConsumer3 count should be " + numberMessages + ", is: " + dlqConsumer3.getLatch().getCount(), dlqConsumer3.getLatch()
+                .getCount() == numberMessages);
+            LOG.info("dlqConsumer2 successful, count = " + dlqConsumer2.getLatch().getCount());
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw e;
+        } finally {
+            // Tell consumers to stop (don't read any more messages after this)
+            if (consumer1 != null)
+                consumer1.setStop(true);
+            if (consumer2 != null)
+                consumer2.setStop(true);
+            if (consumer3 != null)
+                consumer3.setStop(true);
+            if (dlqConsumer1 != null)
+                dlqConsumer1.setStop(true);
+            if (dlqConsumer2 != null)
+                dlqConsumer2.setStop(true);
+            if (dlqConsumer3 != null)
+                dlqConsumer3.setStop(true);
+        }
+    }
+
+    private static Thread thread(Runnable runnable, boolean daemon) {
+        Thread brokerThread = new Thread(runnable);
+        brokerThread.setDaemon(daemon);
+        brokerThread.start();
+        return brokerThread;
+    }
+
+    private class TestProducer implements Runnable {
+        private String destinationName = null;
+        private boolean isTopic = true;
+        private int numberMessages = 0;
+        private CountDownLatch latch = null;
+
+        public TestProducer(String destinationName, boolean isTopic, int numberMessages) {
+            this.destinationName = destinationName;
+            this.isTopic = isTopic;
+            this.numberMessages = numberMessages;
+            latch = new CountDownLatch(numberMessages);
+        }
+
+        public CountDownLatch getLatch() {
+            return latch;
+        }
+
+        public void run() {
+            ActiveMQConnectionFactory connectionFactory = null;
+            ActiveMQConnection connection = null;
+            ActiveMQSession session = null;
+            Destination destination = null;
+
+            try {
+                LOG.info("Started TestProducer for destination (" + destinationName + ")");
+
+                connectionFactory = new ActiveMQConnectionFactory(jmsConnectionURI);
+                connection = (ActiveMQConnection) connectionFactory.createConnection();
+                connection.start();
+                session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+                if (isTopic) {
+                    destination = session.createTopic(this.destinationName);
+                } else {
+                    destination = session.createQueue(this.destinationName);
+                }
+
+                // Create a MessageProducer from the Session to the Topic or
+                // Queue
+                ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(destination);
+                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+                for (int i = 0; i < numberMessages; i++) {
+                    TextMessage message = (TextMessage) session.createTextMessage("I am a message :: " + String.valueOf(i));
+                    try {
+                        producer.send(message);
+
+                    } catch (Exception deeperException) {
+                        LOG.info("Producer for destination (" + destinationName + ") Caught: " + deeperException);
+                    }
+
+                    latch.countDown();
+                    Thread.sleep(1000);
+                }
+
+                LOG.info("Finished TestProducer for destination (" + destinationName + ")");
+
+            } catch (Exception e) {
+                LOG.error("Terminating TestProducer(" + destinationName + ")Caught: " + e);
+                e.printStackTrace();
+
+            } finally {
+                try {
+                    // Clean up
+                    if (session != null)
+                        session.close();
+                    if (connection != null)
+                        connection.close();
+
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    LOG.error("Closing connection/session (" + destinationName + ")Caught: " + e);
+                }
+            }
+        }
+    }
+
+    private class TestConsumer implements Runnable, ExceptionListener, MessageListener {
+        private String destinationName = null;
+        private boolean isTopic = true;
+        private CountDownLatch latch = null;
+        private int maxRedeliveries = 0;
+        private int receivedMessageCounter = 0;
+        private boolean bFakeFail = false;
+        private boolean bStop = false;
+
+        private ActiveMQConnectionFactory connectionFactory = null;
+        private ActiveMQConnection connection = null;
+        private Session session = null;
+        private MessageConsumer consumer = null;
+
+        public TestConsumer(String destinationName, boolean isTopic, int expectedNumberMessages, boolean bFakeFail) {
+            this.destinationName = destinationName;
+            this.isTopic = isTopic;
+            latch = new CountDownLatch(expectedNumberMessages * (this.bFakeFail ? (maxRedeliveries + 1) : 1));
+            this.bFakeFail = bFakeFail;
+        }
+
+        public CountDownLatch getLatch() {
+            return latch;
+        }
+
+        public void run() {
+
+            try {
+                LOG.info("Started TestConsumer for destination (" + destinationName + ")");
+
+                connectionFactory = new ActiveMQConnectionFactory(jmsConnectionURI);
+                connection = (ActiveMQConnection) connectionFactory.createConnection();
+                connection.start();
+                session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+                RedeliveryPolicy policy = connection.getRedeliveryPolicy();
+                policy.setInitialRedeliveryDelay(1);
+                policy.setUseExponentialBackOff(false);
+                policy.setMaximumRedeliveries(maxRedeliveries);
+
+                connection.setExceptionListener(this);
+
+                Destination destination = null;
+                if (isTopic) {
+                    destination = session.createTopic(destinationName);
+                } else {
+                    destination = session.createQueue(destinationName);
+                }
+
+                consumer = session.createConsumer(destination);
+                consumer.setMessageListener(this);
+
+                while (!bStop) {
+                    Thread.sleep(100);
+                }
+
+                LOG.info("Finished TestConsumer for destination name (" + destinationName + ") remaining " + this.latch.getCount() + " messages "
+                    + this.toString());
+
+            } catch (Exception e) {
+                LOG.error("Consumer (" + destinationName + ") Caught: " + e);
+                e.printStackTrace();
+            } finally {
+                try {
+                    // Clean up
+                    if (consumer != null)
+                        consumer.close();
+                    if (session != null)
+                        session.close();
+                    if (connection != null)
+                        connection.close();
+
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    LOG.error("Closing connection/session (" + destinationName + ")Caught: " + e);
+                }
+            }
+        }
+
+        public synchronized void onException(JMSException ex) {
+            ex.printStackTrace();
+            LOG.error("Consumer for destination, (" + destinationName + "), JMS Exception occured.  Shutting down client.");
+        }
+
+        public synchronized void setStop(boolean bStop) {
+            this.bStop = bStop;
+        }
+
+        public synchronized void onMessage(Message message) {
+            receivedMessageCounter++;
+            latch.countDown();
+
+            LOG.info("Consumer for destination (" + destinationName + ") latch countdown: " + latch.getCount() + " :: Number messages received "
+                + this.receivedMessageCounter);
+
+            try {
+                LOG.info("Consumer for destination (" + destinationName + ") Received message id :: " + message.getJMSMessageID());
+
+                if (!bFakeFail) {
+                    LOG.info("Consumer on destination " + destinationName + " committing JMS Session for message: " + message.toString());
+                    session.commit();
+                } else {
+                    LOG.info("Consumer on destination " + destinationName + " rolling back JMS Session for message: " + message.toString());
+                    session.rollback(); // rolls back all the consumed messages
+                                        // on the session to
+                }
+
+            } catch (JMSException ex) {
+                ex.printStackTrace();
+                LOG.error("Error reading JMS Message from destination " + destinationName + ".");
+            }
+        }
+    }
+
+    private static void purgeDestination(String destination) throws Exception {
+        final Queue dest = (Queue) ((RegionBroker) broker.getRegionBroker()).getQueueRegion().getDestinationMap().get(new ActiveMQQueue(destination));
+        dest.purge();
+        assertEquals(0, dest.getDestinationStatistics().getMessages().getCount());
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDisconnectSelectorTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDisconnectSelectorTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDisconnectSelectorTest.java
new file mode 100644
index 0000000..8b95345
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDisconnectSelectorTest.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.virtual;
+
+import java.net.URI;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.spring.ConsumerBean;
+import org.apache.activemq.xbean.XBeanBrokerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test case for  https://issues.apache.org/jira/browse/AMQ-3004
+ */
+
+public class VirtualTopicDisconnectSelectorTest extends EmbeddedBrokerTestSupport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(VirtualTopicDisconnectSelectorTest.class);
+    protected Connection connection;
+
+    public void testVirtualTopicSelectorDisconnect() throws Exception {
+        testVirtualTopicDisconnect("odd = 'no'", 3000, 1500);
+    }
+
+    public void testVirtualTopicNoSelectorDisconnect() throws Exception {
+        testVirtualTopicDisconnect(null, 3000, 3000);
+    }
+
+    public void testVirtualTopicDisconnect(String messageSelector, int total , int expected) throws Exception {
+        if (connection == null) {
+            connection = createConnection();
+        }
+        connection.start();
+
+        final ConsumerBean messageList = new ConsumerBean();
+
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+        Destination producerDestination = getProducerDestination();
+        Destination destination = getConsumerDsetination();
+
+        LOG.info("Sending to: " + producerDestination);
+        LOG.info("Consuming from: " + destination );
+
+        MessageConsumer consumer = createConsumer(session, destination, messageSelector);
+
+        MessageListener listener = new MessageListener(){
+            public void onMessage(Message message){
+                messageList.onMessage(message);
+                try {
+                    message.acknowledge();
+                } catch (JMSException e) {
+                    e.printStackTrace();
+                }
+            }
+        };
+
+        consumer.setMessageListener(listener);
+
+
+        // create topic producer
+        MessageProducer producer = session.createProducer(producerDestination);
+        assertNotNull(producer);
+
+        int disconnectCount = total/3;
+        int reconnectCount = (total * 2)/3;
+
+        for (int i = 0; i < total; i++) {
+            producer.send(createMessage(session, i));
+
+            if (i==disconnectCount){
+               consumer.close();
+            }
+            if (i==reconnectCount){
+                consumer = createConsumer(session, destination, messageSelector);
+                consumer.setMessageListener(listener);
+            }
+        }
+
+        assertMessagesArrived(messageList, expected ,10000);
+    }
+            
+    protected Destination getConsumerDsetination() {
+        return new ActiveMQQueue("Consumer.VirtualTopic.TEST");
+    }
+
+
+    protected Destination getProducerDestination() {
+        return new ActiveMQTopic("VirtualTopic.TEST");
+    }
+
+    protected void setUp() throws Exception {
+        super.setUp();
+    }
+
+    protected MessageConsumer createConsumer(Session session, Destination destination, String messageSelector) throws JMSException {
+        if (messageSelector != null) {
+            return session.createConsumer(destination, messageSelector);
+        } else {
+            return session.createConsumer(destination);
+        }
+    }
+
+    protected TextMessage createMessage(Session session, int i) throws JMSException {
+        TextMessage textMessage = session.createTextMessage("message: " + i);
+        if (i % 2 != 0) {
+            textMessage.setStringProperty("odd", "yes");
+        } else {
+            textMessage.setStringProperty("odd", "no");
+        }
+        textMessage.setIntProperty("i", i);
+        return textMessage;
+    }
+
+
+
+    protected void assertMessagesArrived(ConsumerBean messageList, int expected, long timeout) {
+        messageList.assertMessagesArrived(expected,timeout);
+
+        messageList.flushMessages();
+
+        
+        LOG.info("validate no other messages on queues");
+        try {
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                
+            Destination destination1 = getConsumerDsetination();
+
+            MessageConsumer c1 = session.createConsumer(destination1, null);
+            c1.setMessageListener(messageList);
+
+            
+            LOG.info("send one simple message that should go to both consumers");
+            MessageProducer producer = session.createProducer(getProducerDestination());
+            assertNotNull(producer);
+            
+            producer.send(session.createTextMessage("Last Message"));
+            
+            messageList.assertMessagesArrived(1);
+
+        } catch (JMSException e) {
+            e.printStackTrace();
+            fail("unexpeced ex while waiting for last messages: " + e);
+        }
+    }
+
+
+    protected String getBrokerConfigUri() {
+        return "org/apache/activemq/broker/virtual/disconnected-selector.xml";
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        XBeanBrokerFactory factory = new XBeanBrokerFactory();
+        BrokerService answer = factory.createBroker(new URI(getBrokerConfigUri()));
+        return answer;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicPubSubTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicPubSubTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicPubSubTest.java
new file mode 100644
index 0000000..42e6e60
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicPubSubTest.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.virtual;
+
+import java.util.Vector;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import junit.framework.Test;
+
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.spring.ConsumerBean;
+
+/**
+ *
+ * 
+ */
+public class VirtualTopicPubSubTest extends EmbeddedBrokerTestSupport {
+
+    private Vector<Connection> connections = new Vector<Connection>();
+    public int ackMode = Session.AUTO_ACKNOWLEDGE;
+
+    public static Test suite() {
+        return suite(VirtualTopicPubSubTest.class);
+    }
+
+    public void initCombosForTestVirtualTopicCreation() {
+        addCombinationValues("ackMode", new Object[] {new Integer(Session.AUTO_ACKNOWLEDGE), new Integer(Session.CLIENT_ACKNOWLEDGE) });
+    }
+
+    private boolean doneTwice = false;
+
+	public void testVirtualTopicCreation() throws Exception {
+	  doTestVirtualTopicCreation(10);
+	}
+
+	public void doTestVirtualTopicCreation(int total) throws Exception {
+
+        ConsumerBean messageList = new ConsumerBean() {
+            public synchronized void onMessage(Message message) {
+                super.onMessage(message);
+                if (ackMode == Session.CLIENT_ACKNOWLEDGE) {
+                    try {
+                        message.acknowledge();
+                    } catch (JMSException e) {
+                        e.printStackTrace();
+                    }
+                }
+
+            }
+        };
+        messageList.setVerbose(true);
+
+        String queueAName = getVirtualTopicConsumerName();
+        // create consumer 'cluster'
+        ActiveMQQueue queue1 = new ActiveMQQueue(queueAName);
+        ActiveMQQueue queue2 = new ActiveMQQueue(queueAName);
+  
+        Session session = createStartAndTrackConnection().createSession(false, ackMode);
+        MessageConsumer c1 = session.createConsumer(queue1);
+         
+        session = createStartAndTrackConnection().createSession(false, ackMode);
+        MessageConsumer c2 = session.createConsumer(queue2);
+
+        c1.setMessageListener(messageList);
+        c2.setMessageListener(messageList);
+
+        // create topic producer
+        Session producerSession = createStartAndTrackConnection().createSession(false, ackMode);
+        MessageProducer producer = producerSession.createProducer(new ActiveMQTopic(getVirtualTopicName()));
+        assertNotNull(producer);
+
+        for (int i = 0; i < total; i++) {
+            producer.send(producerSession.createTextMessage("message: " + i));
+        }
+
+        messageList.assertMessagesArrived(total);
+
+        // do twice so we confirm messages do not get redelivered after client acknowledgement
+        if( doneTwice == false ) {
+            doneTwice = true;
+            doTestVirtualTopicCreation(0);
+		}
+    }
+
+    private Connection createStartAndTrackConnection() throws Exception {
+        Connection connection = createConnection();
+        connection.start();
+        connections.add(connection);
+        return connection;
+    }
+
+    protected String getVirtualTopicName() {
+        return "VirtualTopic.TEST";
+    }
+
+    protected String getVirtualTopicConsumerName() {
+        return "Consumer.A.VirtualTopic.TEST";
+    }
+
+
+    protected void tearDown() throws Exception {
+        for (Connection connection: connections) {
+            connection.close();
+        }
+        super.tearDown();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicPubSubUsingXBeanTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicPubSubUsingXBeanTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicPubSubUsingXBeanTest.java
new file mode 100644
index 0000000..6952331
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicPubSubUsingXBeanTest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.virtual;
+
+import java.net.URI;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.xbean.XBeanBrokerFactory;
+
+/**
+ *
+ * 
+ */
+public class VirtualTopicPubSubUsingXBeanTest extends VirtualTopicPubSubTest {
+
+    protected String getVirtualTopicConsumerName() {
+        return "VirtualTopicConsumers.ConsumerNumberOne.FOO";
+    }
+
+    protected String getVirtualTopicName() {
+        return "FOO";
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        XBeanBrokerFactory factory = new XBeanBrokerFactory();
+        BrokerService answer = factory.createBroker(new URI(getBrokerConfigUri()));
+        
+        // lets disable persistence as we are a test
+        answer.setPersistent(false);
+        
+        return answer;
+    }
+
+    protected String getBrokerConfigUri() {
+        return "org/apache/activemq/broker/virtual/global-virtual-topics.xml";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicSelectorTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicSelectorTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicSelectorTest.java
new file mode 100644
index 0000000..3287bab
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicSelectorTest.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.virtual;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.DestinationInterceptor;
+import org.apache.activemq.broker.region.virtual.VirtualDestination;
+import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
+import org.apache.activemq.broker.region.virtual.VirtualTopic;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.spring.ConsumerBean;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class VirtualTopicSelectorTest extends CompositeTopicTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(VirtualTopicSelectorTest.class);
+            
+    protected Destination getConsumer1Dsetination() {
+        return new ActiveMQQueue("Consumer.1.VirtualTopic.TEST");
+    }
+
+    protected Destination getConsumer2Dsetination() {
+        return new ActiveMQQueue("Consumer.2.VirtualTopic.TEST");
+    }
+    
+    protected Destination getProducerDestination() {
+        return new ActiveMQTopic("VirtualTopic.TEST");
+    }
+    
+    @Override
+    protected void assertMessagesArrived(ConsumerBean messageList1, ConsumerBean messageList2) {
+        messageList1.assertMessagesArrived(total/2);
+        messageList2.assertMessagesArrived(total/2);
+ 
+        messageList1.flushMessages();
+        messageList2.flushMessages();
+        
+        LOG.info("validate no other messages on queues");
+        try {
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                
+            Destination destination1 = getConsumer1Dsetination();
+            Destination destination2 = getConsumer2Dsetination();
+            MessageConsumer c1 = session.createConsumer(destination1, null);
+            MessageConsumer c2 = session.createConsumer(destination2, null);
+            c1.setMessageListener(messageList1);
+            c2.setMessageListener(messageList2);
+            
+            
+            LOG.info("send one simple message that should go to both consumers");
+            MessageProducer producer = session.createProducer(getProducerDestination());
+            assertNotNull(producer);
+            
+            producer.send(session.createTextMessage("Last Message"));
+            
+            messageList1.assertMessagesArrived(1);
+            messageList2.assertMessagesArrived(1);
+        
+        } catch (JMSException e) {
+            e.printStackTrace();
+            fail("unexpeced ex while waiting for last messages: " + e);
+        }
+    }
+    
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        // use message selectors on consumers that need to propagate up to the virtual
+        // topic dispatch so that un matched messages do not linger on subscription queues
+        messageSelector1 = "odd = 'yes'";
+        messageSelector2 = "odd = 'no'";
+        
+        BrokerService broker = new BrokerService();
+        broker.setPersistent(false);
+
+        VirtualTopic virtualTopic = new VirtualTopic();
+        // the new config that enables selectors on the intercepter
+        virtualTopic.setSelectorAware(true);
+        VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor();
+        interceptor.setVirtualDestinations(new VirtualDestination[]{virtualTopic});
+        broker.setDestinationInterceptors(new DestinationInterceptor[]{interceptor});
+        return broker;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicsAndDurableSubsTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicsAndDurableSubsTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicsAndDurableSubsTest.java
new file mode 100644
index 0000000..d1e709f
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicsAndDurableSubsTest.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.virtual;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.broker.jmx.MBeanTest;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.spring.ConsumerBean;
+
+public class VirtualTopicsAndDurableSubsTest extends MBeanTest {
+
+    private Connection connection;
+
+    public void testVirtualTopicCreationAndDurableSubs() throws Exception {
+        if (connection == null) {
+            connection = createConnection();
+        }
+        connection.setClientID(getAClientID());
+        connection.start();
+
+        ConsumerBean messageList = new ConsumerBean();
+        messageList.setVerbose(true);
+        
+        String queueAName = getVirtualTopicConsumerName();
+        // create consumer 'cluster'
+        ActiveMQQueue queue1 = new ActiveMQQueue(queueAName);
+        ActiveMQQueue queue2 = new ActiveMQQueue(queueAName);
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer c1 = session.createConsumer(queue1);
+        MessageConsumer c2 = session.createConsumer(queue2);
+
+        c1.setMessageListener(messageList);
+        c2.setMessageListener(messageList);
+
+        // create topic producer
+        MessageProducer producer = session.createProducer(new ActiveMQTopic(getVirtualTopicName()));
+        assertNotNull(producer);
+
+        int total = 10;
+        for (int i = 0; i < total; i++) {
+            producer.send(session.createTextMessage("message: " + i));
+        }
+        messageList.assertMessagesArrived(total);
+        
+        //Add and remove durable subscriber after using VirtualTopics
+        assertCreateAndDestroyDurableSubscriptions();
+    }
+
+    protected String getAClientID(){
+    	return "VirtualTopicCreationAndDurableSubs";
+    }
+
+    protected String getVirtualTopicName() {
+        return "VirtualTopic.TEST";
+    }
+
+
+    protected String getVirtualTopicConsumerName() {
+        return "Consumer.A.VirtualTopic.TEST";
+    }
+
+    protected String getDurableSubscriberName(){
+    	return "Sub1";
+    }
+    
+    protected String getDurableSubscriberTopicName(){
+    	return "simple.topic";
+    }
+
+    protected void tearDown() throws Exception {
+        if (connection != null) {
+            connection.close();
+        }
+        super.tearDown();
+    }
+    
+    //Overrides test cases from MBeanTest to avoid having them run.
+    public void testMBeans() throws Exception {}
+    public void testMoveMessages() throws Exception {}
+    public void testRetryMessages() throws Exception {}
+    public void testMoveMessagesBySelector() throws Exception {}
+    public void testCopyMessagesBySelector() throws Exception {}
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/composite-queue.xml
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/composite-queue.xml b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/composite-queue.xml
new file mode 100644
index 0000000..ed3bc73
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/composite-queue.xml
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+  
+  http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!-- this file can only be parsed using the xbean-spring library -->
+<!-- START SNIPPET: xbean -->
+<beans 
+  xmlns="http://www.springframework.org/schema/beans" 
+  xmlns:amq="http://activemq.apache.org/schema/core"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
+
+  <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" />
+
+  <broker persistent="false" useJmx="false" xmlns="http://activemq.apache.org/schema/core">
+    <destinationInterceptors>
+      <virtualDestinationInterceptor>
+        <virtualDestinations>
+          <compositeQueue name="MY.QUEUE">
+            <forwardTo>
+              <queue physicalName="FOO" />
+              <topic physicalName="BAR" />
+            </forwardTo>
+          </compositeQueue>
+        </virtualDestinations>
+      </virtualDestinationInterceptor>
+    </destinationInterceptors>
+
+  </broker>
+
+</beans>
+<!-- END SNIPPET: xbean -->

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/composite-topic.xml
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/composite-topic.xml b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/composite-topic.xml
new file mode 100644
index 0000000..ded6471
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/composite-topic.xml
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+  
+  http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!-- this file can only be parsed using the xbean-spring library -->
+<!-- START SNIPPET: xbean -->
+<beans 
+  xmlns="http://www.springframework.org/schema/beans" 
+  xmlns:amq="http://activemq.apache.org/schema/core"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
+
+  <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" />
+
+  <broker xmlns="http://activemq.apache.org/schema/core">
+    <destinationInterceptors>
+      <virtualDestinationInterceptor>
+        <virtualDestinations>
+          <compositeTopic name="MY.TOPIC">
+            <forwardTo>
+              <queue physicalName="FOO" />
+              <topic physicalName="BAR" />
+            </forwardTo>
+          </compositeTopic>
+        </virtualDestinations>
+      </virtualDestinationInterceptor>
+    </destinationInterceptors>
+
+  </broker>
+
+</beans>
+<!-- END SNIPPET: xbean -->

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/disconnected-selector.xml
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/disconnected-selector.xml b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/disconnected-selector.xml
new file mode 100644
index 0000000..2772910
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/disconnected-selector.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+  
+  http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!-- this file can only be parsed using the xbean-spring library -->
+<!-- START SNIPPET: xbean -->
+<beans
+        xmlns="http://www.springframework.org/schema/beans"
+        xmlns:amq="http://activemq.apache.org/schema/core"
+        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
+
+      <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" />
+
+    <broker xmlns="http://activemq.apache.org/schema/core" persistent="false">
+        <destinationInterceptors>
+            <virtualDestinationInterceptor>
+                <virtualDestinations>
+                    <virtualTopic name="VirtualTopic.>" prefix="Consumer." selectorAware="true"/>
+                </virtualDestinations>
+            </virtualDestinationInterceptor>
+        </destinationInterceptors>
+        <plugins>
+            <virtualSelectorCacheBrokerPlugin persistFile = "target/selectorcache.data"/>
+        </plugins>
+    </broker>
+</beans>
+<!-- END SNIPPET: xbean -->

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/filtered-queue.xml
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/filtered-queue.xml b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/filtered-queue.xml
new file mode 100644
index 0000000..d51f03c
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/filtered-queue.xml
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+  
+  http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!-- this file can only be parsed using the xbean-spring library -->
+<!-- START SNIPPET: xbean -->
+<beans 
+  xmlns="http://www.springframework.org/schema/beans" 
+  xmlns:amq="http://activemq.apache.org/schema/core"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
+
+  <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" />
+
+  <broker xmlns="http://activemq.apache.org/schema/core">
+    <destinationInterceptors>
+      <virtualDestinationInterceptor>
+        <virtualDestinations>
+          <compositeQueue name="MY.QUEUE">
+            <forwardTo>
+              <filteredDestination selector="odd = 'yes'" queue="FOO"/>
+              <filteredDestination selector="i = 5" topic="BAR"/>
+            </forwardTo>
+          </compositeQueue>
+        </virtualDestinations>
+      </virtualDestinationInterceptor>
+    </destinationInterceptors>
+
+  </broker>
+
+</beans>
+<!-- END SNIPPET: xbean -->

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/global-virtual-topics.xml
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/global-virtual-topics.xml b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/global-virtual-topics.xml
new file mode 100644
index 0000000..ddd0667
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/global-virtual-topics.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+  
+  http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!-- this file can only be parsed using the xbean-spring library -->
+<!-- START SNIPPET: xbean -->
+<beans 
+  xmlns="http://www.springframework.org/schema/beans" 
+  xmlns:amq="http://activemq.apache.org/schema/core"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
+
+  <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" />
+
+  <broker xmlns="http://activemq.apache.org/schema/core">
+    <destinationInterceptors>
+      <virtualDestinationInterceptor>
+        <virtualDestinations>
+          <virtualTopic name=">" prefix="VirtualTopicConsumers.*." selectorAware="false"/>
+        </virtualDestinations>
+      </virtualDestinationInterceptor>
+    </destinationInterceptors>
+
+  </broker>
+
+</beans>
+<!-- END SNIPPET: xbean -->

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/virtual-individual-dlq.xml
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/virtual-individual-dlq.xml b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/virtual-individual-dlq.xml
new file mode 100644
index 0000000..d725436
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/virtual-individual-dlq.xml
@@ -0,0 +1,80 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<!-- START SNIPPET: example -->
+<beans
+  xmlns="http://www.springframework.org/schema/beans"
+  xmlns:amq="http://activemq.apache.org/schema/core"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
+
+    
+
+    <!-- 
+        The <broker> element is used to configure the ActiveMQ broker. 
+    -->
+    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="bcBroker">
+ 
+        <destinationInterceptors>
+      <virtualDestinationInterceptor>
+         <virtualDestinations>
+            <virtualTopic name="VirtualTopic.>" prefix="Consumer.*." />
+         </virtualDestinations>
+      </virtualDestinationInterceptor>
+   </destinationInterceptors>
+              
+   <destinationPolicy>
+      <policyMap>
+         <policyEntries>
+            <policyEntry queue=">" memoryLimit="128 mb" >
+               <deadLetterStrategy>
+                  <bean xmlns="http://www.springframework.org/schema/beans"
+                        class="org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy">
+                     <property name="useQueueForQueueMessages" value="true"></property>
+                     <property name="processNonPersistent" value="true"></property>
+                     <property name="processExpired" value="false"></property>
+                     <property name="enableAudit" value="false"></property>
+                     
+                  </bean>
+               </deadLetterStrategy>
+            </policyEntry>
+            <policyEntry topic=">" memoryLimit="128 mb" >
+               <deadLetterStrategy>
+                  <bean xmlns="http://www.springframework.org/schema/beans"
+                        class="org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy">
+                     <property name="useQueueForQueueMessages" value="true"></property>
+                     <property name="processNonPersistent" value="true"></property>
+                     <property name="processExpired" value="false"></property>
+                     <property name="enableAudit" value="false"></property>
+          
+                  </bean>
+               </deadLetterStrategy>
+             </policyEntry>
+         </policyEntries>
+      </policyMap>
+   </destinationPolicy>
+       
+        <managementContext>
+            <managementContext createConnector="false"/>
+        </managementContext>
+
+    </broker>
+
+    
+    
+</beans>
+<!-- END SNIPPET: example -->

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/virtual-topics-and-interceptor.xml
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/virtual-topics-and-interceptor.xml b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/virtual-topics-and-interceptor.xml
new file mode 100644
index 0000000..fcce72e
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/virtual-topics-and-interceptor.xml
@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+  
+  http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!-- this file can only be parsed using the xbean-spring library -->
+<!-- START SNIPPET: xbean -->
+<beans 
+  xmlns="http://www.springframework.org/schema/beans" 
+  xmlns:amq="http://activemq.apache.org/schema/core"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
+
+  <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" />
+
+  <broker xmlns="http://activemq.apache.org/schema/core" persistent="false">
+
+
+    <destinationInterceptors>
+      <!--  custom destination interceptor -->
+      <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.broker.virtual.DestinationInterceptorDurableSubTest$SimpleDestinationInterceptor" />
+
+      <virtualDestinationInterceptor>
+        <virtualDestinations>
+          <virtualTopic name=">" prefix="VirtualTopicConsumers.*." selectorAware="false"/>
+        </virtualDestinations>
+      </virtualDestinationInterceptor>
+    </destinationInterceptors>
+
+    <managementContext>
+      <managementContext createConnector="true" connectorPort="1299"/>
+    </managementContext>
+  </broker>
+
+</beans>
+<!-- END SNIPPET: xbean -->


Mime
View raw message