activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [18/52] [abbrv] [partial] activemq-artemis git commit: ARTEMIS-127 Adding activemq unit test module to Artemis
Date Tue, 09 Jun 2015 16:36:49 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4126Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4126Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4126Test.java
new file mode 100644
index 0000000..4d6d39c
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4126Test.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.net.Socket;
+import java.net.URI;
+
+import javax.management.ObjectName;
+import javax.net.SocketFactory;
+import javax.net.ssl.SSLSocketFactory;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQSslConnectionFactory;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.transport.stomp.Stomp;
+import org.apache.activemq.transport.stomp.StompConnection;
+import org.apache.activemq.transport.stomp.StompFrame;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class AMQ4126Test {
+
+    protected BrokerService broker;
+
+    protected String java_security_auth_login_config = "java.security.auth.login.config";
+    protected String xbean = "xbean:";
+    protected String confBase = "src/test/resources/org/apache/activemq/bugs/amq4126";
+    protected String certBase = "src/test/resources/org/apache/activemq/security";
+    protected String JaasStompSSLBroker_xml = "JaasStompSSLBroker.xml";
+    protected StompConnection stompConnection = new StompConnection();
+    private final static String destinationName = "TEST.QUEUE";
+    protected String oldLoginConf = null;
+
+    @Before
+    public void before() throws Exception {
+        if (System.getProperty(java_security_auth_login_config) != null) {
+            oldLoginConf = System.getProperty(java_security_auth_login_config);
+        }
+        System.setProperty(java_security_auth_login_config, confBase + "/login.config");
+        broker = BrokerFactory.createBroker(xbean + confBase + "/" + JaasStompSSLBroker_xml);
+
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.setUseJmx(true);
+        broker.start();
+        broker.waitUntilStarted();
+    }
+
+    @After
+    public void after() throws Exception {
+        broker.stop();
+
+        if (oldLoginConf != null) {
+            System.setProperty(java_security_auth_login_config, oldLoginConf);
+        }
+    }
+
+    public Socket createSocket(String host, int port) throws Exception {
+        System.setProperty("javax.net.ssl.trustStore", certBase + "/broker1.ks");
+        System.setProperty("javax.net.ssl.trustStorePassword", "password");
+        System.setProperty("javax.net.ssl.trustStoreType", "jks");
+        System.setProperty("javax.net.ssl.keyStore", certBase + "/client.ks");
+        System.setProperty("javax.net.ssl.keyStorePassword", "password");
+        System.setProperty("javax.net.ssl.keyStoreType", "jks");
+
+        SocketFactory factory = SSLSocketFactory.getDefault();
+        return factory.createSocket(host, port);
+    }
+
+    public void stompConnectTo(String connectorName, String extraHeaders) throws Exception {
+        String host = broker.getConnectorByName(connectorName).getConnectUri().getHost();
+        int port = broker.getConnectorByName(connectorName).getConnectUri().getPort();
+        stompConnection.open(createSocket(host, port));
+        String extra = extraHeaders != null ? extraHeaders : "\n";
+        stompConnection.sendFrame("CONNECT\n" + extra + "\n" + Stomp.NULL);
+
+        StompFrame f = stompConnection.receive();
+        TestCase.assertEquals(f.getBody(), "CONNECTED", f.getAction());
+        stompConnection.close();
+    }
+
+    @Test
+    public void testStompSSLWithUsernameAndPassword() throws Exception {
+        stompConnectTo("stomp+ssl", "login:system\n" + "passcode:manager\n");
+    }
+
+    @Test
+    public void testStompSSLWithCertificate() throws Exception {
+        stompConnectTo("stomp+ssl", null);
+    }
+
+    @Test
+    public void testStompNIOSSLWithUsernameAndPassword() throws Exception {
+        stompConnectTo("stomp+nio+ssl", "login:system\n" + "passcode:manager\n");
+    }
+
+    @Test
+    public void testStompNIOSSLWithCertificate() throws Exception {
+        stompConnectTo("stomp+nio+ssl", null);
+    }
+
+    public void openwireConnectTo(String connectorName, String username, String password) throws Exception {
+        URI brokerURI = broker.getConnectorByName(connectorName).getConnectUri();
+        String uri = "ssl://" + brokerURI.getHost() + ":" + brokerURI.getPort();
+        ActiveMQSslConnectionFactory cf = new ActiveMQSslConnectionFactory(uri);
+        cf.setTrustStore("org/apache/activemq/security/broker1.ks");
+        cf.setTrustStorePassword("password");
+        cf.setKeyStore("org/apache/activemq/security/client.ks");
+        cf.setKeyStorePassword("password");
+        ActiveMQConnection connection = null;
+        if (username != null || password != null) {
+            connection = (ActiveMQConnection)cf.createConnection(username, password);
+        } else {
+            connection = (ActiveMQConnection)cf.createConnection();
+        }
+        TestCase.assertNotNull(connection);
+        connection.start();
+        connection.stop();
+    }
+
+    @Test
+    public void testOpenwireSSLWithUsernameAndPassword() throws Exception {
+        openwireConnectTo("openwire+ssl", "system", "manager");
+    }
+
+    @Test
+    public void testOpenwireSSLWithCertificate() throws Exception {
+        openwireConnectTo("openwire+ssl", null, null);
+    }
+
+    @Test
+    public void testOpenwireNIOSSLWithUsernameAndPassword() throws Exception {
+        openwireConnectTo("openwire+nio+ssl", "system", "mmanager");
+    }
+
+    @Test
+    public void testOpenwireNIOSSLWithCertificate() throws Exception {
+        openwireConnectTo("openwire+nio+ssl", null, null);
+    }
+
+    @Test
+    public void testJmx() throws Exception {
+        TestCase.assertFalse(findDestination(destinationName));
+        broker.getAdminView().addQueue(destinationName);
+        TestCase.assertTrue(findDestination(destinationName));
+        broker.getAdminView().removeQueue(destinationName);
+        TestCase.assertFalse(findDestination(destinationName));
+    }
+
+    private boolean findDestination(String name) throws Exception {
+        ObjectName[] destinations = broker.getAdminView().getQueues();
+        for (ObjectName destination : destinations) {
+            if (destination.toString().contains(name)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4133Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4133Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4133Test.java
new file mode 100644
index 0000000..9ca08bb
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4133Test.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.net.Socket;
+
+import javax.net.SocketFactory;
+import javax.net.ssl.SSLSocketFactory;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.transport.stomp.Stomp;
+import org.apache.activemq.transport.stomp.StompConnection;
+import org.apache.activemq.transport.stomp.StompFrame;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AMQ4133Test {
+
+    protected String java_security_auth_login_config = "java.security.auth.login.config";
+    protected String xbean = "xbean:";
+    protected String confBase = "src/test/resources/org/apache/activemq/bugs/amq4126";
+    protected String certBase = "src/test/resources/org/apache/activemq/security";
+    protected String activemqXml = "InconsistentConnectorPropertiesBehaviour.xml";
+    protected BrokerService broker;
+
+    protected String oldLoginConf = null;
+
+    @Before
+    public void before() throws Exception {
+        if (System.getProperty(java_security_auth_login_config) != null) {
+            oldLoginConf = System.getProperty(java_security_auth_login_config);
+        }
+        System.setProperty(java_security_auth_login_config, confBase + "/" + "login.config");
+        broker = BrokerFactory.createBroker(xbean + confBase + "/" + activemqXml);
+
+        broker.start();
+        broker.waitUntilStarted();
+    }
+
+    @After
+    public void after() throws Exception {
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
+    }
+
+    @Test
+    public void stompSSLTransportNeedClientAuthTrue() throws Exception {
+        stompConnectTo("localhost", broker.getConnectorByName("stomp+ssl").getConnectUri().getPort());
+    }
+
+    @Test
+    public void stompSSLNeedClientAuthTrue() throws Exception {
+        stompConnectTo("localhost", broker.getConnectorByName("stomp+ssl+special").getConnectUri().getPort());
+    }
+
+    @Test
+    public void stompNIOSSLTransportNeedClientAuthTrue() throws Exception {
+        stompConnectTo("localhost", broker.getConnectorByName("stomp+nio+ssl").getConnectUri().getPort());
+    }
+
+    @Test
+    public void stompNIOSSLNeedClientAuthTrue() throws Exception {
+        stompConnectTo("localhost", broker.getConnectorByName("stomp+nio+ssl+special").getConnectUri().getPort());
+    }
+
+    public Socket createSocket(String host, int port) throws Exception {
+        System.setProperty("javax.net.ssl.trustStore", certBase + "/" + "broker1.ks");
+        System.setProperty("javax.net.ssl.trustStorePassword", "password");
+        System.setProperty("javax.net.ssl.trustStoreType", "jks");
+        System.setProperty("javax.net.ssl.keyStore", certBase + "/" + "client.ks");
+        System.setProperty("javax.net.ssl.keyStorePassword", "password");
+        System.setProperty("javax.net.ssl.keyStoreType", "jks");
+
+        SocketFactory factory = SSLSocketFactory.getDefault();
+        return factory.createSocket(host, port);
+    }
+
+    public void stompConnectTo(String host, int port) throws Exception {
+        StompConnection stompConnection = new StompConnection();
+        stompConnection.open(createSocket(host, port));
+        stompConnection.sendFrame("CONNECT\n" + "\n" + Stomp.NULL);
+        StompFrame f = stompConnection.receive();
+        TestCase.assertEquals(f.getBody(), "CONNECTED", f.getAction());
+        stompConnection.close();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4147Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4147Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4147Test.java
new file mode 100644
index 0000000..cf7ca45
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4147Test.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.bugs;
+
+import java.net.URI;
+import java.util.concurrent.Semaphore;
+
+import javax.jms.Message;
+import javax.jms.MessageListener;
+
+import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.network.DemandForwardingBridgeSupport;
+import org.apache.activemq.util.MessageIdList;
+import org.apache.activemq.util.Wait;
+
+/**
+ * This test demonstrates a bug in {@link DemandForwardingBridgeSupport} when
+ * bridges are VM-to-VM. Specifically, memory usage from the local broker is
+ * manipulated by the remote broker.
+ */
+public class AMQ4147Test extends JmsMultipleBrokersTestSupport {
+    /**
+     * This test demonstrates the bug: namely, when a message is bridged over
+     * the VMTransport, its memory usage continues to refer to the originating
+     * broker. As a result, memory usage is never accounted for on the remote
+     * broker, and the local broker's memory usage is only decreased once the
+     * message is consumed on the remote broker.
+     */
+    public void testVMTransportRemoteMemoryUsage() throws Exception {
+        BrokerService broker1 = createBroker(new URI(
+                "broker:(vm://broker1)/broker1?persistent=false"));
+
+        BrokerService broker2 = createBroker(new URI(
+                "broker:(vm://broker2)/broker2?persistent=false"));
+
+        startAllBrokers();
+
+        // Forward messages from broker1 to broker2 over the VM transport.
+        bridgeBrokers("broker1", "broker2").start();
+
+        // Verify that broker1 and broker2's test queues have no memory usage.
+        ActiveMQDestination testQueue = createDestination(
+                AMQ4147Test.class.getSimpleName() + ".queue", false);
+        final Destination broker1TestQueue = broker1.getDestination(testQueue);
+        final Destination broker2TestQueue = broker2.getDestination(testQueue);
+
+        assertEquals(0, broker1TestQueue.getMemoryUsage().getUsage());
+        assertEquals(0, broker2TestQueue.getMemoryUsage().getUsage());
+
+        // Produce a message to broker1's test queue and verify that broker1's
+        // memory usage has increased, but broker2 still has no memory usage.
+        sendMessages("broker1", testQueue, 1);
+        assertTrue(broker1TestQueue.getMemoryUsage().getUsage() > 0);
+        assertEquals(0, broker2TestQueue.getMemoryUsage().getUsage());
+
+        // Create a consumer on broker2 that is synchronized to allow detection
+        // of "in flight" messages to the consumer.
+        MessageIdList broker2Messages = getBrokerMessages("broker2");
+        final Semaphore consumerReady = new Semaphore(0);
+        final Semaphore consumerProceed = new Semaphore(0);
+
+        broker2Messages.setParent(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+                consumerReady.release();
+                try {
+                    consumerProceed.acquire();
+                } catch (InterruptedException ex) {
+                    Thread.currentThread().interrupt();
+                }
+            }
+        });
+
+        createConsumer("broker2", testQueue);
+
+        // Verify that when broker2's consumer receives the message, the memory
+        // usage has moved broker1 to broker2. The first assertion is expected
+        // to fail due to the bug; the try/finally ensures the consumer is
+        // released prior to failure so that the broker can shut down.
+        consumerReady.acquire();
+
+        try {
+            assertTrue("Memory Usage Should be Zero: ", Wait.waitFor(new Wait.Condition() {
+                @Override
+                public boolean isSatisified() throws Exception {
+                    return broker1TestQueue.getMemoryUsage().getUsage() == 0;
+                }
+            }));
+            assertTrue(broker2TestQueue.getMemoryUsage().getUsage() > 0);
+        } finally {
+            // Consume the message and verify that there is no more memory
+            // usage.
+            consumerProceed.release();
+        }
+
+        assertTrue("Memory Usage Should be Zero: ", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return broker1TestQueue.getMemoryUsage().getUsage() == 0;
+            }
+        }));
+        assertTrue("Memory Usage Should be Zero: ", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return broker2TestQueue.getMemoryUsage().getUsage() == 0;
+            }
+        }));
+    }
+
+    /**
+     * This test demonstrates that the bug is VMTransport-specific and does not
+     * occur when bridges occur using other protocols.
+     */
+    public void testTcpTransportRemoteMemoryUsage() throws Exception {
+        BrokerService broker1 = createBroker(new URI(
+                "broker:(vm://broker1)/broker1?persistent=false"));
+
+        BrokerService broker2 = createBroker(new URI(
+                "broker:(tcp://localhost:61616)/broker2?persistent=false"));
+
+        startAllBrokers();
+
+        // Forward messages from broker1 to broker2 over the TCP transport.
+        bridgeBrokers("broker1", "broker2").start();
+
+        // Verify that broker1 and broker2's test queues have no memory usage.
+        ActiveMQDestination testQueue = createDestination(
+                AMQ4147Test.class.getSimpleName() + ".queue", false);
+        final Destination broker1TestQueue = broker1.getDestination(testQueue);
+        final Destination broker2TestQueue = broker2.getDestination(testQueue);
+
+        assertEquals(0, broker1TestQueue.getMemoryUsage().getUsage());
+        assertEquals(0, broker2TestQueue.getMemoryUsage().getUsage());
+
+        // Produce a message to broker1's test queue and verify that broker1's
+        // memory usage has increased, but broker2 still has no memory usage.
+        sendMessages("broker1", testQueue, 1);
+        assertTrue(broker1TestQueue.getMemoryUsage().getUsage() > 0);
+        assertEquals(0, broker2TestQueue.getMemoryUsage().getUsage());
+
+        // Create a consumer on broker2 that is synchronized to allow detection
+        // of "in flight" messages to the consumer.
+        MessageIdList broker2Messages = getBrokerMessages("broker2");
+        final Semaphore consumerReady = new Semaphore(0);
+        final Semaphore consumerProceed = new Semaphore(0);
+
+        broker2Messages.setParent(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+                consumerReady.release();
+                try {
+                    consumerProceed.acquire();
+                } catch (InterruptedException ex) {
+                    Thread.currentThread().interrupt();
+                }
+            }
+        });
+
+        createConsumer("broker2", testQueue);
+
+        // Verify that when broker2's consumer receives the message, the memory
+        // usage has moved broker1 to broker2.
+        consumerReady.acquire();
+
+        try {
+            assertTrue("Memory Usage Should be Zero: ", Wait.waitFor(new Wait.Condition() {
+                @Override
+                public boolean isSatisified() throws Exception {
+                    return broker1TestQueue.getMemoryUsage().getUsage() == 0;
+                }
+            }));
+            assertTrue(broker2TestQueue.getMemoryUsage().getUsage() > 0);
+        } finally {
+            // Consume the message and verify that there is no more memory
+            // usage.
+            consumerProceed.release();
+        }
+
+        // Pause to allow ACK to be processed.
+        assertTrue("Memory Usage Should be Zero: ", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return broker1TestQueue.getMemoryUsage().getUsage() == 0;
+            }
+        }));
+        assertTrue("Memory Usage Should be Zero: ", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return broker2TestQueue.getMemoryUsage().getUsage() == 0;
+            }
+        }));
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4148Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4148Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4148Test.java
new file mode 100644
index 0000000..906131e
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4148Test.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.bugs;
+
+import java.net.URI;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.network.DemandForwardingBridgeSupport;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.util.Wait;
+import org.junit.Assert;
+
+/**
+ * This test demonstrates a bug in {@link DemandForwardingBridgeSupport} whereby
+ * a static subscription from broker1 to broker2 is forwarded to broker3 even
+ * though the network TTL is 1. This results in duplicate subscriptions on
+ * broker3.
+ */
+public class AMQ4148Test extends JmsMultipleBrokersTestSupport {
+
+    public void test() throws Exception {
+        // Create a hub-and-spoke network where each hub-spoke pair share
+        // messages on a test queue.
+        BrokerService hub = createBroker(new URI("broker:(vm://hub)/hub?persistent=false"));
+
+        final BrokerService[] spokes = new BrokerService[4];
+        for (int i = 0; i < spokes.length; i++) {
+            spokes[i] = createBroker(new URI("broker:(vm://spoke" + i + ")/spoke" + i + "?persistent=false"));
+
+        }
+        startAllBrokers();
+
+        ActiveMQDestination testQueue = createDestination(AMQ4148Test.class.getSimpleName() + ".queue", false);
+
+        NetworkConnector[] ncs = new NetworkConnector[spokes.length];
+        for (int i = 0; i < spokes.length; i++) {
+            NetworkConnector nc = bridgeBrokers("hub", "spoke" + i);
+            nc.setNetworkTTL(1);
+            nc.setDuplex(true);
+            nc.setConduitSubscriptions(false);
+            nc.setStaticallyIncludedDestinations(Arrays.asList(testQueue));
+            nc.start();
+
+            ncs[i] = nc;
+        }
+
+        waitForBridgeFormation();
+
+        // Pause to allow subscriptions to be created.
+        TimeUnit.SECONDS.sleep(5);
+
+        // Verify that the hub has a subscription from each spoke, but that each
+        // spoke has a single subscription from the hub (since the network TTL is 1).
+        final Destination hubTestQueue = hub.getDestination(testQueue);
+        assertTrue("Expecting {" + spokes.length + "} consumer but was {" + hubTestQueue.getConsumers().size() + "}",
+            Wait.waitFor(new Wait.Condition() {
+
+                @Override
+                public boolean isSatisified() throws Exception {
+                    return spokes.length == hubTestQueue.getConsumers().size();
+                }
+            })
+        );
+
+        // Now check each spoke has exactly one consumer on the Queue.
+        for (int i = 0; i < 4; i++) {
+            Destination spokeTestQueue = spokes[i].getDestination(testQueue);
+            Assert.assertEquals(1, spokeTestQueue.getConsumers().size());
+        }
+
+        for (NetworkConnector nc : ncs) {
+            nc.stop();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4157Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4157Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4157Test.java
new file mode 100644
index 0000000..d29ec08
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4157Test.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Vector;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.BytesMessage;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ConnectionControl;
+import org.junit.After;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class AMQ4157Test {
+    static final Logger LOG = LoggerFactory.getLogger(AMQ4157Test.class);
+    private BrokerService broker;
+    private ActiveMQConnectionFactory connectionFactory;
+    private final Destination destination = new ActiveMQQueue("Test");
+    private final String payloadString = new String(new byte[8*1024]);
+    private final boolean useBytesMessage= true;
+    private final int parallelProducer = 20;
+    private final int parallelConsumer = 100;
+
+    private final Vector<Exception> exceptions = new Vector<Exception>();
+    long toSend = 1000;
+
+    @Test
+    public void testPublishCountsWithRollbackConsumer() throws Exception {
+
+        startBroker(true);
+
+        final AtomicLong sharedCount = new AtomicLong(toSend);
+        ExecutorService executorService = Executors.newCachedThreadPool();
+
+        for (int i=0; i< parallelConsumer; i++) {
+            executorService.execute(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        consumeOneAndRollback();
+                    } catch (Exception e) {
+                        exceptions.add(e);
+                    }
+                }
+            });
+        }
+
+        for (int i=0; i< parallelProducer; i++) {
+            executorService.execute(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        publishMessages(sharedCount, 0);
+                    } catch (Exception e) {
+                        exceptions.add(e);
+                    }
+                }
+            });
+        }
+
+        executorService.shutdown();
+        executorService.awaitTermination(30, TimeUnit.MINUTES);
+        assertTrue("Producers done in time", executorService.isTerminated());
+        assertTrue("No exceptions: " + exceptions, exceptions.isEmpty());
+
+        restartBroker(500);
+
+        LOG.info("Attempting consume of {} messages", toSend);
+
+        consumeMessages(toSend);
+    }
+
+    private void consumeOneAndRollback() throws Exception {
+        ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
+        connection.start();
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        MessageConsumer consumer = session.createConsumer(destination);
+        Message message = null;
+        while (message == null) {
+            message = consumer.receive(1000);
+        }
+        session.rollback();
+        connection.close();
+    }
+
+    private void consumeMessages(long count) throws Exception {
+        ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createConsumer(destination);
+        for (int i=0; i<count; i++) {
+            assertNotNull("got message "+ i, consumer.receive(20000));
+        }
+        assertNull("none left over", consumer.receive(2000));
+    }
+
+    private void restartBroker(int restartDelay) throws Exception {
+        stopBroker();
+        TimeUnit.MILLISECONDS.sleep(restartDelay);
+        startBroker(false);
+    }
+
+    @After
+    public void stopBroker() throws Exception {
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
+    }
+
+    private void publishMessages(AtomicLong count, int expiry) throws Exception {
+        ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
+        connection.setWatchTopicAdvisories(false);
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        MessageProducer producer = session.createProducer(destination);
+        while ( (count.getAndDecrement()) > 0) {
+            Message message = null;
+            if (useBytesMessage) {
+                message = session.createBytesMessage();
+                ((BytesMessage) message).writeBytes(payloadString.getBytes());
+            } else {
+                message = session.createTextMessage(payloadString);
+            }
+            producer.send(message, DeliveryMode.PERSISTENT, 5, expiry);
+        }
+        connection.syncSendPacket(new ConnectionControl());
+        connection.close();
+    }
+
+    public void startBroker(boolean deleteAllMessages) throws Exception {
+        broker = new BrokerService();
+        broker.setDeleteAllMessagesOnStartup(deleteAllMessages);
+        broker.addConnector("tcp://0.0.0.0:0");
+        broker.start();
+
+        String options = "?jms.redeliveryPolicy.maximumRedeliveries=-1&jms.prefetchPolicy.all=1000&jms.watchTopicAdvisories=false&jms.useAsyncSend=true&jms.alwaysSessionAsync=false&jms.dispatchAsync=false&socketBufferSize=131072&ioBufferSize=16384&wireFormat.tightEncodingEnabled=false&wireFormat.cacheSize=8192";
+        connectionFactory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri() + options);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java
new file mode 100644
index 0000000..4867f28
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java
@@ -0,0 +1,393 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.bugs;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.management.ObjectName;
+
+import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerFilter;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.DiscoveryEvent;
+import org.apache.activemq.network.DiscoveryNetworkConnector;
+import org.apache.activemq.network.NetworkBridge;
+import org.apache.activemq.network.NetworkBridgeListener;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.discovery.DiscoveryAgent;
+import org.apache.activemq.transport.discovery.DiscoveryListener;
+import org.apache.activemq.transport.discovery.simple.SimpleDiscoveryAgent;
+import org.junit.Assert;
+
+/**
+ * This test demonstrates a number of race conditions in
+ * {@link DiscoveryNetworkConnector} that can result in an active bridge no
+ * longer being reported as active and vice-versa, an inactive bridge still
+ * being reported as active.
+ */
+public class AMQ4160Test extends JmsMultipleBrokersTestSupport {
+    final long MAX_TEST_TIME = TimeUnit.MINUTES.toMillis(2);
+
+    /**
+     * Since these tests involve wait conditions, protect against indefinite
+     * waits (due to unanticipated issues).
+     */
+    public void setUp() throws Exception {
+        setAutoFail(true);
+        setMaxTestTime(MAX_TEST_TIME);
+        super.setUp();
+    }
+
+    /**
+     * This test demonstrates how concurrent attempts to establish a bridge to
+     * the same remote broker are allowed to occur. Connection uniqueness will
+     * cause whichever bridge creation attempt is second to fail. However, this
+     * failure erases the entry in
+     * {@link DiscoveryNetworkConnector#activeBridges()} that represents the
+     * successful first bridge creation attempt.
+     */
+    public void testLostActiveBridge() throws Exception {
+        final long ATTEMPT_TO_CREATE_DELAY = TimeUnit.SECONDS.toMillis(15);
+
+        // Start two brokers with a bridge from broker1 to broker2.
+        BrokerService broker1 = createBroker(new URI(
+                "broker:(vm://broker1)/broker1?persistent=false"));
+        final BrokerService broker2 = createBroker(new URI(
+                "broker:(vm://broker2)/broker2?persistent=false"));
+
+        // Allow the concurrent local bridge connections to be made even though
+        // they are duplicated; this prevents both of the bridge attempts from
+        // failing in the case that the local and remote bridges are established
+        // out-of-order.
+        BrokerPlugin ignoreAddConnectionPlugin = new BrokerPlugin() {
+            @Override
+            public Broker installPlugin(Broker broker) throws Exception {
+                return new BrokerFilter(broker) {
+                    @Override
+                    public void addConnection(ConnectionContext context,
+                            ConnectionInfo info) throws Exception {
+                        // ignore
+                    }
+                };
+            }
+        };
+
+        broker1.setPlugins(new BrokerPlugin[] { ignoreAddConnectionPlugin });
+
+        startAllBrokers();
+
+        // Start a bridge from broker1 to broker2. The discovery agent attempts
+        // to create the bridge concurrently with two threads, and the
+        // synchronization in createBridge ensures that pre-patch both threads
+        // actually attempt to start bridges. Post-patch, only one thread is
+        // allowed to start the bridge.
+        final CountDownLatch attemptLatch = new CountDownLatch(2);
+        final CountDownLatch createLatch = new CountDownLatch(2);
+
+        DiscoveryNetworkConnector nc = new DiscoveryNetworkConnector() {
+            @Override
+            public void onServiceAdd(DiscoveryEvent event) {
+                // Pre-and-post patch, two threads attempt to establish a bridge
+                // to the same remote broker.
+                attemptLatch.countDown();
+                super.onServiceAdd(event);
+            }
+
+            @Override
+            protected NetworkBridge createBridge(Transport localTransport,
+                    Transport remoteTransport, final DiscoveryEvent event) {
+                // Pre-patch, the two threads are allowed to create the bridge.
+                // Post-patch, only the first thread is allowed. Wait a
+                // reasonable delay once both attempts are detected to allow
+                // the two bridge creations to occur concurrently (pre-patch).
+                // Post-patch, the wait will timeout and allow the first (and
+                // only) bridge creation to occur.
+                try {
+                    attemptLatch.await();
+                    createLatch.countDown();
+                    createLatch.await(ATTEMPT_TO_CREATE_DELAY,
+                            TimeUnit.MILLISECONDS);
+                    return super.createBridge(localTransport, remoteTransport,
+                            event);
+                } catch (InterruptedException e) {
+                    Thread.interrupted();
+                    return null;
+                }
+            }
+        };
+
+        nc.setDiscoveryAgent(new DiscoveryAgent() {
+            TaskRunnerFactory taskRunner = new TaskRunnerFactory();
+            DiscoveryListener listener;
+
+            @Override
+            public void start() throws Exception {
+                taskRunner.init();
+                taskRunner.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        listener.onServiceAdd(new DiscoveryEvent(broker2
+                                .getVmConnectorURI().toString()));
+                    }
+                });
+                taskRunner.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        listener.onServiceAdd(new DiscoveryEvent(broker2
+                                .getVmConnectorURI().toString()));
+                    }
+                });
+            }
+
+            @Override
+            public void stop() throws Exception {
+                taskRunner.shutdown();
+            }
+
+            @Override
+            public void setDiscoveryListener(DiscoveryListener listener) {
+                this.listener = listener;
+            }
+
+            @Override
+            public void registerService(String name) throws IOException {
+            }
+
+            @Override
+            public void serviceFailed(DiscoveryEvent event) throws IOException {
+                listener.onServiceRemove(event);
+            }
+        });
+
+        broker1.addNetworkConnector(nc);
+        nc.start();
+
+        // Wait for the bridge to be formed by the first attempt.
+        waitForBridge(broker1.getBrokerName(), broker2.getBrokerName(),
+                MAX_TEST_TIME, TimeUnit.MILLISECONDS);
+
+        // Pre-patch, the second bridge creation attempt fails and removes the
+        // first (successful) bridge creation attempt from the
+        // list of active bridges. Post-patch, the second bridge creation
+        // attempt is prevented, so the first bridge creation attempt
+        // remains "active". This assertion is expected to fail pre-patch and
+        // pass post-patch.
+        Assert.assertFalse(nc.activeBridges().isEmpty());
+    }
+
+    /**
+     * This test demonstrates a race condition where a failed bridge can be
+     * removed from the list of active bridges in
+     * {@link DiscoveryNetworkConnector} before it has been added. Eventually,
+     * the failed bridge is added, but never removed, which causes subsequent
+     * bridge creation attempts to be ignored. The result is a network connector
+     * that thinks it has an active bridge, when in fact it doesn't.
+     */
+    public void testInactiveBridgStillActive() throws Exception {
+        // Start two brokers with a bridge from broker1 to broker2.
+        BrokerService broker1 = createBroker(new URI(
+                "broker:(vm://broker1)/broker1?persistent=false"));
+        final BrokerService broker2 = createBroker(new URI(
+                "broker:(vm://broker2)/broker2?persistent=false"));
+
+        // Force bridge failure by having broker1 disallow connections.
+        BrokerPlugin disallowAddConnectionPlugin = new BrokerPlugin() {
+            @Override
+            public Broker installPlugin(Broker broker) throws Exception {
+                return new BrokerFilter(broker) {
+                    @Override
+                    public void addConnection(ConnectionContext context,
+                            ConnectionInfo info) throws Exception {
+                        throw new Exception(
+                                "Test exception to force bridge failure");
+                    }
+                };
+            }
+        };
+
+        broker1.setPlugins(new BrokerPlugin[] { disallowAddConnectionPlugin });
+
+        startAllBrokers();
+
+        // Start a bridge from broker1 to broker2. The bridge delays returning
+        // from start until after the bridge failure has been processed;
+        // this leaves the first bridge creation attempt recorded as active,
+        // even though it failed.
+        final SimpleDiscoveryAgent da = new SimpleDiscoveryAgent();
+        da.setServices(new URI[] { broker2.getVmConnectorURI() });
+
+        final CountDownLatch attemptLatch = new CountDownLatch(3);
+        final CountDownLatch removedLatch = new CountDownLatch(1);
+
+        DiscoveryNetworkConnector nc = new DiscoveryNetworkConnector() {
+            @Override
+            public void onServiceAdd(DiscoveryEvent event) {
+                attemptLatch.countDown();
+                super.onServiceAdd(event);
+            }
+
+            @Override
+            public void onServiceRemove(DiscoveryEvent event) {
+                super.onServiceRemove(event);
+                removedLatch.countDown();
+            }
+
+            @Override
+            protected NetworkBridge createBridge(Transport localTransport,
+                    Transport remoteTransport, final DiscoveryEvent event) {
+                final NetworkBridge next = super.createBridge(localTransport,
+                        remoteTransport, event);
+                return new NetworkBridge() {
+
+                    @Override
+                    public void start() throws Exception {
+                        next.start();
+                        // Delay returning until the failed service has been
+                        // removed.
+                        removedLatch.await();
+                    }
+
+                    @Override
+                    public void stop() throws Exception {
+                        next.stop();
+                    }
+
+                    @Override
+                    public void serviceRemoteException(Throwable error) {
+                        next.serviceRemoteException(error);
+                    }
+
+                    @Override
+                    public void serviceLocalException(Throwable error) {
+                        next.serviceLocalException(error);
+                    }
+
+                    @Override
+                    public void setNetworkBridgeListener(
+                            NetworkBridgeListener listener) {
+                        next.setNetworkBridgeListener(listener);
+                    }
+
+                    @Override
+                    public String getRemoteAddress() {
+                        return next.getRemoteAddress();
+                    }
+
+                    @Override
+                    public String getRemoteBrokerName() {
+                        return next.getRemoteBrokerName();
+                    }
+
+                    @Override
+                    public String getRemoteBrokerId() {
+                        return next.getRemoteBrokerId();
+                    }
+
+                    @Override
+                    public String getLocalAddress() {
+                        return next.getLocalAddress();
+                    }
+
+                    @Override
+                    public String getLocalBrokerName() {
+                        return next.getLocalBrokerName();
+                    }
+
+                    @Override
+                    public long getEnqueueCounter() {
+                        return next.getEnqueueCounter();
+                    }
+
+                    @Override
+                    public long getDequeueCounter() {
+                        return next.getDequeueCounter();
+                    }
+
+                    @Override
+                    public void setMbeanObjectName(ObjectName objectName) {
+                        next.setMbeanObjectName(objectName);
+                    }
+
+                    @Override
+                    public ObjectName getMbeanObjectName() {
+                        return next.getMbeanObjectName();
+                    }
+
+                    public void resetStats(){
+                        next.resetStats();
+                    }
+                };
+            }
+        };
+        nc.setDiscoveryAgent(da);
+
+        broker1.addNetworkConnector(nc);
+        nc.start();
+
+        // All bridge attempts should fail, so the attempt latch should get
+        // triggered. However, because of the race condition, the first attempt
+        // is considered successful and causes further attempts to stop.
+        // Therefore, this wait will time out and cause the test to fail.
+        Assert.assertTrue(attemptLatch.await(30, TimeUnit.SECONDS));
+    }
+
+    /**
+     * This test verifies that when a network connector is restarted, any
+     * bridges that were active at the time of the stop are allowed to be
+     * re-established (i.e., the "active events" data structure in
+     * {@link DiscoveryNetworkConnector} is reset.
+     */
+    public void testAllowAttemptsAfterRestart() throws Exception {
+        final long STOP_DELAY = TimeUnit.SECONDS.toMillis(10);
+
+        // Start two brokers with a bridge from broker1 to broker2.
+        BrokerService broker1 = createBroker(new URI(
+                "broker:(vm://broker1)/broker1?persistent=false"));
+        final BrokerService broker2 = createBroker(new URI(
+                "broker:(vm://broker2)/broker2?persistent=false"));
+
+        startAllBrokers();
+
+        // Start a bridge from broker1 to broker2.
+        NetworkConnector nc = bridgeBrokers(broker1.getBrokerName(),
+                broker2.getBrokerName());
+        nc.start();
+
+        waitForBridge(broker1.getBrokerName(), broker2.getBrokerName(),
+                MAX_TEST_TIME, TimeUnit.MILLISECONDS);
+
+        // Restart the network connector and verify that the bridge is
+        // re-established. The pause between start/stop is to account for the
+        // asynchronous closure.
+        nc.stop();
+        Thread.sleep(STOP_DELAY);
+        nc.start();
+
+        waitForBridge(broker1.getBrokerName(), broker2.getBrokerName(),
+                MAX_TEST_TIME, TimeUnit.MILLISECONDS);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4212Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4212Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4212Test.java
new file mode 100644
index 0000000..141a881
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4212Test.java
@@ -0,0 +1,358 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.management.ObjectName;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.disk.journal.DataFile;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ4212Test {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AMQ4212Test.class);
+
+    private BrokerService service;
+    private String connectionUri;
+    private ActiveMQConnectionFactory cf;
+
+    private final int MSG_COUNT = 256;
+
+    @Before
+    public void setUp() throws IOException, Exception {
+        createBroker(true, false);
+    }
+
+    public void createBroker(boolean deleteAllMessages, boolean recover) throws Exception {
+        service = new BrokerService();
+        service.setBrokerName("InactiveSubTest");
+        service.setDeleteAllMessagesOnStartup(deleteAllMessages);
+        service.setAdvisorySupport(false);
+        service.setPersistent(true);
+        service.setUseJmx(true);
+        service.setKeepDurableSubsActive(false);
+
+        KahaDBPersistenceAdapter pa=new KahaDBPersistenceAdapter();
+        File dataFile=new File("KahaDB");
+        pa.setDirectory(dataFile);
+        pa.setJournalMaxFileLength(10*1024);
+        pa.setCheckpointInterval(TimeUnit.SECONDS.toMillis(5));
+        pa.setCleanupInterval(TimeUnit.SECONDS.toMillis(5));
+        pa.setForceRecoverIndex(recover);
+
+        service.setPersistenceAdapter(pa);
+        service.start();
+        service.waitUntilStarted();
+
+        connectionUri = "vm://InactiveSubTest?create=false";
+        cf = new ActiveMQConnectionFactory(connectionUri);
+    }
+
+    private void restartBroker() throws Exception {
+        stopBroker();
+        createBroker(false, false);
+    }
+
+    private void recoverBroker() throws Exception {
+        stopBroker();
+        createBroker(false, true);
+    }
+
+    @After
+    public void stopBroker() throws Exception {
+        if (service != null) {
+            service.stop();
+            service.waitUntilStopped();
+            service = null;
+        }
+    }
+
+    @Test
+    public void testDirableSubPrefetchRecovered() throws Exception {
+
+        ActiveMQQueue queue = new ActiveMQQueue("MyQueue");
+        ActiveMQTopic topic = new ActiveMQTopic("MyDurableTopic");
+
+        // Send to a Queue to create some journal files
+        sendMessages(queue);
+
+        LOG.info("There are currently [{}] journal log files.", getNumberOfJournalFiles());
+
+        createInactiveDurableSub(topic);
+
+        assertTrue("Should have an inactive durable sub", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                ObjectName[] subs = service.getAdminView().getInactiveDurableTopicSubscribers();
+                return subs != null && subs.length == 1 ? true : false;
+            }
+        }));
+
+        // Now send some more to the queue to create even more files.
+        sendMessages(queue);
+
+        LOG.info("There are currently [{}] journal log files.", getNumberOfJournalFiles());
+        assertTrue(getNumberOfJournalFiles() > 1);
+
+        LOG.info("Restarting the broker.");
+        restartBroker();
+        LOG.info("Restarted the broker.");
+
+        LOG.info("There are currently [{}] journal log files.", getNumberOfJournalFiles());
+        assertTrue(getNumberOfJournalFiles() > 1);
+
+        assertTrue("Should have an inactive durable sub", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                ObjectName[] subs = service.getAdminView().getInactiveDurableTopicSubscribers();
+                return subs != null && subs.length == 1 ? true : false;
+            }
+        }));
+
+        // Clear out all queue data
+        service.getAdminView().removeQueue(queue.getQueueName());
+
+        assertTrue("Less than two journal files expected, was " + getNumberOfJournalFiles(), Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getNumberOfJournalFiles() <= 2;
+            }
+        }, TimeUnit.MINUTES.toMillis(2)));
+
+        LOG.info("Sending {} Messages to the Topic.", MSG_COUNT);
+        // Send some messages to the inactive destination
+        sendMessages(topic);
+
+        LOG.info("Attempt to consume {} messages from the Topic.", MSG_COUNT);
+        assertEquals(MSG_COUNT, consumeFromInactiveDurableSub(topic));
+
+        LOG.info("Recovering the broker.");
+        recoverBroker();
+        LOG.info("Recovering the broker.");
+
+        assertTrue("Should have an inactive durable sub", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                ObjectName[] subs = service.getAdminView().getInactiveDurableTopicSubscribers();
+                return subs != null && subs.length == 1 ? true : false;
+            }
+        }));
+    }
+
+    @Test
+    public void testDurableAcksNotDropped() throws Exception {
+
+        ActiveMQQueue queue = new ActiveMQQueue("MyQueue");
+        ActiveMQTopic topic = new ActiveMQTopic("MyDurableTopic");
+
+        // Create durable sub in first data file.
+        createInactiveDurableSub(topic);
+
+        assertTrue("Should have an inactive durable sub", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                ObjectName[] subs = service.getAdminView().getInactiveDurableTopicSubscribers();
+                return subs != null && subs.length == 1 ? true : false;
+            }
+        }));
+
+        // Send to a Topic
+        sendMessages(topic, 1);
+
+        // Send to a Queue to create some journal files
+        sendMessages(queue);
+
+        LOG.info("Before consume there are currently [{}] journal log files.", getNumberOfJournalFiles());
+
+        // Consume all the Messages leaving acks behind.
+        consumeDurableMessages(topic, 1);
+
+        LOG.info("After consume there are currently [{}] journal log files.", getNumberOfJournalFiles());
+
+        // Now send some more to the queue to create even more files.
+        sendMessages(queue);
+
+        LOG.info("More Queued. There are currently [{}] journal log files.", getNumberOfJournalFiles());
+        assertTrue(getNumberOfJournalFiles() > 1);
+
+        LOG.info("Restarting the broker.");
+        restartBroker();
+        LOG.info("Restarted the broker.");
+
+        LOG.info("There are currently [{}] journal log files.", getNumberOfJournalFiles());
+        assertTrue(getNumberOfJournalFiles() > 1);
+
+        assertTrue("Should have an inactive durable sub", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                ObjectName[] subs = service.getAdminView().getInactiveDurableTopicSubscribers();
+                return subs != null && subs.length == 1 ? true : false;
+            }
+        }));
+
+        // Clear out all queue data
+        service.getAdminView().removeQueue(queue.getQueueName());
+
+        assertTrue("Less than three journal file expected, was " + getNumberOfJournalFiles(), Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getNumberOfJournalFiles() <= 3;
+            }
+        }, TimeUnit.MINUTES.toMillis(3)));
+
+        // See if we receive any message they should all be acked.
+        tryConsumeExpectNone(topic);
+
+        LOG.info("There are currently [{}] journal log files.", getNumberOfJournalFiles());
+
+        LOG.info("Recovering the broker.");
+        recoverBroker();
+        LOG.info("Recovering the broker.");
+
+        LOG.info("There are currently [{}] journal log files.", getNumberOfJournalFiles());
+
+        assertTrue("Should have an inactive durable sub", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                ObjectName[] subs = service.getAdminView().getInactiveDurableTopicSubscribers();
+                return subs != null && subs.length == 1 ? true : false;
+            }
+        }));
+
+        // See if we receive any message they should all be acked.
+        tryConsumeExpectNone(topic);
+
+        assertTrue("Less than three journal file expected, was " + getNumberOfJournalFiles(), Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getNumberOfJournalFiles() == 1;
+            }
+        }, TimeUnit.MINUTES.toMillis(1)));
+    }
+
+    private int getNumberOfJournalFiles() throws IOException {
+        Collection<DataFile> files =
+            ((KahaDBPersistenceAdapter) service.getPersistenceAdapter()).getStore().getJournal().getFileMap().values();
+        int reality = 0;
+        for (DataFile file : files) {
+            if (file != null) {
+                reality++;
+            }
+        }
+
+        return reality;
+    }
+
+    private void createInactiveDurableSub(Topic topic) throws Exception {
+        Connection connection = cf.createConnection();
+        connection.setClientID("Inactive");
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createDurableSubscriber(topic, "Inactive");
+        consumer.close();
+        connection.close();
+    }
+
+    private void consumeDurableMessages(Topic topic, int count) throws Exception {
+        Connection connection = cf.createConnection();
+        connection.setClientID("Inactive");
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createDurableSubscriber(topic, "Inactive");
+        connection.start();
+        for (int i = 0; i < count; ++i) {
+           if (consumer.receive(TimeUnit.SECONDS.toMillis(10)) == null) {
+               fail("should have received a message");
+           }
+        }
+        consumer.close();
+        connection.close();
+    }
+
+    private void tryConsumeExpectNone(Topic topic) throws Exception {
+        Connection connection = cf.createConnection();
+        connection.setClientID("Inactive");
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createDurableSubscriber(topic, "Inactive");
+        connection.start();
+        if (consumer.receive(TimeUnit.SECONDS.toMillis(10)) != null) {
+            fail("Should be no messages for this durable.");
+        }
+        consumer.close();
+        connection.close();
+    }
+
+    private int consumeFromInactiveDurableSub(Topic topic) throws Exception {
+        Connection connection = cf.createConnection();
+        connection.setClientID("Inactive");
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createDurableSubscriber(topic, "Inactive");
+
+        int count = 0;
+
+        while (consumer.receive(10000) != null) {
+            count++;
+        }
+
+        consumer.close();
+        connection.close();
+
+        return count;
+    }
+
+    private void sendMessages(Destination destination) throws Exception {
+        sendMessages(destination, MSG_COUNT);
+    }
+
+    private void sendMessages(Destination destination, int count) throws Exception {
+        Connection connection = cf.createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(destination);
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+        for (int i = 0; i < count; ++i) {
+            TextMessage message = session.createTextMessage("Message #" + i + " for destination: " + destination);
+            producer.send(message);
+        }
+        connection.close();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4213Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4213Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4213Test.java
new file mode 100644
index 0000000..fddb6b1
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4213Test.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.fail;
+
+import javax.jms.JMSException;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerPluginSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ProducerInfo;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AMQ4213Test {
+
+    private static BrokerService brokerService;
+    private static String BROKER_ADDRESS = "tcp://localhost:0";
+    private static String TEST_QUEUE = "testQueue";
+    private static ActiveMQQueue queue = new ActiveMQQueue(TEST_QUEUE);
+
+    private String connectionUri;
+
+    @SuppressWarnings("unchecked")
+    @Before
+    public void setUp() throws Exception {
+        brokerService = new BrokerService();
+        brokerService.setPersistent(false);
+        brokerService.setUseJmx(true);
+        brokerService.setDeleteAllMessagesOnStartup(true);
+        connectionUri = brokerService.addConnector(BROKER_ADDRESS).getPublishableConnectString();
+
+        brokerService.setPlugins(new BrokerPlugin[]{
+            new BrokerPluginSupport() {
+
+                @Override
+                public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
+                    throw new javax.jms.JMSSecurityException(connectionUri);
+                }
+            }
+        });
+
+        brokerService.start();
+        brokerService.waitUntilStarted();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        brokerService.stop();
+        brokerService.waitUntilStopped();
+    }
+
+    @Test
+    public void testExceptionOnProducerCreateThrows() throws Exception {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
+        ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
+
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+        connection.start();
+
+        try {
+            session.createProducer(queue);
+            fail("Should not be able to create this producer.");
+        } catch (JMSException ex) {
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4220Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4220Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4220Test.java
new file mode 100644
index 0000000..7084bde
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4220Test.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ4220Test {
+
+    static final Logger LOG = LoggerFactory.getLogger(AMQ4220Test.class);
+    private final static int maxFileLength = 1024*1024*32;
+    private final static String destinationName = "TEST.QUEUE";
+    BrokerService broker;
+
+    @Before
+    public void setUp() throws Exception {
+        prepareBrokerWithMultiStore(true);
+        broker.start();
+        broker.waitUntilStarted();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        broker.stop();
+    }
+
+    protected BrokerService createBroker(PersistenceAdapter kaha) throws Exception {
+        BrokerService broker = new BrokerService();
+        broker.setUseJmx(true);
+        broker.setBrokerName("localhost");
+        broker.setPersistenceAdapter(kaha);
+        return broker;
+    }
+
+    @Test
+    public void testRestartAfterQueueDelete() throws Exception {
+
+        // Ensure we have an Admin View.
+        assertTrue("Broker doesn't have an Admin View.", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return (broker.getAdminView()) != null;
+            }
+        }));
+
+
+        LOG.info("Adding initial destination: {}", destinationName);
+
+        broker.getAdminView().addQueue(destinationName);
+
+        assertNotNull(broker.getDestination(new ActiveMQQueue(destinationName)));
+
+        LOG.info("Removing initial destination: {}", destinationName);
+
+        broker.getAdminView().removeQueue(destinationName);
+
+        LOG.info("Adding back destination: {}", destinationName);
+
+        broker.getAdminView().addQueue(destinationName);
+
+        assertNotNull(broker.getDestination(new ActiveMQQueue(destinationName)));
+    }
+
+    protected KahaDBPersistenceAdapter createStore(boolean delete) throws IOException {
+        KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter();
+        kaha.setJournalMaxFileLength(maxFileLength);
+        kaha.setCleanupInterval(5000);
+        if (delete) {
+            kaha.deleteAllMessages();
+        }
+        return kaha;
+    }
+
+    public void prepareBrokerWithMultiStore(boolean deleteAllMessages) throws Exception {
+
+        MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter = new MultiKahaDBPersistenceAdapter();
+        if (deleteAllMessages) {
+            multiKahaDBPersistenceAdapter.deleteAllMessages();
+        }
+        ArrayList<FilteredKahaDBPersistenceAdapter> adapters = new ArrayList<FilteredKahaDBPersistenceAdapter>();
+
+        FilteredKahaDBPersistenceAdapter template = new FilteredKahaDBPersistenceAdapter();
+        template.setPersistenceAdapter(createStore(deleteAllMessages));
+        template.setPerDestination(true);
+        adapters.add(template);
+
+        multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(adapters);
+        broker = createBroker(multiKahaDBPersistenceAdapter);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4221Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4221Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4221Test.java
new file mode 100644
index 0000000..55e8027
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4221Test.java
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.util.HashSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import junit.framework.Test;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
+import org.apache.activemq.TestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.DestinationStatistics;
+import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.store.kahadb.plist.PListStoreImpl;
+import org.apache.activemq.util.DefaultTestAppender;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.spi.LoggingEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ4221Test extends TestSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(AMQ4221Test.class);
+    public int PAYLOAD_SIZE_BYTES = 4 * 1024;
+    public int NUM_TO_SEND = 60000;
+    public int NUM_CONCURRENT_PRODUCERS = 20;
+    public int QUEUE_COUNT = 1;
+    public int TMP_JOURNAL_MAX_FILE_SIZE = 10 * 1024 * 1024;
+
+    public int DLQ_PURGE_INTERVAL = 30000;
+
+    public int MESSAGE_TIME_TO_LIVE = 20000;
+    public int EXPIRE_SWEEP_PERIOD = 200;
+    public int TMP_JOURNAL_GC_PERIOD = 50;
+    public int RECEIVE_POLL_PERIOD = 4000;
+    private int RECEIVE_BATCH = 5000;
+
+    final byte[] payload = new byte[PAYLOAD_SIZE_BYTES];
+    final AtomicInteger counter = new AtomicInteger(0);
+    final HashSet<Throwable> exceptions = new HashSet<Throwable>();
+    BrokerService brokerService;
+    private String brokerUrlString;
+    ExecutorService executorService = Executors.newCachedThreadPool();
+    final AtomicBoolean done = new AtomicBoolean(false);
+
+    public static Test suite() {
+        return suite(AMQ4221Test.class);
+    }
+
+    @Override
+    public void setUp() throws Exception {
+
+        LogManager.getRootLogger().addAppender(new DefaultTestAppender() {
+
+            @Override
+            public void doAppend(LoggingEvent event) {
+                if (event.getLevel().isGreaterOrEqual(Level.ERROR)) {
+                    System.err.println("exit on error: " + event.getMessage());
+                    done.set(true);
+                    new Thread() {
+                        public void run() {
+                            System.exit(787);
+                        }
+                    }.start();
+                }
+            }
+        });
+
+        done.set(false);
+        brokerService = new BrokerService();
+        brokerService.setDeleteAllMessagesOnStartup(true);
+        brokerService.setDestinations(new ActiveMQDestination[]{new ActiveMQQueue("ActiveMQ.DLQ")});
+
+        PolicyEntry defaultPolicy = new PolicyEntry();
+        defaultPolicy.setPendingQueuePolicy(new FilePendingQueueMessageStoragePolicy());
+        defaultPolicy.setExpireMessagesPeriod(EXPIRE_SWEEP_PERIOD);
+        defaultPolicy.setProducerFlowControl(false);
+        defaultPolicy.setMemoryLimit(50 * 1024 * 1024);
+
+        brokerService.getSystemUsage().getMemoryUsage().setLimit(50 * 1024 * 1024);
+
+
+        PolicyMap destinationPolicyMap = new PolicyMap();
+        destinationPolicyMap.setDefaultEntry(defaultPolicy);
+        brokerService.setDestinationPolicy(destinationPolicyMap);
+
+
+        PListStoreImpl tempDataStore = new PListStoreImpl();
+        tempDataStore.setDirectory(brokerService.getTmpDataDirectory());
+        tempDataStore.setJournalMaxFileLength(TMP_JOURNAL_MAX_FILE_SIZE);
+        tempDataStore.setCleanupInterval(TMP_JOURNAL_GC_PERIOD);
+        tempDataStore.setIndexPageSize(200);
+        tempDataStore.setIndexEnablePageCaching(false);
+
+        brokerService.setTempDataStore(tempDataStore);
+        brokerService.setAdvisorySupport(false);
+        TransportConnector tcp = brokerService.addConnector("tcp://localhost:0");
+        brokerService.start();
+        brokerUrlString = tcp.getPublishableConnectString();
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        brokerService.stop();
+        brokerService.waitUntilStopped();
+        executorService.shutdownNow();
+    }
+
+    public void testProduceConsumeExpireHalf() throws Exception {
+
+        final org.apache.activemq.broker.region.Queue dlq =
+                (org.apache.activemq.broker.region.Queue) getDestination(brokerService, new ActiveMQQueue("ActiveMQ.DLQ"));
+
+        if (DLQ_PURGE_INTERVAL > 0) {
+            executorService.execute(new Runnable() {
+                @Override
+                public void run() {
+                    while (!done.get()) {
+                        try {
+                            Thread.sleep(DLQ_PURGE_INTERVAL);
+                            LOG.info("Purge DLQ, current size: " + dlq.getDestinationStatistics().getMessages().getCount());
+                            dlq.purge();
+                        } catch (InterruptedException allDone) {
+                        } catch (Throwable e) {
+                            e.printStackTrace();
+                            exceptions.add(e);
+                        }
+                    }
+                }
+            });
+
+        }
+
+        final CountDownLatch latch = new CountDownLatch(QUEUE_COUNT);
+        for (int i = 0; i < QUEUE_COUNT; i++) {
+            final int id = i;
+            executorService.execute(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        doProduceConsumeExpireHalf(id, latch);
+                    } catch (Throwable e) {
+                        e.printStackTrace();
+                        exceptions.add(e);
+                    }
+                }
+            });
+        }
+
+        while (!done.get()) {
+            done.set(latch.await(5, TimeUnit.SECONDS));
+        }
+        executorService.shutdown();
+        executorService.awaitTermination(5, TimeUnit.MINUTES);
+
+        assertTrue("no exceptions:" + exceptions, exceptions.isEmpty());
+
+    }
+
+    public void doProduceConsumeExpireHalf(int id, CountDownLatch latch) throws Exception {
+
+        final ActiveMQQueue queue = new ActiveMQQueue("Q" + id);
+
+        final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrlString);
+        ActiveMQPrefetchPolicy prefecthPolicy = new ActiveMQPrefetchPolicy();
+        prefecthPolicy.setAll(0);
+        factory.setPrefetchPolicy(prefecthPolicy);
+        Connection connection = factory.createConnection();
+        connection.start();
+        final MessageConsumer consumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(queue, "on = 'true'");
+
+        executorService.execute(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    while (!done.get()) {
+                        Thread.sleep(RECEIVE_POLL_PERIOD);
+                        for (int i = 0; i < RECEIVE_BATCH && !done.get(); i++) {
+
+                            Message message = consumer.receive(1000);
+                            if (message != null) {
+                                counter.incrementAndGet();
+                                if (counter.get() > 0 && counter.get() % 500 == 0) {
+                                    LOG.info("received: " + counter.get() + ", " + message.getJMSDestination().toString());
+                                }
+                            }
+                        }
+                    }
+                } catch (JMSException ignored) {
+
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    exceptions.add(e);
+                }
+            }
+        });
+
+        final AtomicInteger accumulator = new AtomicInteger(0);
+        final CountDownLatch producersDone = new CountDownLatch(NUM_CONCURRENT_PRODUCERS);
+
+        for (int i = 0; i < NUM_CONCURRENT_PRODUCERS; i++) {
+            executorService.execute(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        Connection sendConnection = factory.createConnection();
+                        sendConnection.start();
+                        Session sendSession = sendConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                        MessageProducer producer = sendSession.createProducer(queue);
+                        producer.setTimeToLive(MESSAGE_TIME_TO_LIVE);
+                        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+                        while (accumulator.incrementAndGet() < NUM_TO_SEND && !done.get()) {
+                            BytesMessage message = sendSession.createBytesMessage();
+                            message.writeBytes(payload);
+                            message.setStringProperty("on", String.valueOf(accumulator.get() % 2 == 0));
+                            producer.send(message);
+
+                        }
+                        producersDone.countDown();
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                        exceptions.add(e);
+                    }
+                }
+            });
+        }
+
+        producersDone.await(10, TimeUnit.MINUTES);
+
+        final DestinationStatistics view = getDestinationStatistics(brokerService, queue);
+        LOG.info("total expired so far " + view.getExpired().getCount() + ", " + queue.getQueueName());
+        latch.countDown();
+    }
+}


Mime
View raw message