activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [10/52] [abbrv] [partial] activemq-artemis git commit: ARTEMIS-127 Adding activemq unit test module to Artemis
Date Tue, 09 Jun 2015 16:36:41 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempStorageBlockedBrokerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempStorageBlockedBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempStorageBlockedBrokerTest.java
new file mode 100644
index 0000000..066fb07
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempStorageBlockedBrokerTest.java
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.io.File;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.ResourceAllocationException;
+import javax.jms.Session;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
+import org.apache.activemq.TestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.store.kahadb.plist.PListStoreImpl;
+import org.apache.activemq.usage.MemoryUsage;
+import org.apache.activemq.usage.StoreUsage;
+import org.apache.activemq.usage.SystemUsage;
+import org.apache.activemq.usage.TempUsage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TempStorageBlockedBrokerTest extends TestSupport {
+
+    public int deliveryMode = DeliveryMode.PERSISTENT;
+
+    private static final Logger LOG = LoggerFactory.getLogger(TempStorageBlockedBrokerTest.class);
+    private static final int MESSAGES_COUNT = 1000;
+    private static byte[] buf = new byte[4 * 1024];
+    private BrokerService broker;
+    AtomicInteger messagesSent = new AtomicInteger(0);
+    AtomicInteger messagesConsumed = new AtomicInteger(0);
+
+    protected long messageReceiveTimeout = 10000L;
+
+    Destination destination = new ActiveMQTopic("FooTwo");
+
+    private String connectionUri;
+
+    public void testRunProducerWithHungConsumer() throws Exception {
+
+        final long origTempUsage = broker.getSystemUsage().getTempUsage().getUsage();
+
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
+        // ensure messages are spooled to disk for this consumer
+        ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy();
+        prefetch.setTopicPrefetch(10);
+        factory.setPrefetchPolicy(prefetch);
+        Connection consumerConnection = factory.createConnection();
+        consumerConnection.start();
+
+        Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = consumerSession.createConsumer(destination);
+
+        final Connection producerConnection = factory.createConnection();
+        producerConnection.start();
+
+        final CountDownLatch producerHasSentTenMessages = new CountDownLatch(10);
+        Thread producingThread = new Thread("Producing thread") {
+            @Override
+            public void run() {
+                try {
+                    Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                    MessageProducer producer = session.createProducer(destination);
+                    producer.setDeliveryMode(deliveryMode);
+                    for (int idx = 0; idx < MESSAGES_COUNT; ++idx) {
+                        Message message = session.createTextMessage(new String(buf) + idx);
+
+                        producer.send(message);
+                        messagesSent.incrementAndGet();
+                        producerHasSentTenMessages.countDown();
+                        Thread.sleep(10);
+                        if (idx != 0 && idx%100 == 0) {
+                            LOG.info("Sent Message " + idx);
+                            LOG.info("Temp Store Usage " + broker.getSystemUsage().getTempUsage().getUsage());
+                        }
+                    }
+                    producer.close();
+                    session.close();
+                } catch (Throwable ex) {
+                    ex.printStackTrace();
+                }
+            }
+        };
+        producingThread.start();
+
+        assertTrue("producer has sent 10 in a reasonable time", producerHasSentTenMessages.await(30, TimeUnit.SECONDS));
+
+        int count = 0;
+
+        Message m = null;
+        while ((m = consumer.receive(messageReceiveTimeout)) != null) {
+            count++;
+            if (count != 0 && count%10 == 0) {
+                LOG.info("Recieved Message (" + count + "):" + m);
+            }
+            messagesConsumed.incrementAndGet();
+            try {
+                Thread.sleep(100);
+            } catch (Exception e) {
+                LOG.info("error sleeping");
+            }
+        }
+
+        LOG.info("Connection Timeout: Retrying.. count: " + count);
+
+        while ((m = consumer.receive(messageReceiveTimeout)) != null) {
+            count++;
+            if (count != 0 && count%100 == 0) {
+                LOG.info("Recieved Message (" + count + "):" + m);
+            }
+            messagesConsumed.incrementAndGet();
+            try {
+                Thread.sleep(100);
+            } catch (Exception e) {
+                LOG.info("error sleeping");
+            }
+        }
+
+        LOG.info("consumer session closing: consumed count: " + count);
+
+        consumerSession.close();
+
+        producingThread.join();
+
+        final long tempUsageBySubscription = broker.getSystemUsage().getTempUsage().getUsage();
+        LOG.info("Orig Usage: " + origTempUsage + ", currentUsage: " + tempUsageBySubscription);
+
+        producerConnection.close();
+        consumerConnection.close();
+
+        LOG.info("Subscrition Usage: " + tempUsageBySubscription + ", endUsage: "
+                + broker.getSystemUsage().getTempUsage().getUsage());
+
+        // do a cleanup
+        ((PListStoreImpl)broker.getTempDataStore()).run();
+        LOG.info("Subscrition Usage: " + tempUsageBySubscription + ", endUsage: "
+                        + broker.getSystemUsage().getTempUsage().getUsage());
+
+        assertEquals("Incorrect number of Messages Sent: " + messagesSent.get(), messagesSent.get(), MESSAGES_COUNT);
+        assertEquals("Incorrect number of Messages Consumed: " + messagesConsumed.get(), messagesConsumed.get(),
+                MESSAGES_COUNT);
+    }
+
+    public void testFillTempAndConsume() throws Exception {
+
+        broker.getSystemUsage().setSendFailIfNoSpace(true);
+        destination = new ActiveMQQueue("Foo");
+
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
+        final ActiveMQConnection producerConnection = (ActiveMQConnection) factory.createConnection();
+        // so we can easily catch the ResourceAllocationException on send
+        producerConnection.setAlwaysSyncSend(true);
+        producerConnection.start();
+
+        Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(destination);
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+        try {
+            while (true) {
+                Message message = session.createTextMessage(new String(buf) + messagesSent.toString());
+                producer.send(message);
+                messagesSent.incrementAndGet();
+                if (messagesSent.get() % 100 == 0) {
+                    LOG.info("Sent Message " + messagesSent.get());
+                    LOG.info("Temp Store Usage " + broker.getSystemUsage().getTempUsage().getUsage());
+                }
+            }
+        } catch (ResourceAllocationException ex) {
+            LOG.info("Got resource exception : " + ex + ", after sent: " + messagesSent.get());
+        }
+
+        // consume all sent
+        Connection consumerConnection = factory.createConnection();
+        consumerConnection.start();
+
+        Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = consumerSession.createConsumer(destination);
+
+
+        while (consumer.receive(messageReceiveTimeout) != null) {
+            messagesConsumed.incrementAndGet();
+            if (messagesConsumed.get() % 1000 == 0) {
+                LOG.info("received Message " + messagesConsumed.get());
+                LOG.info("Temp Store Usage " + broker.getSystemUsage().getTempUsage().getUsage());
+            }
+        }
+
+        assertEquals("Incorrect number of Messages Consumed: " + messagesConsumed.get(), messagesConsumed.get(),
+                messagesSent.get());
+    }
+
+    @Override
+    public void setUp() throws Exception {
+
+        broker = new BrokerService();
+        broker.setDataDirectory("target" + File.separator + "activemq-data");
+        broker.setPersistent(true);
+        broker.setUseJmx(true);
+        broker.setAdvisorySupport(false);
+        broker.setDeleteAllMessagesOnStartup(true);
+
+        setDefaultPersistenceAdapter(broker);
+        SystemUsage sysUsage = broker.getSystemUsage();
+        MemoryUsage memUsage = new MemoryUsage();
+        memUsage.setLimit((1024 * 1024));
+        StoreUsage storeUsage = new StoreUsage();
+        storeUsage.setLimit((1024 * 1024) * 38);
+        TempUsage tmpUsage = new TempUsage();
+        tmpUsage.setLimit((1024 * 1024) * 38);
+
+        PolicyEntry defaultPolicy = new PolicyEntry();
+        // defaultPolicy.setTopic("FooTwo");
+        defaultPolicy.setProducerFlowControl(false);
+        defaultPolicy.setMemoryLimit(10 * 1024);
+
+        PolicyMap policyMap = new PolicyMap();
+        policyMap.setDefaultEntry(defaultPolicy);
+
+        sysUsage.setMemoryUsage(memUsage);
+        sysUsage.setStoreUsage(storeUsage);
+        sysUsage.setTempUsage(tmpUsage);
+
+        broker.setDestinationPolicy(policyMap);
+        broker.setSystemUsage(sysUsage);
+
+        broker.addConnector("tcp://localhost:0").setName("Default");
+        broker.start();
+
+        connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        if (broker != null) {
+            broker.stop();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempStorageConfigBrokerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempStorageConfigBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempStorageConfigBrokerTest.java
new file mode 100644
index 0000000..1061346
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempStorageConfigBrokerTest.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.ResourceAllocationException;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.plist.PListStoreImpl;
+import org.junit.After;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test that when configuring small temp store limits the journal size must also
+ * be smaller than the configured limit, but will still send a ResourceAllocationException
+ * if its not when sendFailIfNoSpace is enabled.
+ */
+public class TempStorageConfigBrokerTest {
+
+    public int deliveryMode = DeliveryMode.PERSISTENT;
+
+    private static final Logger LOG = LoggerFactory.getLogger(TempStorageConfigBrokerTest.class);
+    private static byte[] buf = new byte[4 * 1024];
+    private BrokerService broker;
+    private AtomicInteger messagesSent = new AtomicInteger(0);
+    private AtomicInteger messagesConsumed = new AtomicInteger(0);
+
+    private String brokerUri;
+    private long messageReceiveTimeout = 10000L;
+    private Destination destination = new ActiveMQTopic("FooTwo");
+
+    @Test(timeout=360000)
+    @Ignore("blocks in hudson, needs investigation")
+    public void testFillTempAndConsumeWithBadTempStoreConfig() throws Exception {
+
+        createBrokerWithInvalidTempStoreConfig();
+
+        broker.getSystemUsage().setSendFailIfNoSpace(true);
+        destination = new ActiveMQQueue("Foo");
+
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUri);
+        final ActiveMQConnection producerConnection = (ActiveMQConnection) factory.createConnection();
+        // so we can easily catch the ResourceAllocationException on send
+        producerConnection.setAlwaysSyncSend(true);
+        producerConnection.start();
+
+        Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(destination);
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+        try {
+            while (true) {
+                Message message = session.createTextMessage(new String(buf) + messagesSent.toString());
+                producer.send(message);
+                messagesSent.incrementAndGet();
+                if (messagesSent.get() % 100 == 0) {
+                    LOG.info("Sent Message " + messagesSent.get());
+                    LOG.info("Temp Store Usage " + broker.getSystemUsage().getTempUsage().getUsage());
+                }
+            }
+        } catch (ResourceAllocationException ex) {
+            assertTrue("Should not be able to send 100 messages: ", messagesSent.get() < 100);
+            LOG.info("Got resource exception : " + ex + ", after sent: " + messagesSent.get());
+        }
+    }
+
+    @Test(timeout=360000)
+    @Ignore("blocks in hudson, needs investigation")
+    public void testFillTempAndConsumeWithGoodTempStoreConfig() throws Exception {
+
+        createBrokerWithValidTempStoreConfig();
+
+        broker.getSystemUsage().setSendFailIfNoSpace(true);
+        destination = new ActiveMQQueue("Foo");
+
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUri);
+        final ActiveMQConnection producerConnection = (ActiveMQConnection) factory.createConnection();
+        // so we can easily catch the ResourceAllocationException on send
+        producerConnection.setAlwaysSyncSend(true);
+        producerConnection.start();
+
+        Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(destination);
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+        try {
+            while (true) {
+                Message message = session.createTextMessage(new String(buf) + messagesSent.toString());
+                producer.send(message);
+                messagesSent.incrementAndGet();
+                if (messagesSent.get() % 100 == 0) {
+                    LOG.info("Sent Message " + messagesSent.get());
+                    LOG.info("Temp Store Usage " + broker.getSystemUsage().getTempUsage().getUsage());
+                }
+            }
+        } catch (ResourceAllocationException ex) {
+            assertTrue("Should be able to send at least 200 messages but was: " + messagesSent.get(),
+                       messagesSent.get() > 200);
+            LOG.info("Got resource exception : " + ex + ", after sent: " + messagesSent.get());
+        }
+
+        // consume all sent
+        Connection consumerConnection = factory.createConnection();
+        consumerConnection.start();
+
+        Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = consumerSession.createConsumer(destination);
+
+        while (consumer.receive(messageReceiveTimeout) != null) {
+            messagesConsumed.incrementAndGet();
+            if (messagesConsumed.get() % 1000 == 0) {
+                LOG.info("received Message " + messagesConsumed.get());
+                LOG.info("Temp Store Usage " + broker.getSystemUsage().getTempUsage().getUsage());
+            }
+        }
+
+        assertEquals("Incorrect number of Messages Consumed: " + messagesConsumed.get(),
+                     messagesConsumed.get(), messagesSent.get());
+    }
+
+    private void createBrokerWithValidTempStoreConfig() throws Exception {
+        broker = new BrokerService();
+        broker.setDataDirectory("target" + File.separator + "activemq-data");
+        broker.setPersistent(true);
+        broker.setUseJmx(true);
+        broker.setAdvisorySupport(false);
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.setPersistenceAdapter(new KahaDBPersistenceAdapter());
+
+        broker.getSystemUsage().setSendFailIfNoSpace(true);
+        broker.getSystemUsage().getMemoryUsage().setLimit(1048576);
+        broker.getSystemUsage().getTempUsage().setLimit(2*1048576);
+        ((PListStoreImpl)broker.getSystemUsage().getTempUsage().getStore()).setJournalMaxFileLength(2 * 1048576);
+        broker.getSystemUsage().getStoreUsage().setLimit(20*1048576);
+
+        PolicyEntry defaultPolicy = new PolicyEntry();
+        defaultPolicy.setProducerFlowControl(false);
+        defaultPolicy.setMemoryLimit(10 * 1024);
+
+        PolicyMap policyMap = new PolicyMap();
+        policyMap.setDefaultEntry(defaultPolicy);
+
+        broker.setDestinationPolicy(policyMap);
+        broker.addConnector("tcp://localhost:0").setName("Default");
+        broker.start();
+
+        brokerUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
+    }
+
+    private void createBrokerWithInvalidTempStoreConfig() throws Exception {
+        broker = new BrokerService();
+        broker.setDataDirectory("target" + File.separator + "activemq-data");
+        broker.setPersistent(true);
+        broker.setUseJmx(true);
+        broker.setAdvisorySupport(false);
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.setPersistenceAdapter(new KahaDBPersistenceAdapter());
+
+        broker.getSystemUsage().setSendFailIfNoSpace(true);
+        broker.getSystemUsage().getMemoryUsage().setLimit(1048576);
+        broker.getSystemUsage().getTempUsage().setLimit(2*1048576);
+        broker.getSystemUsage().getStoreUsage().setLimit(2*1048576);
+
+        PolicyEntry defaultPolicy = new PolicyEntry();
+        defaultPolicy.setProducerFlowControl(false);
+        defaultPolicy.setMemoryLimit(10 * 1024);
+
+        PolicyMap policyMap = new PolicyMap();
+        policyMap.setDefaultEntry(defaultPolicy);
+
+        broker.setDestinationPolicy(policyMap);
+        broker.addConnector("tcp://localhost:0").setName("Default");
+        broker.start();
+
+        brokerUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (broker != null) {
+            broker.stop();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempStoreDataCleanupTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempStoreDataCleanupTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempStoreDataCleanupTest.java
new file mode 100644
index 0000000..34df4a3
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempStoreDataCleanupTest.java
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.store.kahadb.plist.PListStoreImpl;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TempStoreDataCleanupTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TempStoreDataCleanupTest.class);
+    private static final String QUEUE_NAME = TempStoreDataCleanupTest.class.getName() + "Queue";
+
+    private final String str = new String(
+        "QAa0bcLdUK2eHfJgTP8XhiFj61DOklNm9nBoI5pGqYVrs3CtSuMZvwWx4yE7zR");
+
+    private BrokerService broker;
+    private String connectionUri;
+    private ExecutorService pool;
+    private String queueName;
+    private Random r = new Random();
+
+    @Before
+    public void setUp() throws Exception {
+
+        broker = new BrokerService();
+        broker.setDataDirectory("target" + File.separator + "activemq-data");
+        broker.setPersistent(true);
+        broker.setUseJmx(true);
+        broker.setDedicatedTaskRunner(false);
+        broker.setAdvisorySupport(false);
+        broker.setDeleteAllMessagesOnStartup(true);
+
+        SharedDeadLetterStrategy strategy = new SharedDeadLetterStrategy();
+        strategy.setProcessExpired(false);
+        strategy.setProcessNonPersistent(false);
+
+        PolicyEntry defaultPolicy = new PolicyEntry();
+        defaultPolicy.setQueue(">");
+        defaultPolicy.setOptimizedDispatch(true);
+        defaultPolicy.setDeadLetterStrategy(strategy);
+        defaultPolicy.setMemoryLimit(9000000);
+
+        PolicyMap policyMap = new PolicyMap();
+        policyMap.setDefaultEntry(defaultPolicy);
+
+        broker.setDestinationPolicy(policyMap);
+
+        broker.getSystemUsage().getMemoryUsage().setLimit(300000000L);
+
+        broker.addConnector("tcp://localhost:0").setName("Default");
+        broker.start();
+        broker.waitUntilStarted();
+
+        connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
+        pool = Executors.newFixedThreadPool(10);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
+
+        if (pool != null) {
+            pool.shutdown();
+        }
+    }
+
+    @Test
+    public void testIt() throws Exception {
+
+        int startPercentage = broker.getAdminView().getMemoryPercentUsage();
+        LOG.info("MemoryUsage at test start = " + startPercentage);
+
+        for (int i = 0; i < 2; i++) {
+            LOG.info("Started the test iteration: " + i + " using queueName = " + queueName);
+            queueName = QUEUE_NAME + i;
+            final CountDownLatch latch = new CountDownLatch(11);
+
+            pool.execute(new Runnable() {
+                @Override
+                public void run() {
+                    receiveAndDiscard100messages(latch);
+                }
+            });
+
+            for (int j = 0; j < 10; j++) {
+                pool.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        send10000messages(latch);
+                    }
+                });
+            }
+
+            LOG.info("Waiting on the send / receive latch");
+            latch.await(5, TimeUnit.MINUTES);
+            LOG.info("Resumed");
+
+            destroyQueue();
+            TimeUnit.SECONDS.sleep(2);
+        }
+
+        LOG.info("MemoryUsage before awaiting temp store cleanup = " + broker.getAdminView().getMemoryPercentUsage());
+
+        final PListStoreImpl pa = (PListStoreImpl) broker.getTempDataStore();
+        assertTrue("only one journal file should be left: " + pa.getJournal().getFileMap().size(),
+            Wait.waitFor(new Wait.Condition() {
+
+                @Override
+                public boolean isSatisified() throws Exception {
+                    return pa.getJournal().getFileMap().size() == 1;
+                }
+            }, TimeUnit.MINUTES.toMillis(3))
+        );
+
+        int endPercentage = broker.getAdminView().getMemoryPercentUsage();
+        LOG.info("MemoryUseage at test end = " + endPercentage);
+
+        assertEquals(startPercentage, endPercentage);
+    }
+
+    public void destroyQueue() {
+        try {
+            Broker broker = this.broker.getBroker();
+            if (!broker.isStopped()) {
+                LOG.info("Removing: " + queueName);
+                broker.removeDestination(this.broker.getAdminConnectionContext(), new ActiveMQQueue(queueName), 10);
+            }
+        } catch (Exception e) {
+            LOG.warn("Got an error while removing the test queue", e);
+        }
+    }
+
+    private void send10000messages(CountDownLatch latch) {
+        ActiveMQConnection activeMQConnection = null;
+        try {
+            activeMQConnection = createConnection(null);
+            Session session = activeMQConnection.createSession(false,
+                    Session.AUTO_ACKNOWLEDGE);
+            MessageProducer producer = session.createProducer(session
+                    .createQueue(queueName));
+            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+            activeMQConnection.start();
+            for (int i = 0; i < 10000; i++) {
+                TextMessage textMessage = session.createTextMessage();
+                textMessage.setText(generateBody(1000));
+                textMessage.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
+                producer.send(textMessage);
+                try {
+                    Thread.sleep(10);
+                } catch (InterruptedException e) {
+                }
+            }
+            producer.close();
+        } catch (JMSException e) {
+            LOG.warn("Got an error while sending the messages", e);
+        } finally {
+            if (activeMQConnection != null) {
+                try {
+                    activeMQConnection.close();
+                } catch (JMSException e) {
+                }
+            }
+        }
+        latch.countDown();
+    }
+
+    private void receiveAndDiscard100messages(CountDownLatch latch) {
+        ActiveMQConnection activeMQConnection = null;
+        try {
+            activeMQConnection = createConnection(null);
+            Session session = activeMQConnection.createSession(false,
+                    Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer messageConsumer = session.createConsumer(
+                    session.createQueue(queueName));
+            activeMQConnection.start();
+            for (int i = 0; i < 100; i++) {
+                messageConsumer.receive();
+            }
+            messageConsumer.close();
+            LOG.info("Created and disconnected");
+        } catch (JMSException e) {
+            LOG.warn("Got an error while receiving the messages", e);
+        } finally {
+            if (activeMQConnection != null) {
+                try {
+                    activeMQConnection.close();
+                } catch (JMSException e) {
+                }
+            }
+        }
+        latch.countDown();
+    }
+
+    private ActiveMQConnection createConnection(String id) throws JMSException {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
+        if (id != null) {
+            factory.setClientID(id);
+        }
+
+        ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
+        return connection;
+    }
+
+    private String generateBody(int length) {
+
+        StringBuilder sb = new StringBuilder();
+        int te = 0;
+        for (int i = 1; i <= length; i++) {
+            te = r.nextInt(62);
+            sb.append(str.charAt(te));
+        }
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TransactedStoreUsageSuspendResumeTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TransactedStoreUsageSuspendResumeTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TransactedStoreUsageSuspendResumeTest.java
new file mode 100644
index 0000000..3d32867
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TransactedStoreUsageSuspendResumeTest.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.TestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import static org.junit.Assert.assertTrue;
+
+// https://issues.apache.org/jira/browse/AMQ-4262
+public class TransactedStoreUsageSuspendResumeTest {
+    private static final Logger LOG = LoggerFactory.getLogger(TransactedStoreUsageSuspendResumeTest.class);
+
+    private static final int MAX_MESSAGES = 10000;
+
+    private static final String QUEUE_NAME = "test.queue";
+
+    private BrokerService broker;
+
+    private final CountDownLatch messagesReceivedCountDown = new CountDownLatch(MAX_MESSAGES);
+    private final CountDownLatch messagesSentCountDown = new CountDownLatch(MAX_MESSAGES);
+    private final CountDownLatch consumerStartLatch = new CountDownLatch(1);
+
+    private class ConsumerThread extends Thread {
+
+        @Override
+        public void run() {
+            try {
+
+                consumerStartLatch.await(30, TimeUnit.SECONDS);
+
+                ConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
+                Connection connection = factory.createConnection();
+                connection.start();
+                Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+                // wait for producer to stop
+                long currentSendCount;
+                do {
+                    currentSendCount = messagesSentCountDown.getCount();
+                    TimeUnit.SECONDS.sleep(5);
+                } while (currentSendCount != messagesSentCountDown.getCount());
+
+                LOG.info("Starting consumer at: " + currentSendCount);
+
+                MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME));
+
+                do {
+                    Message message = consumer.receive(5000);
+                    if (message != null) {
+                        session.commit();
+                        messagesReceivedCountDown.countDown();
+                    }
+                    if (messagesReceivedCountDown.getCount() % 500 == 0) {
+                        LOG.info("remaining to receive: " + messagesReceivedCountDown.getCount());
+                    }
+                } while (messagesReceivedCountDown.getCount() != 0);
+                consumer.close();
+                session.close();
+                connection.close();
+            } catch (Exception e) {
+                Assert.fail(e.getMessage());
+            }
+        }
+    }
+
+    @Before
+    public void setup() throws Exception {
+
+        broker = new BrokerService();
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.setPersistent(true);
+
+        KahaDBPersistenceAdapter kahaDB = new KahaDBPersistenceAdapter();
+        kahaDB.setJournalMaxFileLength(500 * 1024);
+        kahaDB.setCleanupInterval(10*1000);
+        broker.setPersistenceAdapter(kahaDB);
+
+        broker.getSystemUsage().getStoreUsage().setLimit(7*1024*1024);
+
+        broker.start();
+        broker.waitUntilStarted();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        broker.stop();
+    }
+
+    @Test
+    public void testTransactedStoreUsageSuspendResume() throws Exception {
+
+        ConsumerThread thread = new ConsumerThread();
+        thread.start();
+        ExecutorService sendExecutor = Executors.newSingleThreadExecutor();
+        sendExecutor.execute(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    sendMessages();
+                } catch (Exception ignored) {
+                }
+            }
+        });
+        sendExecutor.shutdown();
+        sendExecutor.awaitTermination(5, TimeUnit.MINUTES);
+
+        boolean allMessagesReceived = messagesReceivedCountDown.await(10, TimeUnit.MINUTES);
+        if (!allMessagesReceived) {
+            TestSupport.dumpAllThreads("StuckConsumer!");
+        }
+        assertTrue("Got all messages: " + messagesReceivedCountDown, allMessagesReceived);
+
+        // give consumers a chance to exit gracefully
+        TimeUnit.SECONDS.sleep(2);
+    }
+
+    private void sendMessages() throws Exception {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
+        factory.setAlwaysSyncSend(true);
+        Connection connection = factory.createConnection();
+        connection.start();
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        Destination queue = session.createQueue(QUEUE_NAME);
+        Destination retainQueue = session.createQueue(QUEUE_NAME + "-retain");
+        MessageProducer producer = session.createProducer(null);
+
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+        BytesMessage message = session.createBytesMessage();
+        message.writeBytes(new byte[10]);
+
+        for (int i=0; i<4240; i++) {
+            // mostly fill the store with retained messages
+            // so consumer only has a small bit of store usage to work with
+            producer.send(retainQueue, message);
+            session.commit();
+        }
+
+        consumerStartLatch.countDown();
+        for (int i = 0; i < MAX_MESSAGES; i++) {
+            producer.send(queue,  message);
+            if (i>0 && i%20 == 0) {
+                session.commit();
+            }
+            messagesSentCountDown.countDown();
+            if (i>0 && i%500 == 0) {
+                LOG.info("Sent : " + i);
+            }
+
+        }
+        session.commit();
+        producer.close();
+        session.close();
+        connection.close();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TransactionNotStartedErrorTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TransactionNotStartedErrorTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TransactionNotStartedErrorTest.java
new file mode 100644
index 0000000..1baff9a
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TransactionNotStartedErrorTest.java
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * simulate message flow which cause the following exception in the broker
+ * (exception logged by client) <p/> 2007-07-24 13:51:23,624
+ * com.easynet.halo.Halo ERROR (LoggingErrorHandler.java: 23) JMS failure
+ * javax.jms.JMSException: Transaction 'TX:ID:dmt-53625-1185281414694-1:0:344'
+ * has not been started. at
+ * org.apache.activemq.broker.TransactionBroker.getTransaction(TransactionBroker.java:230)
+ * This appears to be consistent in a MacBook. Haven't been able to replicate it
+ * on Windows though
+ */
+public class TransactionNotStartedErrorTest extends TestCase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TransactionNotStartedErrorTest.class);
+
+    private static final int counter = 500;
+
+    private static int hectorToHaloCtr;
+    private static int xenaToHaloCtr;
+    private static int troyToHaloCtr;
+
+    private static int haloToHectorCtr;
+    private static int haloToXenaCtr;
+    private static int haloToTroyCtr;
+
+    private final String hectorToHalo = "hectorToHalo";
+    private final String xenaToHalo = "xenaToHalo";
+    private final String troyToHalo = "troyToHalo";
+
+    private final String haloToHector = "haloToHector";
+    private final String haloToXena = "haloToXena";
+    private final String haloToTroy = "haloToTroy";
+
+    private BrokerService broker;
+
+    private Connection hectorConnection;
+    private Connection xenaConnection;
+    private Connection troyConnection;
+    private Connection haloConnection;
+
+    private final Object lock = new Object();
+
+    public Connection createConnection() throws Exception {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
+            broker.getTransportConnectors().get(0).getPublishableConnectString());
+        return factory.createConnection();
+    }
+
+    public Session createSession(Connection connection, boolean transacted) throws JMSException {
+        return connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
+    }
+
+    public void startBroker() throws Exception {
+        broker = new BrokerService();
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.setPersistent(true);
+        broker.setUseJmx(true);
+        broker.addConnector("tcp://localhost:0").setName("Default");
+        broker.start();
+        LOG.info("Starting broker..");
+    }
+
+    public void tearDown() throws Exception {
+        hectorConnection.close();
+        xenaConnection.close();
+        troyConnection.close();
+        haloConnection.close();
+        broker.stop();
+    }
+
+    public void testTransactionNotStartedError() throws Exception {
+        startBroker();
+        hectorConnection = createConnection();
+        Thread hectorThread = buildProducer(hectorConnection, hectorToHalo);
+        Receiver hHectorReceiver = new Receiver() {
+            public void receive(String s) throws Exception {
+                haloToHectorCtr++;
+                if (haloToHectorCtr >= counter) {
+                    synchronized (lock) {
+                        lock.notifyAll();
+                    }
+                }
+            }
+        };
+        buildReceiver(hectorConnection, haloToHector, false, hHectorReceiver);
+
+        troyConnection = createConnection();
+        Thread troyThread = buildProducer(troyConnection, troyToHalo);
+        Receiver hTroyReceiver = new Receiver() {
+            public void receive(String s) throws Exception {
+                haloToTroyCtr++;
+                if (haloToTroyCtr >= counter) {
+                    synchronized (lock) {
+                        lock.notifyAll();
+                    }
+                }
+            }
+        };
+        buildReceiver(hectorConnection, haloToTroy, false, hTroyReceiver);
+
+        xenaConnection = createConnection();
+        Thread xenaThread = buildProducer(xenaConnection, xenaToHalo);
+        Receiver hXenaReceiver = new Receiver() {
+            public void receive(String s) throws Exception {
+                haloToXenaCtr++;
+                if (haloToXenaCtr >= counter) {
+                    synchronized (lock) {
+                        lock.notifyAll();
+                    }
+                }
+            }
+        };
+        buildReceiver(xenaConnection, haloToXena, false, hXenaReceiver);
+
+        haloConnection = createConnection();
+        final MessageSender hectorSender = buildTransactionalProducer(haloToHector, haloConnection);
+        final MessageSender troySender = buildTransactionalProducer(haloToTroy, haloConnection);
+        final MessageSender xenaSender = buildTransactionalProducer(haloToXena, haloConnection);
+        Receiver hectorReceiver = new Receiver() {
+            public void receive(String s) throws Exception {
+                hectorToHaloCtr++;
+                troySender.send("halo to troy because of hector");
+                if (hectorToHaloCtr >= counter) {
+                    synchronized (lock) {
+                        lock.notifyAll();
+                    }
+                }
+            }
+        };
+        Receiver xenaReceiver = new Receiver() {
+            public void receive(String s) throws Exception {
+                xenaToHaloCtr++;
+                hectorSender.send("halo to hector because of xena");
+                if (xenaToHaloCtr >= counter) {
+                    synchronized (lock) {
+                        lock.notifyAll();
+                    }
+                }
+            }
+        };
+        Receiver troyReceiver = new Receiver() {
+            public void receive(String s) throws Exception {
+                troyToHaloCtr++;
+                xenaSender.send("halo to xena because of troy");
+                if (troyToHaloCtr >= counter) {
+                    synchronized (lock) {
+                        lock.notifyAll();
+                    }
+                }
+            }
+        };
+        buildReceiver(haloConnection, hectorToHalo, true, hectorReceiver);
+        buildReceiver(haloConnection, xenaToHalo, true, xenaReceiver);
+        buildReceiver(haloConnection, troyToHalo, true, troyReceiver);
+
+        haloConnection.start();
+
+        troyConnection.start();
+        troyThread.start();
+
+        xenaConnection.start();
+        xenaThread.start();
+
+        hectorConnection.start();
+        hectorThread.start();
+        waitForMessagesToBeDelivered();
+        // number of messages received should match messages sent
+        assertEquals(hectorToHaloCtr, counter);
+        LOG.info("hectorToHalo received " + hectorToHaloCtr + " messages");
+        assertEquals(xenaToHaloCtr, counter);
+        LOG.info("xenaToHalo received " + xenaToHaloCtr + " messages");
+        assertEquals(troyToHaloCtr, counter);
+        LOG.info("troyToHalo received " + troyToHaloCtr + " messages");
+        assertEquals(haloToHectorCtr, counter);
+        LOG.info("haloToHector received " + haloToHectorCtr + " messages");
+        assertEquals(haloToXenaCtr, counter);
+        LOG.info("haloToXena received " + haloToXenaCtr + " messages");
+        assertEquals(haloToTroyCtr, counter);
+        LOG.info("haloToTroy received " + haloToTroyCtr + " messages");
+
+    }
+
+    protected void waitForMessagesToBeDelivered() {
+        // let's give the listeners enough time to read all messages
+        long maxWaitTime = counter * 3000;
+        long waitTime = maxWaitTime;
+        long start = (maxWaitTime <= 0) ? 0 : System.currentTimeMillis();
+
+        synchronized (lock) {
+            boolean hasMessages = true;
+            while (hasMessages && waitTime >= 0) {
+                try {
+                    lock.wait(200);
+                } catch (InterruptedException e) {
+                    LOG.error(e.toString());
+                }
+                // check if all messages have been received
+                hasMessages = hectorToHaloCtr < counter || xenaToHaloCtr < counter || troyToHaloCtr < counter || haloToHectorCtr < counter || haloToXenaCtr < counter
+                              || haloToTroyCtr < counter;
+                waitTime = maxWaitTime - (System.currentTimeMillis() - start);
+            }
+        }
+    }
+
+    public MessageSender buildTransactionalProducer(String queueName, Connection connection) throws Exception {
+        return new MessageSender(queueName, connection, true, false);
+    }
+
+    public Thread buildProducer(Connection connection, final String queueName) throws Exception {
+        final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final MessageSender producer = new MessageSender(queueName, connection, false, false);
+        Thread thread = new Thread() {
+
+            public synchronized void run() {
+                for (int i = 0; i < counter; i++) {
+                    try {
+                        producer.send(queueName);
+                        if (session.getTransacted()) {
+                            session.commit();
+                        }
+
+                    } catch (Exception e) {
+                        throw new RuntimeException("on " + queueName + " send", e);
+                    }
+                }
+            }
+        };
+        return thread;
+    }
+
+    public void buildReceiver(Connection connection, final String queueName, boolean transacted, final Receiver receiver) throws Exception {
+        final Session session = transacted ? connection.createSession(true, Session.SESSION_TRANSACTED) : connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer inputMessageConsumer = session.createConsumer(session.createQueue(queueName));
+        MessageListener messageListener = new MessageListener() {
+
+            public void onMessage(Message message) {
+                try {
+                    ObjectMessage objectMessage = (ObjectMessage)message;
+                    String s = (String)objectMessage.getObject();
+                    receiver.receive(s);
+                    if (session.getTransacted()) {
+                        session.commit();
+                    }
+
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        };
+        inputMessageConsumer.setMessageListener(messageListener);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TrapMessageInJDBCStoreTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TrapMessageInJDBCStoreTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TrapMessageInJDBCStoreTest.java
new file mode 100644
index 0000000..688d066
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TrapMessageInJDBCStoreTest.java
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.io.IOException;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.jdbc.DataSourceServiceSupport;
+import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
+import org.apache.activemq.store.jdbc.LeaseDatabaseLocker;
+import org.apache.activemq.store.jdbc.TransactionContext;
+import org.apache.activemq.util.IOHelper;
+import org.apache.activemq.util.LeaseLockerIOExceptionHandler;
+import org.apache.derby.jdbc.EmbeddedDataSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test to demostrate a message trapped in the JDBC store and not
+ * delivered to consumer
+ *
+ * The test throws issues the commit to the DB but throws
+ * an exception back to the broker. This scenario could happen when a network
+ * cable is disconnected - message is committed to DB but broker does not know.
+ *
+ *
+ */
+
+public class TrapMessageInJDBCStoreTest extends TestCase {
+
+    private static final String MY_TEST_Q = "MY_TEST_Q";
+    private static final Logger LOG = LoggerFactory
+            .getLogger(TrapMessageInJDBCStoreTest.class);
+    private String transportUrl = "tcp://127.0.0.1:0";
+    private BrokerService broker;
+    private TestTransactionContext testTransactionContext;
+    private TestJDBCPersistenceAdapter jdbc;
+
+    protected BrokerService createBroker(boolean withJMX) throws Exception {
+        BrokerService broker = new BrokerService();
+
+        broker.setUseJmx(withJMX);
+
+        EmbeddedDataSource embeddedDataSource = (EmbeddedDataSource) DataSourceServiceSupport.createDataSource(IOHelper.getDefaultDataDirectory());
+        embeddedDataSource.setCreateDatabase("create");
+
+        //wire in a TestTransactionContext (wrapper to TransactionContext) that has an executeBatch()
+        // method that can be configured to throw a SQL exception on demand
+        jdbc = new TestJDBCPersistenceAdapter();
+        jdbc.setDataSource(embeddedDataSource);
+        jdbc.setCleanupPeriod(0);
+        testTransactionContext = new TestTransactionContext(jdbc);
+
+        jdbc.setLockKeepAlivePeriod(1000l);
+        LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker();
+        leaseDatabaseLocker.setLockAcquireSleepInterval(2000l);
+        jdbc.setLocker(leaseDatabaseLocker);
+
+        broker.setPersistenceAdapter(jdbc);
+
+        broker.setIoExceptionHandler(new LeaseLockerIOExceptionHandler());
+
+        transportUrl = broker.addConnector(transportUrl).getPublishableConnectString();
+        return broker;
+    }
+
+    /**
+     *
+     * sends 3 messages to the queue. When the second message is being committed to the JDBCStore, $
+     * it throws a dummy SQL exception - the message has been committed to the embedded DB before the exception
+     * is thrown
+     *
+     * Excepted correct outcome: receive 3 messages and the DB should contain no messages
+     *
+     * @throws Exception
+     */
+
+    public void testDBCommitException() throws Exception {
+
+        broker = this.createBroker(false);
+        broker.deleteAllMessages();
+        broker.start();
+        broker.waitUntilStarted();
+
+        LOG.info("***Broker started...");
+
+        // failover but timeout in 5 seconds so the test does not hang
+        String failoverTransportURL = "failover:(" + transportUrl
+                + ")?timeout=5000";
+
+
+        sendMessage(MY_TEST_Q, failoverTransportURL);
+
+        //check db contents
+        ArrayList<Long> dbSeq = dbMessageCount();
+        LOG.info("*** after send: db contains message seq " +dbSeq );
+
+        List<TextMessage> consumedMessages = consumeMessages(MY_TEST_Q,failoverTransportURL);
+
+        assertEquals("number of consumed messages",3,consumedMessages.size());
+
+        //check db contents
+        dbSeq = dbMessageCount();
+        LOG.info("*** after consume - db contains message seq " + dbSeq);
+
+        assertEquals("number of messages in DB after test",0,dbSeq.size());
+
+        broker.stop();
+        broker.waitUntilStopped();
+    }
+
+
+
+    public List<TextMessage> consumeMessages(String queue,
+                                      String transportURL) throws JMSException {
+        Connection connection = null;
+        LOG.debug("*** consumeMessages() called ...");
+
+        try {
+
+            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
+                    transportURL);
+
+            connection = factory.createConnection();
+            connection.start();
+            Session session = connection.createSession(false,
+                    Session.AUTO_ACKNOWLEDGE);
+            Destination destination = session.createQueue(queue);
+
+            ArrayList<TextMessage> consumedMessages = new ArrayList<TextMessage>();
+
+            MessageConsumer messageConsumer = session.createConsumer(destination);
+
+            while(true){
+                TextMessage textMessage= (TextMessage) messageConsumer.receive(4000);
+                LOG.debug("*** consumed Messages :"+textMessage);
+
+                if(textMessage==null){
+                    return consumedMessages;
+                }
+                consumedMessages.add(textMessage);
+            }
+
+
+        } finally {
+            if (connection != null) {
+                connection.close();
+            }
+        }
+    }
+
+    public void sendMessage(String queue, String transportURL)
+            throws Exception {
+        Connection connection = null;
+
+        try {
+
+            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
+                    transportURL);
+
+            connection = factory.createConnection();
+            Session session = connection.createSession(false,
+                    Session.AUTO_ACKNOWLEDGE);
+            Destination destination = session.createQueue(queue);
+            MessageProducer producer = session.createProducer(destination);
+            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+            TextMessage m = session.createTextMessage("1");
+
+            LOG.debug("*** send message 1 to broker...");
+            producer.send(m);
+
+            // trigger SQL exception in transactionContext
+            LOG.debug("***  send message 2 to broker");
+            m.setText("2");
+            producer.send(m);
+
+            //check db contents
+            ArrayList<Long> dbSeq = dbMessageCount();
+            LOG.info("*** after send 2 - db contains message seq " + dbSeq);
+            assertEquals("number of messages in DB after send 2",2,dbSeq.size());
+
+            LOG.debug("***  send  message 3 to broker");
+            m.setText("3");
+            producer.send(m);
+            LOG.debug("*** Finished sending messages to broker");
+
+        } finally {
+            if (connection != null) {
+                connection.close();
+            }
+        }
+    }
+
+    /**
+     *  query the DB to see what messages are left in the store
+     * @return
+     * @throws SQLException
+     * @throws IOException
+     */
+    private ArrayList<Long> dbMessageCount() throws SQLException, IOException {
+        java.sql.Connection conn = ((JDBCPersistenceAdapter) broker.getPersistenceAdapter()).getDataSource().getConnection();
+        PreparedStatement statement = conn.prepareStatement("SELECT MSGID_SEQ FROM ACTIVEMQ_MSGS");
+
+        try{
+
+            ResultSet result = statement.executeQuery();
+            ArrayList<Long> dbSeq = new ArrayList<Long>();
+
+            while (result.next()){
+                dbSeq.add(result.getLong(1));
+            }
+
+            return dbSeq;
+
+        }finally{
+            statement.close();
+            conn.close();
+
+        }
+
+    }
+
+	/*
+     * Mock classes used for testing
+	 */
+
+    public class TestJDBCPersistenceAdapter extends JDBCPersistenceAdapter {
+        public TransactionContext getTransactionContext() throws IOException {
+            return testTransactionContext;
+        }
+    }
+
+    public class TestTransactionContext extends TransactionContext {
+
+        private int count;
+
+        public TestTransactionContext(
+                JDBCPersistenceAdapter jdbcPersistenceAdapter)
+                throws IOException {
+            super(jdbcPersistenceAdapter);
+        }
+
+        public void executeBatch() throws SQLException {
+            super.executeBatch();
+            count++;
+            LOG.debug("ExecuteBatchOverride: count:" + count, new RuntimeException("executeBatch"));
+
+            // throw on second add message
+            if (count == 16){
+                throw new SQLException("TEST SQL EXCEPTION from executeBatch after super.execution: count:" + count);
+            }
+        }
+
+
+
+
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/VMTransportClosureTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/VMTransportClosureTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/VMTransportClosureTest.java
new file mode 100644
index 0000000..6a96a14
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/VMTransportClosureTest.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.bugs;
+
+import java.io.IOException;
+
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ShutdownInfo;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.transport.TransportListener;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class VMTransportClosureTest extends EmbeddedBrokerTestSupport {
+	private static final Log LOG = LogFactory
+			.getLog(VMTransportClosureTest.class);
+	private static final long MAX_TEST_TIME_MILLIS = 300000; // 5min
+	private static final int NUM_ATTEMPTS = 100000;
+
+	public void setUp() throws Exception {
+		setAutoFail(true);
+		setMaxTestTime(MAX_TEST_TIME_MILLIS);
+		super.setUp();
+	}
+
+	/**
+	 * EmbeddedBrokerTestSupport.createBroker() binds the broker to a VM
+	 * transport address, which results in a call to
+	 * VMTransportFactory.doBind(location):
+	 * <p>
+	 * <code>
+	 *     public TransportServer doBind(URI location) throws IOException {
+	 *        return bind(location, false);
+	 *}
+	 *</code>
+	 * </p>
+	 * As a result, VMTransportServer.disposeOnDisconnect is <code>false</code>.
+	 * To expose the bug, we need to have VMTransportServer.disposeOnDisconnect
+	 * <code>true</code>, which is the case when the VMTransportServer is not
+	 * already bound when the first connection is made.
+	 */
+	@Override
+	protected BrokerService createBroker() throws Exception {
+		BrokerService answer = new BrokerService();
+		answer.setPersistent(isPersistent());
+		// answer.addConnector(bindAddress);
+		return answer;
+	}
+
+	/**
+	 * This test demonstrates how the "disposeOnDisonnect" feature of
+	 * VMTransportServer can incorrectly close all VM connections to the local
+	 * broker.
+	 */
+	public void testPrematureClosure() throws Exception {
+
+		// Open a persistent connection to the local broker. The persistent
+		// connection is maintained through the test and should prevent the
+		// VMTransportServer from stopping itself when the local transport is
+		// closed.
+		ActiveMQConnection persistentConn = (ActiveMQConnection) createConnection();
+		persistentConn.start();
+		Session session = persistentConn.createSession(true,
+				Session.SESSION_TRANSACTED);
+		MessageProducer producer = session.createProducer(destination);
+
+		for (int i = 0; i < NUM_ATTEMPTS; i++) {
+			LOG.info("Attempt: " + i);
+
+			// Open and close a local transport connection. As is done by by
+			// most users of the transport, ensure that the transport is stopped
+			// when closed by the peer (via ShutdownInfo). Closing the local
+			// transport should not affect the persistent connection.
+			final Transport localTransport = TransportFactory.connect(broker
+					.getVmConnectorURI());
+			localTransport.setTransportListener(new TransportListener() {
+				public void onCommand(Object command) {
+					if (command instanceof ShutdownInfo) {
+						try {
+							localTransport.stop();
+						} catch (Exception ex) {
+							throw new RuntimeException(ex);
+						}
+					}
+				}
+
+				public void onException(IOException error) {
+					// ignore
+				}
+
+				public void transportInterupted() {
+					// ignore
+				}
+
+				public void transportResumed() {
+					// ignore
+				}
+			});
+
+			localTransport.start();
+			localTransport.stop();
+
+			// Ensure that the persistent connection is still usable.
+			producer.send(session.createMessage());
+			session.rollback();
+		}
+
+		persistentConn.close();
+	}
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java
new file mode 100644
index 0000000..7939453
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.kahadb.KahaDBStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import java.io.File;
+import java.text.DateFormat;
+import java.util.Date;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class VerifySteadyEnqueueRate extends TestCase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(VerifySteadyEnqueueRate.class);
+
+    private static int max_messages = 1000000;
+    private final String destinationName = getName() + "_Queue";
+    private BrokerService broker;
+    final boolean useTopic = false;
+
+    protected static final String payload = new String(new byte[24]);
+
+    @Override
+    public void setUp() throws Exception {
+        startBroker();
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        broker.stop();
+    }
+
+    @SuppressWarnings("unused")
+    public void testEnqueueRateCanMeetSLA() throws Exception {
+        if (true) {
+            return;
+        }
+        doTestEnqueue(false);
+    }
+
+    private void doTestEnqueue(final boolean transacted) throws Exception {
+        final long min = 100;
+        final AtomicLong total = new AtomicLong(0);
+        final AtomicLong slaViolations = new AtomicLong(0);
+        final AtomicLong max = new AtomicLong(0);
+        final int numThreads = 6;
+
+        Runnable runner = new Runnable() {
+
+            @Override
+            public void run() {
+                try {
+                    MessageSender producer = new MessageSender(destinationName,
+                            createConnection(), transacted, useTopic);
+
+                    for (int i = 0; i < max_messages; i++) {
+                        long startT = System.currentTimeMillis();
+                        producer.send(payload);
+                        long endT = System.currentTimeMillis();
+                        long duration = endT - startT;
+
+                        total.incrementAndGet();
+
+                        if (duration > max.get()) {
+                            max.set(duration);
+                        }
+
+                        if (duration > min) {
+                            slaViolations.incrementAndGet();
+                            System.err.println("SLA violation @ "+Thread.currentThread().getName()
+                                    + " "
+                                    + DateFormat.getTimeInstance().format(
+                                            new Date(startT)) + " at message "
+                                    + i + " send time=" + duration
+                                    + " - Total SLA violations: "+slaViolations.get()+"/"+total.get()+" ("+String.format("%.6f", 100.0*slaViolations.get()/total.get())+"%)");
+                        }
+                    }
+
+                } catch (Exception e) {
+                    // TODO Auto-generated catch block
+                    e.printStackTrace();
+                }
+                System.out.println("Max Violation = " + max + " - Total SLA violations: "+slaViolations.get()+"/"+total.get()+" ("+String.format("%.6f", 100.0*slaViolations.get()/total.get())+"%)");
+            }
+        };
+        ExecutorService executor = Executors.newCachedThreadPool();
+
+        for (int i = 0; i < numThreads; i++) {
+            executor.execute(runner);
+        }
+
+        executor.shutdown();
+        while(!executor.isTerminated()) {
+            executor.awaitTermination(10, TimeUnit.SECONDS);
+        }
+    }
+
+    private Connection createConnection() throws Exception {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
+                broker.getTransportConnectors().get(0).getConnectUri());
+        return factory.createConnection();
+    }
+
+    private void startBroker() throws Exception {
+        broker = new BrokerService();
+        //broker.setDeleteAllMessagesOnStartup(true);
+        broker.setPersistent(true);
+        broker.setUseJmx(true);
+
+        KahaDBStore kaha = new KahaDBStore();
+        kaha.setDirectory(new File("target/activemq-data/kahadb"));
+        // The setEnableJournalDiskSyncs(false) setting is a little dangerous right now, as I have not verified
+        // what happens if the index is updated but a journal update is lost.
+        // Index is going to be in consistent, but can it be repaired?
+        kaha.setEnableJournalDiskSyncs(false);
+        // Using a bigger journal file size makes he take fewer spikes as it is not switching files as often.
+        kaha.setJournalMaxFileLength(1024*1024*100);
+
+        // small batch means more frequent and smaller writes
+        kaha.setIndexWriteBatchSize(100);
+        // do the index write in a separate thread
+        kaha.setEnableIndexWriteAsync(true);
+
+        broker.setPersistenceAdapter(kaha);
+
+        broker.addConnector("tcp://localhost:0").setName("Default");
+        broker.start();
+        LOG.info("Starting broker..");
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/amq1095/ActiveMQTestCase.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/amq1095/ActiveMQTestCase.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/amq1095/ActiveMQTestCase.java
new file mode 100644
index 0000000..01ecdb1
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/amq1095/ActiveMQTestCase.java
@@ -0,0 +1,166 @@
+/* ====================================================================
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================== */
+
+package org.apache.activemq.bugs.amq1095;
+
+import java.net.URI;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.TextMessage;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQTopic;
+
+/**
+ * <p>
+ * Common functionality for ActiveMQ test cases.
+ * </p>
+ *
+ * @author Rainer Klute <a
+ *         href="mailto:rainer.klute@dp-itsolutions.de">&lt;rainer.klute@dp-itsolutions.de&gt;</a>
+ * @since 2007-08-10
+ * @version $Id: ActiveMQTestCase.java 12 2007-08-14 12:02:02Z rke $
+ */
+public class ActiveMQTestCase extends TestCase
+{
+    private Context context;
+    private BrokerService broker;
+    protected Connection connection;
+    protected Destination destination;
+    private final List<MessageConsumer> consumersToEmpty = new LinkedList<MessageConsumer>();
+    protected final long RECEIVE_TIMEOUT = 500;
+
+
+    /** <p>Constructor</p> */
+    public ActiveMQTestCase()
+    {}
+
+    /** <p>Constructor</p>
+     * @param name the test case's name
+     */
+    public ActiveMQTestCase(final String name)
+    {
+        super(name);
+    }
+
+    /**
+     * <p>Sets up the JUnit testing environment.
+     */
+    @Override
+    protected void setUp()
+    {
+        URI uri;
+        try
+        {
+            /* Copy all system properties starting with "java.naming." to the initial context. */
+            final Properties systemProperties = System.getProperties();
+            final Properties jndiProperties = new Properties();
+            for (final Iterator<Object> i = systemProperties.keySet().iterator(); i.hasNext();)
+            {
+                final String key = (String) i.next();
+                if (key.startsWith("java.naming.") || key.startsWith("topic.") ||
+                    key.startsWith("queue."))
+                {
+                    final String value = (String) systemProperties.get(key);
+                    jndiProperties.put(key, value);
+                }
+            }
+            context = new InitialContext(jndiProperties);
+            uri = new URI("xbean:org/apache/activemq/bugs/amq1095/activemq.xml");
+            broker = BrokerFactory.createBroker(uri);
+            broker.start();
+        }
+        catch (Exception ex)
+        {
+            throw new RuntimeException(ex);
+        }
+
+        final ConnectionFactory connectionFactory;
+        try
+        {
+            /* Lookup the connection factory. */
+            connectionFactory = (ConnectionFactory) context.lookup("TopicConnectionFactory");
+
+            destination = new ActiveMQTopic("TestTopic");
+
+            /* Create a connection: */
+            connection = connectionFactory.createConnection();
+            connection.setClientID("sampleClientID");
+        }
+        catch (JMSException ex1)
+        {
+            ex1.printStackTrace();
+            fail(ex1.toString());
+        }
+        catch (NamingException ex2) {
+            ex2.printStackTrace();
+            fail(ex2.toString());
+        }
+        catch (Throwable ex3) {
+            ex3.printStackTrace();
+            fail(ex3.toString());
+        }
+    }
+
+
+    /**
+     * <p>
+     * Tear down the testing environment by receiving any messages that might be
+     * left in the topic after a failure and shutting down the broker properly.
+     * This is quite important for subsequent test cases that assume the topic
+     * to be empty.
+     * </p>
+     */
+    @Override
+    protected void tearDown() throws Exception {
+        TextMessage msg;
+        try {
+            for (final Iterator<MessageConsumer> i = consumersToEmpty.iterator(); i.hasNext();)
+            {
+                final MessageConsumer consumer = i.next();
+                if (consumer != null)
+                    do
+                        msg = (TextMessage) consumer.receive(RECEIVE_TIMEOUT);
+                    while (msg != null);
+            }
+        } catch (Exception e) {
+        }
+        if (connection != null) {
+            connection.stop();
+        }
+        broker.stop();
+    }
+
+    protected void registerToBeEmptiedOnShutdown(final MessageConsumer consumer)
+    {
+        consumersToEmpty.add(consumer);
+    }
+}


Mime
View raw message