activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [15/52] [abbrv] [partial] activemq-artemis git commit: ARTEMIS-127 Adding activemq unit test module to Artemis
Date Tue, 09 Jun 2015 16:36:46 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4582Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4582Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4582Test.java
new file mode 100644
index 0000000..1c34982
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4582Test.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.io.IOException;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.util.ConsumerThread;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ4582Test {
+
+    private static final transient Logger LOG = LoggerFactory.getLogger(AMQ4582Test.class);
+
+    BrokerService broker;
+    Connection connection;
+    Session session;
+
+    public static final String KEYSTORE_TYPE = "jks";
+    public static final String PASSWORD = "password";
+    public static final String SERVER_KEYSTORE = "src/test/resources/server.keystore";
+    public static final String TRUST_KEYSTORE = "src/test/resources/client.keystore";
+
+    public static final int PRODUCER_COUNT = 10;
+    public static final int CONSUMER_COUNT = 10;
+    public static final int MESSAGE_COUNT = 1000;
+
+    final ConsumerThread[] consumers = new ConsumerThread[CONSUMER_COUNT];
+
+    @Before
+    public void setUp() throws Exception {
+        System.setProperty("javax.net.ssl.trustStore", TRUST_KEYSTORE);
+        System.setProperty("javax.net.ssl.trustStorePassword", PASSWORD);
+        System.setProperty("javax.net.ssl.trustStoreType", KEYSTORE_TYPE);
+        System.setProperty("javax.net.ssl.keyStore", SERVER_KEYSTORE);
+        System.setProperty("javax.net.ssl.keyStoreType", KEYSTORE_TYPE);
+        System.setProperty("javax.net.ssl.keyStorePassword", PASSWORD);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (broker != null) {
+            try {
+                broker.stop();
+            } catch(Exception e) {}
+        }
+    }
+
+    @Rule public ExpectedException thrown = ExpectedException.none();
+    @Test
+    public void simpleTest() throws Exception {
+        thrown.expect(IOException.class);
+        thrown.expectMessage("enabledCipherSuites=BADSUITE");
+
+        broker = new BrokerService();
+        broker.setPersistent(false);
+        broker.setUseJmx(false);
+        try {
+            broker.addConnector(
+                "ssl://localhost:0?transport.needClientAuth=true&transport.enabledCipherSuites=BADSUITE");
+            broker.start();
+            broker.waitUntilStarted();
+        } catch (Exception e) {
+            LOG.info("BrokerService threw:", e);
+            throw e;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4595Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4595Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4595Test.java
new file mode 100644
index 0000000..507e52e
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4595Test.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.net.URI;
+import java.util.Date;
+import java.util.Enumeration;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import static org.junit.Assert.assertEquals;
+
+public class AMQ4595Test {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AMQ4595Test.class);
+
+    private BrokerService broker;
+    private URI connectUri;
+    private ActiveMQConnectionFactory factory;
+
+    @Before
+    public void startBroker() throws Exception {
+        broker = new BrokerService();
+        TransportConnector connector = broker.addConnector("vm://localhost");
+        broker.deleteAllMessages();
+
+        //PolicyMap pMap = new PolicyMap();
+        //PolicyEntry policyEntry = new PolicyEntry();
+        //policyEntry.setMaxBrowsePageSize(10000);
+        //pMap.put(new ActiveMQQueue(">"), policyEntry);
+        // when no policy match, browserSub has maxMessages==0
+        //broker.setDestinationPolicy(pMap);
+
+        broker.getSystemUsage().getMemoryUsage().setLimit(256 * 1024 * 1024);
+        broker.start();
+        broker.waitUntilStarted();
+        connectUri = connector.getConnectUri();
+        factory = new ActiveMQConnectionFactory(connectUri);
+    }
+
+    @After
+    public void stopBroker() throws Exception {
+        broker.stop();
+        broker.waitUntilStopped();
+    }
+
+    @Test(timeout=120000)
+    public void testBrowsingSmallBatch() throws JMSException {
+        doTestBrowsing(100);
+    }
+
+    @Test(timeout=160000)
+    public void testBrowsingMediumBatch() throws JMSException {
+        doTestBrowsing(1000);
+    }
+
+    @Test(timeout=300000)
+    public void testBrowsingLargeBatch() throws JMSException {
+        doTestBrowsing(10000);
+    }
+
+    private void doTestBrowsing(int messageToSend) throws JMSException {
+        ActiveMQQueue queue = new ActiveMQQueue("TEST");
+
+        // Send the messages to the Queue.
+        ActiveMQConnection producerConnection = (ActiveMQConnection) factory.createConnection();
+        producerConnection.setUseAsyncSend(true);
+        producerConnection.start();
+        Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = producerSession.createProducer(queue);
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+        for (int i = 1; i <= messageToSend; i++) {
+            String msgStr = provideMessageText(i, 8192);
+            producer.send(producerSession.createTextMessage(msgStr));
+            if ((i % 1000) == 0) {
+                LOG.info("P&C: {}", msgStr.substring(0, 100));
+            }
+        }
+        producerConnection.close();
+
+        LOG.info("Mem usage after producer done: " + broker.getSystemUsage().getMemoryUsage().getPercentUsage() + "%");
+
+        // Browse the queue.
+        Connection connection = factory.createConnection();
+        connection.start();
+        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+
+        QueueBrowser browser = session.createBrowser(queue);
+        Enumeration<?> enumeration = browser.getEnumeration();
+        int browsed = 0;
+        while (enumeration.hasMoreElements()) {
+            TextMessage m = (TextMessage) enumeration.nextElement();
+            browsed++;
+            if ((browsed % 1000) == 0) {
+                LOG.info("B[{}]: {}", browsed, m.getText().substring(0, 100));
+            }
+        }
+        browser.close();
+        session.close();
+        connection.close();
+
+        LOG.info("Mem usage after browser closed: " + broker.getSystemUsage().getMemoryUsage().getPercentUsage() + "%");
+
+        // The number of messages browsed should be equal to the number of messages sent.
+        assertEquals(messageToSend, browsed);
+
+        browser.close();
+    }
+
+    public String provideMessageText(int messageNumber, int messageSize) {
+        StringBuilder buf = new StringBuilder();
+        buf.append("Message: ");
+        if (messageNumber > 0) {
+            buf.append(messageNumber);
+        }
+        buf.append(" sent at: ").append(new Date());
+
+        if (buf.length() > messageSize) {
+            return buf.substring(0, messageSize);
+        }
+        for (int i = buf.length(); i < messageSize; i++) {
+            buf.append(' ');
+        }
+        return buf.toString();
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4607Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4607Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4607Test.java
new file mode 100644
index 0000000..265b692
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4607Test.java
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import junit.framework.Test;
+import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.ManagementContext;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.network.ConditionalNetworkBridgeFilterFactory;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.util.Wait;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ4607Test extends JmsMultipleBrokersTestSupport implements UncaughtExceptionHandler {
+    private static final Logger LOG = LoggerFactory.getLogger(AMQ4607Test.class);
+
+    public static final int BROKER_COUNT = 3;
+    public static final int CONSUMER_COUNT = 1;
+    public static final int MESSAGE_COUNT = 0;
+    public static final boolean CONDUIT = true;
+    public static final int TIMEOUT = 20000;
+
+    public boolean duplex = true;
+    protected Map<String, MessageConsumer> consumerMap;
+    Map<Thread, Throwable> unhandeledExceptions = new HashMap<Thread, Throwable>();
+
+    private void assertNoUnhandeledExceptions() {
+        for( Entry<Thread, Throwable> e: unhandeledExceptions.entrySet()) {
+            LOG.error("Thread:" + e.getKey() + " Had unexpected: " + e.getValue());
+        }
+        assertTrue("There are no unhandelled exceptions, see: log for detail on: " + unhandeledExceptions,
+                unhandeledExceptions.isEmpty());
+    }
+
+    public NetworkConnector bridge(String from, String to) throws Exception {
+        NetworkConnector networkConnector = bridgeBrokers(from, to, true, -1, CONDUIT);
+        networkConnector.setSuppressDuplicateQueueSubscriptions(true);
+        networkConnector.setDecreaseNetworkConsumerPriority(true);
+        networkConnector.setConsumerTTL(1);
+        networkConnector.setDuplex(duplex);
+        return networkConnector;
+    }
+
+    public static Test suite() {
+        return suite(AMQ4607Test.class);
+    }
+
+    public void initCombos() {
+        addCombinationValues("duplex", new Boolean[]{Boolean.TRUE, Boolean.FALSE});
+    }
+
+    public void testMigratingConsumer() throws Exception {
+        bridge("Broker0", "Broker1");
+        if (!duplex) bridge("Broker1", "Broker0");
+
+        bridge("Broker1", "Broker2");
+        if (!duplex) bridge("Broker2", "Broker1");
+
+        bridge("Broker0", "Broker2");
+        if (!duplex) bridge("Broker2", "Broker0");
+
+        startAllBrokers();
+        this.waitForBridgeFormation();
+
+        Destination dest = createDestination("TEST.FOO", false);
+        sendMessages("Broker0", dest, 1);
+
+        for (int i=0; i< BROKER_COUNT; i++) {
+            MessageConsumer messageConsumer = createConsumer("Broker" + i, dest, "DoNotConsume = 'true'");
+
+            for (int J = 0; J < BROKER_COUNT; J++) {
+                assertExactConsumersConnect("Broker" + J, dest, CONSUMER_COUNT, TIMEOUT);
+            }
+
+            assertNoUnhandeledExceptions();
+
+            assertExactMessageCount("Broker" + i, dest, 1, TIMEOUT);
+
+            messageConsumer.close();
+            LOG.info("Check for no consumers..");
+            for (int J = 0; J < BROKER_COUNT; J++) {
+        	    assertExactConsumersConnect("Broker" + J, dest, 0, TIMEOUT);
+            }
+        }
+
+        // now consume the message
+        final String brokerId = "Broker2";
+        MessageConsumer messageConsumer = createConsumer(brokerId, dest);
+        assertTrue("Consumed ok", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return brokers.get(brokerId).allMessages.getMessageIds().size() == 1;
+            }
+        }));
+        messageConsumer.close();
+
+    }
+
+    public void testMigratingConsumerFullCircle() throws Exception {
+        bridge("Broker0", "Broker1");
+        if (!duplex) bridge("Broker1", "Broker0");
+
+        bridge("Broker1", "Broker2");
+        if (!duplex) bridge("Broker2", "Broker1");
+
+        bridge("Broker0", "Broker2");
+        if (!duplex) bridge("Broker2", "Broker0");
+
+        // allow full loop, immediate replay back to 0 from 2
+        ConditionalNetworkBridgeFilterFactory conditionalNetworkBridgeFilterFactory = new ConditionalNetworkBridgeFilterFactory();
+        conditionalNetworkBridgeFilterFactory.setReplayDelay(0);
+        conditionalNetworkBridgeFilterFactory.setReplayWhenNoConsumers(true);
+        brokers.get("Broker2").broker.getDestinationPolicy().getDefaultEntry().setNetworkBridgeFilterFactory(conditionalNetworkBridgeFilterFactory);
+        startAllBrokers();
+        this.waitForBridgeFormation();
+
+        Destination dest = createDestination("TEST.FOO", false);
+
+        sendMessages("Broker0", dest, 1);
+
+        for (int i=0; i< BROKER_COUNT; i++) {
+            MessageConsumer messageConsumer = createConsumer("Broker" + i, dest, "DoNotConsume = 'true'");
+
+            for (int J = 0; J < BROKER_COUNT; J++) {
+                assertExactConsumersConnect("Broker" + J, dest, CONSUMER_COUNT, TIMEOUT);
+            }
+
+            assertNoUnhandeledExceptions();
+
+            // validate the message has been forwarded
+            assertExactMessageCount("Broker" + i, dest, 1, TIMEOUT);
+
+            messageConsumer.close();
+            LOG.info("Check for no consumers..");
+            for (int J = 0; J < BROKER_COUNT; J++) {
+        	    assertExactConsumersConnect("Broker" + J, dest, 0, TIMEOUT);
+            }
+        }
+
+        // now consume the message from the origin
+        LOG.info("Consume from origin...");
+        final String brokerId = "Broker0";
+        MessageConsumer messageConsumer = createConsumer(brokerId, dest);
+        assertTrue("Consumed ok", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return brokers.get(brokerId).allMessages.getMessageIds().size() == 1;
+            }
+        }));
+        messageConsumer.close();
+
+    }
+
+    protected void assertExactMessageCount(final String brokerName, Destination destination, final int count, long timeout) throws Exception {
+        ManagementContext context = brokers.get(brokerName).broker.getManagementContext();
+        final QueueViewMBean queueViewMBean = (QueueViewMBean) context.newProxyInstance(brokers.get(brokerName).broker.getAdminView().getQueues()[0], QueueViewMBean.class, false);
+        assertTrue("Excepected queue depth: " + count + " on: " + brokerName, Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                long currentCount = queueViewMBean.getQueueSize();
+                LOG.info("On " + brokerName + " current queue size for " + queueViewMBean + ", " + currentCount);
+                if (count != currentCount) {
+                    LOG.info("Sub IDs: " + Arrays.asList(queueViewMBean.getSubscriptions()));
+                }
+                return currentCount == count;
+            }
+        }, timeout));
+    }
+
+    protected void assertExactConsumersConnect(final String brokerName, Destination destination, final int count, long timeout) throws Exception {
+        final ManagementContext context = brokers.get(brokerName).broker.getManagementContext();
+        assertTrue("Excepected consumers count: " + count + " on: " + brokerName, Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                try {
+                    QueueViewMBean queueViewMBean = (QueueViewMBean) context.newProxyInstance(brokers.get(brokerName).broker.getAdminView().getQueues()[0], QueueViewMBean.class, false);
+                    long currentCount = queueViewMBean.getConsumerCount();
+                    LOG.info("On " + brokerName + " current consumer count for " + queueViewMBean + ", " + currentCount);
+                    if (count != currentCount) {
+                        LOG.info("Sub IDs: " + Arrays.asList(queueViewMBean.getSubscriptions()));
+                    }
+                    return currentCount == count;
+                } catch (Exception e) {
+                    LOG.warn("Unexpected: " + e, e);
+                    return false;
+                }
+            }
+        }, timeout));
+    }
+
+    public void setUp() throws Exception {
+        super.setUp();
+
+        unhandeledExceptions.clear();
+        Thread.setDefaultUncaughtExceptionHandler(this);
+        
+        // Setup n brokers
+        for (int i = 0; i < BROKER_COUNT; i++) {
+            createBroker(new URI("broker:(tcp://localhost:6161" + i + ")/Broker" + i + "?persistent=false&useJmx=true"));
+        }
+
+        consumerMap = new LinkedHashMap<String, MessageConsumer>();
+    }
+
+    @Override
+    protected void configureBroker(BrokerService brokerService) {
+        PolicyEntry policyEntry = new PolicyEntry();
+        policyEntry.setExpireMessagesPeriod(0);
+        PolicyMap policyMap = new PolicyMap();
+        policyMap.setDefaultEntry(policyEntry);
+        brokerService.setDestinationPolicy(policyMap);
+    }
+
+    public void uncaughtException(Thread t, Throwable e) {
+        synchronized(unhandeledExceptions) {
+            unhandeledExceptions.put(t,e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4636Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4636Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4636Test.java
new file mode 100644
index 0000000..014d86a
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4636Test.java
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.concurrent.CountDownLatch;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.store.jdbc.DataSourceServiceSupport;
+import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
+import org.apache.activemq.store.jdbc.LeaseDatabaseLocker;
+import org.apache.activemq.store.jdbc.TransactionContext;
+import org.apache.activemq.util.IOHelper;
+import org.apache.activemq.util.LeaseLockerIOExceptionHandler;
+import org.apache.derby.jdbc.EmbeddedDataSource;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import static org.junit.Assert.fail;
+
+/**
+ * Testing how the broker reacts when a SQL Exception is thrown from
+ * org.apache.activemq.store.jdbc.TransactionContext.executeBatch().
+ * <p/>
+ * see https://issues.apache.org/jira/browse/AMQ-4636
+ */
+public class AMQ4636Test {
+
+    private static final String MY_TEST_TOPIC = "MY_TEST_TOPIC";
+    private static final Logger LOG = LoggerFactory
+            .getLogger(AMQ4636Test.class);
+    private String transportUrl = "tcp://0.0.0.0:0";
+    private BrokerService broker;
+    EmbeddedDataSource embeddedDataSource;
+    CountDownLatch throwSQLException = new CountDownLatch(0);
+
+    @Before
+    public void startBroker() throws Exception {
+        broker = createBroker();
+        broker.deleteAllMessages();
+        broker.start();
+        broker.waitUntilStarted();
+        LOG.info("Broker started...");
+    }
+
+    @After
+    public void stopBroker() throws Exception {
+        if (broker != null) {
+            LOG.info("Stopping broker...");
+            broker.stop();
+            broker.waitUntilStopped();
+        }
+        try {
+            if (embeddedDataSource != null) {
+                // ref http://svn.apache.org/viewvc/db/derby/code/trunk/java/testing/org/apache/derbyTesting/junit/JDBCDataSource.java?view=markup
+                embeddedDataSource.setShutdownDatabase("shutdown");
+                embeddedDataSource.getConnection();
+            }
+        } catch (Exception ignored) {
+        } finally {
+            embeddedDataSource.setShutdownDatabase(null);
+        }
+    }
+
+    protected BrokerService createBroker() throws Exception {
+
+        embeddedDataSource = (EmbeddedDataSource) DataSourceServiceSupport.createDataSource(IOHelper.getDefaultDataDirectory());
+        embeddedDataSource.setCreateDatabase("create");
+        embeddedDataSource.getConnection().close();
+
+        //wire in a TestTransactionContext (wrapper to TransactionContext) that has an executeBatch()
+        // method that can be configured to throw a SQL exception on demand
+        JDBCPersistenceAdapter jdbc = new TestJDBCPersistenceAdapter();
+        jdbc.setDataSource(embeddedDataSource);
+
+        jdbc.setLockKeepAlivePeriod(1000l);
+        LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker();
+        leaseDatabaseLocker.setLockAcquireSleepInterval(2000l);
+        jdbc.setLocker(leaseDatabaseLocker);
+
+        broker = new BrokerService();
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry defaultEntry = new PolicyEntry();
+        defaultEntry.setExpireMessagesPeriod(0);
+        policyMap.setDefaultEntry(defaultEntry);
+        broker.setDestinationPolicy(policyMap);
+        broker.setPersistenceAdapter(jdbc);
+
+        broker.setIoExceptionHandler(new LeaseLockerIOExceptionHandler());
+
+        transportUrl = broker.addConnector(transportUrl).getPublishableConnectString();
+        return broker;
+    }
+
+    /**
+     * adding a TestTransactionContext (wrapper to TransactionContext) so an SQLException is triggered
+     * during TransactionContext.executeBatch() when called in the broker.
+     * <p/>
+     * Expectation: SQLException triggers a connection shutdown and failover should kick and try to redeliver the
+     * message. SQLException should NOT be returned to client
+     */
+    @Test
+    public void testProducerWithDBShutdown() throws Exception {
+
+        // failover but timeout in 1 seconds so the test does not hang
+        String failoverTransportURL = "failover:(" + transportUrl
+                + ")?timeout=1000";
+
+        this.createDurableConsumer(MY_TEST_TOPIC, failoverTransportURL);
+
+        this.sendMessage(MY_TEST_TOPIC, failoverTransportURL, false, false);
+
+    }
+
+    @Test
+    public void testTransactedProducerCommitWithDBShutdown() throws Exception {
+
+        // failover but timeout in 1 seconds so the test does not hang
+        String failoverTransportURL = "failover:(" + transportUrl
+                + ")?timeout=1000";
+
+        this.createDurableConsumer(MY_TEST_TOPIC, failoverTransportURL);
+
+        try {
+            this.sendMessage(MY_TEST_TOPIC, failoverTransportURL, true, true);
+            fail("Expect rollback after failover - inddoubt commit");
+        } catch (javax.jms.TransactionRolledBackException expectedInDoubt) {
+            LOG.info("Got rollback after failover failed commit", expectedInDoubt);
+        }
+    }
+
+    @Test
+    public void testTransactedProducerRollbackWithDBShutdown() throws Exception {
+
+        // failover but timeout in 1 seconds so the test does not hang
+        String failoverTransportURL = "failover:(" + transportUrl
+                + ")?timeout=1000";
+
+        this.createDurableConsumer(MY_TEST_TOPIC, failoverTransportURL);
+
+        this.sendMessage(MY_TEST_TOPIC, failoverTransportURL, true, false);
+    }
+
+    public void createDurableConsumer(String topic,
+                                      String transportURL) throws JMSException {
+        Connection connection = null;
+        LOG.info("*** createDurableConsumer() called ...");
+
+        try {
+
+            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
+                    transportURL);
+
+            connection = factory.createConnection();
+            connection.setClientID("myconn1");
+            Session session = connection.createSession(false,
+                    Session.AUTO_ACKNOWLEDGE);
+            Destination destination = session.createTopic(topic);
+
+            TopicSubscriber topicSubscriber = session.createDurableSubscriber(
+                    (Topic) destination, "MySub1");
+        } finally {
+            if (connection != null) {
+                connection.close();
+            }
+        }
+    }
+
+    public void sendMessage(String topic, String transportURL, boolean transacted, boolean commit)
+            throws JMSException {
+        Connection connection = null;
+
+        try {
+
+            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
+                    transportURL);
+
+            connection = factory.createConnection();
+            Session session = connection.createSession(transacted,
+                    transacted ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
+            Destination destination = session.createTopic(topic);
+            MessageProducer producer = session.createProducer(destination);
+            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+            Message m = session.createTextMessage("testMessage");
+            LOG.info("*** send message to broker...");
+
+            // trigger SQL exception in transactionContext
+            throwSQLException = new CountDownLatch(1);
+            producer.send(m);
+
+            if (transacted) {
+                if (commit) {
+                    session.commit();
+                } else {
+                    session.rollback();
+                }
+            }
+
+            LOG.info("*** Finished send message to broker");
+
+        } finally {
+            if (connection != null) {
+                connection.close();
+            }
+        }
+    }
+
+	/*
+     * Mock classes used for testing
+	 */
+
+    public class TestJDBCPersistenceAdapter extends JDBCPersistenceAdapter {
+
+        public TransactionContext getTransactionContext() throws IOException {
+            return new TestTransactionContext(this);
+        }
+    }
+
+    public class TestTransactionContext extends TransactionContext {
+
+        public TestTransactionContext(
+                JDBCPersistenceAdapter jdbcPersistenceAdapter)
+                throws IOException {
+            super(jdbcPersistenceAdapter);
+        }
+
+        @Override
+        public void executeBatch() throws SQLException {
+            if (throwSQLException.getCount() > 0) {
+                // only throw exception once
+                throwSQLException.countDown();
+                throw new SQLException("TEST SQL EXCEPTION");
+            }
+            super.executeBatch();
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4656Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4656Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4656Test.java
new file mode 100644
index 0000000..fcdf23e
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4656Test.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.util.Arrays;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.management.ObjectName;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.BrokerView;
+import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
+import org.apache.activemq.broker.region.policy.FilePendingDurableSubscriberMessageStoragePolicy;
+import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.StorePendingDurableSubscriberMessageStoragePolicy;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(value = Parameterized.class)
+public class AMQ4656Test {
+
+    private static final transient Logger LOG = LoggerFactory.getLogger(AMQ4656Test.class);
+    private static BrokerService brokerService;
+    private static String BROKER_ADDRESS = "tcp://localhost:0";
+
+    private String connectionUri;
+
+    @Parameterized.Parameter
+    public PendingDurableSubscriberMessageStoragePolicy pendingDurableSubPolicy;
+
+    @Parameterized.Parameters(name="{0}")
+    public static Iterable<Object[]> getTestParameters() {
+        return Arrays.asList(new Object[][]{{new FilePendingDurableSubscriberMessageStoragePolicy()},{new StorePendingDurableSubscriberMessageStoragePolicy()}});
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        brokerService = new BrokerService();
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry defaultEntry = new PolicyEntry();
+        defaultEntry.setPendingDurableSubscriberPolicy(pendingDurableSubPolicy);
+        policyMap.setDefaultEntry(defaultEntry);
+        brokerService.setDestinationPolicy(policyMap);
+        brokerService.setPersistent(false);
+        brokerService.setUseJmx(true);
+        brokerService.setDeleteAllMessagesOnStartup(true);
+        connectionUri = brokerService.addConnector(BROKER_ADDRESS).getPublishableConnectString();
+        brokerService.start();
+        brokerService.waitUntilStarted();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        brokerService.stop();
+        brokerService.waitUntilStopped();
+    }
+
+    @Test
+    public void testDurableConsumerEnqueueCountWithZeroPrefetch() throws Exception {
+
+        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri);
+
+        Connection connection = connectionFactory.createConnection();
+        connection.setClientID(getClass().getName());
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination destination = session.createTopic("DurableTopic");
+
+        MessageConsumer consumer = session.createDurableSubscriber((Topic) destination, "EnqueueSub");
+
+        BrokerView view = brokerService.getAdminView();
+        view.getDurableTopicSubscribers();
+
+        ObjectName subName = view.getDurableTopicSubscribers()[0];
+
+        DurableSubscriptionViewMBean sub = (DurableSubscriptionViewMBean)
+            brokerService.getManagementContext().newProxyInstance(subName, DurableSubscriptionViewMBean.class, true);
+
+        assertEquals(0, sub.getEnqueueCounter());
+        assertEquals(0, sub.getDequeueCounter());
+        assertEquals(0, sub.getPendingQueueSize());
+        assertEquals(0, sub.getDispatchedCounter());
+        assertEquals(0, sub.getDispatchedQueueSize());
+
+        consumer.close();
+
+        MessageProducer producer = session.createProducer(destination);
+        for (int i = 0; i < 20; i++) {
+            producer.send(session.createMessage());
+        }
+        producer.close();
+
+        consumer = session.createDurableSubscriber((Topic) destination, "EnqueueSub");
+
+        Thread.sleep(1000);
+
+        assertEquals(20, sub.getEnqueueCounter());
+        assertEquals(0, sub.getDequeueCounter());
+        assertEquals(0, sub.getPendingQueueSize());
+        assertEquals(20, sub.getDispatchedCounter());
+        assertEquals(20, sub.getDispatchedQueueSize());
+
+        LOG.info("Pending Queue Size with no receives: {}", sub.getPendingQueueSize());
+
+        assertNotNull(consumer.receive(1000));
+        assertNotNull(consumer.receive(1000));
+
+        consumer.close();
+
+        Thread.sleep(2000);
+
+        LOG.info("Pending Queue Size with two receives: {}", sub.getPendingQueueSize());
+
+        assertEquals(20, sub.getEnqueueCounter());
+        assertEquals(2, sub.getDequeueCounter());
+        assertEquals(18, sub.getPendingQueueSize());
+        assertEquals(20, sub.getDispatchedCounter());
+        assertEquals(0, sub.getDispatchedQueueSize());
+
+        session.close();
+        connection.close();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4671Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4671Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4671Test.java
new file mode 100644
index 0000000..b69ab47
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4671Test.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.fail;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ4671Test {
+
+    private static final transient Logger LOG = LoggerFactory.getLogger(AMQ4671Test.class);
+    private static BrokerService brokerService;
+    private static String BROKER_ADDRESS = "tcp://localhost:0";
+
+    private String connectionUri;
+
+    @Before
+    public void setUp() throws Exception {
+        brokerService = new BrokerService();
+        brokerService.setPersistent(false);
+        brokerService.setUseJmx(true);
+        brokerService.setDeleteAllMessagesOnStartup(true);
+        connectionUri = brokerService.addConnector(BROKER_ADDRESS).getPublishableConnectString();
+        connectionUri = connectionUri + "?trace=true";
+        brokerService.start();
+        brokerService.waitUntilStarted();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        brokerService.stop();
+        brokerService.waitUntilStopped();
+    }
+
+    @Test
+    public void testNonDurableSubscriberInvalidUnsubscribe() throws Exception {
+        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri);
+
+        Connection connection = connectionFactory.createConnection();
+        connection.setClientID(getClass().getName());
+        connection.start();
+
+        try {
+            Session ts = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            try {
+                ts.unsubscribe("invalid-subscription-name");
+                fail("this should fail");
+            } catch (javax.jms.InvalidDestinationException e) {
+                LOG.info("Test caught correct invalid destination exception");
+            }
+        } finally {
+            if (connection != null) {
+                connection.close();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4677Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4677Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4677Test.java
new file mode 100644
index 0000000..fd80690
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4677Test.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.*;
+import javax.management.ObjectName;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.leveldb.LevelDBStore;
+import org.apache.activemq.leveldb.LevelDBStoreViewMBean;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ4677Test {
+
+    private static final transient Logger LOG = LoggerFactory.getLogger(AMQ4677Test.class);
+    private static BrokerService brokerService;
+
+    @Rule public TestName name = new TestName();
+
+    private File dataDirFile;
+
+    @Before
+    public void setUp() throws Exception {
+
+        dataDirFile = new File("target/LevelDBCleanupTest");
+
+        brokerService = new BrokerService();
+        brokerService.setBrokerName("LevelDBBroker");
+        brokerService.setPersistent(true);
+        brokerService.setUseJmx(true);
+        brokerService.setAdvisorySupport(false);
+        brokerService.setDeleteAllMessagesOnStartup(true);
+        brokerService.setDataDirectoryFile(dataDirFile);
+
+        LevelDBStore persistenceFactory = new LevelDBStore();
+        persistenceFactory.setDirectory(dataDirFile);
+        brokerService.setPersistenceAdapter(persistenceFactory);
+        brokerService.start();
+        brokerService.waitUntilStarted();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        brokerService.stop();
+        brokerService.waitUntilStopped();
+    }
+
+    @Test
+    public void testSendAndReceiveAllMessages() throws Exception {
+        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://LevelDBBroker");
+
+        Connection connection = connectionFactory.createConnection();
+        connection.setClientID(getClass().getName());
+        connection.start();
+
+        final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+        Destination destination = session.createQueue(name.toString());
+        MessageProducer producer = session.createProducer(destination);
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+        final LevelDBStoreViewMBean levelDBView = getLevelDBStoreMBean();
+        assertNotNull(levelDBView);
+        levelDBView.compact();
+
+        final int SIZE = 6 * 1024 * 5;
+        final int MSG_COUNT = 60000;
+        final CountDownLatch done = new CountDownLatch(MSG_COUNT);
+
+        byte buffer[] = new byte[SIZE];
+        for (int i = 0; i < SIZE; ++i) {
+            buffer[i] = (byte) 128;
+        }
+
+        for (int i = 0; i < MSG_COUNT; ++i) {
+            BytesMessage message = session.createBytesMessage();
+            message.writeBytes(buffer);
+            producer.send(message);
+
+            if ((i % 1000) == 0) {
+                LOG.info("Sent message #{}", i);
+                session.commit();
+            }
+        }
+
+        session.commit();
+
+        LOG.info("Finished sending all messages.");
+
+        MessageConsumer consumer = session.createConsumer(destination);
+        consumer.setMessageListener(new MessageListener() {
+
+            @Override
+            public void onMessage(Message message) {
+                if ((done.getCount() % 1000) == 0) {
+                    try {
+                        LOG.info("Received message #{}", MSG_COUNT - done.getCount());
+                        session.commit();
+                    } catch (JMSException e) {
+                    }
+                }
+                done.countDown();
+            }
+        });
+
+        done.await(15, TimeUnit.MINUTES);
+        session.commit();
+        LOG.info("Finished receiving all messages.");
+
+        assertTrue("Should < 3 logfiles left.", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                levelDBView.compact();
+                return countLogFiles() < 3;
+            }
+        }, TimeUnit.MINUTES.toMillis(5), (int)TimeUnit.SECONDS.toMillis(30)));
+
+        levelDBView.compact();
+        LOG.info("Current number of logs {}", countLogFiles());
+    }
+
+    protected long countLogFiles() {
+        String[] logFiles = dataDirFile.list(new FilenameFilter() {
+
+            @Override
+            public boolean accept(File dir, String name) {
+                if (name.endsWith("log")) {
+                    return true;
+                }
+                return false;
+            }
+        });
+
+        LOG.info("Current number of logs {}", logFiles.length);
+        return logFiles.length;
+    }
+
+    protected LevelDBStoreViewMBean getLevelDBStoreMBean() throws Exception {
+        ObjectName levelDbViewMBeanQuery = new ObjectName(
+            "org.apache.activemq:type=Broker,brokerName=LevelDBBroker,service=PersistenceAdapter,instanceName=LevelDB*");
+
+        Set<ObjectName> names = brokerService.getManagementContext().queryNames(null, levelDbViewMBeanQuery);
+        if (names.isEmpty() || names.size() > 1) {
+            throw new java.lang.IllegalStateException("Can't find levelDB store name.");
+        }
+
+        LevelDBStoreViewMBean proxy = (LevelDBStoreViewMBean) brokerService.getManagementContext()
+                .newProxyInstance(names.iterator().next(), LevelDBStoreViewMBean.class, true);
+        return proxy;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4853Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4853Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4853Test.java
new file mode 100644
index 0000000..a347279
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4853Test.java
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.advisory.AdvisoryBroker;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.SessionId;
+import org.apache.activemq.command.SessionInfo;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ4853Test {
+
+    private static final transient Logger LOG = LoggerFactory.getLogger(AMQ4853Test.class);
+    private static BrokerService brokerService;
+    private static final String BROKER_ADDRESS = "tcp://localhost:0";
+    private static final ActiveMQQueue DESTINATION = new ActiveMQQueue("TEST.QUEUE");
+    private CountDownLatch cycleDoneLatch;
+
+    private String connectionUri;
+
+    @Before
+    public void setUp() throws Exception {
+        brokerService = new BrokerService();
+        brokerService.setPersistent(false);
+        brokerService.setUseJmx(false);
+        brokerService.setAdvisorySupport(true);
+        brokerService.setDeleteAllMessagesOnStartup(true);
+        connectionUri = brokerService.addConnector(BROKER_ADDRESS).getPublishableConnectString();
+
+        brokerService.start();
+        brokerService.waitUntilStarted();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        brokerService.stop();
+        brokerService.waitUntilStopped();
+    }
+
+    /**
+     * Test to shows the performance of the removing consumers while other stay active.
+     * @throws Exception
+     */
+    @Ignore
+    @Test
+    public void test() throws Exception {
+
+        // Create a stable set of consumers to fill in the advisory broker's consumer list.
+        ArrayList<Consumer> fixedConsumers = new ArrayList<Consumer>(100);
+        for (int i = 0; i < 200; ++i) {
+            fixedConsumers.add(new Consumer());
+        }
+
+        // Create a set of consumers that comes online for a short time and then
+        // goes offline again.  Cycles will repeat as each batch completes
+        final int fixedDelayConsumers = 300;
+        final int fixedDelayCycles = 25;
+
+        final CountDownLatch fixedDelayCycleLatch = new CountDownLatch(fixedDelayCycles);
+
+        // Update so done method can track state.
+        cycleDoneLatch = fixedDelayCycleLatch;
+
+        CyclicBarrier barrier = new CyclicBarrier(fixedDelayConsumers, new Runnable() {
+            @Override
+            public void run() {
+                LOG.info("Fixed delay consumers cycle {} completed.", fixedDelayCycleLatch.getCount());
+                fixedDelayCycleLatch.countDown();
+            }
+        });
+
+        for (int i = 0; i < fixedDelayConsumers; ++i) {
+            new Thread(new FixedDelyConsumer(barrier)).start();
+        }
+
+        fixedDelayCycleLatch.await(10, TimeUnit.MINUTES);
+
+        // Clean up.
+
+        for (Consumer consumer : fixedConsumers) {
+            consumer.close();
+        }
+        fixedConsumers.clear();
+    }
+
+    private ConnectionInfo createConnectionInfo() {
+        ConnectionId id = new ConnectionId();
+        id.setValue("ID:123456789:0:1");
+
+        ConnectionInfo info = new ConnectionInfo();
+        info.setConnectionId(id);
+        return info;
+    }
+
+    private SessionInfo createSessionInfo(ConnectionInfo connection) {
+        SessionId id = new SessionId(connection.getConnectionId(), 1);
+
+        SessionInfo info = new SessionInfo();
+        info.setSessionId(id);
+
+        return info;
+    }
+
+    public ConsumerInfo createConsumerInfo(SessionInfo session, int value, ActiveMQDestination destination) {
+        ConsumerId id = new ConsumerId();
+        id.setConnectionId(session.getSessionId().getConnectionId());
+        id.setSessionId(1);
+        id.setValue(value);
+
+        ConsumerInfo info = new ConsumerInfo();
+        info.setConsumerId(id);
+        info.setDestination(destination);
+        return info;
+    }
+
+    /**
+     * Test to shows the performance impact of removing consumers in various scenarios.
+     * @throws Exception
+     */
+    @Ignore
+    @Test
+    public void testPerformanceOfRemovals() throws Exception {
+        // setup
+        AdvisoryBroker testObj = (AdvisoryBroker) brokerService.getBroker().getAdaptor(AdvisoryBroker.class);
+        ActiveMQDestination destination = new ActiveMQQueue("foo");
+        ConnectionInfo connectionInfo = createConnectionInfo();
+        ConnectionContext connectionContext = new ConnectionContext(connectionInfo);
+        connectionContext.setBroker(brokerService.getBroker());
+        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+
+        long start = System.currentTimeMillis();
+
+        for (int i = 0; i < 200; ++i) {
+
+            for (int j = 1; j <= 500; j++) {
+                ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, j, destination);
+                testObj.addConsumer(connectionContext, consumerInfo);
+            }
+
+            for (int j = 500; j > 0; j--) {
+                ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, j, destination);
+                testObj.removeConsumer(connectionContext, consumerInfo);
+            }
+
+            for (int j = 1; j <= 500; j++) {
+                ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, j, destination);
+                testObj.addConsumer(connectionContext, consumerInfo);
+            }
+
+            for (int j = 1; j <= 500; j++) {
+                ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, j, destination);
+                testObj.removeConsumer(connectionContext, consumerInfo);
+            }
+        }
+
+        long finish = System.currentTimeMillis();
+
+        long totalTime = finish - start;
+
+        LOG.info("Total test time: {} seconds", TimeUnit.MILLISECONDS.toSeconds(totalTime));
+
+        assertEquals(0, testObj.getAdvisoryConsumers().size());
+    }
+
+    @Test
+    public void testEqualsNeeded() throws Exception {
+        // setup
+        AdvisoryBroker testObj = (AdvisoryBroker) brokerService.getBroker().getAdaptor(AdvisoryBroker.class);
+        ActiveMQDestination destination = new ActiveMQQueue("foo");
+        ConnectionInfo connectionInfo = createConnectionInfo();
+        ConnectionContext connectionContext = new ConnectionContext(connectionInfo);
+        connectionContext.setBroker(brokerService.getBroker());
+        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+
+        for (int j = 1; j <= 5; j++) {
+            ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, j, destination);
+            testObj.addConsumer(connectionContext, consumerInfo);
+        }
+
+        for (int j = 1; j <= 5; j++) {
+            ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, j, destination);
+            testObj.removeConsumer(connectionContext, consumerInfo);
+        }
+
+        assertEquals(0, testObj.getAdvisoryConsumers().size());
+    }
+
+    private boolean done() {
+        if (cycleDoneLatch == null) {
+            return true;
+        }
+        return cycleDoneLatch.getCount() == 0;
+    }
+
+    class Consumer implements MessageListener {
+
+        Connection connection;
+        Session session;
+        Destination destination;
+        MessageConsumer consumer;
+
+        Consumer() throws JMSException {
+            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
+            connection = factory.createConnection();
+            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            consumer = session.createConsumer(DESTINATION);
+            consumer.setMessageListener(this);
+            connection.start();
+        }
+
+        @Override
+        public void onMessage(Message message) {
+        }
+
+        public void close() {
+            try {
+                connection.close();
+            } catch(Exception e) {
+            }
+
+            connection = null;
+            session = null;
+            consumer = null;
+        }
+    }
+
+    class FixedDelyConsumer implements Runnable {
+
+        private final CyclicBarrier barrier;
+        private final int sleepInterval;
+
+        public FixedDelyConsumer(CyclicBarrier barrier) {
+            this.barrier = barrier;
+            this.sleepInterval = 1000;
+        }
+
+        public FixedDelyConsumer(CyclicBarrier barrier, int sleepInterval) {
+            this.barrier = barrier;
+            this.sleepInterval = sleepInterval;
+        }
+
+        @Override
+        public void run() {
+            while (!done()) {
+
+                try {
+                    Consumer consumer = new Consumer();
+                    TimeUnit.MILLISECONDS.sleep(sleepInterval);
+                    consumer.close();
+                    barrier.await();
+                } catch (Exception ex) {
+                    return;
+                }
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4887Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4887Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4887Test.java
new file mode 100644
index 0000000..cf33ece
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4887Test.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ4887Test {
+    private static final transient Logger LOG = LoggerFactory.getLogger(AMQ4887Test.class);
+    private static final Integer ITERATIONS = 10;
+
+    @Rule
+    public TestName name = new TestName();
+
+    @Test
+    public void testBytesMessageSetPropertyBeforeCopy() throws Exception {
+        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
+        ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
+        connection.start();
+        doTestBytesMessageSetPropertyBeforeCopy(connection);
+    }
+
+    @Test
+    public void testBytesMessageSetPropertyBeforeCopyCompressed() throws Exception {
+        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
+        connectionFactory.setUseCompression(true);
+        ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
+        connection.start();
+        doTestBytesMessageSetPropertyBeforeCopy(connection);
+    }
+
+    public void doTestBytesMessageSetPropertyBeforeCopy(Connection connection) throws Exception {
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination destination = session.createQueue(name.toString());
+        MessageConsumer consumer = session.createConsumer(destination);
+        MessageProducer producer = session.createProducer(destination);
+
+        BytesMessage message = session.createBytesMessage();
+
+        for (int i=0; i < ITERATIONS; i++) {
+
+            long sendTime = System.currentTimeMillis();
+            message.setLongProperty("sendTime", sendTime);
+            producer.send(message);
+
+            LOG.debug("Receiving message " + i);
+            Message receivedMessage =  consumer.receive(5000);
+            assertNotNull("On message " + i, receivedMessage);
+            assertTrue("On message " + i, receivedMessage instanceof BytesMessage);
+
+            BytesMessage receivedBytesMessage = (BytesMessage) receivedMessage;
+
+            int numElements = 0;
+            try {
+                while (true) {
+                    receivedBytesMessage.readBoolean();
+                    numElements++;
+                }
+            } catch (Exception ex) {
+            }
+
+            LOG.info("Iteration [{}]: Received Message contained {} boolean values.", i, numElements);
+            assertEquals(i, numElements);
+
+            long receivedSendTime = receivedBytesMessage.getLongProperty("sendTime");
+            assertEquals("On message " + i, receivedSendTime, sendTime);
+
+            // Add a new bool value on each iteration.
+            message.writeBoolean(true);
+        }
+    }
+
+    @Test
+    public void testStreamMessageSetPropertyBeforeCopy() throws Exception {
+        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
+        ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
+        connection.start();
+        doTestStreamMessageSetPropertyBeforeCopy(connection);
+    }
+
+    @Test
+    public void testStreamMessageSetPropertyBeforeCopyCompressed() throws Exception {
+        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
+        connectionFactory.setUseCompression(true);
+        ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
+        connection.start();
+        doTestStreamMessageSetPropertyBeforeCopy(connection);
+    }
+
+    public void doTestStreamMessageSetPropertyBeforeCopy(Connection connection) throws Exception {
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination destination = session.createQueue(name.toString());
+        MessageConsumer consumer = session.createConsumer(destination);
+        MessageProducer producer = session.createProducer(destination);
+
+        StreamMessage message = session.createStreamMessage();
+
+        for (int i=0; i < ITERATIONS; i++) {
+
+            long sendTime = System.currentTimeMillis();
+            message.setLongProperty("sendTime", sendTime);
+            producer.send(message);
+
+            LOG.debug("Receiving message " + i);
+            Message receivedMessage =  consumer.receive(5000);
+            assertNotNull("On message " + i, receivedMessage);
+            assertTrue("On message " + i, receivedMessage instanceof StreamMessage);
+
+            StreamMessage receivedStreamMessage = (StreamMessage) receivedMessage;
+
+            int numElements = 0;
+            try {
+                while (true) {
+                    receivedStreamMessage.readBoolean();
+                    numElements++;
+                }
+            } catch (Exception ex) {
+            }
+
+            LOG.info("Iteration [{}]: Received Message contained {} boolean values.", i, numElements);
+            assertEquals(i, numElements);
+
+            long receivedSendTime = receivedStreamMessage.getLongProperty("sendTime");
+            assertEquals("On message " + i, receivedSendTime, sendTime);
+
+            // Add a new bool value on each iteration.
+            message.writeBoolean(true);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4893Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4893Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4893Test.java
new file mode 100644
index 0000000..026a4be
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4893Test.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.bugs;
+
+import java.io.IOException;
+import java.util.Map;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.command.ActiveMQObjectMessage;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.util.ByteSequence;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ4893Test {
+
+    private static final transient Logger LOG = LoggerFactory.getLogger(AMQ4893Test.class);
+
+    @Test
+    public void testPropertiesInt() throws Exception {
+        ActiveMQObjectMessage message = new ActiveMQObjectMessage();
+        message.setIntProperty("TestProp", 333);
+        fakeUnmarshal(message);
+        roundTripProperties(message);
+    }
+
+    @Test
+    public void testPropertiesString() throws Exception {
+        ActiveMQObjectMessage message = new ActiveMQObjectMessage();
+        message.setStringProperty("TestProp", "Value");
+        fakeUnmarshal(message);
+        roundTripProperties(message);
+    }
+
+    @Test
+    public void testPropertiesObject() throws Exception {
+        ActiveMQObjectMessage message = new ActiveMQObjectMessage();
+        message.setObjectProperty("TestProp", "Value");
+        fakeUnmarshal(message);
+        roundTripProperties(message);
+    }
+
+    @Test
+    public void testPropertiesObjectNoMarshalling() throws Exception {
+        ActiveMQObjectMessage message = new ActiveMQObjectMessage();
+        message.setObjectProperty("TestProp", "Value");
+        roundTripProperties(message);
+    }
+
+    private void roundTripProperties(ActiveMQObjectMessage message) throws IOException, JMSException {
+        ActiveMQObjectMessage copy = new ActiveMQObjectMessage();
+        for (Map.Entry<String, Object> prop : message.getProperties().entrySet()) {
+            LOG.debug("{} -> {}", prop.getKey(), prop.getValue().getClass());
+            copy.setObjectProperty(prop.getKey(), prop.getValue());
+        }
+    }
+
+    private void fakeUnmarshal(ActiveMQObjectMessage message) throws IOException {
+        // we need to force the unmarshalled property field to be set so it
+        // gives us a hawtbuffer for the string
+        OpenWireFormat format = new OpenWireFormat();
+        message.beforeMarshall(format);
+        message.afterMarshall(format);
+
+        ByteSequence seq = message.getMarshalledProperties();
+        message.clearProperties();
+        message.setMarshalledProperties(seq);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4899Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4899Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4899Test.java
new file mode 100644
index 0000000..81140ce
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4899Test.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.DestinationInterceptor;
+import org.apache.activemq.broker.region.virtual.VirtualDestination;
+import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
+import org.apache.activemq.broker.region.virtual.VirtualTopic;
+import org.apache.activemq.plugin.SubQueueSelectorCacheBrokerPlugin;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+
+public class AMQ4899Test {
+    protected static final Logger LOG = LoggerFactory.getLogger(AMQ4899Test.class);
+    private static final String QUEUE_NAME="AMQ4899TestQueue";
+    private static final String CONSUMER_QUEUE="Consumer.Orders.VirtualOrders." + QUEUE_NAME;
+    private static final String PRODUCER_DESTINATION_NAME = "VirtualOrders." + QUEUE_NAME;
+
+    private static final Integer MESSAGE_LIMIT = 20;
+    public static final String CONSUMER_A_SELECTOR = "Order < " + 10;
+    public static  String CONSUMER_B_SELECTOR = "Order >= " + 10;
+    private CountDownLatch consumersStarted = new CountDownLatch(2);
+    private CountDownLatch consumerAtoConsumeCount= new CountDownLatch(10);
+    private CountDownLatch consumerBtoConsumeCount = new CountDownLatch(10);
+
+    private BrokerService broker;
+
+    @Before
+    public void setUp() {
+        setupBroker("broker://()/localhost?");
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testVirtualTopicMultipleSelectors() throws Exception{
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
+        Connection connection = factory.createConnection();
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        Queue consumerQueue = session.createQueue(CONSUMER_QUEUE);
+
+        MessageListener listenerA = new AMQ4899Listener("A", consumersStarted, consumerAtoConsumeCount);
+        MessageConsumer consumerA = session.createConsumer(consumerQueue, CONSUMER_A_SELECTOR);
+        consumerA.setMessageListener(listenerA);
+
+        MessageListener listenerB = new AMQ4899Listener("B", consumersStarted, consumerBtoConsumeCount);
+        MessageConsumer consumerB = session.createConsumer(consumerQueue, CONSUMER_B_SELECTOR);
+        consumerB.setMessageListener(listenerB);
+
+        consumersStarted.await(10, TimeUnit.SECONDS);
+        assertEquals("Not all consumers started in time", 0, consumersStarted.getCount());
+
+        Destination producerDestination = session.createTopic(PRODUCER_DESTINATION_NAME);
+        MessageProducer producer = session.createProducer(producerDestination);
+        int messageIndex = 0;
+        for (int i=0; i < MESSAGE_LIMIT; i++) {
+            if (i==3) {
+                LOG.debug("Stopping consumerA");
+                consumerA.close();
+            }
+
+            if (i == 14) {
+                LOG.debug("Stopping consumer B");
+                consumerB.close();
+            }
+            String messageText = "hello " + messageIndex++ + " sent at " + new java.util.Date().toString();
+            TextMessage message = session.createTextMessage(messageText);
+            message.setIntProperty("Order", i);
+            LOG.debug("Sending message [{}]", messageText);
+            producer.send(message);
+            Thread.sleep(100);
+        }
+        Thread.sleep(1 * 1000);
+
+        // restart consumerA
+        LOG.debug("Restarting consumerA");
+        consumerA = session.createConsumer(consumerQueue, CONSUMER_A_SELECTOR);
+        consumerA.setMessageListener(listenerA);
+
+        // restart consumerB
+        LOG.debug("restarting consumerB");
+        consumerB = session.createConsumer(consumerQueue, CONSUMER_B_SELECTOR);
+        consumerB.setMessageListener(listenerB);
+
+        consumerAtoConsumeCount.await(5, TimeUnit.SECONDS);
+        consumerBtoConsumeCount.await(5, TimeUnit.SECONDS);
+
+        LOG.debug("Unconsumed messages for consumerA {} consumerB {}", consumerAtoConsumeCount.getCount(), consumerBtoConsumeCount.getCount());
+
+        assertEquals("Consumer A did not consume all messages", 0, consumerAtoConsumeCount.getCount());
+        assertEquals("Consumer B did not consume all messages", 0, consumerBtoConsumeCount.getCount());
+
+        connection.close();
+    }
+
+    /**
+     * Setup broker with VirtualTopic configured
+     */
+    private void setupBroker(String uri) {
+        try {
+            broker = BrokerFactory.createBroker(uri);
+
+            VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor();
+            VirtualTopic virtualTopic = new VirtualTopic();
+            virtualTopic.setName("VirtualOrders.>");
+            virtualTopic.setSelectorAware(true);
+            VirtualDestination[] virtualDestinations = { virtualTopic };
+            interceptor.setVirtualDestinations(virtualDestinations);
+            broker.setDestinationInterceptors(new DestinationInterceptor[]{interceptor});
+
+            SubQueueSelectorCacheBrokerPlugin subQueueSelectorCacheBrokerPlugin = new SubQueueSelectorCacheBrokerPlugin();
+            BrokerPlugin[] updatedPlugins = {subQueueSelectorCacheBrokerPlugin};
+            broker.setPlugins(updatedPlugins);
+
+            broker.start();
+            broker.waitUntilStarted();
+        } catch (Exception e) {
+            LOG.error("Failed creating broker", e);
+        }
+    }
+}
+
+class AMQ4899Listener implements MessageListener {
+    Logger LOG = LoggerFactory.getLogger(AMQ4899Listener.class);
+    CountDownLatch toConsume;
+    String id;
+
+    public AMQ4899Listener(String id, CountDownLatch started, CountDownLatch toConsume) {
+        this.id = id;
+        this.toConsume = toConsume;
+        started.countDown();
+    }
+
+    @Override
+    public void onMessage(Message message) {
+        toConsume.countDown();
+        try {
+            if (message instanceof TextMessage) {
+                TextMessage textMessage = (TextMessage) message;
+                LOG.debug("Listener {} received [{}]", id, textMessage.getText());
+            } else {
+                LOG.error("Listener {} Expected a TextMessage, got {}", id, message.getClass().getCanonicalName());
+            }
+        } catch (JMSException e) {
+            LOG.error("Unexpected JMSException in Listener " + id, e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java
new file mode 100644
index 0000000..e65ad91
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ4930Test extends TestCase {
+    private static final Logger LOG = LoggerFactory.getLogger(AMQ4930Test.class);
+    final int messageCount = 150;
+    final int messageSize = 1024*1024;
+    final int maxBrowsePageSize = 50;
+    final ActiveMQQueue bigQueue = new ActiveMQQueue("BIG");
+    BrokerService broker;
+    ActiveMQConnectionFactory factory;
+
+    protected void configureBroker() throws Exception {
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.setAdvisorySupport(false);
+        broker.getSystemUsage().getMemoryUsage().setLimit(1*1024*1024);
+
+        PolicyMap pMap = new PolicyMap();
+        PolicyEntry policy = new PolicyEntry();
+        // disable expriy processing as this will call browse in parallel
+        policy.setExpireMessagesPeriod(0);
+        policy.setMaxPageSize(maxBrowsePageSize);
+        policy.setMaxBrowsePageSize(maxBrowsePageSize);
+        pMap.setDefaultEntry(policy);
+
+        broker.setDestinationPolicy(pMap);
+    }
+
+    public void testBrowsePendingNonPersistent() throws Exception {
+        doTestBrowsePending(DeliveryMode.NON_PERSISTENT);
+    }
+
+    public void testBrowsePendingPersistent() throws Exception {
+        doTestBrowsePending(DeliveryMode.PERSISTENT);
+    }
+
+    public void testWithStatsDisabled() throws Exception {
+        ((RegionBroker)broker.getRegionBroker()).getDestinationStatistics().setEnabled(false);
+        doTestBrowsePending(DeliveryMode.PERSISTENT);
+    }
+
+    public void doTestBrowsePending(int deliveryMode) throws Exception {
+
+        Connection connection = factory.createConnection();
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(bigQueue);
+        producer.setDeliveryMode(deliveryMode);
+        BytesMessage bytesMessage = session.createBytesMessage();
+        bytesMessage.writeBytes(new byte[messageSize]);
+
+        for (int i = 0; i < messageCount; i++) {
+            producer.send(bigQueue, bytesMessage);
+        }
+
+        final QueueViewMBean queueViewMBean = (QueueViewMBean)
+                broker.getManagementContext().newProxyInstance(broker.getAdminView().getQueues()[0], QueueViewMBean.class, false);
+
+        LOG.info(queueViewMBean.getName() + " Size: " + queueViewMBean.getEnqueueCount());
+
+        connection.close();
+
+        assertFalse("Cache disabled on q", queueViewMBean.isCacheEnabled());
+
+        // ensure repeated browse does now blow mem
+
+        final Queue underTest = (Queue) ((RegionBroker)broker.getRegionBroker()).getQueueRegion().getDestinationMap().get(bigQueue);
+
+        // do twice to attempt to pull in 2*maxBrowsePageSize which uses up the system memory limit
+        Message[] browsed = underTest.browse();
+        LOG.info("Browsed: " + browsed.length);
+        assertEquals("maxBrowsePageSize", maxBrowsePageSize, browsed.length);
+        browsed = underTest.browse();
+        LOG.info("Browsed: " + browsed.length);
+        assertEquals("maxBrowsePageSize", maxBrowsePageSize, browsed.length);
+        Runtime.getRuntime().gc();
+        long free = Runtime.getRuntime().freeMemory()/1024;
+        LOG.info("free at start of check: " + free);
+        // check for memory growth
+        for (int i=0; i<10; i++) {
+            LOG.info("free: " + Runtime.getRuntime().freeMemory()/1024);
+            browsed = underTest.browse();
+            LOG.info("Browsed: " + browsed.length);
+            assertEquals("maxBrowsePageSize", maxBrowsePageSize, browsed.length);
+            Runtime.getRuntime().gc();
+            Runtime.getRuntime().gc();
+            assertTrue("No growth: " + Runtime.getRuntime().freeMemory()/1024 + " >= " + (free - (free * 0.2)), Runtime.getRuntime().freeMemory()/1024 >= (free - (free * 0.2)));
+        }
+    }
+
+
+    protected void setUp() throws Exception {
+        super.setUp();
+        broker = new BrokerService();
+        broker.setBrokerName("thisOne");
+        configureBroker();
+        broker.start();
+        factory = new ActiveMQConnectionFactory("vm://thisOne?jms.alwaysSyncSend=true");
+        factory.setWatchTopicAdvisories(false);
+
+    }
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        if (broker != null) {
+            broker.stop();
+            broker = null;
+        }
+    }
+
+}
\ No newline at end of file


Mime
View raw message