activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [14/52] [abbrv] [partial] activemq-artemis git commit: ARTEMIS-127 Adding activemq unit test module to Artemis
Date Tue, 09 Jun 2015 16:36:45 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4950Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4950Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4950Test.java
new file mode 100644
index 0000000..acfc0f6
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4950Test.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.XASession;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.apache.activemq.ActiveMQXAConnection;
+import org.apache.activemq.ActiveMQXAConnectionFactory;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerPluginSupport;
+import org.apache.activemq.broker.BrokerRegistry;
+import org.apache.activemq.broker.BrokerRestartTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.TransactionBroker;
+import org.apache.activemq.broker.TransportConnection;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.command.TransactionInfo;
+import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.transport.failover.FailoverTransport;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Test for AMQ-4950.
+ * Simulates an error during XA prepare call.
+ */
+public class AMQ4950Test extends BrokerRestartTestSupport {
+
+    protected static final Logger LOG = LoggerFactory.getLogger(AMQ4950Test.class);
+    protected static final String simulatedExceptionMessage = "Simulating error inside tx prepare().";
+    public boolean prioritySupport = false;
+    protected String connectionUri = null;
+
+    @Override
+    protected void configureBroker(BrokerService broker) throws Exception {
+        broker.setDestinationPolicy(policyMap);
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.setUseJmx(false);
+        connectionUri = broker.addConnector("tcp://localhost:0").getPublishableConnectString();
+        broker.setPlugins(new BrokerPlugin[]{
+                new BrokerPluginSupport() {
+
+                    @Override
+                    public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
+                        getNext().prepareTransaction(context, xid);
+                        LOG.debug("BrokerPlugin.prepareTransaction() will throw an exception.");
+                        throw new XAException(simulatedExceptionMessage);
+                    }
+
+                    @Override
+                    public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
+                        LOG.debug("BrokerPlugin.commitTransaction().");
+                        super.commitTransaction(context, xid, onePhase);
+                    }
+                }
+        });
+   }
+
+    /**
+     * Creates XA transaction and invokes XA prepare().
+     * Due to registered BrokerFilter prepare will be handled by broker
+     * but then throw an exception.
+     * Prior to fixing AMQ-4950, this resulted in a ClassCastException
+     * in ConnectionStateTracker.PrepareReadonlyTransactionAction.onResponse()
+     * causing the failover transport to reconnect and replay the XA prepare().
+     */
+    public void testXAPrepareFailure() throws Exception {
+
+        assertNotNull(connectionUri);
+        ActiveMQXAConnectionFactory cf = new ActiveMQXAConnectionFactory("failover:(" + connectionUri + ")");
+        ActiveMQXAConnection xaConnection = (ActiveMQXAConnection)cf.createConnection();
+        xaConnection.start();
+        XASession session = xaConnection.createXASession();
+        XAResource resource = session.getXAResource();
+        Xid tid = createXid();
+        resource.start(tid, XAResource.TMNOFLAGS);
+
+        MessageProducer producer = session.createProducer(session.createQueue(this.getClass().getName()));
+        Message message = session.createTextMessage("Sample Message");
+        producer.send(message);
+        resource.end(tid, XAResource.TMSUCCESS);
+        try {
+            LOG.debug("Calling XA prepare(), expecting an exception");
+            int ret = resource.prepare(tid);
+            if (XAResource.XA_OK == ret) 
+                resource.commit(tid, false);
+        } catch (XAException xae) {
+            LOG.info("Received excpected XAException: {}", xae.getMessage());
+            LOG.info("Rolling back transaction {}", tid);
+            
+            // with bug AMQ-4950 the thrown error reads "Cannot call prepare now"
+            // we check that we receive the original exception message as 
+            // thrown by the BrokerPlugin
+            assertEquals(simulatedExceptionMessage, xae.getMessage());
+            resource.rollback(tid);
+        }
+        // couple of assertions
+        assertTransactionGoneFromBroker(tid);
+        assertTransactionGoneFromConnection(broker.getBrokerName(), xaConnection.getClientID(), xaConnection.getConnectionInfo().getConnectionId(), tid);
+        assertTransactionGoneFromFailoverState(xaConnection, tid);
+
+        //cleanup
+        producer.close();
+        session.close();
+        xaConnection.close();
+        LOG.debug("testXAPrepareFailure() finished.");
+    }
+
+
+    public Xid createXid() throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream os = new DataOutputStream(baos);
+        os.writeLong(++txGenerator);
+        os.close();
+        final byte[] bs = baos.toByteArray();
+
+        return new Xid() {
+            public int getFormatId() {
+                return 86;
+            }
+
+            public byte[] getGlobalTransactionId() {
+                return bs;
+            }
+
+            public byte[] getBranchQualifier() {
+                return bs;
+            }
+        };
+    }
+
+
+    private void assertTransactionGoneFromFailoverState(
+            ActiveMQXAConnection connection1, Xid tid) throws Exception {
+
+        FailoverTransport transport = (FailoverTransport) connection1.getTransport().narrow(FailoverTransport.class);
+        TransactionInfo info = new TransactionInfo(connection1.getConnectionInfo().getConnectionId(), new XATransactionId(tid), TransactionInfo.COMMIT_ONE_PHASE);
+        assertNull("transaction should not exist in the state tracker",
+                transport.getStateTracker().processCommitTransactionOnePhase(info));
+    }
+
+
+    private void assertTransactionGoneFromBroker(Xid tid) throws Exception {
+        BrokerService broker = BrokerRegistry.getInstance().lookup("localhost");
+        TransactionBroker transactionBroker = (TransactionBroker)broker.getBroker().getAdaptor(TransactionBroker.class);
+        try {
+            transactionBroker.getTransaction(null, new XATransactionId(tid), false);
+            fail("expected exception on tx not found");
+        } catch (XAException expectedOnNotFound) {
+        }
+    }
+
+
+    private void assertTransactionGoneFromConnection(String brokerName, String clientId, ConnectionId connectionId, Xid tid) throws Exception {
+        BrokerService broker = BrokerRegistry.getInstance().lookup(brokerName);
+        CopyOnWriteArrayList<TransportConnection> connections = broker.getTransportConnectors().get(0).getConnections();
+        for (TransportConnection connection: connections) {
+            if (connection.getConnectionId().equals(clientId)) {
+                try {
+                    connection.processPrepareTransaction(new TransactionInfo(connectionId, new XATransactionId(tid), TransactionInfo.PREPARE));
+                    fail("did not get expected excepton on missing transaction, it must be still there in error!");
+                } catch (IllegalStateException expectedOnNoTransaction) {
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4952Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4952Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4952Test.java
new file mode 100644
index 0000000..6a52e46
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4952Test.java
@@ -0,0 +1,505 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.bugs;
+
+import java.net.URI;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.sql.DataSource;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerFilter;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ProducerBrokerExchange;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.network.ConditionalNetworkBridgeFilterFactory;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.util.Wait;
+import org.apache.derby.jdbc.EmbeddedDataSource;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.*;
+
+/**
+ * Test creates a broker network with two brokers - producerBroker (with a
+ * message producer attached) and consumerBroker (with consumer attached)
+ * <p/>
+ * Simulates network duplicate message by stopping and restarting the
+ * consumerBroker after message (with message ID ending in 120) is persisted to
+ * consumerBrokerstore BUT BEFORE ack sent to the producerBroker over the
+ * network connection. When the network connection is reestablished the
+ * producerBroker resends message (with messageID ending in 120).
+ * <p/>
+ * Expectation:
+ * <p/>
+ * With the following policy entries set, would expect the duplicate message to
+ * be read from the store and dispatched to the consumer - where the duplicate
+ * could be detected by consumer.
+ * <p/>
+ * PolicyEntry policy = new PolicyEntry(); policy.setQueue(">");
+ * policy.setEnableAudit(false); policy.setUseCache(false);
+ * policy.setExpireMessagesPeriod(0);
+ * <p/>
+ * <p/>
+ * Note 1: Network needs to use replaywhenNoConsumers so enabling the
+ * networkAudit to avoid this scenario is not feasible.
+ * <p/>
+ * NOTE 2: Added a custom plugin to the consumerBroker so that the
+ * consumerBroker shutdown will occur after a message has been persisted to
+ * consumerBroker store but before an ACK is sent back to ProducerBroker. This
+ * is just a hack to ensure producerBroker will resend the message after
+ * shutdown.
+ */
+
+@RunWith(value = Parameterized.class)
+public class AMQ4952Test {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AMQ4952Test.class);
+
+    protected static final int MESSAGE_COUNT = 1;
+
+    protected BrokerService consumerBroker;
+    protected BrokerService producerBroker;
+
+    protected ActiveMQQueue QUEUE_NAME = new ActiveMQQueue("duptest.store");
+
+    private final CountDownLatch stopConsumerBroker = new CountDownLatch(1);
+    private final CountDownLatch consumerBrokerRestarted = new CountDownLatch(1);
+    private final CountDownLatch consumerRestartedAndMessageForwarded = new CountDownLatch(1);
+
+    private EmbeddedDataSource localDataSource;
+
+    @Parameterized.Parameter(0)
+    public boolean enableCursorAudit;
+
+    @Parameterized.Parameters(name = "enableAudit={0}")
+    public static Iterable<Object[]> getTestParameters() {
+        return Arrays.asList(new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } });
+    }
+
+    @Test
+    public void testConsumerBrokerRestart() throws Exception {
+
+        Callable consumeMessageTask = new Callable() {
+            @Override
+            public Object call() throws Exception {
+
+                int receivedMessageCount = 0;
+
+                ActiveMQConnectionFactory consumerFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:2006)?randomize=false&backup=false");
+                Connection consumerConnection = consumerFactory.createConnection();
+
+                try {
+
+                    consumerConnection.setClientID("consumer");
+                    consumerConnection.start();
+
+                    Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+                    MessageConsumer messageConsumer = consumerSession.createConsumer(QUEUE_NAME);
+
+                    while (true) {
+                        TextMessage textMsg = (TextMessage) messageConsumer.receive(5000);
+
+                        if (textMsg == null) {
+                            return receivedMessageCount;
+                        }
+
+                        receivedMessageCount++;
+                        LOG.info("*** receivedMessageCount {} message has MessageID {} ", receivedMessageCount, textMsg.getJMSMessageID());
+
+                        // on first delivery ensure the message is pending an
+                        // ack when it is resent from the producer broker
+                        if (textMsg.getJMSMessageID().endsWith("1") && receivedMessageCount == 1) {
+                            LOG.info("Waiting for restart...");
+                            consumerRestartedAndMessageForwarded.await(90, TimeUnit.SECONDS);
+                        }
+
+                        textMsg.acknowledge();
+                    }
+                } finally {
+                    consumerConnection.close();
+                }
+            }
+        };
+
+        Runnable consumerBrokerResetTask = new Runnable() {
+            @Override
+            public void run() {
+
+                try {
+                    // wait for signal
+                    stopConsumerBroker.await();
+
+                    LOG.info("********* STOPPING CONSUMER BROKER");
+
+                    consumerBroker.stop();
+                    consumerBroker.waitUntilStopped();
+
+                    LOG.info("***** STARTING CONSUMER BROKER");
+                    // do not delete messages on startup
+                    consumerBroker = createConsumerBroker(false);
+
+                    LOG.info("***** CONSUMER BROKER STARTED!!");
+                    consumerBrokerRestarted.countDown();
+
+                    assertTrue("message forwarded on time", Wait.waitFor(new Wait.Condition() {
+                        @Override
+                        public boolean isSatisified() throws Exception {
+                            LOG.info("ProducerBroker totalMessageCount: " + producerBroker.getAdminView().getTotalMessageCount());
+                            return producerBroker.getAdminView().getTotalMessageCount() == 0;
+                        }
+                    }));
+                    consumerRestartedAndMessageForwarded.countDown();
+
+                } catch (Exception e) {
+                    LOG.error("Exception when stopping/starting the consumerBroker ", e);
+                }
+
+            }
+        };
+
+        ExecutorService executor = Executors.newFixedThreadPool(2);
+
+        // start consumerBroker start/stop task
+        executor.execute(consumerBrokerResetTask);
+
+        // start consuming messages
+        Future<Integer> numberOfConsumedMessage = executor.submit(consumeMessageTask);
+
+        produceMessages();
+
+        // Wait for consumer to finish
+        int totalMessagesConsumed = numberOfConsumedMessage.get();
+
+        StringBuffer contents = new StringBuffer();
+        boolean messageInStore = isMessageInJDBCStore(localDataSource, contents);
+        LOG.debug("****number of messages received " + totalMessagesConsumed);
+
+        assertEquals("number of messages received", 2, totalMessagesConsumed);
+        assertEquals("messages left in store", true, messageInStore);
+        assertTrue("message is in dlq: " + contents.toString(), contents.toString().contains("DLQ"));
+    }
+
+    private void produceMessages() throws JMSException {
+
+        ActiveMQConnectionFactory producerFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:2003)?randomize=false&backup=false");
+        Connection producerConnection = producerFactory.createConnection();
+
+        try {
+            producerConnection.setClientID("producer");
+            producerConnection.start();
+
+            Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            final MessageProducer remoteProducer = producerSession.createProducer(QUEUE_NAME);
+
+            int i = 0;
+            while (MESSAGE_COUNT > i) {
+                String payload = "test msg " + i;
+                TextMessage msg = producerSession.createTextMessage(payload);
+                remoteProducer.send(msg);
+                i++;
+            }
+
+        } finally {
+            producerConnection.close();
+        }
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        LOG.debug("Running with enableCursorAudit set to {}", this.enableCursorAudit);
+        doSetUp();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        doTearDown();
+    }
+
+    protected void doTearDown() throws Exception {
+
+        try {
+            producerBroker.stop();
+        } catch (Exception ex) {
+        }
+        try {
+            consumerBroker.stop();
+        } catch (Exception ex) {
+        }
+    }
+
+    protected void doSetUp() throws Exception {
+        producerBroker = createProducerBroker();
+        consumerBroker = createConsumerBroker(true);
+    }
+
+    /**
+     * Producer broker listens on localhost:2003 networks to consumerBroker -
+     * localhost:2006
+     *
+     * @return
+     * @throws Exception
+     */
+    protected BrokerService createProducerBroker() throws Exception {
+
+        String networkToPorts[] = new String[] { "2006" };
+        HashMap<String, String> networkProps = new HashMap<String, String>();
+
+        networkProps.put("networkTTL", "10");
+        networkProps.put("conduitSubscriptions", "true");
+        networkProps.put("decreaseNetworkConsumerPriority", "true");
+        networkProps.put("dynamicOnly", "true");
+
+        BrokerService broker = new BrokerService();
+        broker.getManagementContext().setCreateConnector(false);
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.setBrokerName("BP");
+        broker.setAdvisorySupport(false);
+
+        // lazy init listener on broker start
+        TransportConnector transportConnector = new TransportConnector();
+        transportConnector.setUri(new URI("tcp://localhost:2003"));
+        List<TransportConnector> transportConnectors = new ArrayList<TransportConnector>();
+        transportConnectors.add(transportConnector);
+        broker.setTransportConnectors(transportConnectors);
+
+        // network to consumerBroker
+
+        if (networkToPorts != null && networkToPorts.length > 0) {
+            StringBuilder builder = new StringBuilder("static:(failover:(tcp://localhost:2006)?maxReconnectAttempts=0)?useExponentialBackOff=false");
+            NetworkConnector nc = broker.addNetworkConnector(builder.toString());
+            if (networkProps != null) {
+                IntrospectionSupport.setProperties(nc, networkProps);
+            }
+            nc.setStaticallyIncludedDestinations(Arrays.<ActiveMQDestination> asList(new ActiveMQQueue[] { QUEUE_NAME }));
+        }
+
+        // Persistence adapter
+
+        JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
+        EmbeddedDataSource remoteDataSource = new EmbeddedDataSource();
+        remoteDataSource.setDatabaseName("target/derbyDBRemoteBroker");
+        remoteDataSource.setCreateDatabase("create");
+        jdbc.setDataSource(remoteDataSource);
+        broker.setPersistenceAdapter(jdbc);
+
+        // set Policy entries
+        PolicyEntry policy = new PolicyEntry();
+
+        policy.setQueue(">");
+        policy.setEnableAudit(false);
+        policy.setUseCache(false);
+        policy.setExpireMessagesPeriod(0);
+
+        // set replay with no consumers
+        ConditionalNetworkBridgeFilterFactory conditionalNetworkBridgeFilterFactory = new ConditionalNetworkBridgeFilterFactory();
+        conditionalNetworkBridgeFilterFactory.setReplayWhenNoConsumers(true);
+        policy.setNetworkBridgeFilterFactory(conditionalNetworkBridgeFilterFactory);
+
+        PolicyMap pMap = new PolicyMap();
+        pMap.setDefaultEntry(policy);
+        broker.setDestinationPolicy(pMap);
+
+        broker.start();
+        broker.waitUntilStarted();
+
+        return broker;
+    }
+
+    /**
+     * consumerBroker - listens on localhost:2006
+     *
+     * @param deleteMessages
+     *            - drop messages when broker instance is created
+     * @return
+     * @throws Exception
+     */
+    protected BrokerService createConsumerBroker(boolean deleteMessages) throws Exception {
+
+        String scheme = "tcp";
+        String listenPort = "2006";
+
+        BrokerService broker = new BrokerService();
+        broker.getManagementContext().setCreateConnector(false);
+        broker.setDeleteAllMessagesOnStartup(deleteMessages);
+        broker.setBrokerName("BC");
+        // lazy init listener on broker start
+        TransportConnector transportConnector = new TransportConnector();
+        transportConnector.setUri(new URI(scheme + "://localhost:" + listenPort));
+        List<TransportConnector> transportConnectors = new ArrayList<TransportConnector>();
+        transportConnectors.add(transportConnector);
+        broker.setTransportConnectors(transportConnectors);
+
+        // policy entries
+
+        PolicyEntry policy = new PolicyEntry();
+
+        policy.setQueue(">");
+        policy.setEnableAudit(enableCursorAudit);
+        policy.setExpireMessagesPeriod(0);
+
+        // set replay with no consumers
+        ConditionalNetworkBridgeFilterFactory conditionalNetworkBridgeFilterFactory = new ConditionalNetworkBridgeFilterFactory();
+        conditionalNetworkBridgeFilterFactory.setReplayWhenNoConsumers(true);
+        policy.setNetworkBridgeFilterFactory(conditionalNetworkBridgeFilterFactory);
+
+        PolicyMap pMap = new PolicyMap();
+
+        pMap.setDefaultEntry(policy);
+        broker.setDestinationPolicy(pMap);
+
+        // Persistence adapter
+        JDBCPersistenceAdapter localJDBCPersistentAdapter = new JDBCPersistenceAdapter();
+        EmbeddedDataSource localDataSource = new EmbeddedDataSource();
+        localDataSource.setDatabaseName("target/derbyDBLocalBroker");
+        localDataSource.setCreateDatabase("create");
+        localJDBCPersistentAdapter.setDataSource(localDataSource);
+        broker.setPersistenceAdapter(localJDBCPersistentAdapter);
+
+        if (deleteMessages) {
+            // no plugin on restart
+            broker.setPlugins(new BrokerPlugin[] { new MyTestPlugin() });
+        }
+
+        this.localDataSource = localDataSource;
+
+        broker.start();
+        broker.waitUntilStarted();
+
+        return broker;
+    }
+
+    /**
+     * Query JDBC Store to see if messages are left
+     *
+     * @param dataSource
+     * @return
+     * @throws SQLException
+     */
+    private boolean isMessageInJDBCStore(DataSource dataSource, StringBuffer stringBuffer) throws SQLException {
+
+        boolean tableHasData = false;
+        String query = "select * from ACTIVEMQ_MSGS";
+
+        java.sql.Connection conn = dataSource.getConnection();
+        PreparedStatement s = conn.prepareStatement(query);
+
+        ResultSet set = null;
+
+        try {
+            StringBuffer headers = new StringBuffer();
+            set = s.executeQuery();
+            ResultSetMetaData metaData = set.getMetaData();
+            for (int i = 1; i <= metaData.getColumnCount(); i++) {
+
+                if (i == 1) {
+                    headers.append("||");
+                }
+                headers.append(metaData.getColumnName(i) + "||");
+            }
+            LOG.error(headers.toString());
+
+            while (set.next()) {
+                tableHasData = true;
+
+                for (int i = 1; i <= metaData.getColumnCount(); i++) {
+                    if (i == 1) {
+                        stringBuffer.append("|");
+                    }
+                    stringBuffer.append(set.getString(i) + "|");
+                }
+                LOG.error(stringBuffer.toString());
+            }
+        } finally {
+            try {
+                set.close();
+            } catch (Throwable ignore) {
+            }
+            try {
+                s.close();
+            } catch (Throwable ignore) {
+            }
+
+            conn.close();
+        }
+
+        return tableHasData;
+    }
+
+    /**
+     * plugin used to ensure consumerbroker is restared before the network
+     * message from producerBroker is acked
+     */
+    class MyTestPlugin implements BrokerPlugin {
+
+        @Override
+        public Broker installPlugin(Broker broker) throws Exception {
+            return new MyTestBroker(broker);
+        }
+    }
+
+    class MyTestBroker extends BrokerFilter {
+
+        public MyTestBroker(Broker next) {
+            super(next);
+        }
+
+        @Override
+        public void send(ProducerBrokerExchange producerExchange, org.apache.activemq.command.Message messageSend) throws Exception {
+
+            super.send(producerExchange, messageSend);
+            LOG.error("Stopping broker on send:  " + messageSend.getMessageId().getProducerSequenceId());
+            stopConsumerBroker.countDown();
+            producerExchange.getConnectionContext().setDontSendReponse(true);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5035Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5035Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5035Test.java
new file mode 100644
index 0000000..13ddd30
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5035Test.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertNotNull;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.BrokerViewMBean;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AMQ5035Test {
+
+    private static final String CLIENT_ID = "amq-test-client-id";
+    private static final String DURABLE_SUB_NAME = "testDurable";
+
+    private final String xbean = "xbean:";
+    private final String confBase = "src/test/resources/org/apache/activemq/bugs/amq5035";
+
+    private static BrokerService brokerService;
+    private String connectionUri;
+
+    @Before
+    public void setUp() throws Exception {
+        brokerService = BrokerFactory.createBroker(xbean + confBase + "/activemq.xml");
+        connectionUri = brokerService.getTransportConnectorByScheme("tcp").getPublishableConnectString();
+        brokerService.setDeleteAllMessagesOnStartup(true);
+        brokerService.start();
+        brokerService.waitUntilStarted();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        brokerService.stop();
+        brokerService.waitUntilStopped();
+    }
+
+    @Test
+    public void testFoo() throws Exception {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
+        Connection connection = factory.createConnection();
+        connection.setClientID(CLIENT_ID);
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Topic topic = session.createTopic("Test.Topic");
+        MessageConsumer consumer = session.createDurableSubscriber(topic, DURABLE_SUB_NAME);
+        consumer.close();
+
+        BrokerViewMBean brokerView = getBrokerView(DURABLE_SUB_NAME);
+        brokerView.destroyDurableSubscriber(CLIENT_ID, DURABLE_SUB_NAME);
+    }
+
+    private BrokerViewMBean getBrokerView(String testDurable) throws MalformedObjectNameException {
+        ObjectName brokerName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost");
+        BrokerViewMBean view = (BrokerViewMBean) brokerService.getManagementContext().newProxyInstance(brokerName, BrokerViewMBean.class, true);
+        assertNotNull(view);
+        return view;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5136Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5136Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5136Test.java
new file mode 100644
index 0000000..c2cb11e
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5136Test.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerRegistry;
+import org.apache.activemq.broker.BrokerService;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AMQ5136Test {
+
+    BrokerService brokerService;
+    @Before
+    public void startBroker() throws Exception {
+        brokerService = new BrokerService();
+        brokerService.setPersistent(false);
+        brokerService.start();
+    }
+
+    @After
+    public void stopBroker() throws Exception {
+        brokerService.stop();
+    }
+
+    @Test
+    public void memoryUsageOnCommit() throws Exception {
+        sendMessagesAndAssertMemoryUsage(new TransactionHandler() {
+            @Override
+            public void finishTransaction(Session session) throws JMSException {
+                session.commit();
+            }
+        });
+    }
+
+    @Test
+    public void memoryUsageOnRollback() throws Exception {
+        sendMessagesAndAssertMemoryUsage(new TransactionHandler() {
+            @Override
+            public void finishTransaction(Session session) throws JMSException {
+                session.rollback();
+            }
+        });
+    }
+
+    private void sendMessagesAndAssertMemoryUsage(TransactionHandler transactionHandler) throws Exception {
+        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
+        Connection connection = connectionFactory.createConnection();
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        Topic destination = session.createTopic("ActiveMQBug");
+        MessageProducer producer = session.createProducer(destination);
+        for (int i = 0; i < 100; i++) {
+            BytesMessage message = session.createBytesMessage();
+            message.writeBytes(generateBytes());
+            producer.send(message);
+            transactionHandler.finishTransaction(session);
+        }
+        connection.close();
+        org.junit.Assert.assertEquals(0, BrokerRegistry.getInstance().findFirst().getSystemUsage().getMemoryUsage().getPercentUsage());
+    }
+
+    private byte[] generateBytes() {
+        byte[] bytes = new byte[100000];
+        for (int i = 0; i < 100000; i++) {
+            bytes[i] = (byte) i;
+        }
+        return bytes;
+    }
+
+    private static interface TransactionHandler {
+        void finishTransaction(Session session) throws JMSException;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5212Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5212Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5212Test.java
new file mode 100644
index 0000000..4c07655
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5212Test.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.bugs;
+
+import java.util.Arrays;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQMessageProducer;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+@RunWith(value = Parameterized.class)
+public class AMQ5212Test {
+
+    BrokerService brokerService;
+
+    @Parameterized.Parameter(0)
+    public boolean concurrentStoreAndDispatchQ = true;
+
+    @Parameterized.Parameters(name = "concurrentStoreAndDispatch={0}")
+    public static Iterable<Object[]> getTestParameters() {
+        return Arrays.asList(new Object[][]{{Boolean.TRUE}, {Boolean.FALSE}});
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        start(true);
+    }
+
+    public void start(boolean deleteAllMessages) throws Exception {
+        brokerService = new BrokerService();
+        if (deleteAllMessages) {
+            brokerService.deleteAllMessages();
+        }
+        ((KahaDBPersistenceAdapter)brokerService.getPersistenceAdapter()).setConcurrentStoreAndDispatchQueues(concurrentStoreAndDispatchQ);
+        brokerService.addConnector("tcp://localhost:0");
+        brokerService.setAdvisorySupport(false);
+        brokerService.start();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        brokerService.stop();
+    }
+
+    @Test
+    public void verifyDuplicateSuppressionWithConsumer() throws Exception {
+        doVerifyDuplicateSuppression(100, 100, true);
+    }
+
+    @Test
+    public void verifyDuplicateSuppression() throws Exception {
+        doVerifyDuplicateSuppression(100, 100, false);
+    }
+
+    public void doVerifyDuplicateSuppression(final int numToSend, final int expectedTotalEnqueue, final boolean demand) throws Exception {
+        final ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerService.getTransportConnectors().get(0).getPublishableConnectString());
+        connectionFactory.setCopyMessageOnSend(false);
+        connectionFactory.setWatchTopicAdvisories(false);
+
+        final int concurrency = 40;
+        final AtomicInteger workCount = new AtomicInteger(numToSend);
+        ExecutorService executorService = Executors.newFixedThreadPool(concurrency);
+        for (int i = 0; i < concurrency; i++) {
+            executorService.execute(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        int i;
+                        while ((i = workCount.getAndDecrement()) > 0) {
+                            ActiveMQConnection activeMQConnection = (ActiveMQConnection) connectionFactory.createConnection();
+                            activeMQConnection.start();
+                            ActiveMQSession activeMQSession = (ActiveMQSession) activeMQConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+                            ActiveMQQueue dest = new ActiveMQQueue("queue-" + i + "-"
+                                    + AMQ5212Test.class.getSimpleName());
+                            ActiveMQMessageProducer activeMQMessageProducer = (ActiveMQMessageProducer) activeMQSession.createProducer(dest);
+                            if (demand) {
+                                // create demand so page in will happen
+                                activeMQSession.createConsumer(dest);
+                            }
+                            ActiveMQTextMessage message = new ActiveMQTextMessage();
+                            message.setDestination(dest);
+                            activeMQMessageProducer.send(message, null);
+
+                            // send a duplicate
+                            activeMQConnection.syncSendPacket(message);
+                            activeMQConnection.close();
+
+                        }
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+            });
+        }
+        TimeUnit.SECONDS.sleep(1);
+        executorService.shutdown();
+        executorService.awaitTermination(5, TimeUnit.MINUTES);
+
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return expectedTotalEnqueue == brokerService.getAdminView().getTotalEnqueueCount();
+            }
+        });
+        assertEquals("total enqueue as expected", expectedTotalEnqueue, brokerService.getAdminView().getTotalEnqueueCount());
+    }
+
+    @Test
+    public void verifyConsumptionOnDuplicate() throws Exception {
+
+        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerService.getTransportConnectors().get(0).getPublishableConnectString());
+        connectionFactory.setCopyMessageOnSend(false);
+        connectionFactory.setWatchTopicAdvisories(false);
+
+        ActiveMQConnection activeMQConnection = (ActiveMQConnection) connectionFactory.createConnection();
+        activeMQConnection.start();
+        ActiveMQSession activeMQSession = (ActiveMQSession) activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        ActiveMQQueue dest = new ActiveMQQueue("Q");
+        ActiveMQMessageProducer activeMQMessageProducer = (ActiveMQMessageProducer) activeMQSession.createProducer(dest);
+        ActiveMQTextMessage message = new ActiveMQTextMessage();
+        message.setDestination(dest);
+        activeMQMessageProducer.send(message, null);
+
+        // send a duplicate
+        activeMQConnection.syncSendPacket(message);
+
+        activeMQConnection.close();
+
+        // verify original can be consumed after restart
+        brokerService.stop();
+        brokerService.start(false);
+
+        connectionFactory = new ActiveMQConnectionFactory(brokerService.getTransportConnectors().get(0).getPublishableConnectString());
+        connectionFactory.setCopyMessageOnSend(false);
+        connectionFactory.setWatchTopicAdvisories(false);
+
+        activeMQConnection = (ActiveMQConnection) connectionFactory.createConnection();
+        activeMQConnection.start();
+        activeMQSession = (ActiveMQSession) activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        MessageConsumer messageConsumer = activeMQSession.createConsumer(dest);
+        Message received = messageConsumer.receive(4000);
+        assertNotNull("Got message", received);
+        assertEquals("match", message.getJMSMessageID(), received.getJMSMessageID());
+
+        activeMQConnection.close();
+    }
+
+    @Test
+    public void verifyClientAckConsumptionOnDuplicate() throws Exception {
+
+        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerService.getTransportConnectors().get(0).getPublishableConnectString());
+        connectionFactory.setCopyMessageOnSend(false);
+        connectionFactory.setWatchTopicAdvisories(false);
+
+        ActiveMQConnection activeMQConnection = (ActiveMQConnection) connectionFactory.createConnection();
+        activeMQConnection.start();
+        ActiveMQSession activeMQSession = (ActiveMQSession) activeMQConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+        ActiveMQQueue dest = new ActiveMQQueue("Q");
+
+        MessageConsumer messageConsumer = activeMQSession.createConsumer(dest);
+
+        ActiveMQMessageProducer activeMQMessageProducer = (ActiveMQMessageProducer) activeMQSession.createProducer(dest);
+        ActiveMQTextMessage message = new ActiveMQTextMessage();
+        message.setDestination(dest);
+        activeMQMessageProducer.send(message, null);
+
+        // send a duplicate
+        activeMQConnection.syncSendPacket(message);
+
+
+        Message received = messageConsumer.receive(4000);
+        assertNotNull("Got message", received);
+        assertEquals("match", message.getJMSMessageID(), received.getJMSMessageID());
+        messageConsumer.close();
+
+
+        messageConsumer = activeMQSession.createConsumer(dest);
+        received = messageConsumer.receive(4000);
+        assertNotNull("Got message", received);
+        assertEquals("match", message.getJMSMessageID(), received.getJMSMessageID());
+        received.acknowledge();
+
+        activeMQConnection.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266SingleDestTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266SingleDestTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266SingleDestTest.java
new file mode 100644
index 0000000..0d7f44b
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266SingleDestTest.java
@@ -0,0 +1,602 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueConnection;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.RedeliveryPolicy;
+import org.apache.activemq.TestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ Non transactional concurrent producer/consumer to single dest
+ */
+@RunWith(Parameterized.class)
+public class AMQ5266SingleDestTest {
+    static Logger LOG = LoggerFactory.getLogger(AMQ5266SingleDestTest.class);
+    String activemqURL;
+    BrokerService brokerService;
+
+    public int numDests = 1;
+    public int messageSize = 10*1000;
+
+    @Parameterized.Parameter(0)
+    public int publisherMessagesPerThread = 1000;
+
+    @Parameterized.Parameter(1)
+    public int publisherThreadCount = 20;
+
+    @Parameterized.Parameter(2)
+    public int consumerThreadsPerQueue = 5;
+
+    @Parameterized.Parameter(3)
+    public int destMemoryLimit = 50 * 1024;
+
+    @Parameterized.Parameter(4)
+    public boolean useCache = true;
+
+    @Parameterized.Parameter(5)
+    public TestSupport.PersistenceAdapterChoice persistenceAdapterChoice = TestSupport.PersistenceAdapterChoice.KahaDB;
+
+    @Parameterized.Parameter(6)
+    public boolean optimizeDispatch = false;
+
+    @Parameterized.Parameters(name="#{0},producerThreads:{1},consumerThreads:{2},mL:{3},useCache:{4},useDefaultStore:{5},optimizedDispatch:{6}")
+    public static Iterable<Object[]> parameters() {
+        return Arrays.asList(new Object[][]{
+               {1000,  40,  40,   1024*1024*1,  true, TestSupport.PersistenceAdapterChoice.KahaDB, false},
+               {1000,  40,  40,   1024*1024*1,  true, TestSupport.PersistenceAdapterChoice.LevelDB, false},
+               {1000,  40,  40,   1024*1024*1,  true, TestSupport.PersistenceAdapterChoice.JDBC, false},
+        });
+    }
+
+    public int consumerBatchSize = 25;
+
+    @BeforeClass
+    public static void derbyTestMode() throws Exception {
+        System.setProperty("derby.system.durability","test");
+    }
+
+    @Before
+    public void startBroker() throws Exception {
+        brokerService = new BrokerService();
+
+        TestSupport.setPersistenceAdapter(brokerService, persistenceAdapterChoice);
+        brokerService.setDeleteAllMessagesOnStartup(true);
+        brokerService.setUseJmx(false);
+        brokerService.setAdvisorySupport(false);
+
+
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry defaultEntry = new PolicyEntry();
+        defaultEntry.setUseConsumerPriority(false); // java.lang.IllegalArgumentException: Comparison method violates its general contract!
+        defaultEntry.setMaxProducersToAudit(publisherThreadCount);
+        defaultEntry.setEnableAudit(true);
+        defaultEntry.setUseCache(useCache);
+        defaultEntry.setMaxPageSize(1000);
+        defaultEntry.setOptimizedDispatch(optimizeDispatch);
+        defaultEntry.setMemoryLimit(destMemoryLimit);
+        defaultEntry.setExpireMessagesPeriod(0);
+        policyMap.setDefaultEntry(defaultEntry);
+        brokerService.setDestinationPolicy(policyMap);
+
+        brokerService.getSystemUsage().getMemoryUsage().setLimit(64 * 1024 * 1024);
+
+        TransportConnector transportConnector = brokerService.addConnector("tcp://0.0.0.0:0");
+        brokerService.start();
+        activemqURL = transportConnector.getPublishableConnectString();
+        activemqURL += "?jms.watchTopicAdvisories=false"; // ensure all messages are queue or dlq messages
+    }
+
+    @After
+    public void stopBroker() throws Exception {
+        if (brokerService != null) {
+            brokerService.stop();
+        }
+    }
+
+    @Test
+    public void test() throws Exception {
+
+        String activemqQueues = "activemq";
+        for (int i=1;i<numDests;i++) {
+            activemqQueues +=",activemq"+i;
+        }
+
+        int consumerWaitForConsumption = 5 * 60 * 1000;
+
+        ExportQueuePublisher publisher = null;
+        ExportQueueConsumer consumer = null;
+
+        LOG.info("Publisher will publish " + (publisherMessagesPerThread * publisherThreadCount) + " messages to each queue specified.");
+        LOG.info("\nBuilding Publisher...");
+
+        publisher = new ExportQueuePublisher(activemqURL, activemqQueues, publisherMessagesPerThread, publisherThreadCount);
+
+        LOG.info("Building Consumer...");
+
+        consumer = new ExportQueueConsumer(activemqURL, activemqQueues, consumerThreadsPerQueue, consumerBatchSize, publisherMessagesPerThread * publisherThreadCount);
+
+        long totalStart = System.currentTimeMillis();
+
+        LOG.info("Starting Publisher...");
+
+        publisher.start();
+
+        LOG.info("Starting Consumer...");
+
+        consumer.start();
+
+        int distinctPublishedCount = 0;
+
+
+        LOG.info("Waiting For Publisher Completion...");
+
+        publisher.waitForCompletion();
+
+        List publishedIds = publisher.getIDs();
+        distinctPublishedCount = new TreeSet(publishedIds).size();
+
+        LOG.info("Publisher Complete. Published: " + publishedIds.size() + ", Distinct IDs Published: " + distinctPublishedCount);
+        LOG.info("Publisher duration: {}", TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - totalStart));
+
+
+        long endWait = System.currentTimeMillis() + consumerWaitForConsumption;
+        while (!consumer.completed() && System.currentTimeMillis() < endWait) {
+            try {
+                int secs = (int) (endWait - System.currentTimeMillis()) / 1000;
+                LOG.info("Waiting For Consumer Completion. Time left: " + secs + " secs");
+                Thread.sleep(1000);
+            } catch (Exception e) {
+            }
+        }
+
+        LOG.info("\nConsumer Complete: " + consumer.completed() +", Shutting Down.");
+
+        LOG.info("Total duration: {}", TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - totalStart));
+
+        consumer.shutdown();
+
+        TimeUnit.SECONDS.sleep(2);
+
+        LOG.info("Consumer Stats:");
+
+        for (Map.Entry<String, List<String>> entry : consumer.getIDs().entrySet()) {
+
+            List<String> idList = entry.getValue();
+
+            int distinctConsumed = new TreeSet<String>(idList).size();
+
+            StringBuilder sb = new StringBuilder();
+            sb.append("   Queue: " + entry.getKey() +
+                    " -> Total Messages Consumed: " + idList.size() +
+                    ", Distinct IDs Consumed: " + distinctConsumed);
+
+            int diff = distinctPublishedCount - distinctConsumed;
+            sb.append(" ( " + (diff > 0 ? diff : "NO") + " STUCK MESSAGES " + " ) ");
+            LOG.info(sb.toString());
+
+            assertEquals("expect to get all messages!", 0, diff);
+
+        }
+
+        // verify empty dlq
+        assertEquals("No pending messages", 0l, ((RegionBroker) brokerService.getRegionBroker()).getDestinationStatistics().getMessages().getCount());
+    }
+
+    public class ExportQueuePublisher {
+
+        private final String amqUser = ActiveMQConnection.DEFAULT_USER;
+        private final String amqPassword = ActiveMQConnection.DEFAULT_PASSWORD;
+        private ActiveMQConnectionFactory connectionFactory = null;
+        private String activemqURL = null;
+        private String activemqQueues = null;
+        // Collection of distinct IDs that the publisher has published.
+        // After a message is published, its UUID will be written to this list for tracking.
+        // This list of IDs (or distinct count) will be used to compare to the consumed list of IDs.
+        //private Set<String> ids = Collections.synchronizedSet(new TreeSet<String>());
+        private List<String> ids = Collections.synchronizedList(new ArrayList<String>());
+        private List<PublisherThread> threads;
+
+        public ExportQueuePublisher(String activemqURL, String activemqQueues, int messagesPerThread, int threadCount) throws Exception {
+
+            this.activemqURL = activemqURL;
+            this.activemqQueues = activemqQueues;
+
+            threads = new ArrayList<PublisherThread>();
+
+            // Build the threads and tell them how many messages to publish
+            for (int i = 0; i < threadCount; i++) {
+                PublisherThread pt = new PublisherThread(messagesPerThread);
+                threads.add(pt);
+            }
+        }
+
+        public List<String> getIDs() {
+            return ids;
+        }
+
+        // Kick off threads
+        public void start() throws Exception {
+
+            for (PublisherThread pt : threads) {
+                pt.start();
+            }
+        }
+
+        // Wait for threads to complete. They will complete once they've published all of their messages.
+        public void waitForCompletion() throws Exception {
+
+            for (PublisherThread pt : threads) {
+                pt.join();
+                pt.close();
+            }
+        }
+
+        private Session newSession(QueueConnection queueConnection) throws Exception {
+            return queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        }
+
+        private synchronized QueueConnection newQueueConnection() throws Exception {
+
+            if (connectionFactory == null) {
+                connectionFactory = new ActiveMQConnectionFactory(amqUser, amqPassword, activemqURL);
+            }
+
+            // Set the redelivery count to -1 (infinite), or else messages will start dropping
+            // after the queue has had a certain number of failures (default is 6)
+            RedeliveryPolicy policy = connectionFactory.getRedeliveryPolicy();
+            policy.setMaximumRedeliveries(-1);
+
+            QueueConnection amqConnection = connectionFactory.createQueueConnection();
+            amqConnection.start();
+            return amqConnection;
+        }
+
+        private class PublisherThread extends Thread {
+
+            private int count;
+            private QueueConnection qc;
+            private Session session;
+            private MessageProducer mp;
+
+            private PublisherThread(int count) throws Exception {
+
+                this.count = count;
+
+                // Each Thread has its own Connection and Session, so no sync worries
+                qc = newQueueConnection();
+                session = newSession(qc);
+
+                // In our code, when publishing to multiple queues,
+                // we're using composite destinations like below
+                Queue q = new ActiveMQQueue(activemqQueues);
+                mp = session.createProducer(q);
+            }
+
+            public void run() {
+
+                try {
+
+                    // Loop until we've published enough messages
+                    while (count-- > 0) {
+
+                        TextMessage tm = session.createTextMessage(getMessageText());
+                        String id = UUID.randomUUID().toString();
+                        tm.setStringProperty("KEY", id);
+                        ids.add(id);                            // keep track of the key to compare against consumer
+
+                        mp.send(tm);
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+
+            // Called by waitForCompletion
+            public void close() {
+
+                try {
+                    mp.close();
+                } catch (Exception e) {
+                }
+
+                try {
+                    session.close();
+                } catch (Exception e) {
+                }
+
+                try {
+                    qc.close();
+                } catch (Exception e) {
+                }
+            }
+        }
+
+    }
+
+    String messageText;
+    private String getMessageText() {
+
+        if (messageText == null) {
+
+            synchronized (this) {
+
+                if (messageText == null) {
+
+                    StringBuilder sb = new StringBuilder();
+                    for (int i = 0; i < messageSize; i++) {
+                        sb.append("X");
+                    }
+                    messageText = sb.toString();
+                }
+            }
+        }
+
+        return messageText;
+    }
+
+
+    public class ExportQueueConsumer {
+
+        private final String amqUser = ActiveMQConnection.DEFAULT_USER;
+        private final String amqPassword = ActiveMQConnection.DEFAULT_PASSWORD;
+        private final int totalToExpect;
+        private ActiveMQConnectionFactory connectionFactory = null;
+        private String activemqURL = null;
+        private String activemqQueues = null;
+        private String[] queues = null;
+        // Map of IDs that were consumed, keyed by queue name.
+        // We'll compare these against what was published to know if any got stuck or dropped.
+        private Map<String, List<String>> idsByQueue = new HashMap<String, List<String>>();
+        private Map<String, List<ConsumerThread>> threads;
+
+        public ExportQueueConsumer(String activemqURL, String activemqQueues, int threadsPerQueue, int batchSize, int totalToExpect) throws Exception {
+
+            this.activemqURL = activemqURL;
+            this.activemqQueues = activemqQueues;
+            this.totalToExpect = totalToExpect;
+
+            queues = this.activemqQueues.split(",");
+
+            for (int i = 0; i < queues.length; i++) {
+                queues[i] = queues[i].trim();
+            }
+
+            threads = new HashMap<String, List<ConsumerThread>>();
+
+            // For each queue, create a list of threads and set up the list of ids
+            for (String q : queues) {
+
+                List<ConsumerThread> list = new ArrayList<ConsumerThread>();
+
+                idsByQueue.put(q, Collections.synchronizedList(new ArrayList<String>()));
+
+                for (int i = 0; i < threadsPerQueue; i++) {
+                    list.add(new ConsumerThread(q, batchSize));
+                }
+
+                threads.put(q, list);
+            }
+        }
+
+        public Map<String, List<String>> getIDs() {
+            return idsByQueue;
+        }
+
+        // Start the threads
+        public void start() throws Exception {
+
+            for (List<ConsumerThread> list : threads.values()) {
+
+                for (ConsumerThread ct : list) {
+
+                    ct.start();
+                }
+            }
+        }
+
+        // Tell the threads to stop
+        // Then wait for them to stop
+        public void shutdown() throws Exception {
+
+            for (List<ConsumerThread> list : threads.values()) {
+
+                for (ConsumerThread ct : list) {
+
+                    ct.shutdown();
+                }
+            }
+
+            for (List<ConsumerThread> list : threads.values()) {
+
+                for (ConsumerThread ct : list) {
+
+                    ct.join();
+                }
+            }
+        }
+
+        private Session newSession(QueueConnection queueConnection) throws Exception {
+            return queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        }
+
+        private synchronized QueueConnection newQueueConnection() throws Exception {
+
+            if (connectionFactory == null) {
+                connectionFactory = new ActiveMQConnectionFactory(amqUser, amqPassword, activemqURL);
+            }
+
+            // Set the redelivery count to -1 (infinite), or else messages will start dropping
+            // after the queue has had a certain number of failures (default is 6)
+            RedeliveryPolicy policy = connectionFactory.getRedeliveryPolicy();
+            policy.setMaximumRedeliveries(-1);
+
+            QueueConnection amqConnection = connectionFactory.createQueueConnection();
+            amqConnection.start();
+            return amqConnection;
+        }
+
+        public boolean completed() {
+            for (List<ConsumerThread> list : threads.values()) {
+
+                for (ConsumerThread ct : list) {
+
+                    if (ct.isAlive()) {
+                        LOG.info("thread for {} is still alive.", ct.qName);
+                        return false;
+                    }
+                }
+            }
+            return true;
+        }
+
+        private class ConsumerThread extends Thread {
+
+            private int batchSize;
+            private QueueConnection qc;
+            private Session session;
+            private MessageConsumer mc;
+            private List<String> idList;
+            private boolean shutdown = false;
+            private String qName;
+
+            private ConsumerThread(String queueName, int batchSize) throws Exception {
+
+                this.batchSize = batchSize;
+
+                // Each thread has its own connection and session
+                qName = queueName;
+                qc = newQueueConnection();
+                session = newSession(qc);
+                Queue q = session.createQueue(queueName + "?consumer.prefetchSize=" + batchSize);
+                mc = session.createConsumer(q);
+
+                idList = idsByQueue.get(queueName);
+            }
+
+            public void run() {
+
+                try {
+
+                    int count = 0;
+
+                    // Keep reading as long as it hasn't been told to shutdown
+                    while (!shutdown) {
+
+                        if (idList.size() >= totalToExpect) {
+                            LOG.info("Got {} for q: {}", +idList.size(), qName);
+                            break;
+                        }
+                        Message m = mc.receive(4000);
+
+                        if (m != null) {
+
+                            // We received a non-null message, add the ID to our list
+
+                            idList.add(m.getStringProperty("KEY"));
+
+                            count++;
+
+                            // If we've reached our batch size, commit the batch and reset the count
+
+                            if (count == batchSize) {
+                                count = 0;
+                            }
+                        } else {
+
+                            // We didn't receive anything this time, commit any current batch and reset the count
+
+                            count = 0;
+
+                            // Sleep a little before trying to read after not getting a message
+
+                            try {
+                                if (idList.size() < totalToExpect) {
+                                    LOG.info("did not receive on {}, current count: {}", qName, idList.size());
+                                }
+                                //sleep(3000);
+                            } catch (Exception e) {
+                            }
+                        }
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                } finally {
+
+                    // Once we exit, close everything
+                    close();
+                }
+            }
+
+            public void shutdown() {
+                shutdown = true;
+            }
+
+            public void close() {
+
+                try {
+                    mc.close();
+                } catch (Exception e) {
+                }
+
+                try {
+                    session.close();
+                } catch (Exception e) {
+                }
+
+                try {
+                    qc.close();
+                } catch (Exception e) {
+
+                }
+            }
+        }
+    }
+}


Mime
View raw message