activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [27/52] [abbrv] [partial] activemq-artemis git commit: ARTEMIS-127 Adding activemq unit test module to Artemis
Date Tue, 09 Jun 2015 16:36:58 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerManagementTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerManagementTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerManagementTest.java
new file mode 100644
index 0000000..e65d819
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerManagementTest.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.scheduler.memory;
+
+import org.apache.activemq.broker.scheduler.JobSchedulerManagementTest;
+
+/**
+ * Tests management of in memory scheduler via JMS client.
+ */
+public class InMemoryJobSchedulerManagementTest extends JobSchedulerManagementTest {
+
+    @Override
+    protected boolean isPersistent() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerStoreTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerStoreTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerStoreTest.java
new file mode 100644
index 0000000..ac90070
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerStoreTest.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.scheduler.memory;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.activemq.broker.scheduler.Job;
+import org.apache.activemq.broker.scheduler.JobScheduler;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.IOHelper;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class InMemoryJobSchedulerStoreTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(InMemoryJobSchedulerStoreTest.class);
+
+    @Test(timeout = 120 * 1000)
+    public void testRestart() throws Exception {
+        InMemoryJobSchedulerStore store = new InMemoryJobSchedulerStore();
+        File directory = new File("target/test/ScheduledDB");
+        IOHelper.mkdirs(directory);
+        IOHelper.deleteChildren(directory);
+        store.setDirectory(directory);
+        final int NUMBER = 1000;
+        store.start();
+        List<ByteSequence> list = new ArrayList<ByteSequence>();
+        for (int i = 0; i < NUMBER; i++) {
+            ByteSequence buff = new ByteSequence(new String("testjob" + i).getBytes());
+            list.add(buff);
+        }
+
+        JobScheduler js = store.getJobScheduler("test");
+        js.startDispatching();
+        int count = 0;
+        long startTime = 10 * 60 * 1000;
+        long period = startTime;
+        for (ByteSequence job : list) {
+            js.schedule("id:" + (count++), job, "", startTime, period, -1);
+        }
+
+        List<Job> test = js.getAllJobs();
+        LOG.debug("Found {} jobs in the store before restart", test.size());
+        assertEquals(list.size(), test.size());
+        store.stop();
+        store.start();
+        js = store.getJobScheduler("test");
+        test = js.getAllJobs();
+        LOG.debug("Found {} jobs in the store after restart", test.size());
+        assertEquals(0, test.size());
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerTest.java
new file mode 100644
index 0000000..36771b0
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerTest.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.scheduler.memory;
+
+import org.apache.activemq.broker.scheduler.JobSchedulerStore;
+import org.apache.activemq.broker.scheduler.JobSchedulerTest;
+
+/**
+ * In-Memory store based variation of the JobSchedulerTest
+ */
+public class InMemoryJobSchedulerTest extends JobSchedulerTest {
+
+    @Override
+    public void testAddStopThenDeliver() throws Exception {
+        // In Memory store that's stopped doesn't retain the jobs.
+    }
+
+    @Override
+    protected JobSchedulerStore createJobSchedulerStore() throws Exception {
+        return new InMemoryJobSchedulerStore();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerTxTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerTxTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerTxTest.java
new file mode 100644
index 0000000..fb87905
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerTxTest.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.scheduler.memory;
+
+import org.apache.activemq.broker.scheduler.JobSchedulerTxTest;
+
+/**
+ * In memory version of the TX test case
+ */
+public class InMemoryJobSchedulerTxTest extends JobSchedulerTxTest {
+
+    @Override
+    protected boolean isPersistent() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/spring.xml
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/spring.xml b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/spring.xml
new file mode 100644
index 0000000..7d37217
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/spring.xml
@@ -0,0 +1,97 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+
+<!-- START SNIPPET: spring -->
+<!DOCTYPE beans PUBLIC "-//SPRING//DTD BEAN//EN" "http://www.springframework.org/dtd/spring-beans.dtd">
+<beans>
+  <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
+
+  <!-- an embedded broker -->
+
+  <bean id="broker" class="org.apache.activemq.broker.BrokerService" 
+    init-method="start">
+    <property name="transportConnectorURIs">
+      <list>
+        <value>tcp://localhost:61616</value>
+        <value>tcp://localhost:61636</value>
+      </list>
+    </property>
+  </bean>
+
+  <!-- JMS ConnectionFactory to use -->
+  <bean id="jmsFactory"
+    class="org.apache.activemq.ActiveMQConnectionFactory">
+    <property name="brokerURL" value="tcp://localhost:61636" />
+  </bean>
+
+  <!-- Spring JMS Template -->
+  <bean id="myJmsTemplate"
+    class="org.springframework.jms.core.JmsTemplate">
+    <property name="connectionFactory">
+      <!-- lets wrap in a pool to avoid creating a connection per send -->
+      <bean
+        class="org.springframework.jms.connection.SingleConnectionFactory">
+        <property name="targetConnectionFactory">
+          <ref local="jmsFactory" />
+        </property>
+      </bean>
+    </property>
+  </bean>
+  
+  <!-- Spring JMS Template -->
+  <bean id="consumerJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
+    <property name="connectionFactory" ref="jmsFactory"/>
+  </bean>
+
+  <!-- a sample POJO which uses a Spring JmsTemplate -->
+  <bean id="producer" class="org.apache.activemq.spring.SpringProducer">
+    <property name="template">
+      <ref bean="myJmsTemplate"></ref>
+    </property>
+
+    <property name="destination">
+      <ref bean="destination" />
+    </property>
+
+    <property name="messageCount">
+      <value>10</value>
+    </property>
+  </bean>
+
+
+  <!-- a sample POJO consumer -->
+  <bean id="consumer" class="org.apache.activemq.spring.SpringConsumer">
+    <property name="template">
+      <ref bean="consumerJmsTemplate"></ref>
+    </property>
+
+    <property name="destination">
+      <ref bean="destination" />
+    </property>
+  </bean>
+
+  <bean id="destination" class="org.apache.activemq.command.ActiveMQTopic"
+    autowire="constructor">
+    <constructor-arg>
+      <value>org.apache.activemq.spring.Test.spring.topic</value>
+    </constructor-arg>
+  </bean>
+
+</beans>
+
+<!-- END SNIPPET: spring -->

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/store/DefaultStoreBrokerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/store/DefaultStoreBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/store/DefaultStoreBrokerTest.java
new file mode 100644
index 0000000..1e9633a
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/store/DefaultStoreBrokerTest.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store;
+
+import java.net.URI;
+
+import junit.framework.Test;
+
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerTest;
+
+/**
+ * Once the wire format is completed we can test against real persistence storage.
+ * 
+ * 
+ */
+public class DefaultStoreBrokerTest extends BrokerTest {
+
+    protected BrokerService createBroker() throws Exception {
+        return BrokerFactory.createBroker(new URI("broker://()/localhost?deleteAllMessagesOnStartup=true"));
+    }
+    
+    protected BrokerService createRestartedBroker() throws Exception {
+        return BrokerFactory.createBroker(new URI("broker://()/localhost"));
+    }
+    
+    public static Test suite() {
+        return suite(DefaultStoreBrokerTest.class);
+    }
+    
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/store/DefaultStoreRecoveryBrokerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/store/DefaultStoreRecoveryBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/store/DefaultStoreRecoveryBrokerTest.java
new file mode 100644
index 0000000..e89ca04
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/store/DefaultStoreRecoveryBrokerTest.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store;
+
+import java.net.URI;
+
+import junit.framework.Test;
+
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.RecoveryBrokerTest;
+
+/**
+ * Used to verify that recovery works correctly against 
+ * 
+ * 
+ */
+public class DefaultStoreRecoveryBrokerTest extends RecoveryBrokerTest {
+
+    protected BrokerService createBroker() throws Exception {
+        return BrokerFactory.createBroker(new URI("broker://()/localhost?deleteAllMessagesOnStartup=true"));
+    }
+    
+    protected BrokerService createRestartedBroker() throws Exception {
+        return BrokerFactory.createBroker(new URI("broker://()/localhost"));
+    }
+    
+    public static Test suite() {
+        return suite(DefaultStoreRecoveryBrokerTest.class);
+    }
+    
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/store/LoadTester.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/store/LoadTester.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/store/LoadTester.java
new file mode 100644
index 0000000..a6d78b4
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/store/LoadTester.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import javax.jms.BytesMessage;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import junit.framework.Test;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.JmsTestSupport;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ProgressPrinter;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class LoadTester extends JmsTestSupport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(LoadTester.class);
+
+    protected int messageSize = 1024 * 64;
+    protected int produceCount = 10000;
+
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        return BrokerFactory.createBroker(new URI("xbean:org/apache/activemq/broker/store/loadtester.xml"));
+    }
+
+    @Override
+    protected ConnectionFactory createConnectionFactory() throws URISyntaxException, IOException {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getServer().getConnectURI());
+        factory.setUseAsyncSend(true);
+        return factory;
+    }
+
+    public void testQueueSendThenAddConsumer() throws Exception {
+        ProgressPrinter printer = new ProgressPrinter(produceCount, 20);
+
+        ActiveMQDestination destination = new ActiveMQQueue("TEST");
+
+        connection.setUseCompression(false);
+        connection.getPrefetchPolicy().setAll(10);
+        connection.start();
+        Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(destination);
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+        LOG.info("Sending " + produceCount + " messages that are " + (messageSize / 1024.0) + "k large, for a total of " + (produceCount * messageSize / (1024.0 * 1024.0))
+                 + " megs of data.");
+        // Send a message to the broker.
+        long start = System.currentTimeMillis();
+        for (int i = 0; i < produceCount; i++) {
+            printer.increment();
+            BytesMessage msg = session.createBytesMessage();
+            msg.writeBytes(new byte[messageSize]);
+            producer.send(msg);
+        }
+        long end1 = System.currentTimeMillis();
+
+        LOG.info("Produced messages/sec: " + (produceCount * 1000.0 / (end1 - start)));
+
+        printer = new ProgressPrinter(produceCount, 10);
+        start = System.currentTimeMillis();
+        MessageConsumer consumer = session.createConsumer(destination);
+        for (int i = 0; i < produceCount; i++) {
+            printer.increment();
+            assertNotNull("Getting message: " + i, consumer.receive(20000));
+        }
+        end1 = System.currentTimeMillis();
+        LOG.info("Consumed messages/sec: " + (produceCount * 1000.0 / (end1 - start)));
+
+    }
+
+    public static Test suite() {
+        return suite(LoadTester.class);
+    }
+
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/store/RecoverExpiredMessagesTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/store/RecoverExpiredMessagesTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/store/RecoverExpiredMessagesTest.java
new file mode 100644
index 0000000..fb0296c
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/store/RecoverExpiredMessagesTest.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.concurrent.TimeUnit;
+import junit.framework.Test;
+import org.apache.activemq.broker.BrokerRestartTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.StubConnection;
+import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy;
+import org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.util.IOHelper;
+
+public class RecoverExpiredMessagesTest extends BrokerRestartTestSupport {
+    final ArrayList<String> expected = new ArrayList<String>();
+    final ActiveMQDestination destination = new ActiveMQQueue("TEST");
+    public PendingQueueMessageStoragePolicy queuePendingPolicy;
+
+    @Override
+    protected void setUp() throws Exception {
+        setAutoFail(true);
+        super.setUp();
+    }
+
+    public void initCombosForTestRecovery() throws Exception {
+        addCombinationValues("queuePendingPolicy", new PendingQueueMessageStoragePolicy[] {new FilePendingQueueMessageStoragePolicy(), new VMPendingQueueMessageStoragePolicy()});
+        PersistenceAdapter[] persistenceAdapters = new PersistenceAdapter[] {
+                new KahaDBPersistenceAdapter(),
+                new JDBCPersistenceAdapter(JDBCPersistenceAdapter.createDataSource(IOHelper.getDefaultDataDirectory()), new OpenWireFormat())
+        };
+        for (PersistenceAdapter adapter : persistenceAdapters) {
+            adapter.setDirectory(new File(IOHelper.getDefaultDataDirectory()));
+        }
+        addCombinationValues("persistenceAdapter", persistenceAdapters);
+    }
+
+    public void testRecovery() throws Exception {
+        sendSomeMessagesThatWillExpireIn5AndThenOne();
+
+        broker.stop();
+        broker.waitUntilStopped();
+        TimeUnit.SECONDS.sleep(6);
+        broker = createRestartedBroker();
+        broker.start();
+
+        consumeExpected();
+    }
+
+    private void consumeExpected() throws Exception {
+        // Setup the consumer and receive the message.
+         StubConnection connection = createConnection();
+        ConnectionInfo connectionInfo = createConnectionInfo();
+        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
+        connection.send(consumerInfo);
+
+        Message m = receiveMessage(connection);
+        assertNotNull("Should have received message " + expected.get(0) + " by now!", m);
+        assertEquals(expected.get(0), m.getMessageId().toString());
+        MessageAck ack = createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE);
+        connection.send(ack);
+
+        assertNoMessagesLeft(connection);
+        connection.request(closeConnectionInfo(connectionInfo));
+    }
+
+    private void sendSomeMessagesThatWillExpireIn5AndThenOne() throws Exception {
+
+        // Setup the producer and send the message.
+        StubConnection connection = createConnection();
+        ConnectionInfo connectionInfo = createConnectionInfo();
+        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        connection.send(producerInfo);
+
+
+        int MESSAGE_COUNT = 10;
+        for(int i=0; i < MESSAGE_COUNT; i++) {
+            Message message = createMessage(producerInfo, destination);
+            message.setExpiration(System.currentTimeMillis()+5000);
+            message.setPersistent(true);
+            connection.send(message);
+        }
+        Message message = createMessage(producerInfo, destination);
+        message.setPersistent(true);
+        connection.send(message);
+        expected.add(message.getMessageId().toString());
+
+        connection.request(closeConnectionInfo(connectionInfo));
+    }
+
+    @Override
+    protected PolicyEntry getDefaultPolicy() {
+        PolicyEntry policy = super.getDefaultPolicy();
+        policy.setPendingQueuePolicy(queuePendingPolicy);
+        policy.setExpireMessagesPeriod(0);
+        return policy;
+    }
+
+    @Override
+    protected void configureBroker(BrokerService broker) throws Exception {
+        super.configureBroker(broker);
+        broker.setPersistenceAdapter(persistenceAdapter);
+    }
+
+    public static Test suite() {
+        return suite(RecoverExpiredMessagesTest.class);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/store/kahabroker.xml
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/store/kahabroker.xml b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/store/kahabroker.xml
new file mode 100644
index 0000000..4c8254b
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/store/kahabroker.xml
@@ -0,0 +1,38 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<beans 
+  xmlns="http://www.springframework.org/schema/beans" 
+  xmlns:amq="http://activemq.apache.org/schema/core"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
+
+  <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
+
+  <broker brokerName="broker" persistent="false" useJmx="false" xmlns="http://activemq.apache.org/schema/core">
+    <persistenceAdapter>
+      <kahaPersistenceAdapter dir = "${basedir}/target/activemq-data/kaha-broker.db"  maxDataFileLength = "1024"/>
+    </persistenceAdapter>
+
+     <transportConnectors>
+      <transportConnector uri="tcp://localhost:0"/>
+    </transportConnectors>
+
+  </broker>
+
+</beans>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/store/loadtester.xml
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/store/loadtester.xml b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/store/loadtester.xml
new file mode 100644
index 0000000..6383e84
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/store/loadtester.xml
@@ -0,0 +1,61 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<beans 
+  xmlns="http://www.springframework.org/schema/beans" 
+  xmlns:amq="http://activemq.apache.org/schema/core"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
+
+  <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
+
+  <amq:systemUsage id="memory-manager" >
+    <amq:memoryUsage>
+  	  <amq:memoryUsage limit = "1048576" /> <!--  1 meg limit -->
+  	</amq:memoryUsage>
+  </amq:systemUsage>
+  
+  <broker useJmx="true" deleteAllMessagesOnStartup="true" systemUsage="#memory-manager"  xmlns="http://activemq.apache.org/schema/core">
+  
+    <persistenceFactory>
+      <journalPersistenceAdapterFactory 
+      useQuickJournal="false" journalLogFiles="2" dataDirectory="loadtest"/>
+    </persistenceFactory>
+  
+    <transportConnectors>
+      <transportConnector uri="tcp://localhost:0"/>
+    </transportConnectors>
+        
+  </broker>
+  
+  <!-- The Derby Datasource that will be used by the Broker -->
+  <!-- 
+  <bean id="postgres-ds" class="org.postgresql.ds.PGPoolingDataSource" destroy-method="close">
+    <property name="serverName" value="localhost"/>
+    <property name="databaseName" value="activemq"/>
+    <property name="portNumber" value="0"/>
+    <property name="user" value="activemq"/>
+    <property name="password" value="activemq"/>
+    <property name="dataSourceName" value="postgres"/>
+    <property name="initialConnections" value="1"/>
+    <property name="maxConnections" value="10"/>
+  </bean>
+  -->
+   
+</beans>
+<!-- END SNIPPET: xbean -->

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/util/DestinationsPluginTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/util/DestinationsPluginTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/util/DestinationsPluginTest.java
new file mode 100644
index 0000000..d08fc5e
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/util/DestinationsPluginTest.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.util;
+
+
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.BrokerView;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+
+public class DestinationsPluginTest {
+
+    BrokerService broker;
+
+    @Before
+    public void setUp() throws Exception {
+        broker = createBroker();
+        broker.start();
+        broker.waitUntilStarted();
+    }
+
+    @After
+    public void shutdown() throws Exception {
+        broker.stop();
+        broker.waitUntilStopped();
+    }
+
+    protected BrokerService createBroker() {
+        BrokerService broker = new BrokerService();
+        broker.setPersistent(false);
+        broker.setUseJmx(true);
+        broker.setPlugins(new BrokerPlugin[]{new DestinationsPlugin()});
+        broker.setDataDirectory("target/test");
+        return broker;
+    }
+
+    @Test
+    public void testDestinationSave() throws Exception {
+
+        BrokerView brokerView = broker.getAdminView();
+        brokerView.addQueue("test-queue");
+
+        broker.stop();
+        broker.waitUntilStopped();
+
+        broker = createBroker();
+        broker.start();
+        broker.waitUntilStarted();
+
+
+        ActiveMQDestination[] destinations = broker.getRegionBroker().getDestinations();
+        for (ActiveMQDestination destination : destinations) {
+            if (destination.isQueue()) {
+                assertEquals("test-queue", destination.getPhysicalName());
+            }
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/util/PluginBrokerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/util/PluginBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/util/PluginBrokerTest.java
new file mode 100644
index 0000000..9361859
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/util/PluginBrokerTest.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.util;
+
+import java.net.URI;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.test.JmsTopicSendReceiveTest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ * 
+ */
+public class PluginBrokerTest extends JmsTopicSendReceiveTest {
+    private static final Logger LOG = LoggerFactory.getLogger(PluginBrokerTest.class);
+    private BrokerService broker;
+
+    protected void setUp() throws Exception {
+        broker = createBroker();
+        super.setUp();
+    }
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        if (broker != null) {
+            broker.stop();
+        }   
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        return createBroker("org/apache/activemq/util/plugin-broker.xml");
+    }
+
+    protected BrokerService createBroker(String uri) throws Exception {
+        LOG.info("Loading broker configuration from the classpath with URI: " + uri);
+        return BrokerFactory.createBroker(new URI("xbean:" + uri));
+    }
+
+	protected void assertMessageValid(int index, Message message)
+			throws JMSException {
+		// check if broker path has been set 
+		assertEquals("localhost", message.getStringProperty("BrokerPath"));
+		ActiveMQMessage amqMsg = (ActiveMQMessage)message;
+		if (index == 7) {
+			// check custom expiration
+			assertTrue("expiration is in range, depends on two distinct calls to System.currentTimeMillis", 1500 < amqMsg.getExpiration() - amqMsg.getTimestamp());
+		} else if (index == 9) {
+			// check ceiling
+			assertTrue("expiration ceeling is in range, depends on two distinct calls to System.currentTimeMillis", 59500 < amqMsg.getExpiration() - amqMsg.getTimestamp());
+		} else {
+			// check default expiration
+			assertEquals(1000, amqMsg.getExpiration() - amqMsg.getTimestamp());
+		}
+		super.assertMessageValid(index, message);
+	}
+	
+    protected void sendMessage(int index, Message message) throws Exception {
+    	if (index == 7) {
+    		producer.send(producerDestination, message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, 2000);
+    	} else if (index == 9) {
+    		producer.send(producerDestination, message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, 200000);
+    	} else {
+    		super.sendMessage(index, message);
+    	}
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/util/RedeliveryPluginTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/util/RedeliveryPluginTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/util/RedeliveryPluginTest.java
new file mode 100644
index 0000000..dd12768
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/util/RedeliveryPluginTest.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.util;
+
+import junit.framework.TestCase;
+import org.apache.activemq.RedeliveryPolicy;
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ErrorBroker;
+import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RedeliveryPluginTest extends TestCase {
+    private static final Logger LOG = LoggerFactory.getLogger(RedeliveryPluginTest.class);
+    RedeliveryPlugin underTest = new RedeliveryPlugin();
+
+    public void testInstallPluginValidation() throws Exception {
+        RedeliveryPolicyMap redeliveryPolicyMap = new RedeliveryPolicyMap();
+        RedeliveryPolicy defaultEntry = new RedeliveryPolicy();
+        defaultEntry.setInitialRedeliveryDelay(500);
+        redeliveryPolicyMap.setDefaultEntry(defaultEntry);
+        underTest.setRedeliveryPolicyMap(redeliveryPolicyMap);
+
+        final BrokerService brokerService = new BrokerService();
+        brokerService.setSchedulerSupport(false);
+        Broker broker = new ErrorBroker("hi") {
+            @Override
+            public BrokerService getBrokerService() {
+                return brokerService;
+            }
+        };
+
+        try {
+            underTest.installPlugin(broker);
+            fail("expect exception on no scheduler support");
+        } catch (Exception expected) {
+            LOG.info("expected: " + expected);
+        }
+
+        brokerService.setSchedulerSupport(true);
+        try {
+            underTest.installPlugin(broker);
+            fail("expect exception on small initial delay");
+        } catch (Exception expected) {
+            LOG.info("expected: " + expected);
+        }
+
+        defaultEntry.setInitialRedeliveryDelay(5000);
+        defaultEntry.setRedeliveryDelay(500);
+        brokerService.setSchedulerSupport(true);
+        try {
+            underTest.installPlugin(broker);
+            fail("expect exception on small redelivery delay");
+        } catch (Exception expected) {
+            LOG.info("expected: " + expected);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/util/TimeStampingBrokerPluginTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/util/TimeStampingBrokerPluginTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/util/TimeStampingBrokerPluginTest.java
new file mode 100644
index 0000000..1a91f88
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/util/TimeStampingBrokerPluginTest.java
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.util;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
+import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TimeStampingBrokerPluginTest extends TestCase {
+
+	BrokerService broker;
+	TransportConnector tcpConnector;
+	MessageProducer producer;
+	MessageConsumer consumer;
+	Connection connection;
+	Session session;
+	Destination destination;
+	String queue = "TEST.FOO";
+	long expiry = 500;
+	
+	@Before
+	public void setUp() throws Exception {
+		TimeStampingBrokerPlugin tsbp = new TimeStampingBrokerPlugin();
+    	tsbp.setZeroExpirationOverride(expiry);
+    	tsbp.setTtlCeiling(expiry);
+    	
+        broker = new BrokerService();
+        broker.setPersistent(false);
+        broker.setUseJmx(true);
+        broker.setPlugins(new BrokerPlugin[] {tsbp});
+        tcpConnector = broker.addConnector("tcp://localhost:0");
+        
+        // Add policy and individual DLQ strategy
+        PolicyEntry policy = new PolicyEntry();
+        DeadLetterStrategy strategy = new IndividualDeadLetterStrategy();
+        strategy.setProcessExpired(true);
+        ((IndividualDeadLetterStrategy)strategy).setUseQueueForQueueMessages(true);
+        ((IndividualDeadLetterStrategy)strategy).setQueuePrefix("DLQ.");
+        strategy.setProcessNonPersistent(true);
+        policy.setDeadLetterStrategy(strategy);
+
+        PolicyMap pMap = new PolicyMap();
+        pMap.setDefaultEntry(policy);
+
+        broker.setDestinationPolicy(pMap);
+        
+        broker.start();
+        // Create a ConnectionFactory
+        ActiveMQConnectionFactory connectionFactory =
+            new ActiveMQConnectionFactory(tcpConnector.getConnectUri());
+
+        // Create a Connection
+        connection = connectionFactory.createConnection();
+        connection.start();
+
+        // Create a Session
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        // Create the destination Queue
+        destination = session.createQueue(queue);
+
+        // Create a MessageProducer from the Session to the Topic or Queue
+        producer = session.createProducer(destination);
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+	}
+	
+	@After
+	public void tearDown() throws Exception {
+	     // Clean up
+        producer.close();
+        consumer.close();
+        session.close();
+        connection.close();
+        broker.stop();
+	}
+	@Test
+    public void testExpirationSet() throws Exception {
+    	
+        // Create a messages
+        Message sentMessage = session.createMessage();
+
+        // Tell the producer to send the message
+        long beforeSend = System.currentTimeMillis();
+        producer.send(sentMessage);
+
+        // Create a MessageConsumer from the Session to the Topic or Queue
+        consumer = session.createConsumer(destination);
+
+        // Wait for a message
+        Message receivedMessage = consumer.receive(1000);
+
+        // assert we got the same message ID we sent
+        assertEquals(sentMessage.getJMSMessageID(), receivedMessage.getJMSMessageID());
+        
+        // assert message timestamp is in window
+        assertTrue("Expiration should be not null" + receivedMessage.getJMSExpiration() + "\n", Long.valueOf(receivedMessage.getJMSExpiration()) != null);
+
+        // assert message expiration is in window
+        assertTrue("Before send: " + beforeSend + " Msg ts: " + receivedMessage.getJMSTimestamp() + " Msg Expiry: " + receivedMessage.getJMSExpiration(), beforeSend <= receivedMessage.getJMSExpiration() && receivedMessage.getJMSExpiration() <= (receivedMessage.getJMSTimestamp() + expiry));
+    }
+    @Test
+    public void testExpirationCelingSet() throws Exception {
+    	
+        // Create a messages
+        Message sentMessage = session.createMessage();
+        // Tell the producer to send the message
+        long beforeSend = System.currentTimeMillis();
+        long sendExpiry =  beforeSend + (expiry*22);
+        sentMessage.setJMSExpiration(sendExpiry);
+
+        producer.send(sentMessage);
+
+        // Create a MessageConsumer from the Session to the Topic or Queue
+        consumer = session.createConsumer(destination);
+
+        // Wait for a message
+        Message receivedMessage = consumer.receive(1000);
+
+        // assert we got the same message ID we sent
+        assertEquals(sentMessage.getJMSMessageID(), receivedMessage.getJMSMessageID());
+        
+        // assert message timestamp is in window
+        assertTrue("Expiration should be not null" + receivedMessage.getJMSExpiration() + "\n", Long.valueOf(receivedMessage.getJMSExpiration()) != null);
+
+        // assert message expiration is in window
+        assertTrue("Sent expiry: " + sendExpiry + " Recv ts: " + receivedMessage.getJMSTimestamp() + " Recv expiry: " + receivedMessage.getJMSExpiration(), beforeSend <= receivedMessage.getJMSExpiration() && receivedMessage.getJMSExpiration() <= (receivedMessage.getJMSTimestamp() + expiry));
+    }
+    
+    @Test
+    public void testExpirationDLQ() throws Exception {
+    	
+        // Create a messages
+        Message sentMessage = session.createMessage();
+        // Tell the producer to send the message
+        long beforeSend = System.currentTimeMillis();
+        long sendExpiry =  beforeSend + expiry;
+        sentMessage.setJMSExpiration(sendExpiry);
+
+        producer.send(sentMessage);
+
+        // Create a MessageConsumer from the Session to the Topic or Queue
+        consumer = session.createConsumer(destination);
+
+        Thread.sleep(expiry+250);
+        
+        // Wait for a message
+        Message receivedMessage = consumer.receive(1000);
+
+        // Message should roll to DLQ
+        assertNull(receivedMessage);
+                
+        // Close old consumer, setup DLQ listener
+        consumer.close();
+        consumer = session.createConsumer(session.createQueue("DLQ."+queue));
+        
+        // Get mesage from DLQ
+        receivedMessage = consumer.receive(1000);
+
+        // assert we got the same message ID we sent
+        assertEquals(sentMessage.getJMSMessageID(), receivedMessage.getJMSMessageID());
+        
+        // assert message timestamp is in window
+        //System.out.println("Recv: " + receivedMessage.getJMSExpiration());
+        assertEquals("Expiration should be zero" + receivedMessage.getJMSExpiration() + "\n", receivedMessage.getJMSExpiration(), 0);
+        
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/util/TraceBrokerPathPluginTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/util/TraceBrokerPathPluginTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/util/TraceBrokerPathPluginTest.java
new file mode 100644
index 0000000..35e9cdb
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/util/TraceBrokerPathPluginTest.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * 
+ */
+package org.apache.activemq.broker.util;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests TraceBrokerPathPlugin by creating two brokers linked by a network connector, and checking to see if the consuming end receives the expected value in the trace property
+ * @author Raul Kripalani
+ *
+ */
+public class TraceBrokerPathPluginTest extends TestCase {
+
+	BrokerService brokerA;
+	BrokerService brokerB;
+	TransportConnector tcpConnectorA;
+	TransportConnector tcpConnectorB;
+	MessageProducer producer;
+	MessageConsumer consumer;
+	Connection connectionA;
+	Connection connectionB;
+	Session sessionA;
+	Session sessionB;
+	String queue = "TEST.FOO";
+	String traceProperty = "BROKER_PATH";
+	
+	@Before
+	public void setUp() throws Exception {
+		TraceBrokerPathPlugin tbppA = new TraceBrokerPathPlugin();
+		tbppA.setStampProperty(traceProperty);
+		
+		TraceBrokerPathPlugin tbppB = new TraceBrokerPathPlugin();
+		tbppB.setStampProperty(traceProperty);
+    	
+        brokerA = new BrokerService();
+        brokerA.setBrokerName("brokerA");
+        brokerA.setPersistent(false);
+        brokerA.setUseJmx(true);
+        brokerA.setPlugins(new BrokerPlugin[] {tbppA});
+        tcpConnectorA = brokerA.addConnector("tcp://localhost:0");
+
+        brokerB = new BrokerService();
+        brokerB.setBrokerName("brokerB");
+        brokerB.setPersistent(false);
+        brokerB.setUseJmx(true);
+        brokerB.setPlugins(new BrokerPlugin[] {tbppB});
+        tcpConnectorB = brokerB.addConnector("tcp://localhost:0");
+        
+        brokerA.addNetworkConnector("static:(" + tcpConnectorB.getConnectUri().toString() + ")");
+        
+        brokerB.start();
+        brokerB.waitUntilStarted();
+        brokerA.start();
+        brokerA.waitUntilStarted();
+        
+        // Initialise connection to A and MessageProducer
+        connectionA = new ActiveMQConnectionFactory(tcpConnectorA.getConnectUri()).createConnection();
+        connectionA.start();
+        sessionA = connectionA.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        producer = sessionA.createProducer(sessionA.createQueue(queue));
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+        
+        // Initialise connection to B and MessageConsumer
+        connectionB = new ActiveMQConnectionFactory(tcpConnectorB.getConnectUri()).createConnection();
+        connectionB.start();
+        sessionB = connectionB.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        consumer = sessionB.createConsumer(sessionB.createQueue(queue));
+        
+	}
+	
+	@After
+	public void tearDown() throws Exception {
+	     // Clean up
+        producer.close();
+        consumer.close();
+        sessionA.close();
+        sessionB.close();
+        connectionA.close();
+        connectionB.close();
+        brokerA.stop();
+        brokerB.stop();
+	}
+	
+	@Test
+    public void testTraceBrokerPathPlugin() throws Exception {
+        Message sentMessage = sessionA.createMessage();
+        producer.send(sentMessage);
+        Message receivedMessage = consumer.receive(1000);
+
+        // assert we got the message
+        assertNotNull(receivedMessage);
+        
+        // assert we got the same message ID we sent
+        assertEquals(sentMessage.getJMSMessageID(), receivedMessage.getJMSMessageID());
+        
+        assertEquals("brokerA,brokerB", receivedMessage.getStringProperty(traceProperty));
+        
+	}
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeQueueTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeQueueTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeQueueTest.java
new file mode 100644
index 0000000..3621a14
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeQueueTest.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.virtual;
+
+import java.net.URI;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.spring.ConsumerBean;
+import org.apache.activemq.xbean.XBeanBrokerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ * 
+ */
+public class CompositeQueueTest extends EmbeddedBrokerTestSupport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(CompositeQueueTest.class);
+    
+    protected int total = 10;
+    protected Connection connection;
+    public String messageSelector1, messageSelector2 = null;
+
+
+    public void testVirtualTopicCreation() throws Exception {
+        if (connection == null) {
+            connection = createConnection();
+        }
+        connection.start();
+
+        ConsumerBean messageList1 = new ConsumerBean();
+        ConsumerBean messageList2 = new ConsumerBean();
+        messageList1.setVerbose(true);
+        messageList2.setVerbose(true);
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        
+        Destination producerDestination = getProducerDestination();
+        Destination destination1 = getConsumer1Dsetination();
+        Destination destination2 = getConsumer2Dsetination();
+        
+        LOG.info("Sending to: " + producerDestination);
+        LOG.info("Consuming from: " + destination1 + " and " + destination2);
+        
+        MessageConsumer c1 = session.createConsumer(destination1, messageSelector1);
+        MessageConsumer c2 = session.createConsumer(destination2, messageSelector2);
+
+        c1.setMessageListener(messageList1);
+        c2.setMessageListener(messageList2);
+
+        // create topic producer
+        MessageProducer producer = session.createProducer(producerDestination);
+        assertNotNull(producer);
+
+        for (int i = 0; i < total; i++) {
+            producer.send(createMessage(session, i));
+        }
+
+        assertMessagesArrived(messageList1, messageList2);
+    }
+
+    protected void assertMessagesArrived(ConsumerBean messageList1, ConsumerBean messageList2) {
+        messageList1.assertMessagesArrived(total);
+        messageList2.assertMessagesArrived(total);
+    }
+
+    protected TextMessage createMessage(Session session, int i) throws JMSException {
+        TextMessage textMessage = session.createTextMessage("message: " + i);
+        if (i % 2 != 0) {
+            textMessage.setStringProperty("odd", "yes");
+        } else {
+            textMessage.setStringProperty("odd", "no");
+        }
+        textMessage.setIntProperty("i", i);
+        return textMessage;
+    }
+
+    protected Destination getConsumer1Dsetination() {
+        return new ActiveMQQueue("FOO");
+    }
+
+    protected Destination getConsumer2Dsetination() {
+        return new ActiveMQTopic("BAR");
+    }
+
+    protected Destination getProducerDestination() {
+        return new ActiveMQQueue("MY.QUEUE");
+    }
+
+    protected void tearDown() throws Exception {
+        if (connection != null) {
+            connection.close();
+        }
+        super.tearDown();
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        XBeanBrokerFactory factory = new XBeanBrokerFactory();
+        BrokerService answer = factory.createBroker(new URI(getBrokerConfigUri()));
+        return answer;
+    }
+
+    protected String getBrokerConfigUri() {
+        return "org/apache/activemq/broker/virtual/composite-queue.xml";
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeTopicTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeTopicTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeTopicTest.java
new file mode 100644
index 0000000..991e3a5
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeTopicTest.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.virtual;
+
+import javax.jms.Destination;
+
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+
+/**
+ *
+ * 
+ */
+public class CompositeTopicTest extends CompositeQueueTest {
+    
+    protected Destination getConsumer1Dsetination() {
+        return new ActiveMQQueue("FOO");
+    }
+
+    protected Destination getConsumer2Dsetination() {
+        return new ActiveMQTopic("BAR");
+    }
+
+    protected Destination getProducerDestination() {
+        return new ActiveMQTopic("MY.TOPIC");
+    }
+
+    protected String getBrokerConfigUri() {
+        return "org/apache/activemq/broker/virtual/composite-topic.xml";
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/DestinationInterceptorDurableSubTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/DestinationInterceptorDurableSubTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/DestinationInterceptorDurableSubTest.java
new file mode 100644
index 0000000..b6ba22d
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/DestinationInterceptorDurableSubTest.java
@@ -0,0 +1,282 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.virtual;
+
+import java.io.IOException;
+import java.net.URI;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+import javax.management.InstanceNotFoundException;
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectInstance;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ProducerBrokerExchange;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.DestinationFilter;
+import org.apache.activemq.broker.region.DestinationInterceptor;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.xbean.XBeanBrokerFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Test for AMQ-4571.
+ * checks that durable subscription is fully unregistered 
+ * when using nested destination interceptors.
+ */
+public class DestinationInterceptorDurableSubTest extends EmbeddedBrokerTestSupport {
+
+    private static final transient Logger LOG = LoggerFactory.getLogger(DestinationInterceptorDurableSubTest.class);
+    private MBeanServerConnection mbsc = null;
+    public static final String JMX_CONTEXT_BASE_NAME = "org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Topic,destinationName=";
+
+    /**
+     * Tests AMQ-4571.
+     * @throws Exception
+     */
+    public void testVirtualTopicRemoval() throws Exception {
+
+        LOG.debug("Running testVirtualTopicRemoval()");
+        String clientId1 = "myId1";
+        String clientId2 = "myId2";
+
+        Connection conn = null;
+        Session session = null;
+
+        try {
+            assertTrue(broker.isStarted());
+
+            // create durable sub 1
+            conn = createConnection();
+            conn.setClientID(clientId1);
+            conn.start();
+            session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            // Topic topic = session.createTopic(destination.getPhysicalName());
+            TopicSubscriber sub1 = session.createDurableSubscriber((Topic) destination, clientId1);
+
+            // create durable sub 2
+            TopicSubscriber sub2 = session.createDurableSubscriber((Topic) destination, clientId2);
+
+            // verify two subs registered in JMX 
+            assertSubscriptionCount(destination.getPhysicalName(), 2);
+            assertTrue(isSubRegisteredInJmx(destination.getPhysicalName(), clientId1));
+            assertTrue(isSubRegisteredInJmx(destination.getPhysicalName(), clientId2));
+
+            // delete sub 1
+            sub1.close();
+            session.unsubscribe(clientId1);
+
+            // verify only one sub registered in JMX
+            assertSubscriptionCount(destination.getPhysicalName(), 1);
+            assertFalse(isSubRegisteredInJmx(destination.getPhysicalName(), clientId1));
+            assertTrue(isSubRegisteredInJmx(destination.getPhysicalName(), clientId2));
+
+            // delete sub 2
+            sub2.close();
+            session.unsubscribe(clientId2);
+
+            // verify no sub registered in JMX
+            assertSubscriptionCount(destination.getPhysicalName(), 0);
+            assertFalse(isSubRegisteredInJmx(destination.getPhysicalName(), clientId1));
+            assertFalse(isSubRegisteredInJmx(destination.getPhysicalName(), clientId2));
+        } finally {
+            session.close();
+            conn.close();
+        }
+    }
+
+
+    /**
+     * Connects to broker using JMX
+     * @return The JMX connection
+     * @throws IOException in case of any errors
+     */
+    protected MBeanServerConnection connectJMXBroker() throws IOException {
+        // connect to broker via JMX
+        JMXServiceURL url =  new JMXServiceURL("service:jmx:rmi:///jndi/rmi://:1299/jmxrmi");
+        JMXConnector jmxc = JMXConnectorFactory.connect(url, null);
+        MBeanServerConnection mbsc = jmxc.getMBeanServerConnection();
+        LOG.debug("JMX connection established");
+        return mbsc;
+    }
+
+    /**
+     * Asserts that the Subscriptions JMX attribute of a topic has the expected
+     * count. 
+     * @param topicName name of the topic destination
+     * @param expectedCount expected number of subscriptions
+     * @return
+     */
+    protected boolean assertSubscriptionCount(String topicName, int expectedCount) {
+        try {
+            if (mbsc == null) {
+                mbsc = connectJMXBroker();
+            }
+            // query broker queue size
+            ObjectName[] tmp = (ObjectName[])mbsc.getAttribute(new ObjectName(JMX_CONTEXT_BASE_NAME + topicName), "Subscriptions");
+            assertEquals(expectedCount, tmp.length);
+        } catch (Exception ex) {
+            LOG.error(ex.getMessage());
+            return false;
+        }
+        return true;
+    }
+
+    /**
+     * Checks if a subscriptions for topic topicName with subName is registered in JMX
+     * 
+     * @param topicName physical name of topic destination (excluding prefix 'topic://')
+     * @param subName name of the durable subscription
+     * @return true if registered, false otherwise
+     */
+    protected boolean isSubRegisteredInJmx(String topicName, String subName) {
+
+        try {
+            if (mbsc == null) {
+                mbsc = connectJMXBroker();
+            }
+
+            // A durable sub is registered under the Subscriptions JMX attribute of the topic and 
+            // as its own ObjectInstance under the topic's Consumer namespace.
+            // AMQ-4571 only removed the latter not the former on unsubscribe(), so we need 
+            // to check against both.
+            ObjectName[] names = (ObjectName[])mbsc.getAttribute(new ObjectName(JMX_CONTEXT_BASE_NAME + topicName), "Subscriptions");
+            ObjectInstance instance = (ObjectInstance)mbsc.getObjectInstance(
+                new ObjectName(JMX_CONTEXT_BASE_NAME + 
+                    topicName + 
+                    ",endpoint=Consumer,clientId=myId1,consumerId=Durable(myId1_" + 
+                    subName + 
+                    ")")
+            );
+
+            if (instance == null) 
+                return false;
+
+            for (int i=0; i < names.length; i++) {
+                if (names[i].toString().contains(subName))
+                    return true;
+            }
+        } catch (InstanceNotFoundException ine) {
+            //this may be expected so log at info level
+            LOG.info(ine.toString());
+            return false;
+        }
+        catch (Exception ex) {
+            LOG.error(ex.toString());
+            return false;
+        }
+        return false;
+    }
+
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+    }
+
+
+    protected BrokerService createBroker() throws Exception {
+        XBeanBrokerFactory factory = new XBeanBrokerFactory();
+        BrokerService answer = factory.createBroker(new URI(getBrokerConfigUri()));
+
+        // lets disable persistence as we are a test
+        answer.setPersistent(false);
+        useTopic = true;
+        return answer;
+    }
+
+
+    protected String getBrokerConfigUri() {
+        return "org/apache/activemq/broker/virtual/virtual-topics-and-interceptor.xml";
+    }
+
+
+    /**
+     * Simple but custom topic interceptor.
+     * To be used for testing nested interceptors in conjunction with 
+     * virtual topic interceptor.
+     */
+    public static class SimpleDestinationInterceptor implements DestinationInterceptor {
+
+        private final Logger LOG = LoggerFactory.getLogger(SimpleDestinationInterceptor.class);
+        private BrokerService broker;
+
+        public SimpleDestinationInterceptor() {
+        }
+
+        /* (non-Javadoc)
+         * @see org.apache.activemq.broker.BrokerServiceAware#setBrokerService(org.apache.activemq.broker.BrokerService)
+         */
+        public void setBrokerService(BrokerService brokerService) {
+            LOG.info("setBrokerService()");
+            this.broker = brokerService;
+        }
+
+        /* (non-Javadoc)
+         * @see org.apache.activemq.broker.region.DestinationInterceptor#intercept(org.apache.activemq.broker.region.Destination)
+         */
+        public Destination intercept(final Destination destination) {
+            LOG.info("intercept({})", destination.getName());
+
+            if (!destination.getActiveMQDestination().getPhysicalName().startsWith("ActiveMQ")) {
+                return new DestinationFilter(destination) {
+                  public void send(ProducerBrokerExchange context, Message message) throws Exception {
+                    // Send message to Destination
+                    if (LOG.isDebugEnabled()) {
+                      LOG.debug("SimpleDestinationInterceptor: Sending message to destination:"
+                          + this.getActiveMQDestination().getPhysicalName());
+                    }
+                    // message.setDestination(destination.getActiveMQDestination());
+                    super.send(context, message);
+                  }
+                };
+              }
+              return destination;
+        }
+
+
+        /* (non-Javadoc)
+         * @see org.apache.activemq.broker.region.DestinationInterceptor#remove(org.apache.activemq.broker.region.Destination)
+         */
+        public void remove(Destination destination) {
+            LOG.info("remove({})", destination.getName());
+            this.broker = null;
+        }
+
+
+        /* (non-Javadoc)
+         * @see org.apache.activemq.broker.region.DestinationInterceptor#create(org.apache.activemq.broker.Broker, org.apache.activemq.broker.ConnectionContext, org.apache.activemq.command.ActiveMQDestination)
+         */
+        public void create(Broker broker, ConnectionContext context, ActiveMQDestination destination) throws Exception {
+            LOG.info("create("+ broker.getBrokerName() + ", " + context.toString() + ", " + destination.getPhysicalName());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/FilteredQueueTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/FilteredQueueTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/FilteredQueueTest.java
new file mode 100644
index 0000000..17a8706
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/FilteredQueueTest.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.virtual;
+
+import org.apache.activemq.spring.ConsumerBean;
+
+/**
+ * 
+ */
+public class FilteredQueueTest extends CompositeQueueTest {
+
+    @Override
+    protected String getBrokerConfigUri() {
+        return "org/apache/activemq/broker/virtual/filtered-queue.xml";
+    }
+
+    @Override
+    protected void assertMessagesArrived(ConsumerBean messageList1, ConsumerBean messageList2) {
+        messageList1.assertMessagesArrived(total / 2);
+        messageList2.assertMessagesArrived(1);
+    }
+}


Mime
View raw message