activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [32/52] [abbrv] [partial] activemq-artemis git commit: ARTEMIS-127 Adding activemq unit test module to Artemis
Date Tue, 09 Jun 2015 16:37:03 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/TransportConnectorMBeanTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/TransportConnectorMBeanTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/TransportConnectorMBeanTest.java
new file mode 100644
index 0000000..6f55e3d
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/TransportConnectorMBeanTest.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.jmx;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+import java.net.Socket;
+import java.util.Set;
+
+import javax.management.ObjectName;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.util.JMXSupport;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TransportConnectorMBeanTest {
+    private static final Logger LOG = LoggerFactory.getLogger(TransportConnectorMBeanTest.class);
+
+    BrokerService broker;
+
+    @Test
+    public void verifyRemoteAddressInMbeanName() throws Exception {
+        doVerifyRemoteAddressInMbeanName(true);
+    }
+
+    @Test
+    public void verifyRemoteAddressNotInMbeanName() throws Exception {
+        doVerifyRemoteAddressInMbeanName(false);
+    }
+
+    @Test
+    public void verifyClientIdNetwork() throws Exception {
+        doVerifyClientIdNetwork(false);
+    }
+
+    @Test
+    public void verifyClientIdDuplexNetwork() throws Exception {
+        doVerifyClientIdNetwork(true);
+    }
+
+    private void doVerifyClientIdNetwork(boolean duplex) throws Exception {
+        createBroker(true);
+
+        BrokerService networked = new BrokerService();
+        networked.setBrokerName("networked");
+        networked.setPersistent(false);
+        NetworkConnector nc = networked.addNetworkConnector("static:" + broker.getTransportConnectors().get(0).getPublishableConnectString());
+        nc.setDuplex(duplex);
+        networked.start();
+
+        try {
+            assertTrue("presence of mbean with clientId", Wait.waitFor(new Wait.Condition() {
+                @Override
+                public boolean isSatisified() throws Exception {
+                    Set<ObjectName> registeredMbeans = getRegisteredMbeans();
+                    return match("_outbound", registeredMbeans);
+                }
+            }));
+
+        } finally {
+            networked.stop();
+        }
+    }
+
+    private void doVerifyRemoteAddressInMbeanName(boolean allowRemoteAddress) throws Exception {
+        createBroker(allowRemoteAddress);
+        ActiveMQConnection connection = createConnection();
+        Set<ObjectName> registeredMbeans = getRegisteredMbeans();
+        assertEquals("presence of mbean with clientId", true, match(connection.getClientID(), registeredMbeans));
+        assertEquals("presence of mbean with local port", allowRemoteAddress, match(extractLocalPort(connection), registeredMbeans));
+    }
+
+    @After
+    public void stopBroker() throws Exception {
+        if (broker != null) {
+            broker.stop();
+        }
+    }
+
+    private boolean match(String s, Set<ObjectName> registeredMbeans) {
+        String encodedName = JMXSupport.encodeObjectNamePart(s);
+        for (ObjectName name : registeredMbeans) {
+            LOG.info("checking for match:" + encodedName + ", with: " + name.toString());
+            if (name.toString().contains(encodedName)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private String extractLocalPort(ActiveMQConnection connection) throws Exception {
+        Socket socket = connection.getTransport().narrow(Socket.class);
+        return String.valueOf(socket.getLocalPort());
+    }
+
+    private Set<ObjectName> getRegisteredMbeans() throws Exception {
+        // need a little sleep to ensure JMX is up to date
+        Thread.sleep(200);
+        return broker.getManagementContext().queryNames(null, null);
+    }
+
+    private ActiveMQConnection createConnection() throws Exception {
+        final String opts = "?jms.watchTopicAdvisories=false";
+        ActiveMQConnection connection = (ActiveMQConnection)
+                new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri() + opts).createConnection();
+        connection.start();
+        return connection;
+    }
+
+    private void createBroker(boolean allowRemoteAddressInMbeanNames) throws Exception {
+        broker = new BrokerService();
+        broker.setPersistent(false);
+        broker.addConnector("tcp://localhost:0");
+        broker.getManagementContext().setAllowRemoteAddressInMBeanNames(allowRemoteAddressInMbeanNames);
+        broker.start();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/mKahaDBXARecoveryBrokerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/mKahaDBXARecoveryBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/mKahaDBXARecoveryBrokerTest.java
new file mode 100644
index 0000000..4cc57ba
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/mKahaDBXARecoveryBrokerTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import junit.framework.Test;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter;
+
+public class mKahaDBXARecoveryBrokerTest extends XARecoveryBrokerTest {
+
+    @Override
+    protected void configureBroker(BrokerService broker) throws Exception {
+        super.configureBroker(broker);
+
+        MultiKahaDBPersistenceAdapter mKahaDB = new MultiKahaDBPersistenceAdapter();
+        List adapters = new LinkedList<FilteredKahaDBPersistenceAdapter>();
+        FilteredKahaDBPersistenceAdapter defaultEntry = new FilteredKahaDBPersistenceAdapter();
+        defaultEntry.setPersistenceAdapter(new KahaDBPersistenceAdapter());
+        adapters.add(defaultEntry);
+
+        FilteredKahaDBPersistenceAdapter special = new FilteredKahaDBPersistenceAdapter();
+        special.setDestination(new ActiveMQQueue("special"));
+        special.setPersistenceAdapter(new KahaDBPersistenceAdapter());
+        adapters.add(special);
+
+        mKahaDB.setFilteredPersistenceAdapters(adapters);
+        broker.setPersistenceAdapter(mKahaDB);
+    }
+
+    public static Test suite() {
+        return suite(mKahaDBXARecoveryBrokerTest.class);
+    }
+
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+    protected ActiveMQDestination createDestination() {
+        return new ActiveMQQueue("test,special");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/mLevelDBXARecoveryBrokerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/mLevelDBXARecoveryBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/mLevelDBXARecoveryBrokerTest.java
new file mode 100644
index 0000000..147d89a
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/mLevelDBXARecoveryBrokerTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import junit.framework.Test;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter;
+import org.apache.activemq.store.leveldb.LevelDBPersistenceAdapter;
+
+import java.util.LinkedList;
+import java.util.List;
+
+public class mLevelDBXARecoveryBrokerTest extends XARecoveryBrokerTest {
+
+    @Override
+    protected void configureBroker(BrokerService broker) throws Exception {
+        super.configureBroker(broker);
+
+        MultiKahaDBPersistenceAdapter mKahaDB = new MultiKahaDBPersistenceAdapter();
+        List adapters = new LinkedList<FilteredKahaDBPersistenceAdapter>();
+        FilteredKahaDBPersistenceAdapter defaultEntry = new FilteredKahaDBPersistenceAdapter();
+        defaultEntry.setPersistenceAdapter(new LevelDBPersistenceAdapter());
+        adapters.add(defaultEntry);
+
+        FilteredKahaDBPersistenceAdapter special = new FilteredKahaDBPersistenceAdapter();
+        special.setDestination(new ActiveMQQueue("special"));
+        special.setPersistenceAdapter(new LevelDBPersistenceAdapter());
+        adapters.add(special);
+
+        mKahaDB.setFilteredPersistenceAdapters(adapters);
+        broker.setPersistenceAdapter(mKahaDB);
+    }
+
+    public static Test suite() {
+        return suite(mLevelDBXARecoveryBrokerTest.class);
+    }
+
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+    protected ActiveMQDestination createDestination() {
+        return new ActiveMQQueue("test,special");
+    }
+
+    public void testQueuePersistentPreparedAcksAvailableAfterRestartAndRollback() throws Exception {
+        // super.testQueuePersistentPreparedAcksAvailableAfterRestartAndRollback();
+    }
+    public void testQueuePersistentUncommittedAcksLostOnRestart() throws Exception {
+        // super.testQueuePersistentUncommittedAcksLostOnRestart();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/message/security/MessageAuthenticationTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/message/security/MessageAuthenticationTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/message/security/MessageAuthenticationTest.java
new file mode 100644
index 0000000..092c554
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/message/security/MessageAuthenticationTest.java
@@ -0,0 +1,99 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.message.security;
+
+import java.io.IOException;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.TextMessage;
+import javax.jms.JMSException;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.security.MessageAuthorizationPolicy;
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.spring.ConsumerBean;
+
+/**
+ * 
+ */
+public class MessageAuthenticationTest extends EmbeddedBrokerTestSupport {
+
+    private Connection connection;
+
+    public void testSendInvalidMessage() throws Exception {
+        if (connection == null) {
+            connection = createConnection();
+        }
+        connection.start();
+
+        ConsumerBean messageList = new ConsumerBean();
+        messageList.setVerbose(true);
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        Destination destination = new ActiveMQQueue("MyQueue");
+
+        MessageConsumer c1 = session.createConsumer(destination);
+
+        c1.setMessageListener(messageList);
+
+        MessageProducer producer = session.createProducer(destination);
+        assertNotNull(producer);
+
+        producer.send(createMessage(session, "invalidBody", "myHeader", "xyz"));
+        producer.send(createMessage(session, "validBody", "myHeader", "abc"));
+
+        messageList.assertMessagesArrived(1);
+        assertEquals("validBody", ((TextMessage) messageList.flushMessages().get(0)).getText());
+    }
+
+    private javax.jms.Message createMessage(Session session, String body, String header, String value) throws JMSException {
+        TextMessage msg = session.createTextMessage(body);
+        msg.setStringProperty(header, value);
+        return msg;
+    }
+
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        BrokerService answer = new BrokerService();
+        answer.setPersistent(false);
+        answer.setMessageAuthorizationPolicy(new MessageAuthorizationPolicy() {
+            public boolean isAllowedToConsume(ConnectionContext context, Message message) {
+                try {
+                    Object value = message.getProperty("myHeader");
+                    return "abc".equals(value);
+                }
+                catch (IOException e) {
+                    System.out.println("Caught: " + e);
+                    e.printStackTrace();
+                    return false;
+                }
+            }
+        });
+        answer.addConnector(bindAddress);
+        return answer;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/out-of-order-broker-elements.xml
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/out-of-order-broker-elements.xml b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/out-of-order-broker-elements.xml
new file mode 100644
index 0000000..245d946
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/out-of-order-broker-elements.xml
@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+  
+  http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!-- this file can only be parsed using the xbean-spring library with validation off -->
+<beans
+  xmlns="http://www.springframework.org/schema/beans"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
+  http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
+
+  <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" />
+
+    <broker xmlns="http://activemq.apache.org/schema/core">
+        <systemUsage>
+            <systemUsage>
+                <memoryUsage>
+                    <memoryUsage limit="20 mb"/>
+                </memoryUsage>
+                <storeUsage>
+                    <storeUsage limit="1 gb" name="foo"/>
+                </storeUsage>
+                <tempUsage>
+                    <tempUsage limit="100 mb"/>
+                </tempUsage>
+            </systemUsage>
+        </systemUsage>
+        <destinations>
+            <queue physicalName="FOO.BAR"/>
+            <topic physicalName="SOME.TOPIC"/>
+        </destinations>
+
+    </broker>
+
+</beans>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/partition/SpringPartitionBrokerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/partition/SpringPartitionBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/partition/SpringPartitionBrokerTest.java
new file mode 100644
index 0000000..dcf4e69
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/partition/SpringPartitionBrokerTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.partition;
+
+import junit.framework.TestCase;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.partition.PartitionBrokerPlugin;
+import org.apache.activemq.partition.dto.Partitioning;
+
+/**
+ */
+public class SpringPartitionBrokerTest extends TestCase {
+
+    public void testCreatePartitionBroker() throws Exception {
+
+        BrokerService broker = BrokerFactory.createBroker("xbean:activemq-partition.xml");
+        assertEquals(1, broker.getPlugins().length);
+        PartitionBrokerPlugin plugin = (PartitionBrokerPlugin)broker.getPlugins()[0];
+        Partitioning config = plugin.getConfig();
+        assertEquals(2,  config.getBrokers().size());
+
+        Object o;
+        String json = "{\n" +
+        "  \"by_client_id\":{\n" +
+        "    \"client1\":{\"ids\":[\"broker1\"]},\n" +
+        "    \"client2\":{\"ids\":[\"broker1\",\"broker2\"]}\n" +
+        "  },\n" +
+        "  \"brokers\":{\n" +
+        "    \"broker1\":\"tcp://localhost:61616\",\n" +
+        "    \"broker2\":\"tcp://localhost:61616\"\n" +
+        "  }\n" +
+        "}";
+        Partitioning expected = Partitioning.MAPPER.readValue(json, Partitioning.class);
+        assertEquals(expected.toString(), config.toString());
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer0Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer0Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer0Test.java
new file mode 100644
index 0000000..3cfd595
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer0Test.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.policy;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.AbortSlowAckConsumerStrategy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.BlockJUnit4ClassRunner;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(value = Parameterized.class)
+public class AbortSlowAckConsumer0Test extends AbortSlowConsumer0Test {
+    private static final Logger LOG = LoggerFactory.getLogger(AbortSlowAckConsumer0Test.class);
+    protected long maxTimeSinceLastAck = 5 * 1000;
+
+    AbortSlowAckConsumerStrategy strategy;
+
+    public AbortSlowAckConsumer0Test(Boolean isTopic) {
+        super(isTopic);
+    }
+
+    @Override
+    protected AbortSlowAckConsumerStrategy createSlowConsumerStrategy() {
+        AbortSlowAckConsumerStrategy strategy = new AbortSlowAckConsumerStrategy();
+        strategy.setAbortConnection(abortConnection);
+        strategy.setCheckPeriod(checkPeriod);
+        strategy.setMaxSlowDuration(maxSlowDuration);
+        strategy.setMaxTimeSinceLastAck(maxTimeSinceLastAck);
+
+        return strategy;
+    }
+
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = super.createBroker();
+        PolicyEntry policy = new PolicyEntry();
+
+        strategy = createSlowConsumerStrategy();
+        underTest = strategy;
+
+        policy.setSlowConsumerStrategy(strategy);
+        policy.setQueuePrefetch(10);
+        policy.setTopicPrefetch(10);
+        PolicyMap pMap = new PolicyMap();
+        pMap.setDefaultEntry(policy);
+        broker.setDestinationPolicy(pMap);
+        return broker;
+    }
+
+    @Override
+    protected ConnectionFactory createConnectionFactory() throws Exception {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
+        factory.getPrefetchPolicy().setAll(1);
+        return factory;
+    }
+
+    @Override
+    @Test
+    public void testSlowConsumerIsAbortedViaJmx() throws Exception {
+        strategy.setMaxTimeSinceLastAck(500); // so jmx does the abort
+        super.testSlowConsumerIsAbortedViaJmx();
+    }
+
+    @Test
+    public void testZeroPrefetchConsumerIsAborted() throws Exception {
+        strategy.setMaxTimeSinceLastAck(2000); // Make it shorter
+
+        ActiveMQConnection conn = (ActiveMQConnection) createConnectionFactory().createConnection();
+        conn.setExceptionListener(this);
+        connections.add(conn);
+
+        Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        final MessageConsumer consumer = sess.createConsumer(destination);
+        assertNotNull(consumer);
+        conn.start();
+        startProducers(destination, 20);
+
+        Message message = consumer.receive(5000);
+        assertNotNull(message);
+
+        TimeUnit.SECONDS.sleep(15);
+
+        try {
+            consumer.receive(5000);
+            fail("Slow consumer not aborted.");
+        } catch (Exception ex) {
+        }
+    }
+
+    @Test
+    public void testIdleConsumerCanBeAbortedNoMessages() throws Exception {
+        strategy.setIgnoreIdleConsumers(false);
+        strategy.setMaxTimeSinceLastAck(2000); // Make it shorter
+
+        ActiveMQConnection conn = (ActiveMQConnection) createConnectionFactory().createConnection();
+        conn.setExceptionListener(this);
+        connections.add(conn);
+
+        Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        final MessageConsumer consumer = sess.createConsumer(destination);
+        assertNotNull(consumer);
+        conn.start();
+
+        startProducers(destination, 1);
+
+        Message message = consumer.receive(5000);
+        assertNotNull(message);
+
+        // Consumer needs to be closed before the reeive call.
+        TimeUnit.SECONDS.sleep(15);
+
+        try {
+            consumer.receive(5000);
+            fail("Idle consumer not aborted.");
+        } catch (Exception ex) {
+        }
+    }
+
+    @Test
+    public void testIdleConsumerCanBeAborted() throws Exception {
+        strategy.setIgnoreIdleConsumers(false);
+        strategy.setMaxTimeSinceLastAck(2000); // Make it shorter
+
+        ActiveMQConnection conn = (ActiveMQConnection) createConnectionFactory().createConnection();
+        conn.setExceptionListener(this);
+        connections.add(conn);
+
+        Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        final MessageConsumer consumer = sess.createConsumer(destination);
+        assertNotNull(consumer);
+        conn.start();
+        startProducers(destination, 1);
+
+        Message message = consumer.receive(5000);
+        assertNotNull(message);
+        message.acknowledge();
+
+        // Consumer needs to be closed before the reeive call.
+        TimeUnit.SECONDS.sleep(15);
+
+        try {
+            consumer.receive(5000);
+            fail("Idle consumer not aborted.");
+        } catch (Exception ex) {
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer1Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer1Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer1Test.java
new file mode 100644
index 0000000..6d3e970
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer1Test.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.policy;
+
+import javax.jms.ConnectionFactory;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.AbortSlowAckConsumerStrategy;
+import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(value = Parameterized.class)
+public class AbortSlowAckConsumer1Test extends AbortSlowConsumer1Test {
+
+    protected long maxTimeSinceLastAck = 5 * 1000;
+
+    public AbortSlowAckConsumer1Test(Boolean abortConnection, Boolean topic) {
+        super(abortConnection, topic);
+    }
+
+    @Override
+    protected AbortSlowConsumerStrategy createSlowConsumerStrategy() {
+        return new AbortSlowConsumerStrategy();
+    }
+
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = super.createBroker();
+        PolicyEntry policy = new PolicyEntry();
+
+        AbortSlowAckConsumerStrategy strategy = new AbortSlowAckConsumerStrategy();
+        strategy.setAbortConnection(abortConnection);
+        strategy.setCheckPeriod(checkPeriod);
+        strategy.setMaxSlowDuration(maxSlowDuration);
+        strategy.setMaxTimeSinceLastAck(maxTimeSinceLastAck);
+
+        policy.setSlowConsumerStrategy(strategy);
+        policy.setQueuePrefetch(10);
+        policy.setTopicPrefetch(10);
+        PolicyMap pMap = new PolicyMap();
+        pMap.setDefaultEntry(policy);
+        broker.setDestinationPolicy(pMap);
+        return broker;
+    }
+
+    @Override
+    protected ConnectionFactory createConnectionFactory() throws Exception {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
+        factory.getPrefetchPolicy().setAll(1);
+        return factory;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer2Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer2Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer2Test.java
new file mode 100644
index 0000000..948613e
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer2Test.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.policy;
+
+import javax.jms.ConnectionFactory;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.AbortSlowAckConsumerStrategy;
+import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(value = Parameterized.class)
+public class AbortSlowAckConsumer2Test extends AbortSlowConsumer2Test {
+
+    protected long maxTimeSinceLastAck = 5 * 1000;
+
+    public AbortSlowAckConsumer2Test(Boolean topic) {
+        super(topic);
+    }
+
+    @Override
+    protected AbortSlowConsumerStrategy createSlowConsumerStrategy() {
+        return new AbortSlowConsumerStrategy();
+    }
+
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = super.createBroker();
+        PolicyEntry policy = new PolicyEntry();
+
+        AbortSlowAckConsumerStrategy strategy = new AbortSlowAckConsumerStrategy();
+        strategy.setAbortConnection(abortConnection);
+        strategy.setCheckPeriod(checkPeriod);
+        strategy.setMaxSlowDuration(maxSlowDuration);
+        strategy.setMaxTimeSinceLastAck(maxTimeSinceLastAck);
+
+        policy.setSlowConsumerStrategy(strategy);
+        policy.setQueuePrefetch(10);
+        policy.setTopicPrefetch(10);
+        PolicyMap pMap = new PolicyMap();
+        pMap.setDefaultEntry(policy);
+        broker.setDestinationPolicy(pMap);
+        return broker;
+    }
+
+    @Override
+    protected ConnectionFactory createConnectionFactory() throws Exception {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
+        factory.getPrefetchPolicy().setAll(1);
+        return factory;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer0Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer0Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer0Test.java
new file mode 100644
index 0000000..9f23443
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer0Test.java
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.policy;
+
+import java.lang.reflect.UndeclaredThrowableException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.management.InstanceNotFoundException;
+import javax.management.ObjectName;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.TabularData;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQMessageConsumer;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.jmx.AbortSlowConsumerStrategyViewMBean;
+import org.apache.activemq.broker.jmx.DestinationViewMBean;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.util.MessageIdList;
+import org.apache.activemq.util.SocketProxy;
+import org.apache.activemq.util.Wait;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.BlockJUnit4ClassRunner;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import static org.junit.Assert.*;
+
+
+@RunWith(value = Parameterized.class)
+public class AbortSlowConsumer0Test extends AbortSlowConsumerBase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AbortSlowConsumer0Test.class);
+
+    @Parameterized.Parameters(name = "isTopic({0})")
+    public static Collection<Object[]> getTestParameters() {
+        return Arrays.asList(new Object[][]{{Boolean.TRUE}, {Boolean.FALSE}});
+    }
+
+    public AbortSlowConsumer0Test(Boolean isTopic) {
+        this.topic = isTopic;
+    }
+
+    @Test
+    public void testRegularConsumerIsNotAborted() throws Exception {
+        startConsumers(destination);
+        for (Connection c : connections) {
+            c.setExceptionListener(this);
+        }
+        startProducers(destination, 100);
+        allMessagesList.waitForMessagesToArrive(10);
+        allMessagesList.assertAtLeastMessagesReceived(10);
+    }
+
+    @Test
+    public void testSlowConsumerIsAbortedViaJmx() throws Exception {
+        underTest.setMaxSlowDuration(60*1000); // so jmx does the abort
+        startConsumers(withPrefetch(2, destination));
+        Entry<MessageConsumer, MessageIdList> consumertoAbort = consumers.entrySet().iterator().next();
+        consumertoAbort.getValue().setProcessingDelay(8 * 1000);
+        for (Connection c : connections) {
+            c.setExceptionListener(this);
+        }
+        startProducers(destination, 100);
+
+        consumertoAbort.getValue().assertMessagesReceived(1);
+
+        ActiveMQDestination amqDest = (ActiveMQDestination)destination;
+        ObjectName destinationViewMBean = new ObjectName("org.apache.activemq:destinationType=" +
+                (amqDest.isTopic() ? "Topic" : "Queue") +",destinationName="
+                + amqDest.getPhysicalName() + ",type=Broker,brokerName=localhost");
+
+        DestinationViewMBean queue = (DestinationViewMBean) broker.getManagementContext().newProxyInstance(destinationViewMBean, DestinationViewMBean.class, true);
+        ObjectName slowConsumerPolicyMBeanName = queue.getSlowConsumerStrategy();
+
+        assertNotNull(slowConsumerPolicyMBeanName);
+
+        AbortSlowConsumerStrategyViewMBean abortPolicy = (AbortSlowConsumerStrategyViewMBean)
+                broker.getManagementContext().newProxyInstance(slowConsumerPolicyMBeanName, AbortSlowConsumerStrategyViewMBean.class, true);
+
+        TimeUnit.SECONDS.sleep(3);
+
+        TabularData slowOnes = abortPolicy.getSlowConsumers();
+        assertEquals("one slow consumers", 1, slowOnes.size());
+
+        LOG.info("slow ones:"  + slowOnes);
+
+        CompositeData slowOne = (CompositeData) slowOnes.values().iterator().next();
+        LOG.info("Slow one: " + slowOne);
+
+        assertTrue("we have an object name", slowOne.get("subscription") instanceof ObjectName);
+        abortPolicy.abortConsumer((ObjectName)slowOne.get("subscription"));
+
+        consumertoAbort.getValue().assertAtMostMessagesReceived(1);
+
+        slowOnes = abortPolicy.getSlowConsumers();
+        assertEquals("no slow consumers left", 0, slowOnes.size());
+
+        // verify mbean gone with destination
+        broker.getAdminView().removeTopic(amqDest.getPhysicalName());
+
+        try {
+            abortPolicy.getSlowConsumers();
+            fail("expect not found post destination removal");
+        } catch(UndeclaredThrowableException expected) {
+            assertTrue("correct exception: " + expected.getCause(),
+                    expected.getCause() instanceof InstanceNotFoundException);
+        }
+    }
+
+    private Destination withPrefetch(int i, Destination destination) {
+        String destWithPrefetch =
+                ((ActiveMQDestination) destination).getPhysicalName() + "?consumer.prefetchSize=" + i;
+        return topic ? new ActiveMQTopic(destWithPrefetch) : new ActiveMQQueue(destWithPrefetch);
+    }
+
+    @Test
+    public void testOnlyOneSlowConsumerIsAborted() throws Exception {
+        consumerCount = 10;
+        startConsumers(destination);
+        Entry<MessageConsumer, MessageIdList> consumertoAbort = consumers.entrySet().iterator().next();
+        consumertoAbort.getValue().setProcessingDelay(8 * 1000);
+        for (Connection c : connections) {
+            c.setExceptionListener(this);
+        }
+        startProducers(destination, 100);
+
+        allMessagesList.waitForMessagesToArrive(99);
+        allMessagesList.assertAtLeastMessagesReceived(99);
+
+        consumertoAbort.getValue().assertMessagesReceived(1);
+        TimeUnit.SECONDS.sleep(5);
+        consumertoAbort.getValue().assertAtMostMessagesReceived(1);
+    }
+
+    @Test
+    public void testAbortAlreadyClosingConsumers() throws Exception {
+        consumerCount = 1;
+        startConsumers(withPrefetch(2, destination));
+        for (MessageIdList list : consumers.values()) {
+            list.setProcessingDelay(6 * 1000);
+        }
+        for (Connection c : connections) {
+            c.setExceptionListener(this);
+        }
+        startProducers(destination, 100);
+        allMessagesList.waitForMessagesToArrive(consumerCount);
+
+        for (MessageConsumer consumer : consumers.keySet()) {
+            LOG.info("closing consumer: " + consumer);
+            /// will block waiting for on message till 6secs expire
+            consumer.close();
+        }
+    }
+
+    @Test
+    public void testAbortConsumerOnDeadConnection() throws Exception {
+        TransportConnector transportConnector = broker.addConnector("tcp://0.0.0.0:0");
+        transportConnector.setBrokerService(broker);
+        transportConnector.setTaskRunnerFactory(broker.getTaskRunnerFactory());
+        transportConnector.start();
+        SocketProxy socketProxy = new SocketProxy(transportConnector.getPublishableConnectURI());
+        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(socketProxy.getUrl());
+        ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
+        prefetchPolicy.setAll(4);
+        connectionFactory.setPrefetchPolicy(prefetchPolicy);
+        Connection c = connectionFactory.createConnection();
+        connections.add(c);
+        c.start();
+        Session session = c.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        final ActiveMQMessageConsumer messageconsumer = (ActiveMQMessageConsumer) session.createConsumer(destination);
+        startProducers(destination, 10);
+
+        messageconsumer.receive(4000).acknowledge();
+        assertNotNull(messageconsumer.receive(4000));
+        assertNotNull(messageconsumer.receive(4000));
+        assertNotNull(messageconsumer.receive(4000));
+
+        // close control command won't get through
+        socketProxy.pause();
+
+        ActiveMQDestination amqDest = (ActiveMQDestination)destination;
+        ObjectName destinationViewMBean = new ObjectName("org.apache.activemq:destinationType=" +
+                (amqDest.isTopic() ? "Topic" : "Queue") +",destinationName="
+                + amqDest.getPhysicalName() + ",type=Broker,brokerName=localhost");
+
+        final DestinationViewMBean destView = (DestinationViewMBean) broker.getManagementContext().newProxyInstance(destinationViewMBean, DestinationViewMBean.class, true);
+
+        assertTrue("Consumer gone from broker view", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                LOG.info("DestView {} comsumerCount {}", destView, destView.getConsumerCount());
+                return 0 == destView.getConsumerCount();
+            }
+        }));
+
+        socketProxy.goOn();
+
+        assertTrue("consumer was closed", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                boolean closed = false;
+                try {
+                    messageconsumer.receive(400);
+                } catch (javax.jms.IllegalStateException expected) {
+                    closed = expected.toString().contains("closed");
+                }
+                return closed;
+            }
+        }));
+    }
+
+    @Override
+    public void onException(JMSException exception) {
+        exceptions.add(exception);
+        exception.printStackTrace();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer1Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer1Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer1Test.java
new file mode 100644
index 0000000..e17b362
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer1Test.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.policy;
+
+import org.apache.activemq.util.MessageIdList;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertTrue;
+
+@RunWith(value = Parameterized.class)
+public class AbortSlowConsumer1Test extends AbortSlowConsumerBase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AbortSlowConsumer1Test.class);
+
+    @Parameterized.Parameters(name = "abortConnection({0})-isTopic({1})")
+    public static Collection<Object[]> getTestParameters() {
+        return Arrays.asList(new Object[][]{
+                {Boolean.TRUE, Boolean.TRUE},
+                {Boolean.TRUE, Boolean.FALSE},
+                {Boolean.FALSE, Boolean.TRUE},
+                {Boolean.FALSE, Boolean.FALSE}});
+    }
+
+    public AbortSlowConsumer1Test(Boolean abortConnection, Boolean topic) {
+        this.abortConnection = abortConnection;
+        this.topic = topic;
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testSlowConsumerIsAborted() throws Exception {
+        startConsumers(destination);
+        Entry<MessageConsumer, MessageIdList> consumertoAbort = consumers.entrySet().iterator().next();
+        consumertoAbort.getValue().setProcessingDelay(8 * 1000);
+        for (Connection c : connections) {
+            c.setExceptionListener(this);
+        }
+        startProducers(destination, 100);
+
+        consumertoAbort.getValue().assertMessagesReceived(1);
+        TimeUnit.SECONDS.sleep(5);
+        consumertoAbort.getValue().assertAtMostMessagesReceived(1);
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testAbortAlreadyClosedConsumers() throws Exception {
+        Connection conn = createConnectionFactory().createConnection();
+        conn.setExceptionListener(this);
+        connections.add(conn);
+
+        Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        final MessageConsumer consumer = sess.createConsumer(destination);
+        conn.start();
+        startProducers(destination, 20);
+        TimeUnit.SECONDS.sleep(1);
+        LOG.info("closing consumer: " + consumer);
+        consumer.close();
+
+        TimeUnit.SECONDS.sleep(5);
+        assertTrue("no exceptions : " + exceptions.toArray(), exceptions.isEmpty());
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testAbortAlreadyClosedConnection() throws Exception {
+        Connection conn = createConnectionFactory().createConnection();
+        conn.setExceptionListener(this);
+
+        Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        sess.createConsumer(destination);
+        conn.start();
+        startProducers(destination, 20);
+        TimeUnit.SECONDS.sleep(1);
+        LOG.info("closing connection: " + conn);
+        conn.close();
+
+        TimeUnit.SECONDS.sleep(5);
+        assertTrue("no exceptions : " + exceptions.toArray(), exceptions.isEmpty());
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer2Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer2Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer2Test.java
new file mode 100644
index 0000000..7263027
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer2Test.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.policy;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map.Entry;
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import org.apache.activemq.util.MessageIdList;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(value = Parameterized.class)
+public class AbortSlowConsumer2Test extends AbortSlowConsumerBase {
+
+    @Parameterized.Parameters(name = "isTopic({0})")
+    public static Collection<Object[]> getTestParameters() {
+        return Arrays.asList(new Object[][]{{Boolean.TRUE}, {Boolean.FALSE}});
+    }
+
+    public AbortSlowConsumer2Test(Boolean isTopic) {
+        this.topic = isTopic;
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testLittleSlowConsumerIsNotAborted() throws Exception {
+        startConsumers(destination);
+        Entry<MessageConsumer, MessageIdList> consumertoAbort = consumers.entrySet().iterator().next();
+        consumertoAbort.getValue().setProcessingDelay(500);
+        for (Connection c : connections) {
+            c.setExceptionListener(this);
+        }
+        startProducers(destination, 12);
+        allMessagesList.waitForMessagesToArrive(10);
+        allMessagesList.assertAtLeastMessagesReceived(10);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerBase.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerBase.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerBase.java
new file mode 100644
index 0000000..ee28112
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerBase.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.policy;
+
+import junit.framework.Test;
+import org.apache.activemq.JmsMultipleClientsTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.AbortSlowConsumerStrategyViewMBean;
+import org.apache.activemq.broker.jmx.DestinationViewMBean;
+import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.util.MessageIdList;
+import org.junit.Before;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.management.InstanceNotFoundException;
+import javax.management.ObjectName;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.TabularData;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+
+
+public class AbortSlowConsumerBase extends JmsMultipleClientsTestSupport implements ExceptionListener {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AbortSlowConsumerBase.class);
+
+    protected AbortSlowConsumerStrategy underTest;
+    protected boolean abortConnection = false;
+    protected long checkPeriod = 2 * 1000;
+    protected long maxSlowDuration = 5 * 1000;
+    protected final List<Throwable> exceptions = new ArrayList<Throwable>();
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        exceptions.clear();
+        topic = true;
+        underTest = createSlowConsumerStrategy();
+        super.setUp();
+        createDestination();
+    }
+
+    protected AbortSlowConsumerStrategy createSlowConsumerStrategy() {
+        return new AbortSlowConsumerStrategy();
+    }
+
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = super.createBroker();
+        PolicyEntry policy = new PolicyEntry();
+        underTest.setAbortConnection(abortConnection);
+        underTest.setCheckPeriod(checkPeriod);
+        underTest.setMaxSlowDuration(maxSlowDuration);
+
+        policy.setSlowConsumerStrategy(underTest);
+        policy.setQueuePrefetch(10);
+        policy.setTopicPrefetch(10);
+        PolicyMap pMap = new PolicyMap();
+        pMap.setDefaultEntry(policy);
+        broker.setDestinationPolicy(pMap);
+        return broker;
+    }
+
+    @Override
+    public void onException(JMSException exception) {
+        exceptions.add(exception);
+        exception.printStackTrace();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/DeadLetterTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/DeadLetterTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/DeadLetterTest.java
new file mode 100644
index 0000000..6c31237
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/DeadLetterTest.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.policy;
+
+import javax.jms.Destination;
+import javax.jms.Message;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.RedeliveryPolicy;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ * 
+ */
+public class DeadLetterTest extends DeadLetterTestSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(DeadLetterTest.class);
+
+    protected int rollbackCount;
+
+    protected void doTest() throws Exception {
+        connection.start();
+
+        ActiveMQConnection amqConnection = (ActiveMQConnection) connection;
+        rollbackCount = amqConnection.getRedeliveryPolicy().getMaximumRedeliveries() + 1;
+        LOG.info("Will redeliver messages: " + rollbackCount + " times");
+
+        makeConsumer();
+        makeDlqConsumer();
+
+        sendMessages();
+
+        // now lets receive and rollback N times
+        for (int i = 0; i < messageCount; i++) {
+            consumeAndRollback(i);
+        }
+
+        for (int i = 0; i < messageCount; i++) {
+            Message msg = dlqConsumer.receive(1000);
+            assertMessage(msg, i);
+            assertNotNull("Should be a DLQ message for loop: " + i, msg);
+        }
+        session.commit();
+    }
+
+    protected void consumeAndRollback(int messageCounter) throws Exception {
+        for (int i = 0; i < rollbackCount; i++) {
+            Message message = consumer.receive(5000);
+            assertNotNull("No message received for message: " + messageCounter + " and rollback loop: " + i, message);
+            assertMessage(message, messageCounter);
+
+            session.rollback();
+        }
+        LOG.info("Rolled back: " + rollbackCount + " times");
+    }
+
+    protected void setUp() throws Exception {
+        transactedMode = true;
+        super.setUp();
+    }
+
+    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
+        ActiveMQConnectionFactory answer = super.createConnectionFactory();
+        RedeliveryPolicy policy = new RedeliveryPolicy();
+        policy.setMaximumRedeliveries(3);
+        policy.setBackOffMultiplier((short) 1);
+        policy.setInitialRedeliveryDelay(10);
+        policy.setUseExponentialBackOff(false);
+        answer.setRedeliveryPolicy(policy);
+        return answer;
+    }
+
+    protected Destination createDlqDestination() {
+        return new ActiveMQQueue("ActiveMQ.DLQ");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java
new file mode 100644
index 0000000..b275f2e
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.policy;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.apache.activemq.TestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.DestinationStatistics;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ */
+public abstract class DeadLetterTestSupport extends TestSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(DeadLetterTestSupport.class);
+
+    protected int messageCount = 10;
+    protected long timeToLive;
+    protected Connection connection;
+    protected Session session;
+    protected MessageConsumer consumer;
+    protected MessageProducer producer;
+    protected int deliveryMode = DeliveryMode.PERSISTENT;
+    protected boolean durableSubscriber;
+    protected Destination dlqDestination;
+    protected MessageConsumer dlqConsumer;
+    protected QueueBrowser dlqBrowser;
+    protected BrokerService broker;
+    protected boolean transactedMode;
+    protected int acknowledgeMode = Session.CLIENT_ACKNOWLEDGE;
+    private Destination destination;
+
+    protected void setUp() throws Exception {
+        super.setUp();
+        broker = createBroker();
+        broker.start();
+        connection = createConnection();
+        connection.setClientID(createClientId());
+
+        session = connection.createSession(transactedMode, acknowledgeMode);
+        connection.start();
+    }
+
+    protected String createClientId() {
+        return toString();
+    }
+
+    protected void tearDown() throws Exception {
+        if (connection != null) {
+            connection.close();
+        }
+        if (broker != null) {
+            broker.stop();
+        }
+    }
+
+    protected abstract void doTest() throws Exception;
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = new BrokerService();
+        broker.setPersistent(false);
+        PolicyEntry policy = new PolicyEntry();
+        DeadLetterStrategy defaultDeadLetterStrategy = policy.getDeadLetterStrategy();
+        if(defaultDeadLetterStrategy!=null) {
+            defaultDeadLetterStrategy.setProcessNonPersistent(true);
+        }
+        PolicyMap pMap = new PolicyMap();
+        pMap.setDefaultEntry(policy);
+        broker.setDestinationPolicy(pMap);
+        return broker;
+    }
+
+    protected void makeConsumer() throws JMSException {
+        Destination destination = getDestination();
+        LOG.info("Consuming from: " + destination);
+        if (durableSubscriber) {
+            consumer = session.createDurableSubscriber((Topic)destination, destination.toString());
+        } else {
+            consumer = session.createConsumer(destination);
+        }
+    }
+
+    protected void makeDlqConsumer() throws Exception {
+        dlqDestination = createDlqDestination();
+
+        LOG.info("Consuming from dead letter on: " + dlqDestination);
+        dlqConsumer = session.createConsumer(dlqDestination);
+    }
+    
+    protected void makeDlqBrowser() throws JMSException {
+        dlqDestination = createDlqDestination();
+
+        LOG.info("Browsing dead letter on: " + dlqDestination);
+        dlqBrowser = session.createBrowser((Queue)dlqDestination);    	
+    }
+
+    protected void sendMessages() throws JMSException {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        producer = session.createProducer(getDestination());
+        producer.setDeliveryMode(deliveryMode);
+        producer.setTimeToLive(timeToLive);
+
+        LOG.info("Sending " + messageCount + " messages to: " + getDestination());
+        for (int i = 0; i < messageCount; i++) {
+            Message message = createMessage(session, i);
+            producer.send(message);
+        }
+    }
+
+    protected TextMessage createMessage(Session session, int i) throws JMSException {
+        return session.createTextMessage(getMessageText(i));
+    }
+
+    protected String getMessageText(int i) {
+        return "message: " + i;
+    }
+
+    protected void assertMessage(Message message, int i) throws Exception {
+        LOG.info("Received message: " + message);
+        assertNotNull("No message received for index: " + i, message);
+        assertTrue("Should be a TextMessage not: " + message, message instanceof TextMessage);
+        TextMessage textMessage = (TextMessage)message;
+        assertEquals("text of message: " + i, getMessageText(i), textMessage.getText());
+    }
+
+    protected abstract Destination createDlqDestination();
+
+    public void testTransientTopicMessage() throws Exception {
+        super.topic = true;
+        deliveryMode = DeliveryMode.NON_PERSISTENT;
+        durableSubscriber = true;
+        doTest();
+    }
+
+    public void testDurableTopicMessage() throws Exception {
+        super.topic = true;
+        deliveryMode = DeliveryMode.PERSISTENT;
+        durableSubscriber = true;
+        doTest();
+    }
+
+    public void testTransientQueueMessage() throws Exception {
+        super.topic = false;
+        deliveryMode = DeliveryMode.NON_PERSISTENT;
+        durableSubscriber = false;
+        doTest();
+        validateConsumerPrefetch(this.getDestinationString(), 0);        
+    }
+        
+    public void testDurableQueueMessage() throws Exception {
+        super.topic = false;
+        deliveryMode = DeliveryMode.PERSISTENT;
+        durableSubscriber = false;
+        doTest();
+        validateConsumerPrefetch(this.getDestinationString(), 0);
+    }
+
+    public Destination getDestination() {
+        if (destination == null) {
+            destination = createDestination();
+        }
+        return destination;
+    }
+    
+    private void validateConsumerPrefetch(String destination, long expectedCount) {
+        try {
+            Thread.sleep(100);
+        } catch (InterruptedException e) {
+        }
+        RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker();
+        for (org.apache.activemq.broker.region.Destination dest : regionBroker.getQueueRegion().getDestinationMap().values()) {
+            if (dest.getName().equals(destination)) {
+                DestinationStatistics stats = dest.getDestinationStatistics();
+                LOG.info(">>>> inflight for : " + dest.getName() + ": " + stats.getInflight().getCount());
+                assertEquals("inflight for: " + dest.getName() + ": " + stats.getInflight().getCount() + " matches", 
+                        expectedCount, stats.getInflight().getCount());      
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/DestinationCursorConfigTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/DestinationCursorConfigTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/DestinationCursorConfigTest.java
new file mode 100644
index 0000000..a5f7984
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/DestinationCursorConfigTest.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.policy;
+
+import org.apache.activemq.TestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy;
+import org.apache.activemq.broker.region.policy.PendingSubscriberMessageStoragePolicy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
+import org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.xbean.BrokerFactoryBean;
+import org.springframework.core.io.ClassPathResource;
+
+/**
+ * 
+ */
+public class DestinationCursorConfigTest extends TestSupport {
+    protected BrokerService broker;
+
+    @Override
+    protected void setUp() throws Exception {
+        broker = createBroker();
+        super.setUp();  
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        broker.stop();
+        super.tearDown();    
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerFactoryBean factory = new BrokerFactoryBean(new ClassPathResource("org/apache/activemq/broker/policy/cursor.xml"));
+        factory.afterPropertiesSet();
+        BrokerService answer = factory.getBroker();
+        return answer;
+    }
+
+    public void testQueueConfiguration() throws Exception {
+        super.topic = false;
+        ActiveMQDestination destination = (ActiveMQDestination) createDestination("org.apache.foo");
+        PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
+        PendingQueueMessageStoragePolicy policy = entry.getPendingQueuePolicy();
+        assertNotNull(policy);
+        assertTrue("Policy is: " + policy, policy instanceof VMPendingQueueMessageStoragePolicy);
+    }
+
+    public void testTopicConfiguration() throws Exception {
+        super.topic = true;
+        ActiveMQDestination destination = (ActiveMQDestination) createDestination("org.apache.foo");
+        PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
+        PendingSubscriberMessageStoragePolicy policy = entry.getPendingSubscriberPolicy();
+        assertNotNull(policy);
+        assertFalse(entry.isProducerFlowControl());
+        assertTrue(entry.getMemoryLimit()==(1024*1024));
+        assertTrue("subscriberPolicy is: " + policy, policy instanceof VMPendingSubscriberMessageStoragePolicy);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/DiscardingDeadLetterPolicyTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/DiscardingDeadLetterPolicyTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/DiscardingDeadLetterPolicyTest.java
new file mode 100644
index 0000000..f43f886
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/DiscardingDeadLetterPolicyTest.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.policy;
+
+import javax.jms.Message;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
+import org.apache.activemq.broker.region.policy.DiscardingDeadLetterStrategy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class DiscardingDeadLetterPolicyTest extends DeadLetterTest {
+    private static final Logger LOG = LoggerFactory.getLogger(DiscardingDeadLetterPolicyTest.class);
+
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = super.createBroker();
+
+        PolicyEntry policy = new PolicyEntry();
+        DeadLetterStrategy strategy = new DiscardingDeadLetterStrategy();
+        strategy.setProcessNonPersistent(true);
+        policy.setDeadLetterStrategy(strategy);
+
+        PolicyMap pMap = new PolicyMap();
+        pMap.setDefaultEntry(policy);
+
+        broker.setDestinationPolicy(pMap);
+
+        return broker;
+    }
+
+    @Override
+    protected void doTest() throws Exception {
+        connection.start();
+
+        ActiveMQConnection amqConnection = (ActiveMQConnection) connection;
+        rollbackCount = amqConnection.getRedeliveryPolicy().getMaximumRedeliveries() + 1;
+        LOG.info("Will redeliver messages: " + rollbackCount + " times");
+
+        makeConsumer();
+        makeDlqConsumer();
+
+        sendMessages();
+
+        // now lets receive and rollback N times
+        for (int i = 0; i < messageCount; i++) {
+            consumeAndRollback(i);
+        }
+
+        for (int i = 0; i < messageCount; i++) {
+            Message msg = dlqConsumer.receive(1000);
+            assertNull("Should not be a DLQ message for loop: " + i, msg);
+        }
+        session.commit();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/IndividualDeadLetterTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/IndividualDeadLetterTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/IndividualDeadLetterTest.java
new file mode 100644
index 0000000..a587be8
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/IndividualDeadLetterTest.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.policy;
+
+import java.util.Enumeration;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
+import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class IndividualDeadLetterTest extends DeadLetterTest {
+    private static final Logger LOG = LoggerFactory.getLogger(IndividualDeadLetterTest.class);
+
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = super.createBroker();
+
+        PolicyEntry policy = new PolicyEntry();
+        DeadLetterStrategy strategy = new IndividualDeadLetterStrategy();
+        strategy.setProcessNonPersistent(true);
+        policy.setDeadLetterStrategy(strategy);
+
+        PolicyMap pMap = new PolicyMap();
+        pMap.setDefaultEntry(policy);
+
+        broker.setDestinationPolicy(pMap);
+
+        return broker;
+    }
+
+    @Override
+    protected Destination createDlqDestination() {
+        String prefix = topic ? "ActiveMQ.DLQ.Topic." : "ActiveMQ.DLQ.Queue.";
+        return new ActiveMQQueue(prefix + getClass().getName() + "." + getName());
+    }
+
+    public void testDLQBrowsing() throws Exception {
+        super.topic = false;
+        deliveryMode = DeliveryMode.PERSISTENT;
+        durableSubscriber = false;
+        messageCount = 1;
+
+        connection.start();
+
+        ActiveMQConnection amqConnection = (ActiveMQConnection) connection;
+        rollbackCount = amqConnection.getRedeliveryPolicy().getMaximumRedeliveries() + 1;
+        LOG.info("Will redeliver messages: " + rollbackCount + " times");
+
+        sendMessages();
+
+        // now lets receive and rollback N times
+        for (int i = 0; i < rollbackCount; i++) {
+            makeConsumer();
+            Message message = consumer.receive(5000);
+            assertNotNull("No message received: ", message);
+
+            session.rollback();
+            LOG.info("Rolled back: " + rollbackCount + " times");
+            consumer.close();
+        }
+
+        makeDlqBrowser();
+        browseDlq();
+        dlqBrowser.close();
+        session.close();
+        Thread.sleep(1000);
+        session = connection.createSession(transactedMode, acknowledgeMode);
+        Queue testQueue = new ActiveMQQueue("ActiveMQ.DLQ.Queue.ActiveMQ.DLQ.Queue." + getClass().getName() + "." + getName());
+        MessageConsumer testConsumer = session.createConsumer(testQueue);
+        assertNull("The message shouldn't be sent to another DLQ", testConsumer.receive(1000));
+
+    }
+
+    protected void browseDlq() throws Exception {
+        Enumeration<?> messages = dlqBrowser.getEnumeration();
+        while (messages.hasMoreElements()) {
+            LOG.info("Browsing: " + messages.nextElement());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/IndividualDeadLetterViaXmlTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/IndividualDeadLetterViaXmlTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/IndividualDeadLetterViaXmlTest.java
new file mode 100644
index 0000000..6dd87c2
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/IndividualDeadLetterViaXmlTest.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.policy;
+
+import javax.jms.Destination;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.xbean.BrokerFactoryBean;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.core.io.ClassPathResource;
+
+/**
+ *
+ * 
+ */
+public class IndividualDeadLetterViaXmlTest extends DeadLetterTest {
+    private static final Logger LOG = LoggerFactory.getLogger(IndividualDeadLetterViaXmlTest.class);
+
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerFactoryBean factory = new BrokerFactoryBean(new ClassPathResource("org/apache/activemq/broker/policy/individual-dlq.xml"));
+        factory.afterPropertiesSet();
+        BrokerService answer = factory.getBroker();
+        return answer;
+    }
+
+    protected Destination createDlqDestination() {
+        String queueName = "Test.DLQ." + getClass().getName() + "." + getName();
+        LOG.info("Using queue name: " + queueName);
+        return new ActiveMQQueue(queueName);
+    }
+}


Mime
View raw message