activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [42/52] [abbrv] [partial] activemq-artemis git commit: ARTEMIS-127 Adding activemq unit test module to Artemis
Date Tue, 09 Jun 2015 16:37:13 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/LargeStreamletTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/LargeStreamletTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/LargeStreamletTest.java
new file mode 100644
index 0000000..d624d36
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/LargeStreamletTest.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.Destination;
+import javax.jms.Session;
+
+import junit.framework.TestCase;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author rnewson
+ */
+public final class LargeStreamletTest extends TestCase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(LargeStreamletTest.class);
+    private static final String BROKER_URL = "vm://localhost?broker.persistent=false";
+    private static final int BUFFER_SIZE = 1 * 1024;
+    private static final int MESSAGE_COUNT = 10 * 1024;
+
+    protected Exception writerException;
+    protected Exception readerException;
+
+    private final AtomicInteger totalRead = new AtomicInteger();
+    private final AtomicInteger totalWritten = new AtomicInteger();
+    private final AtomicBoolean stopThreads = new AtomicBoolean(false);
+
+    public void testStreamlets() throws Exception {
+        final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL);
+
+        final ActiveMQConnection connection = (ActiveMQConnection)factory.createConnection();
+        connection.start();
+        try {
+            final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            try {
+                final Destination destination = session.createQueue("wibble");
+                final Thread readerThread = new Thread(new Runnable() {
+
+                    @Override
+                    public void run() {
+                        totalRead.set(0);
+                        try {
+                            final InputStream inputStream = connection.createInputStream(destination);
+                            try {
+                                int read;
+                                final byte[] buf = new byte[BUFFER_SIZE];
+                                while (!stopThreads.get() && (read = inputStream.read(buf)) != -1) {
+                                    totalRead.addAndGet(read);
+                                }
+                            } finally {
+                                inputStream.close();
+                            }
+                        } catch (Exception e) {
+                            readerException = e;
+                            e.printStackTrace();
+                        } finally {
+                            LOG.info(totalRead + " total bytes read.");
+                        }
+                    }
+                });
+
+                final Thread writerThread = new Thread(new Runnable() {
+                    private final Random random = new Random();
+
+                    @Override
+                    public void run() {
+                        totalWritten.set(0);
+                        int count = MESSAGE_COUNT;
+                        try {
+                            final OutputStream outputStream = connection.createOutputStream(destination);
+                            try {
+                                final byte[] buf = new byte[BUFFER_SIZE];
+                                random.nextBytes(buf);
+                                while (count > 0 && !stopThreads.get()) {
+                                    outputStream.write(buf);
+                                    totalWritten.addAndGet(buf.length);
+                                    count--;
+                                }
+                            } finally {
+                                outputStream.close();
+                            }
+                        } catch (Exception e) {
+                            writerException = e;
+                            e.printStackTrace();
+                        } finally {
+                            LOG.info(totalWritten + " total bytes written.");
+                        }
+                    }
+                });
+
+                readerThread.start();
+                writerThread.start();
+
+                // Wait till reader is has finished receiving all the messages
+                // or he has stopped
+                // receiving messages.
+                Thread.sleep(1000);
+                int lastRead = totalRead.get();
+                while (readerThread.isAlive()) {
+                    readerThread.join(1000);
+                    // No progress?? then stop waiting..
+                    if (lastRead == totalRead.get()) {
+                        break;
+                    }
+                    lastRead = totalRead.get();
+                }
+
+                stopThreads.set(true);
+
+                assertTrue("Should not have received a reader exception", readerException == null);
+                assertTrue("Should not have received a writer exception", writerException == null);
+
+                assertEquals("Not all messages accounted for", totalWritten.get(), totalRead.get());
+
+            } finally {
+                session.close();
+            }
+        } finally {
+            connection.close();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/LoadTestBurnIn.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/LoadTestBurnIn.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/LoadTestBurnIn.java
new file mode 100644
index 0000000..4462844
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/LoadTestBurnIn.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import junit.framework.Test;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Small burn test moves sends a moderate amount of messages through the broker,
+ * to checking to make sure that the broker does not lock up after a while of
+ * sustained messaging.
+ * 
+ * 
+ */
+public class LoadTestBurnIn extends JmsTestSupport {
+    private static final transient Logger LOG = LoggerFactory.getLogger(LoadTestBurnIn.class);
+
+    public ActiveMQDestination destination;
+    public int deliveryMode;
+    public byte destinationType;
+    public boolean durableConsumer;
+    public int messageCount = 50000;
+    public int messageSize = 1024;
+
+    public static Test suite() {
+        return suite(LoadTestBurnIn.class);
+    }
+
+    protected void setUp() throws Exception {
+        LOG.info("Start: " + getName());
+        super.setUp();
+    }
+
+    protected void tearDown() throws Exception {
+        try {
+            super.tearDown();
+        } catch (Throwable e) {
+            e.printStackTrace(System.out);
+        } finally {
+            LOG.info("End: " + getName());
+        }
+    }
+
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        return BrokerFactory.createBroker(new URI("broker://(tcp://localhost:0)?useJmx=true"));
+        // return BrokerFactory.createBroker(new
+        // URI("xbean:org/apache/activemq/broker/store/loadtester.xml"));
+    }
+
+    protected ConnectionFactory createConnectionFactory() throws URISyntaxException, IOException {
+        return new ActiveMQConnectionFactory(((TransportConnector)broker.getTransportConnectors().get(0))
+            .getServer().getConnectURI());
+    }
+
+    public void initCombosForTestSendReceive() {
+        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+                                                           Integer.valueOf(DeliveryMode.PERSISTENT)});
+        addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.TOPIC_TYPE)});
+        addCombinationValues("durableConsumer", new Object[] {Boolean.TRUE});
+        addCombinationValues("messageSize", new Object[] {Integer.valueOf(101), Integer.valueOf(102),
+                                                          Integer.valueOf(103), Integer.valueOf(104),
+                                                          Integer.valueOf(105), Integer.valueOf(106),
+                                                          Integer.valueOf(107), Integer.valueOf(108)});
+    }
+
+    public void testSendReceive() throws Exception {
+
+        // Durable consumer combination is only valid with topics
+        if (durableConsumer && destinationType != ActiveMQDestination.TOPIC_TYPE) {
+            return;
+        }
+
+        connection.setClientID(getName());
+        connection.getPrefetchPolicy().setAll(1000);
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        destination = createDestination(session, destinationType);
+        MessageConsumer consumer;
+        if (durableConsumer) {
+            consumer = session.createDurableSubscriber((Topic)destination, "sub1:"
+                                                                           + System.currentTimeMillis());
+        } else {
+            consumer = session.createConsumer(destination);
+        }
+        profilerPause("Ready: ");
+
+        final CountDownLatch producerDoneLatch = new CountDownLatch(1);
+
+        // Send the messages, async
+        new Thread() {
+            public void run() {
+                Connection connection2 = null;
+                try {
+                    connection2 = factory.createConnection();
+                    Session session = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                    MessageProducer producer = session.createProducer(destination);
+                    producer.setDeliveryMode(deliveryMode);
+                    for (int i = 0; i < messageCount; i++) {
+                        BytesMessage m = session.createBytesMessage();
+                        m.writeBytes(new byte[messageSize]);
+                        producer.send(m);
+                    }
+                    producer.close();
+                } catch (JMSException e) {
+                    e.printStackTrace();
+                } finally {
+                    safeClose(connection2);
+                    producerDoneLatch.countDown();
+                }
+
+            }
+        }.start();
+
+        // Make sure all the messages were delivered.
+        Message message = null;
+        for (int i = 0; i < messageCount; i++) {
+            message = consumer.receive(5000);
+            assertNotNull("Did not get message: " + i, message);
+        }
+
+        profilerPause("Done: ");
+
+        assertNull(consumer.receiveNoWait());
+        message.acknowledge();
+
+        // Make sure the producer thread finishes.
+        assertTrue(producerDoneLatch.await(5, TimeUnit.SECONDS));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/MessageEvictionTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/MessageEvictionTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/MessageEvictionTest.java
new file mode 100644
index 0000000..b079070
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/MessageEvictionTest.java
@@ -0,0 +1,288 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
+import org.apache.activemq.broker.region.policy.FilePendingSubscriberMessageStoragePolicy;
+import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
+import org.apache.activemq.broker.region.policy.PendingSubscriberMessageStoragePolicy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MessageEvictionTest {
+    static final Logger LOG = LoggerFactory.getLogger(MessageEvictionTest.class);
+    private BrokerService broker;
+    private ConnectionFactory connectionFactory;
+    Connection connection;
+    private Session session;
+    private Topic destination;
+    private final String destinationName = "verifyEvection";
+    protected int numMessages = 2000;
+    protected String payload = new String(new byte[1024*2]);
+
+    public void setUp(PendingSubscriberMessageStoragePolicy pendingSubscriberPolicy) throws Exception {
+        broker = createBroker(pendingSubscriberPolicy);
+        broker.start();
+        connectionFactory = createConnectionFactory();
+        connection = connectionFactory.createConnection();
+        connection.start();
+        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        destination = session.createTopic(destinationName);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        connection.stop();
+        broker.stop();
+    }
+
+    @Test
+    public void testMessageEvictionMemoryUsageFileCursor() throws Exception {
+        setUp(new FilePendingSubscriberMessageStoragePolicy());
+        doTestMessageEvictionMemoryUsage();
+    }
+
+    @Test
+    public void testMessageEvictionMemoryUsageVmCursor() throws Exception {
+        setUp(new VMPendingSubscriberMessageStoragePolicy());
+        doTestMessageEvictionMemoryUsage();
+    }
+
+    @Test
+    public void testMessageEvictionDiscardedAdvisory() throws Exception {
+        setUp(new VMPendingSubscriberMessageStoragePolicy());
+
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        final CountDownLatch consumerRegistered = new CountDownLatch(1);
+        final CountDownLatch gotAdvisory = new CountDownLatch(1);
+        final CountDownLatch advisoryIsGood = new CountDownLatch(1);
+
+        executor.execute(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    ActiveMQTopic discardedAdvisoryDestination =
+                        AdvisorySupport.getMessageDiscardedAdvisoryTopic(destination);
+                    // use separate session rather than asyncDispatch on consumer session
+                    // as we want consumer session to block
+                    Session advisorySession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                    final MessageConsumer consumer = advisorySession.createConsumer(discardedAdvisoryDestination);
+                    consumer.setMessageListener(new MessageListener() {
+                        int advisoriesReceived = 0;
+                        @Override
+                        public void onMessage(Message message) {
+                            try {
+                                LOG.info("advisory:" + message);
+                                ActiveMQMessage activeMQMessage = (ActiveMQMessage) message;
+                                assertNotNull(activeMQMessage.getStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID));
+                                assertEquals(++advisoriesReceived, activeMQMessage.getIntProperty(AdvisorySupport.MSG_PROPERTY_DISCARDED_COUNT));
+                                message.acknowledge();
+                                advisoryIsGood.countDown();
+                            } catch (JMSException e) {
+                                e.printStackTrace();
+                                fail(e.toString());
+                            } finally {
+                                gotAdvisory.countDown();
+                            }
+                        }
+                    });
+                    consumerRegistered.countDown();
+                    gotAdvisory.await(120, TimeUnit.SECONDS);
+                    consumer.close();
+                    advisorySession.close();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    fail(e.toString());
+                }
+            }
+        });
+        assertTrue("we have an advisory consumer", consumerRegistered.await(60, TimeUnit.SECONDS));
+        doTestMessageEvictionMemoryUsage();
+        assertTrue("got an advisory for discarded", gotAdvisory.await(0, TimeUnit.SECONDS));
+        assertTrue("advisory is good",advisoryIsGood.await(0, TimeUnit.SECONDS));
+    }
+
+    public void doTestMessageEvictionMemoryUsage() throws Exception {
+
+        ExecutorService executor = Executors.newCachedThreadPool();
+        final CountDownLatch doAck = new CountDownLatch(1);
+        final CountDownLatch ackDone = new CountDownLatch(1);
+        final CountDownLatch consumerRegistered = new CountDownLatch(1);
+        executor.execute(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    final MessageConsumer consumer = session.createConsumer(destination);
+                    consumer.setMessageListener(new MessageListener() {
+                        @Override
+                        public void onMessage(Message message) {
+                            try {
+                                // very slow, only ack once
+                                doAck.await(60, TimeUnit.SECONDS);
+                                LOG.info("acking: " + message.getJMSMessageID());
+                                message.acknowledge();
+                                ackDone.countDown();
+                            } catch (Exception e) {
+                                e.printStackTrace();
+                                fail(e.toString());
+                            } finally {
+                                consumerRegistered.countDown();
+                                ackDone.countDown();
+                            }
+                        }
+                    });
+                    consumerRegistered.countDown();
+                    ackDone.await(60, TimeUnit.SECONDS);
+                    consumer.close();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    fail(e.toString());
+                }
+            }
+        });
+
+        assertTrue("we have a consumer", consumerRegistered.await(10, TimeUnit.SECONDS));
+
+        final AtomicInteger sent = new AtomicInteger(0);
+        final CountDownLatch sendDone = new CountDownLatch(1);
+        executor.execute(new Runnable() {
+            @Override
+            public void run() {
+               MessageProducer producer;
+               try {
+                   producer = session.createProducer(destination);
+                   for (int i=0; i< numMessages; i++) {
+                       producer.send(session.createTextMessage(payload));
+                       sent.incrementAndGet();
+                       TimeUnit.MILLISECONDS.sleep(10);
+                   }
+                   producer.close();
+                   sendDone.countDown();
+               } catch (Exception e) {
+                   sendDone.countDown();
+                   e.printStackTrace();
+                   fail(e.toString());
+               }
+            }
+        });
+
+        assertTrue("messages sending done", sendDone.await(180, TimeUnit.SECONDS));
+        assertEquals("all message were sent", numMessages, sent.get());
+
+        doAck.countDown();
+        executor.shutdown();
+        executor.awaitTermination(30, TimeUnit.SECONDS);
+
+        assertTrue("usage goes to 0 once consumer goes away", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return 0 == TestSupport.getDestination(broker,
+                        ActiveMQDestination.transform(destination)).getMemoryUsage().getPercentUsage();
+            }
+        }));
+    }
+
+    BrokerService createBroker(PendingSubscriberMessageStoragePolicy pendingSubscriberPolicy) throws Exception {
+        BrokerService brokerService = new BrokerService();
+        brokerService.addConnector("tcp://localhost:0");
+        brokerService.setUseJmx(false);
+        brokerService.setDeleteAllMessagesOnStartup(true);
+
+        // spooling to disk early so topic memory limit is not reached
+        brokerService.getSystemUsage().getMemoryUsage().setLimit(500*1024);
+
+        final List<PolicyEntry> policyEntries = new ArrayList<PolicyEntry>();
+        final PolicyEntry entry = new PolicyEntry();
+        entry.setTopic(">");
+
+        entry.setAdvisoryForDiscardingMessages(true);
+
+        // so consumer does not get over run while blocked limit the prefetch
+        entry.setTopicPrefetch(50);
+
+
+        entry.setPendingSubscriberPolicy(pendingSubscriberPolicy);
+
+        // limit the number of outstanding messages, large enough to use the file store
+        // or small enough not to blow memory limit
+        int pendingMessageLimit = 50;
+        if (pendingSubscriberPolicy instanceof FilePendingSubscriberMessageStoragePolicy) {
+            pendingMessageLimit = 500;
+        }
+        ConstantPendingMessageLimitStrategy pendingMessageLimitStrategy = new ConstantPendingMessageLimitStrategy();
+        pendingMessageLimitStrategy.setLimit(pendingMessageLimit);
+        entry.setPendingMessageLimitStrategy(pendingMessageLimitStrategy);
+
+        // to keep the limit in check and up to date rather than just the first few, evict some
+        OldestMessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy();
+        // whether to check expiry before eviction, default limit 1000 is fine as no ttl set in this test
+        //messageEvictionStrategy.setEvictExpiredMessagesHighWatermark(1000);
+        entry.setMessageEvictionStrategy(messageEvictionStrategy);
+
+        // let evicted messaged disappear
+        entry.setDeadLetterStrategy(null);
+        policyEntries.add(entry);
+
+        final PolicyMap policyMap = new PolicyMap();
+        policyMap.setPolicyEntries(policyEntries);
+        brokerService.setDestinationPolicy(policyMap);
+
+        return brokerService;
+    }
+
+    ConnectionFactory createConnectionFactory() throws Exception {
+        String url = broker.getTransportConnectors().get(0).getServer().getConnectURI().toString();
+        ActiveMQConnectionFactory factory =  new ActiveMQConnectionFactory(url);
+        factory.setWatchTopicAdvisories(false);
+        return factory;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java
new file mode 100644
index 0000000..0fb9728
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java
@@ -0,0 +1,357 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MessageListenerRedeliveryTest extends TestCase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MessageListenerRedeliveryTest.class);
+
+    private Connection connection;
+
+    @Override
+    protected void setUp() throws Exception {
+        connection = createConnection();
+    }
+
+    /**
+     * @see junit.framework.TestCase#tearDown()
+     */
+    @Override
+    protected void tearDown() throws Exception {
+        if (connection != null) {
+            connection.close();
+            connection = null;
+        }
+    }
+
+    protected RedeliveryPolicy getRedeliveryPolicy() {
+        RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
+        redeliveryPolicy.setInitialRedeliveryDelay(0);
+        redeliveryPolicy.setRedeliveryDelay(1000);
+        redeliveryPolicy.setMaximumRedeliveries(3);
+        redeliveryPolicy.setBackOffMultiplier((short)2);
+        redeliveryPolicy.setUseExponentialBackOff(true);
+        return redeliveryPolicy;
+    }
+
+    protected Connection createConnection() throws Exception {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false&marshal=true");
+        factory.setRedeliveryPolicy(getRedeliveryPolicy());
+        return factory.createConnection();
+    }
+
+    private class TestMessageListener implements MessageListener {
+
+        public int counter;
+        private final Session session;
+
+        public TestMessageListener(Session session) {
+            this.session = session;
+        }
+
+        @Override
+        public void onMessage(Message message) {
+            try {
+                LOG.info("Message Received: " + message);
+                counter++;
+                if (counter <= 4) {
+                    LOG.info("Message Rollback.");
+                    session.rollback();
+                } else {
+                    LOG.info("Message Commit.");
+                    message.acknowledge();
+                    session.commit();
+                }
+            } catch (JMSException e) {
+                LOG.error("Error when rolling back transaction");
+            }
+        }
+    }
+
+    public void testQueueRollbackConsumerListener() throws JMSException {
+        connection.start();
+
+        Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+        Queue queue = session.createQueue("queue-" + getName());
+        MessageProducer producer = createProducer(session, queue);
+        Message message = createTextMessage(session);
+        producer.send(message);
+        session.commit();
+
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        ActiveMQMessageConsumer mc = (ActiveMQMessageConsumer)consumer;
+        mc.setRedeliveryPolicy(getRedeliveryPolicy());
+
+        TestMessageListener listener = new TestMessageListener(session);
+        consumer.setMessageListener(listener);
+
+        try {
+            Thread.sleep(500);
+        } catch (InterruptedException e) {
+        }
+
+        // first try.. should get 2 since there is no delay on the
+        // first redeliver..
+        assertEquals(2, listener.counter);
+
+        try {
+            Thread.sleep(1000);
+        } catch (InterruptedException e) {
+        }
+        // 2nd redeliver (redelivery after 1 sec)
+        assertEquals(3, listener.counter);
+
+        try {
+            Thread.sleep(2000);
+        } catch (InterruptedException e) {
+        }
+        // 3rd redeliver (redelivery after 2 seconds) - it should give up after
+        // that
+        assertEquals(4, listener.counter);
+
+        // create new message
+        producer.send(createTextMessage(session));
+        session.commit();
+
+        try {
+            Thread.sleep(500);
+        } catch (InterruptedException e) {
+        }
+        // it should be committed, so no redelivery
+        assertEquals(5, listener.counter);
+
+        try {
+            Thread.sleep(1500);
+        } catch (InterruptedException e) {
+        }
+        // no redelivery, counter should still be 4
+        assertEquals(5, listener.counter);
+
+        session.close();
+    }
+
+    public void testQueueRollbackSessionListener() throws JMSException {
+        connection.start();
+
+        Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+        Queue queue = session.createQueue("queue-" + getName());
+        MessageProducer producer = createProducer(session, queue);
+        Message message = createTextMessage(session);
+        producer.send(message);
+        session.commit();
+
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        ActiveMQMessageConsumer mc = (ActiveMQMessageConsumer)consumer;
+        mc.setRedeliveryPolicy(getRedeliveryPolicy());
+
+        TestMessageListener listener = new TestMessageListener(session);
+        consumer.setMessageListener(listener);
+
+        try {
+            Thread.sleep(500);
+        } catch (InterruptedException e) {
+
+        }
+        // first try
+        assertEquals(2, listener.counter);
+
+        try {
+            Thread.sleep(1000);
+        } catch (InterruptedException e) {
+
+        }
+        // second try (redelivery after 1 sec)
+        assertEquals(3, listener.counter);
+
+        try {
+            Thread.sleep(2000);
+        } catch (InterruptedException e) {
+
+        }
+        // third try (redelivery after 2 seconds) - it should give up after that
+        assertEquals(4, listener.counter);
+
+        // create new message
+        producer.send(createTextMessage(session));
+        session.commit();
+
+        try {
+            Thread.sleep(500);
+        } catch (InterruptedException e) {
+            // ignore
+        }
+        // it should be committed, so no redelivery
+        assertEquals(5, listener.counter);
+
+        try {
+            Thread.sleep(1500);
+        } catch (InterruptedException e) {
+            // ignore
+        }
+        // no redelivery, counter should still be 4
+        assertEquals(5, listener.counter);
+
+        session.close();
+    }
+
+    public void testQueueSessionListenerExceptionRetry() throws  Exception {
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue("queue-" + getName());
+        MessageProducer producer = createProducer(session, queue);
+        Message message = createTextMessage(session, "1");
+        producer.send(message);
+        message = createTextMessage(session, "2");
+        producer.send(message);
+
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        final CountDownLatch gotMessage = new CountDownLatch(2);
+        final AtomicInteger count  = new AtomicInteger(0);
+        final int maxDeliveries = getRedeliveryPolicy().getMaximumRedeliveries();
+        final ArrayList<String> received = new ArrayList<String>();
+        consumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+                LOG.info("Message Received: " + message);
+                try {
+                    received.add(((TextMessage) message).getText());
+                } catch (JMSException e) {
+                    e.printStackTrace();
+                    fail(e.toString());
+                }
+                if (count.incrementAndGet() < maxDeliveries) {
+                    throw new RuntimeException(getName() + " force a redelivery");
+                }
+                // new blood
+                count.set(0);
+                gotMessage.countDown();
+            }
+        });
+
+        assertTrue("got message before retry expiry", gotMessage.await(20, TimeUnit.SECONDS));
+
+        for (int i=0; i<maxDeliveries; i++) {
+            assertEquals("got first redelivered: " + i, "1", received.get(i));
+        }
+        for (int i=maxDeliveries; i<maxDeliveries*2; i++) {
+            assertEquals("got first redelivered: " + i, "2", received.get(i));
+        }
+        session.close();
+    }
+
+    public void testQueueSessionListenerExceptionDlq() throws  Exception {
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue("queue-" + getName());
+        MessageProducer producer = createProducer(session, queue);
+        Message message = createTextMessage(session);
+        producer.send(message);
+
+        final Message[] dlqMessage = new Message[1];
+        ActiveMQDestination dlqDestination = new ActiveMQQueue("ActiveMQ.DLQ");
+        MessageConsumer dlqConsumer = session.createConsumer(dlqDestination);
+        final CountDownLatch gotDlqMessage = new CountDownLatch(1);
+        dlqConsumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+                LOG.info("DLQ Message Received: " + message);
+                dlqMessage[0] = message;
+                gotDlqMessage.countDown();
+            }
+        });
+
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        final int maxDeliveries = getRedeliveryPolicy().getMaximumRedeliveries();
+        final CountDownLatch gotMessage = new CountDownLatch(maxDeliveries);
+
+        consumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+                LOG.info("Message Received: " + message);
+                gotMessage.countDown();
+                throw new RuntimeException(getName() + " force a redelivery");
+            }
+        });
+
+        assertTrue("got message before retry expiry", gotMessage.await(20, TimeUnit.SECONDS));
+
+        // check DLQ
+        assertTrue("got dlq message", gotDlqMessage.await(20, TimeUnit.SECONDS));
+
+        // check DLQ message cause is captured
+        message = dlqMessage[0];
+        assertNotNull("dlq message captured", message);
+        String cause = message.getStringProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
+
+        LOG.info("DLQ'd message cause reported as: {}", cause);
+
+        assertTrue("cause 'cause' exception is remembered", cause.contains("RuntimeException"));
+        assertTrue("is correct exception", cause.contains(getName()));
+        assertTrue("cause exception is remembered", cause.contains("Throwable"));
+        assertTrue("cause policy is remembered", cause.contains("RedeliveryPolicy"));
+
+        session.close();
+    }
+
+    private TextMessage createTextMessage(Session session, String text) throws JMSException {
+        return session.createTextMessage(text);
+    }
+    private TextMessage createTextMessage(Session session) throws JMSException {
+        return session.createTextMessage("Hello");
+    }
+
+    private MessageProducer createProducer(Session session, Destination queue) throws JMSException {
+        MessageProducer producer = session.createProducer(queue);
+        producer.setDeliveryMode(getDeliveryMode());
+        return producer;
+    }
+
+    protected int getDeliveryMode() {
+        return DeliveryMode.PERSISTENT;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/MessageTransformationTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/MessageTransformationTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/MessageTransformationTest.java
new file mode 100644
index 0000000..b4efd32
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/MessageTransformationTest.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq;
+
+import javax.jms.BytesMessage;
+import javax.jms.MapMessage;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.StreamMessage;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import junit.framework.TestCase;
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMapMessage;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQObjectMessage;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQStreamMessage;
+import org.apache.activemq.command.ActiveMQTempQueue;
+import org.apache.activemq.command.ActiveMQTempTopic;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ActiveMQTopic;
+
+public class MessageTransformationTest extends TestCase {
+
+    /**
+     * Sets up the resources of the unit test.
+     * 
+     * @throws Exception
+     */
+    protected void setUp() throws Exception {
+    }
+
+    /**
+     * Clears up the resources used in the unit test.
+     */
+    protected void tearDown() throws Exception {
+    }
+
+    /**
+     * Tests transforming destinations into ActiveMQ's destination
+     * implementation.
+     */
+    public void testTransformDestination() throws Exception {
+        assertTrue("Transforming a TempQueue destination to an ActiveMQTempQueue",
+                   ActiveMQMessageTransformation.transformDestination((TemporaryQueue)new ActiveMQTempQueue()) instanceof ActiveMQTempQueue);
+
+        assertTrue("Transforming a TempTopic destination to an ActiveMQTempTopic",
+                   ActiveMQMessageTransformation.transformDestination((TemporaryTopic)new ActiveMQTempTopic()) instanceof ActiveMQTempTopic);
+
+        assertTrue("Transforming a Queue destination to an ActiveMQQueue", ActiveMQMessageTransformation.transformDestination((Queue)new ActiveMQQueue()) instanceof ActiveMQQueue);
+
+        assertTrue("Transforming a Topic destination to an ActiveMQTopic", ActiveMQMessageTransformation.transformDestination((Topic)new ActiveMQTopic()) instanceof ActiveMQTopic);
+
+        assertTrue("Transforming a Destination to an ActiveMQDestination",
+                   ActiveMQMessageTransformation.transformDestination((ActiveMQDestination)new ActiveMQTopic()) instanceof ActiveMQDestination);
+    }
+
+    /**
+     * Tests transforming messages into ActiveMQ's message implementation.
+     */
+    public void testTransformMessage() throws Exception {
+        assertTrue("Transforming a BytesMessage message into an ActiveMQBytesMessage", ActiveMQMessageTransformation.transformMessage((BytesMessage)new ActiveMQBytesMessage(),
+                                                                                                                                      null) instanceof ActiveMQBytesMessage);
+
+        assertTrue("Transforming a MapMessage message to an ActiveMQMapMessage",
+                   ActiveMQMessageTransformation.transformMessage((MapMessage)new ActiveMQMapMessage(), null) instanceof ActiveMQMapMessage);
+
+        assertTrue("Transforming an ObjectMessage message to an ActiveMQObjectMessage", ActiveMQMessageTransformation.transformMessage((ObjectMessage)new ActiveMQObjectMessage(),
+                                                                                                                                       null) instanceof ActiveMQObjectMessage);
+
+        assertTrue("Transforming a StreamMessage message to an ActiveMQStreamMessage", ActiveMQMessageTransformation.transformMessage((StreamMessage)new ActiveMQStreamMessage(),
+                                                                                                                                      null) instanceof ActiveMQStreamMessage);
+
+        assertTrue("Transforming a TextMessage message to an ActiveMQTextMessage",
+                   ActiveMQMessageTransformation.transformMessage((TextMessage)new ActiveMQTextMessage(), null) instanceof ActiveMQTextMessage);
+
+        assertTrue("Transforming an ActiveMQMessage message to an ActiveMQMessage",
+                   ActiveMQMessageTransformation.transformMessage(new ActiveMQMessage(), null) instanceof ActiveMQMessage);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java
new file mode 100644
index 0000000..0851198
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionConsumer;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.ServerSession;
+import javax.jms.ServerSessionPool;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.BlockJUnit4ClassRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// see: https://issues.apache.org/activemq/browse/AMQ-2651
+@RunWith(BlockJUnit4ClassRunner.class)
+public class OnePrefetchAsyncConsumerTest extends EmbeddedBrokerTestSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(OnePrefetchAsyncConsumerTest.class);
+
+    private Connection connection;
+    private ConnectionConsumer connectionConsumer;
+    private Queue queue;
+    private final AtomicBoolean completed = new AtomicBoolean();
+    private final AtomicBoolean success = new AtomicBoolean();
+
+    @Ignore("https://issues.apache.org/jira/browse/AMQ-5126")
+    @Test(timeout = 60 * 1000)
+    public void testPrefetchExtension() throws Exception {
+        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(queue);
+
+        // when Msg1 is acked, the PrefetchSubscription will (incorrectly?) increment its prefetchExtension
+        producer.send(session.createTextMessage("Msg1"));
+
+        // Msg2 will exhaust the ServerSessionPool (since it only has 1 ServerSession)
+        producer.send(session.createTextMessage("Msg2"));
+
+        // Msg3 will cause the test to fail as it will attempt to retrieve an additional ServerSession from
+        // an exhausted ServerSessionPool due to the (incorrectly?) incremented prefetchExtension in the
+        // PrefetchSubscription
+        producer.send(session.createTextMessage("Msg3"));
+
+        session.commit();
+
+        assertTrue("test completed on time", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return completed.get();
+            }
+        }));
+
+        assertTrue("Attempted to retrieve more than one ServerSession at a time", success.get());
+    }
+
+    @Override
+    protected ConnectionFactory createConnectionFactory() throws Exception {
+        return new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString());
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        setAutoFail(true);
+        bindAddress = "tcp://localhost:0";
+        super.setUp();
+
+        connection = createConnection();
+        queue = createQueue();
+        // note the last arg of 1, this becomes the prefetchSize in PrefetchSubscription
+        connectionConsumer = connection.createConnectionConsumer(queue, null, new TestServerSessionPool(connection), 1);
+        connection.start();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        connectionConsumer.close();
+        connection.close();
+        super.tearDown();
+    }
+
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        BrokerService answer = super.createBroker();
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry defaultEntry = new PolicyEntry();
+        // ensure prefetch is exact. only delivery next when current is acked
+        defaultEntry.setUsePrefetchExtension(false);
+        policyMap.setDefaultEntry(defaultEntry);
+        answer.setDestinationPolicy(policyMap);
+        return answer;
+    }
+
+    protected Queue createQueue() {
+        return new ActiveMQQueue(getDestinationString());
+    }
+
+    // simulates a ServerSessionPool with only 1 ServerSession
+    private class TestServerSessionPool implements ServerSessionPool {
+        Connection connection;
+        TestServerSession serverSession;
+        boolean serverSessionInUse = false;
+
+        public TestServerSessionPool(Connection connection) throws JMSException {
+            this.connection = connection;
+            this.serverSession = new TestServerSession(this);
+        }
+
+        @Override
+        public ServerSession getServerSession() throws JMSException {
+            synchronized (this) {
+                if (serverSessionInUse) {
+                    LOG.info("asked for session while in use, not serialised delivery");
+                    success.set(false);
+                    completed.set(true);
+                }
+                serverSessionInUse = true;
+                return serverSession;
+            }
+        }
+    }
+
+    private class TestServerSession implements ServerSession {
+        TestServerSessionPool pool;
+        Session session;
+
+        public TestServerSession(TestServerSessionPool pool) throws JMSException {
+            this.pool = pool;
+            session = pool.connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+            session.setMessageListener(new TestMessageListener());
+        }
+
+        @Override
+        public Session getSession() throws JMSException {
+            return session;
+        }
+
+        @Override
+        public void start() throws JMSException {
+            // use a separate thread to process the message asynchronously
+            new Thread() {
+                @Override
+                public void run() {
+                    // let the session deliver the message
+                    session.run();
+
+                    // commit the tx and return ServerSession to pool
+                    LOG.debug("Waiting on pool");
+                    synchronized (pool) {
+                        try {
+                            LOG.debug("About to call session.commit");
+                            session.commit();
+                            LOG.debug("Commit completed");
+                        } catch (JMSException e) {
+                            LOG.error("In start", e);
+                        }
+                        pool.serverSessionInUse = false;
+                    }
+                }
+            }.start();
+        }
+    }
+
+    private class TestMessageListener implements MessageListener {
+        @Override
+        public void onMessage(Message message) {
+            try {
+                String text = ((TextMessage) message).getText();
+                LOG.info("got message: " + text);
+                if (text.equals("Msg3")) {
+                    // if we get here, Exception in getServerSession() was not thrown, test is
+                    // successful this obviously doesn't happen now, need to fix prefetchExtension
+                    // computation logic in PrefetchSubscription to get here
+                    success.set(true);
+                    completed.set(true);
+                } else if (text.equals("Msg2")) {
+                    // simulate long message processing so that Msg3 comes when Msg2 is still being
+                    // processed and thus the single ServerSession is in use
+                    TimeUnit.SECONDS.sleep(4);
+                }
+            } catch (JMSException e) {
+                LOG.error("in onMessage", e);
+            } catch (InterruptedException e) {
+                LOG.error("in onMessage",e);
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/OptimizedAckTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/OptimizedAckTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/OptimizedAckTest.java
new file mode 100644
index 0000000..3899361
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/OptimizedAckTest.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.activemq.broker.BrokerRegistry;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.util.Wait;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OptimizedAckTest extends TestSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(OptimizedAckTest.class);
+    private ActiveMQConnection connection;
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        connection = (ActiveMQConnection) createConnection();
+        connection.setOptimizeAcknowledge(true);
+        connection.setOptimizeAcknowledgeTimeOut(0);
+        ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
+        prefetchPolicy.setAll(10);
+        connection.setPrefetchPolicy(prefetchPolicy);
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        connection.close();
+        super.tearDown();
+    }
+
+    public void testReceivedMessageStillInflight() throws Exception {
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue("test");
+        MessageProducer producer = session.createProducer(queue);
+        for (int i = 0; i < 10; i++) {
+            producer.send(session.createTextMessage("Hello" + i));
+        }
+
+        final RegionBroker regionBroker = (RegionBroker) BrokerRegistry.getInstance().findFirst().getRegionBroker();
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        assertTrue("prefetch full", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount());
+                return 10 == regionBroker.getDestinationStatistics().getInflight().getCount();
+            }
+        }));
+
+        for (int i = 0; i < 6; i++) {
+            javax.jms.Message msg = consumer.receive(4000);
+            assertNotNull(msg);
+            assertEquals("all prefetch is still in flight: " + i, 10, regionBroker.getDestinationStatistics().getInflight().getCount());
+        }
+
+        for (int i = 6; i < 10; i++) {
+            javax.jms.Message msg = consumer.receive(4000);
+            assertNotNull(msg);
+
+            assertTrue("most are acked but 3 remain", Wait.waitFor(new Wait.Condition() {
+                @Override
+                public boolean isSatisified() throws Exception {
+                    return 3 == regionBroker.getDestinationStatistics().getInflight().getCount();
+                }
+            }));
+        }
+    }
+
+    public void testVerySlowReceivedMessageStillInflight() throws Exception {
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue("test");
+        MessageProducer producer = session.createProducer(queue);
+        for (int i = 0; i < 10; i++) {
+            producer.send(session.createTextMessage("Hello" + i));
+        }
+
+        final RegionBroker regionBroker = (RegionBroker) BrokerRegistry.getInstance().findFirst().getRegionBroker();
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        assertTrue("prefetch full", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount());
+                return 10 == regionBroker.getDestinationStatistics().getInflight().getCount();
+            }
+        }));
+
+        for (int i = 0; i < 6; i++) {
+            Thread.sleep(400);
+            javax.jms.Message msg = consumer.receive(4000);
+            assertNotNull(msg);
+            assertEquals("all prefetch is still in flight: " + i, 10, regionBroker.getDestinationStatistics().getInflight().getCount());
+        }
+
+        for (int i = 6; i < 10; i++) {
+            Thread.sleep(400);
+            javax.jms.Message msg = consumer.receive(4000);
+            assertNotNull(msg);
+
+            assertTrue("most are acked but 3 remain", Wait.waitFor(new Wait.Condition() {
+                @Override
+                public boolean isSatisified() throws Exception {
+                    return 3 == regionBroker.getDestinationStatistics().getInflight().getCount();
+                }
+            }));
+        }
+
+    }
+
+    public void testReceivedMessageNotInFlightAfterScheduledAckFires() throws Exception {
+        connection.setOptimizedAckScheduledAckInterval(TimeUnit.SECONDS.toMillis(10));
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue("test");
+        MessageProducer producer = session.createProducer(queue);
+        for (int i = 0; i < 10; i++) {
+            producer.send(session.createTextMessage("Hello" + i));
+        }
+
+        final RegionBroker regionBroker = (RegionBroker) BrokerRegistry.getInstance().findFirst().getRegionBroker();
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        assertTrue("prefetch full", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount());
+                return 10 == regionBroker.getDestinationStatistics().getInflight().getCount();
+            }
+        }));
+
+        for (int i = 0; i < 6; i++) {
+            javax.jms.Message msg = consumer.receive(4000);
+            assertNotNull(msg);
+            assertEquals("all prefetch is still in flight: " + i, 10, regionBroker.getDestinationStatistics().getInflight().getCount());
+        }
+
+        for (int i = 6; i < 10; i++) {
+            javax.jms.Message msg = consumer.receive(4000);
+            assertNotNull(msg);
+            assertTrue("most are acked but 3 remain", Wait.waitFor(new Wait.Condition() {
+                @Override
+                public boolean isSatisified() throws Exception {
+                    return 3 == regionBroker.getDestinationStatistics().getInflight().getCount();
+                }
+            }));
+        }
+
+        assertTrue("After delay the scheduled ack should ack all inflight.", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount());
+                return 0 == regionBroker.getDestinationStatistics().getInflight().getCount();
+            }
+        }));
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/PerDestinationStoreLimitTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/PerDestinationStoreLimitTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/PerDestinationStoreLimitTest.java
new file mode 100644
index 0000000..83d9179
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/PerDestinationStoreLimitTest.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.transport.tcp.TcpTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// see: https://issues.apache.org/activemq/browse/AMQ-2668
+public class PerDestinationStoreLimitTest extends JmsTestSupport {
+    static final Logger LOG = LoggerFactory.getLogger(PerDestinationStoreLimitTest.class);
+    final String oneKb = new String(new byte[1024]);
+    
+    ActiveMQDestination queueDest = new ActiveMQQueue("PerDestinationStoreLimitTest.Queue");
+    ActiveMQDestination topicDest = new ActiveMQTopic("PerDestinationStoreLimitTest.Topic");
+
+    protected TransportConnector connector;
+    protected ActiveMQConnection connection;
+    
+    public void testDLQAfterBlockTopic() throws Exception {
+        doTestDLQAfterBlock(topicDest);
+    }
+    
+    public void testDLQAfterBlockQueue() throws Exception {
+        doTestDLQAfterBlock(queueDest);
+    }
+    
+    public void doTestDLQAfterBlock(ActiveMQDestination destination) throws Exception {
+        ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory();
+        RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
+        // Immediately sent to the DLQ on rollback, no redelivery
+        redeliveryPolicy.setMaximumRedeliveries(0);
+        factory.setRedeliveryPolicy(redeliveryPolicy);
+        
+        // Separate connection for consumer so it will not be blocked by filler thread
+        // sending when it blocks
+        connection = (ActiveMQConnection)factory.createConnection();
+        connections.add(connection);
+        connection.setClientID("someId");
+        connection.start();
+
+        final Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED);      
+        MessageConsumer consumer = destination.isQueue() ?
+                    consumerSession.createConsumer(destination) :
+                    consumerSession.createDurableSubscriber((Topic) destination, "Durable");
+        
+        connection = (ActiveMQConnection)factory.createConnection();
+        connections.add(connection);
+        connection.start();
+
+        final Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        final MessageProducer producer = session.createProducer(destination);
+        
+        final AtomicBoolean done = new AtomicBoolean(true);
+        final AtomicBoolean keepGoing = new AtomicBoolean(true);
+        final CountDownLatch fillerStarted = new CountDownLatch(1);
+
+        final AtomicLong sent = new AtomicLong(0);
+        Thread thread = new Thread("Filler") {
+            int i;
+            @Override
+            public void run() {
+                while (keepGoing.get()) {
+                    done.set(false);
+                    fillerStarted.countDown();
+                    try {
+                        producer.send(session.createTextMessage(oneKb + ++i));
+                        if (i%10 == 0) {
+                            session.commit();
+                            sent.getAndAdd(10);
+                            LOG.info("committed/sent: " + sent.get());
+                        }
+                        LOG.info("sent: " + i);
+                    } catch (JMSException e) {
+                    }
+                }
+            }
+        };
+        thread.start();
+		
+        assertTrue("filler started..", fillerStarted.await(20, TimeUnit.SECONDS));
+        waitForBlocked(done);
+
+        // consume and rollback some so message gets to DLQ
+        connection = (ActiveMQConnection)factory.createConnection();
+        connections.add(connection);
+        connection.start();
+        TextMessage msg;
+        int received = 0;
+        for (;received < sent.get(); ++received) {
+        	msg = (TextMessage) consumer.receive(4000);
+        	if (msg == null) {
+        	    LOG.info("received null on count: " + received);
+        	    break;
+        	}
+        	LOG.info("received: " + received + ", msg: " + msg.getJMSMessageID());
+        	if (received%5==0) {
+        	    if (received%3==0) {
+        	        // force the use of the DLQ which will use some more store
+        	        LOG.info("rollback on : " + received);
+        	        consumerSession.rollback();
+        	    } else {
+        	        LOG.info("commit on : " + received);
+        	        consumerSession.commit();
+        	    }
+        	}
+        }
+        LOG.info("Done:: sent: " + sent.get() + ", received: " + received);
+        keepGoing.set(false);
+        assertTrue("some were sent:", sent.get() > 0);
+        assertEquals("received what was committed", sent.get(), received);	
+    }
+
+    protected void waitForBlocked(final AtomicBoolean done)
+            throws InterruptedException {
+        while (true) {
+            Thread.sleep(1000);
+            // the producer is blocked once the done flag stays true
+            if (done.get()) {
+                LOG.info("Blocked....");
+                break;
+            }
+            done.set(true);
+        }
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService service = new BrokerService();
+        service.setDeleteAllMessagesOnStartup(true);
+        
+        service.setUseJmx(false);
+
+        service.getSystemUsage().getStoreUsage().setLimit(200*1024);
+        
+        // allow destination to use 50% of store, leaving 50% for DLQ.
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry policy = new PolicyEntry();
+        policy.setStoreUsageHighWaterMark(50);
+        policyMap.put(queueDest, policy);
+        policyMap.put(topicDest, policy);
+        service.setDestinationPolicy(policyMap);
+
+        connector = service.addConnector("tcp://localhost:0");
+        return service;
+    }
+
+    public void setUp() throws Exception {
+        setAutoFail(true);
+        super.setUp();
+    }
+    
+    protected void tearDown() throws Exception {
+        if (connection != null) {
+            TcpTransport t = (TcpTransport)connection.getTransport().narrow(TcpTransport.class);
+            t.getTransportListener().onException(new IOException("Disposed."));
+            connection.getTransport().stop();
+            super.tearDown();
+        }
+    }
+
+    protected ConnectionFactory createConnectionFactory() throws Exception {
+        return new ActiveMQConnectionFactory(connector.getConnectUri());
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ProducerFlowControlSendFailTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ProducerFlowControlSendFailTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ProducerFlowControlSendFailTest.java
new file mode 100644
index 0000000..c18eccd
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ProducerFlowControlSendFailTest.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.ResourceAllocationException;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
+import org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy;
+
+public class ProducerFlowControlSendFailTest extends ProducerFlowControlTest {
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService service = new BrokerService();
+        service.setPersistent(false);
+        service.setUseJmx(false);
+
+        // Setup a destination policy where it takes only 1 message at a time.
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry policy = new PolicyEntry();
+        policy.setMemoryLimit(1);
+        policy.setPendingSubscriberPolicy(new VMPendingSubscriberMessageStoragePolicy());
+        policy.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy());
+        policy.setProducerFlowControl(true);
+        policyMap.setDefaultEntry(policy);
+        service.setDestinationPolicy(policyMap);
+        
+        service.getSystemUsage().setSendFailIfNoSpace(true);
+
+        connector = service.addConnector("tcp://localhost:0");
+        return service;
+    }
+    
+    @Override
+    public void test2ndPubisherWithStandardConnectionThatIsBlocked() throws Exception {
+        // with sendFailIfNoSpace set, there is no blocking of the connection
+    }
+    
+    @Override
+    public void testAsyncPubisherRecoverAfterBlock() throws Exception {
+        // sendFail means no flowControllwindow as there is no producer ack, just an exception
+    }
+    
+    @Override
+    public void testPubisherRecoverAfterBlock() throws Exception {
+        ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory();
+        // with sendFail, there must be no flowControllwindow
+        // sendFail is an alternative flow control mechanism that does not block
+        factory.setUseAsyncSend(true);
+        connection = (ActiveMQConnection)factory.createConnection();
+        connections.add(connection);
+        connection.start();
+
+        final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        final MessageProducer producer = session.createProducer(queueA);
+        
+        final AtomicBoolean keepGoing = new AtomicBoolean(true);
+   
+        Thread thread = new Thread("Filler") {
+            @Override
+            public void run() {
+                while (keepGoing.get()) {
+                    try {
+                        producer.send(session.createTextMessage("Test message"));
+                        if (gotResourceException.get()) {
+                            // do not flood the broker with requests when full as we are sending async and they 
+                            // will be limited by the network buffers
+                            Thread.sleep(200);
+                        }
+                    } catch (Exception e) {
+                        // with async send, there will be no exceptions
+                        e.printStackTrace();
+                    }
+                }
+            }
+        };
+        thread.start();
+        waitForBlockedOrResourceLimit(new AtomicBoolean(false));
+
+        // resourceException on second message, resumption if we
+        // can receive 10
+        MessageConsumer consumer = session.createConsumer(queueA);
+        TextMessage msg;
+        for (int idx = 0; idx < 10; ++idx) {
+            msg = (TextMessage) consumer.receive(1000);
+            if (msg != null) {
+                msg.acknowledge();
+            }
+        }
+        keepGoing.set(false);
+    }
+
+    public void testPubisherRecoverAfterBlockWithSyncSend() throws Exception {
+        ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory();
+        factory.setExceptionListener(null);
+        factory.setUseAsyncSend(false);
+        connection = (ActiveMQConnection)factory.createConnection();
+        connections.add(connection);
+        connection.start();
+
+        final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        final MessageProducer producer = session.createProducer(queueA);
+        
+        final AtomicBoolean keepGoing = new AtomicBoolean(true);
+        final AtomicInteger exceptionCount = new AtomicInteger(0);
+        Thread thread = new Thread("Filler") {
+            @Override
+            public void run() {
+                while (keepGoing.get()) {
+                    try {
+                        producer.send(session.createTextMessage("Test message"));
+                    } catch (JMSException arg0) {
+                        if (arg0 instanceof ResourceAllocationException) {
+                            gotResourceException.set(true);
+                            exceptionCount.incrementAndGet();
+                        }
+                    }
+                }
+            }
+        };
+        thread.start();
+        waitForBlockedOrResourceLimit(new AtomicBoolean(false));
+
+        // resourceException on second message, resumption if we
+        // can receive 10
+        MessageConsumer consumer = session.createConsumer(queueA);
+        TextMessage msg;
+        for (int idx = 0; idx < 10; ++idx) {
+            msg = (TextMessage) consumer.receive(1000);
+            if (msg != null) {
+                msg.acknowledge();
+            }
+        }
+        assertTrue("we were blocked at least 5 times", 5 < exceptionCount.get());
+        keepGoing.set(false);
+    }
+    
+	@Override
+	protected ConnectionFactory createConnectionFactory() throws Exception {
+		ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connector.getConnectUri());
+		connectionFactory.setExceptionListener(new ExceptionListener() {
+				public void onException(JMSException arg0) {
+					if (arg0 instanceof ResourceAllocationException) {
+						gotResourceException.set(true);
+					}
+				}
+	        });
+		return connectionFactory;
+	}
+}


Mime
View raw message