activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [27/38] activemq-artemis git commit: open wire changes equivalent to ab16f7098fb52d2b4c40627ed110e1776525f208
Date Thu, 18 Feb 2016 02:48:43 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e52d54e9/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1866.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1866.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1866.java
deleted file mode 100644
index ad12f71..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1866.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.util.ArrayList;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.jms.Connection;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import junit.framework.TestCase;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.leveldb.LevelDBStore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This is a test case for the issue reported at:
- * https://issues.apache.org/activemq/browse/AMQ-1866
- *
- * If you have a JMS producer sending messages to multiple fast consumers and
- * one slow consumer, eventually all consumers will run as slow as
- * the slowest consumer.
- */
-public class AMQ1866 extends TestCase {
-
-   private static final Logger log = LoggerFactory.getLogger(ConsumerThread.class);
-   private BrokerService brokerService;
-   private ArrayList<Thread> threads = new ArrayList<>();
-
-   private final String ACTIVEMQ_BROKER_BIND = "tcp://localhost:0";
-   private String ACTIVEMQ_BROKER_URI;
-
-   AtomicBoolean shutdown = new AtomicBoolean();
-   private ActiveMQQueue destination;
-
-   @Override
-   protected void setUp() throws Exception {
-      // Start an embedded broker up.
-      brokerService = new BrokerService();
-      LevelDBStore adaptor = new LevelDBStore();
-      brokerService.setPersistenceAdapter(adaptor);
-      brokerService.deleteAllMessages();
-
-      // A small max page size makes this issue occur faster.
-      PolicyMap policyMap = new PolicyMap();
-      PolicyEntry pe = new PolicyEntry();
-      pe.setMaxPageSize(1);
-      policyMap.put(new ActiveMQQueue(">"), pe);
-      brokerService.setDestinationPolicy(policyMap);
-
-      brokerService.addConnector(ACTIVEMQ_BROKER_BIND);
-      brokerService.start();
-
-      ACTIVEMQ_BROKER_URI = brokerService.getTransportConnectors().get(0).getPublishableConnectString();
-      destination = new ActiveMQQueue(getName());
-   }
-
-   @Override
-   protected void tearDown() throws Exception {
-      // Stop any running threads.
-      shutdown.set(true);
-      for (Thread t : threads) {
-         t.interrupt();
-         t.join();
-      }
-      brokerService.stop();
-   }
-
-   public void testConsumerSlowDownPrefetch0() throws Exception {
-      ACTIVEMQ_BROKER_URI = ACTIVEMQ_BROKER_URI + "?jms.prefetchPolicy.queuePrefetch=0";
-      doTestConsumerSlowDown();
-   }
-
-   public void testConsumerSlowDownPrefetch10() throws Exception {
-      ACTIVEMQ_BROKER_URI = ACTIVEMQ_BROKER_URI + "?jms.prefetchPolicy.queuePrefetch=10";
-      doTestConsumerSlowDown();
-   }
-
-   public void testConsumerSlowDownDefaultPrefetch() throws Exception {
-      doTestConsumerSlowDown();
-   }
-
-   public void doTestConsumerSlowDown() throws Exception {
-
-      // Preload the queue.
-      produce(20000);
-
-      Thread producer = new Thread() {
-         @Override
-         public void run() {
-            try {
-               while (!shutdown.get()) {
-                  produce(1000);
-               }
-            }
-            catch (Exception e) {
-            }
-         }
-      };
-      threads.add(producer);
-      producer.start();
-
-      // This is the slow consumer.
-      ConsumerThread c1 = new ConsumerThread("Consumer-1");
-      threads.add(c1);
-      c1.start();
-
-      // Wait a bit so that the slow consumer gets assigned most of the messages.
-      Thread.sleep(500);
-      ConsumerThread c2 = new ConsumerThread("Consumer-2");
-      threads.add(c2);
-      c2.start();
-
-      int totalReceived = 0;
-      for (int i = 0; i < 30; i++) {
-         Thread.sleep(1000);
-         long c1Counter = c1.counter.getAndSet(0);
-         long c2Counter = c2.counter.getAndSet(0);
-         log.debug("c1: " + c1Counter + ", c2: " + c2Counter);
-         totalReceived += c1Counter;
-         totalReceived += c2Counter;
-
-         // Once message have been flowing for a few seconds, start asserting that c2 always gets messages.  It should be receiving about 100 / sec
-         if (i > 10) {
-            assertTrue("Total received=" + totalReceived + ", Consumer 2 should be receiving new messages every second.", c2Counter > 0);
-         }
-      }
-   }
-
-   public void produce(int count) throws Exception {
-      Connection connection = null;
-      try {
-         ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_BROKER_URI);
-         factory.setDispatchAsync(true);
-
-         connection = factory.createConnection();
-
-         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         MessageProducer producer = session.createProducer(destination);
-         connection.start();
-
-         for (int i = 0; i < count; i++) {
-            producer.send(session.createTextMessage(getName() + " Message " + (++i)));
-         }
-
-      }
-      finally {
-         try {
-            connection.close();
-         }
-         catch (Throwable e) {
-         }
-      }
-   }
-
-   public class ConsumerThread extends Thread {
-
-      final AtomicLong counter = new AtomicLong();
-
-      public ConsumerThread(String threadId) {
-         super(threadId);
-      }
-
-      @Override
-      public void run() {
-         Connection connection = null;
-         try {
-            log.debug(getName() + ": is running");
-
-            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_BROKER_URI);
-            factory.setDispatchAsync(true);
-
-            connection = factory.createConnection();
-
-            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            MessageConsumer consumer = session.createConsumer(destination);
-            connection.start();
-
-            while (!shutdown.get()) {
-               TextMessage msg = (TextMessage) consumer.receive(1000);
-               if (msg != null) {
-                  int sleepingTime;
-                  if (getName().equals("Consumer-1")) {
-                     sleepingTime = 1000 * 1000;
-                  }
-                  else {
-                     sleepingTime = 1;
-                  }
-                  counter.incrementAndGet();
-                  Thread.sleep(sleepingTime);
-               }
-            }
-
-         }
-         catch (Exception e) {
-         }
-         finally {
-            log.debug(getName() + ": is stopping");
-            try {
-               connection.close();
-            }
-            catch (Throwable e) {
-            }
-         }
-      }
-
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e52d54e9/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1893Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1893Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1893Test.java
deleted file mode 100644
index b9cb919..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1893Test.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import junit.framework.TestCase;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class AMQ1893Test extends TestCase {
-
-   private static final Logger log = LoggerFactory.getLogger(AMQ1893Test.class);
-
-   static final String QUEUE_NAME = "TEST";
-
-   static final int MESSAGE_COUNT_OF_ONE_GROUP = 10000;
-
-   static final int[] PRIORITIES = new int[]{0, 5, 10};
-
-   static final boolean debug = false;
-
-   private BrokerService brokerService;
-
-   private ActiveMQQueue destination;
-
-   @Override
-   protected void setUp() throws Exception {
-      brokerService = new BrokerService();
-      brokerService.setDeleteAllMessagesOnStartup(true);
-      brokerService.addConnector("tcp://localhost:0");
-      brokerService.start();
-      destination = new ActiveMQQueue(QUEUE_NAME);
-   }
-
-   @Override
-   protected void tearDown() throws Exception {
-      // Stop any running threads.
-      brokerService.stop();
-   }
-
-   public void testProduceConsumeWithSelector() throws Exception {
-      new TestProducer().produceMessages();
-      new TestConsumer().consume();
-   }
-
-   class TestProducer {
-
-      public void produceMessages() throws Exception {
-         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerService.getTransportConnectors().get(0).getConnectUri().toString());
-         Connection connection = connectionFactory.createConnection();
-         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         Destination destination = session.createQueue(QUEUE_NAME);
-         MessageProducer producer = session.createProducer(destination);
-         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
-
-         long start = System.currentTimeMillis();
-
-         for (int priority : PRIORITIES) {
-
-            String name = null;
-            if (priority == 10) {
-               name = "high";
-            }
-            else if (priority == 5) {
-               name = "mid";
-            }
-            else {
-               name = "low";
-            }
-
-            for (int i = 1; i <= MESSAGE_COUNT_OF_ONE_GROUP; i++) {
-
-               TextMessage message = session.createTextMessage(name + "_" + i);
-               message.setIntProperty("priority", priority);
-
-               producer.send(message);
-            }
-         }
-
-         long end = System.currentTimeMillis();
-
-         log.info("sent " + (MESSAGE_COUNT_OF_ONE_GROUP * 3) + " messages in " + (end - start) + " ms");
-
-         producer.close();
-         session.close();
-         connection.close();
-      }
-   }
-
-   class TestConsumer {
-
-      private CountDownLatch finishLatch = new CountDownLatch(1);
-
-      public void consume() throws Exception {
-         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerService.getTransportConnectors().get(0).getConnectUri().toString());
-
-         final int totalMessageCount = MESSAGE_COUNT_OF_ONE_GROUP * PRIORITIES.length;
-         final AtomicInteger counter = new AtomicInteger();
-         final MessageListener listener = new MessageListener() {
-            @Override
-            public void onMessage(Message message) {
-
-               if (debug) {
-                  try {
-                     log.info(((TextMessage) message).getText());
-                  }
-                  catch (JMSException e) {
-                     e.printStackTrace();
-                  }
-               }
-
-               if (counter.incrementAndGet() == totalMessageCount) {
-
-                  finishLatch.countDown();
-
-               }
-            }
-         };
-
-         int consumerCount = PRIORITIES.length;
-         Connection[] connections = new Connection[consumerCount];
-         Session[] sessions = new Session[consumerCount];
-         MessageConsumer[] consumers = new MessageConsumer[consumerCount];
-
-         for (int i = 0; i < consumerCount; i++) {
-            String selector = "priority = " + PRIORITIES[i];
-
-            connections[i] = connectionFactory.createConnection();
-            sessions[i] = connections[i].createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-            consumers[i] = sessions[i].createConsumer(destination, selector);
-            consumers[i].setMessageListener(listener);
-         }
-
-         for (Connection connection : connections) {
-            connection.start();
-         }
-
-         log.info("received " + counter.get() + " messages");
-
-         assertTrue("got all messages in time", finishLatch.await(60, TimeUnit.SECONDS));
-
-         log.info("received " + counter.get() + " messages");
-
-         for (MessageConsumer consumer : consumers) {
-            consumer.close();
-         }
-
-         for (Session session : sessions) {
-            session.close();
-         }
-
-         for (Connection connection : connections) {
-            connection.close();
-         }
-      }
-
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e52d54e9/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1917Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1917Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1917Test.java
deleted file mode 100644
index a7eb699..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1917Test.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import junit.framework.TestCase;
-
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.ActiveMQDestination;
-
-public class AMQ1917Test extends TestCase {
-
-   private static final int NUM_MESSAGES = 4000;
-   private static final int NUM_THREADS = 10;
-   private static final String REQUEST_QUEUE = "mock.in.queue";
-   private static final String REPLY_QUEUE = "mock.out.queue";
-
-   private Destination requestDestination = ActiveMQDestination.createDestination(REQUEST_QUEUE, ActiveMQDestination.QUEUE_TYPE);
-   private Destination replyDestination = ActiveMQDestination.createDestination(REPLY_QUEUE, ActiveMQDestination.QUEUE_TYPE);
-
-   private CountDownLatch roundTripLatch = new CountDownLatch(NUM_MESSAGES);
-   private CountDownLatch errorLatch = new CountDownLatch(1);
-   private ThreadPoolExecutor tpe;
-   private final String BROKER_URL = "tcp://localhost:0";
-   private String connectionUri;
-   private BrokerService broker = null;
-   private boolean working = true;
-
-   // trival session/producer pool
-   final Session[] sessions = new Session[NUM_THREADS];
-   final MessageProducer[] producers = new MessageProducer[NUM_THREADS];
-
-   @Override
-   public void setUp() throws Exception {
-      broker = new BrokerService();
-      broker.setPersistent(false);
-      broker.addConnector(BROKER_URL);
-      broker.start();
-
-      connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
-
-      BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(10000);
-      tpe = new ThreadPoolExecutor(NUM_THREADS, NUM_THREADS, 60000, TimeUnit.MILLISECONDS, queue);
-      ThreadFactory limitedthreadFactory = new LimitedThreadFactory(tpe.getThreadFactory());
-      tpe.setThreadFactory(limitedthreadFactory);
-   }
-
-   @Override
-   public void tearDown() throws Exception {
-      broker.stop();
-      tpe.shutdown();
-   }
-
-   public void testLoadedSendReceiveWithCorrelationId() throws Exception {
-
-      ActiveMQConnectionFactory connectionFactory = new org.apache.activemq.ActiveMQConnectionFactory();
-      connectionFactory.setBrokerURL(connectionUri);
-      Connection connection = connectionFactory.createConnection();
-      setupReceiver(connection);
-
-      connection = connectionFactory.createConnection();
-      connection.start();
-
-      // trival session/producer pool
-      for (int i = 0; i < NUM_THREADS; i++) {
-         sessions[i] = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         producers[i] = sessions[i].createProducer(requestDestination);
-      }
-
-      for (int i = 0; i < NUM_MESSAGES; i++) {
-         MessageSenderReceiver msr = new MessageSenderReceiver(requestDestination, replyDestination, "Test Message : " + i);
-         tpe.execute(msr);
-      }
-
-      while (!roundTripLatch.await(4000, TimeUnit.MILLISECONDS)) {
-         if (errorLatch.await(1000, TimeUnit.MILLISECONDS)) {
-            fail("there was an error, check the console for thread or thread allocation failure");
-            break;
-         }
-      }
-      working = false;
-   }
-
-   private void setupReceiver(final Connection connection) throws Exception {
-
-      final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      final MessageConsumer consumer = session.createConsumer(requestDestination);
-      final MessageProducer sender = session.createProducer(replyDestination);
-      connection.start();
-
-      new Thread() {
-         @Override
-         public void run() {
-            while (working) {
-               // wait for messages in infinitive loop
-               // time out is set to show the client is awaiting
-               try {
-                  TextMessage msg = (TextMessage) consumer.receive(20000);
-                  if (msg == null) {
-                     errorLatch.countDown();
-                     fail("Response timed out." + " latchCount=" + roundTripLatch.getCount());
-                  }
-                  else {
-                     String result = msg.getText();
-                     //System.out.println("Request:" + (i++)
-                     //        + ", msg=" + result + ", ID" + msg.getJMSMessageID());
-                     TextMessage response = session.createTextMessage();
-                     response.setJMSCorrelationID(msg.getJMSMessageID());
-                     response.setText(result);
-                     sender.send(response);
-                  }
-               }
-               catch (JMSException e) {
-                  if (working) {
-                     errorLatch.countDown();
-                     fail("Unexpected exception:" + e);
-                  }
-               }
-            }
-         }
-      }.start();
-   }
-
-   class MessageSenderReceiver implements Runnable {
-
-      Destination reqDest;
-      Destination replyDest;
-      String origMsg;
-
-      public MessageSenderReceiver(Destination reqDest, Destination replyDest, String msg) throws Exception {
-         this.replyDest = replyDest;
-         this.reqDest = reqDest;
-         this.origMsg = msg;
-      }
-
-      private int getIndexFromCurrentThread() {
-         String name = Thread.currentThread().getName();
-         String num = name.substring(name.lastIndexOf('-') + 1);
-         int idx = Integer.parseInt(num) - 1;
-         assertTrue("idx is in range: idx=" + idx, idx < NUM_THREADS);
-         return idx;
-      }
-
-      @Override
-      public void run() {
-         try {
-            // get thread session and producer from pool
-            int threadIndex = getIndexFromCurrentThread();
-            Session session = sessions[threadIndex];
-            MessageProducer producer = producers[threadIndex];
-
-            final Message sendJmsMsg = session.createTextMessage(origMsg);
-            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-            producer.send(sendJmsMsg);
-
-            String jmsId = sendJmsMsg.getJMSMessageID();
-            String selector = "JMSCorrelationID='" + jmsId + "'";
-
-            MessageConsumer consumer = session.createConsumer(replyDest, selector);
-            Message receiveJmsMsg = consumer.receive(2000);
-            consumer.close();
-            if (receiveJmsMsg == null) {
-               errorLatch.countDown();
-               fail("Unable to receive response for:" + origMsg + ", with selector=" + selector);
-            }
-            else {
-               //System.out.println("received response message :"
-               //        + ((TextMessage) receiveJmsMsg).getText()
-               //        + " with selector : " + selector);
-               roundTripLatch.countDown();
-            }
-         }
-         catch (JMSException e) {
-            fail("unexpected exception:" + e);
-         }
-      }
-   }
-
-   public class LimitedThreadFactory implements ThreadFactory {
-
-      int threadCount;
-      private ThreadFactory factory;
-
-      public LimitedThreadFactory(ThreadFactory threadFactory) {
-         this.factory = threadFactory;
-      }
-
-      @Override
-      public Thread newThread(Runnable arg0) {
-         if (++threadCount > NUM_THREADS) {
-            errorLatch.countDown();
-            fail("too many threads requested");
-         }
-         return factory.newThread(arg0);
-      }
-   }
-}
-

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e52d54e9/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1936Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1936Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1936Test.java
deleted file mode 100644
index 6e49550..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1936Test.java
+++ /dev/null
@@ -1,320 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.jms.DeliveryMode;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.Queue;
-import javax.jms.QueueConnection;
-import javax.jms.QueueConnectionFactory;
-import javax.jms.QueueReceiver;
-import javax.jms.QueueSender;
-import javax.jms.QueueSession;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.naming.NamingException;
-
-import junit.framework.TestCase;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.AutoFailTestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.util.Wait;
-import org.apache.log4j.Logger;
-
-/**
- * AMQ1936Test
- */
-public class AMQ1936Test extends TestCase {
-
-   private final static Logger logger = Logger.getLogger(AMQ1936Test.class);
-   private final static String TEST_QUEUE_NAME = "dynamicQueues/duplicate.message.test.queue";
-   // //--
-   //
-   private final static long TEST_MESSAGE_COUNT = 6000; // The number of test messages to use
-   //
-   // //--
-   private final static int CONSUMER_COUNT = 2; // The number of message receiver instances
-   private final static boolean TRANSACTED_RECEIVE = true; // Flag used by receiver which indicates messages should be
-   // processed within a JMS transaction
-
-   private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(CONSUMER_COUNT, CONSUMER_COUNT, Long.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
-   private final ThreadedMessageReceiver[] receivers = new ThreadedMessageReceiver[CONSUMER_COUNT];
-   private BrokerService broker = null;
-   static QueueConnectionFactory connectionFactory = null;
-
-   @Override
-   protected void setUp() throws Exception {
-      super.setUp();
-
-      broker = new BrokerService();
-      broker.getSystemUsage().getMemoryUsage().setLimit(5 * 1024 * 1024);
-      broker.setBrokerName("test");
-      broker.setDeleteAllMessagesOnStartup(true);
-      broker.start();
-      connectionFactory = new ActiveMQConnectionFactory("vm://test");
-   }
-
-   @Override
-   protected void tearDown() throws Exception {
-      super.tearDown();
-
-      if (threadPool != null) {
-         // signal receivers to stop
-         for (ThreadedMessageReceiver receiver : receivers) {
-            receiver.setShouldStop(true);
-         }
-
-         logger.info("Waiting for receivers to shutdown..");
-         if (!threadPool.awaitTermination(10, TimeUnit.SECONDS)) {
-            logger.warn("Not all receivers completed shutdown.");
-         }
-         else {
-            logger.info("All receivers shutdown successfully..");
-         }
-      }
-
-      logger.debug("Stoping the broker.");
-
-      if (broker != null) {
-         broker.stop();
-      }
-   }
-
-   private void sendTextMessage(String queueName, int i) throws JMSException, NamingException {
-      QueueConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://test");
-      QueueConnection queueConnection = null;
-      QueueSession session = null;
-      QueueSender sender = null;
-      Queue queue = null;
-      TextMessage message = null;
-
-      try {
-
-         // Create the queue connection
-         queueConnection = connectionFactory.createQueueConnection();
-
-         session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
-         queue = session.createQueue(TEST_QUEUE_NAME);
-         sender = session.createSender(queue);
-         sender.setDeliveryMode(DeliveryMode.PERSISTENT);
-
-         message = session.createTextMessage(String.valueOf(i));
-
-         // send the message
-         sender.send(message);
-
-         if (session.getTransacted()) {
-            session.commit();
-         }
-         if (i % 1000 == 0) {
-            logger.info("Message successfully sent to : " + queue.getQueueName() + " messageid: " + message.getJMSMessageID() + " content:" + message.getText());
-         }
-      }
-      finally {
-         if (sender != null) {
-            sender.close();
-         }
-         if (session != null) {
-            session.close();
-         }
-         if (queueConnection != null) {
-            queueConnection.close();
-         }
-      }
-   }
-
-   public void testForDuplicateMessages() throws Exception {
-      final ConcurrentHashMap<String, String> messages = new ConcurrentHashMap<>();
-      final Object lock = new Object();
-      final CountDownLatch duplicateSignal = new CountDownLatch(1);
-      final AtomicInteger messageCount = new AtomicInteger(0);
-
-      // add 1/2 the number of our total messages
-      for (int i = 0; i < TEST_MESSAGE_COUNT / 2; i++) {
-         if (duplicateSignal.getCount() == 0) {
-            fail("Duplicate message id detected");
-         }
-         sendTextMessage(TEST_QUEUE_NAME, i);
-      }
-
-      // create a number of consumers to read of the messages and start them with a handler which simply stores the
-      // message ids
-      // in a Map and checks for a duplicate
-      for (int i = 0; i < CONSUMER_COUNT; i++) {
-         receivers[i] = new ThreadedMessageReceiver(TEST_QUEUE_NAME, new IMessageHandler() {
-
-            @Override
-            public void onMessage(Message message) throws Exception {
-               synchronized (lock) {
-                  int current = messageCount.incrementAndGet();
-                  if (current % 1000 == 0) {
-                     logger.info("Received message:" + message.getJMSMessageID() + " with content: " + ((TextMessage) message).getText());
-                  }
-                  if (messages.containsKey(message.getJMSMessageID())) {
-                     duplicateSignal.countDown();
-                     logger.fatal("duplicate message id detected:" + message.getJMSMessageID());
-                     fail("Duplicate message id detected:" + message.getJMSMessageID());
-                  }
-                  else {
-                     messages.put(message.getJMSMessageID(), message.getJMSMessageID());
-                  }
-               }
-            }
-         });
-         threadPool.submit(receivers[i]);
-      }
-
-      // starting adding the remaining messages
-      for (int i = 0; i < TEST_MESSAGE_COUNT / 2; i++) {
-         if (duplicateSignal.getCount() == 0) {
-            fail("Duplicate message id detected");
-         }
-         sendTextMessage(TEST_QUEUE_NAME, i);
-      }
-
-      logger.info("sent all " + TEST_MESSAGE_COUNT + " messages");
-
-      // allow some time for messages to be delivered to receivers.
-      boolean ok = Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisified() throws Exception {
-            return TEST_MESSAGE_COUNT == messages.size();
-         }
-      }, TimeUnit.MINUTES.toMillis(7));
-      if (!ok) {
-         AutoFailTestSupport.dumpAllThreads("--STUCK?--");
-      }
-      assertEquals("Number of messages received does not match the number sent", TEST_MESSAGE_COUNT, messages.size());
-      assertEquals(TEST_MESSAGE_COUNT, messageCount.get());
-   }
-
-   private final static class ThreadedMessageReceiver implements Runnable {
-
-      private IMessageHandler handler = null;
-      private final AtomicBoolean shouldStop = new AtomicBoolean(false);
-
-      public ThreadedMessageReceiver(String queueName, IMessageHandler handler) {
-         this.handler = handler;
-      }
-
-      @Override
-      public void run() {
-
-         QueueConnection queueConnection = null;
-         QueueSession session = null;
-         QueueReceiver receiver = null;
-         Queue queue = null;
-         Message message = null;
-         try {
-            try {
-
-               queueConnection = connectionFactory.createQueueConnection();
-               // create a transacted session
-               session = queueConnection.createQueueSession(TRANSACTED_RECEIVE, Session.AUTO_ACKNOWLEDGE);
-               queue = session.createQueue(TEST_QUEUE_NAME);
-               receiver = session.createReceiver(queue);
-
-               // start the connection
-               queueConnection.start();
-
-               logger.info("Receiver " + Thread.currentThread().getName() + " connected.");
-
-               // start receive loop
-               while (!(shouldStop.get() || Thread.currentThread().isInterrupted())) {
-                  try {
-                     message = receiver.receive(200);
-                  }
-                  catch (Exception e) {
-                     //
-                     // ignore interrupted exceptions
-                     //
-                     if (e instanceof InterruptedException || e.getCause() instanceof InterruptedException) {
-                                /* ignore */
-                     }
-                     else {
-                        throw e;
-                     }
-                  }
-
-                  if (message != null && this.handler != null) {
-                     this.handler.onMessage(message);
-                  }
-
-                  // commit session on successful handling of message
-                  if (session.getTransacted()) {
-                     session.commit();
-                  }
-               }
-
-               logger.info("Receiver " + Thread.currentThread().getName() + " shutting down.");
-
-            }
-            finally {
-               if (receiver != null) {
-                  try {
-                     receiver.close();
-                  }
-                  catch (JMSException e) {
-                     logger.warn(e);
-                  }
-               }
-               if (session != null) {
-                  try {
-                     session.close();
-                  }
-                  catch (JMSException e) {
-                     logger.warn(e);
-                  }
-               }
-               if (queueConnection != null) {
-                  queueConnection.close();
-               }
-            }
-         }
-         catch (JMSException e) {
-            logger.error(e);
-            e.printStackTrace();
-         }
-         catch (NamingException e) {
-            logger.error(e);
-         }
-         catch (Exception e) {
-            logger.error(e);
-            e.printStackTrace();
-         }
-      }
-
-      public void setShouldStop(Boolean shouldStop) {
-         this.shouldStop.set(shouldStop);
-      }
-   }
-
-   public interface IMessageHandler {
-
-      void onMessage(Message message) throws Exception;
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e52d54e9/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2021Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2021Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2021Test.java
deleted file mode 100644
index 7236581..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2021Test.java
+++ /dev/null
@@ -1,275 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.lang.Thread.UncaughtExceptionHandler;
-import java.util.ArrayList;
-import java.util.Vector;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.Connection;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This is a test case for the issue reported at: https://issues.apache.org/activemq/browse/AMQ-2021 Bug is modification
- * of inflight message properties so the failure can manifest itself in a bunch or ways, from message receipt with null
- * properties to marshall errors
- */
-public class AMQ2021Test implements ExceptionListener, UncaughtExceptionHandler {
-
-   private static final Logger log = LoggerFactory.getLogger(AMQ2021Test.class);
-   BrokerService brokerService;
-   ArrayList<Thread> threads = new ArrayList<>();
-   Vector<Throwable> exceptions;
-
-   @Rule
-   public TestName name = new TestName();
-
-   AMQ2021Test testCase;
-
-   private final String ACTIVEMQ_BROKER_BIND = "tcp://localhost:0";
-   private String CONSUMER_BROKER_URL = "?jms.redeliveryPolicy.maximumRedeliveries=1&jms.redeliveryPolicy.initialRedeliveryDelay=0";
-   private String PRODUCER_BROKER_URL;
-
-   private final int numMessages = 1000;
-   private final int numConsumers = 2;
-   private final int dlqMessages = numMessages / 2;
-
-   private CountDownLatch receivedLatch;
-   private ActiveMQTopic destination;
-   private CountDownLatch started;
-
-   @Before
-   public void setUp() throws Exception {
-      Thread.setDefaultUncaughtExceptionHandler(this);
-      testCase = this;
-
-      // Start an embedded broker up.
-      brokerService = new BrokerService();
-      brokerService.setDeleteAllMessagesOnStartup(true);
-      brokerService.addConnector(ACTIVEMQ_BROKER_BIND);
-      brokerService.start();
-      destination = new ActiveMQTopic(name.getMethodName());
-      exceptions = new Vector<>();
-
-      CONSUMER_BROKER_URL = brokerService.getTransportConnectors().get(0).getPublishableConnectString() + CONSUMER_BROKER_URL;
-      PRODUCER_BROKER_URL = brokerService.getTransportConnectors().get(0).getPublishableConnectString();
-
-      receivedLatch = new CountDownLatch(numConsumers * (numMessages + dlqMessages));
-      started = new CountDownLatch(1);
-   }
-
-   @After
-   public void tearDown() throws Exception {
-      for (Thread t : threads) {
-         t.interrupt();
-         t.join();
-      }
-      brokerService.stop();
-   }
-
-   @Test(timeout = 240000)
-   public void testConcurrentTopicResendToDLQ() throws Exception {
-
-      for (int i = 0; i < numConsumers; i++) {
-         ConsumerThread c1 = new ConsumerThread("Consumer-" + i);
-         threads.add(c1);
-         c1.start();
-      }
-
-      assertTrue(started.await(10, TimeUnit.SECONDS));
-
-      Thread producer = new Thread() {
-         @Override
-         public void run() {
-            try {
-               produce(numMessages);
-            }
-            catch (Exception e) {
-            }
-         }
-      };
-      threads.add(producer);
-      producer.start();
-
-      boolean allGood = receivedLatch.await(90, TimeUnit.SECONDS);
-      for (Throwable t : exceptions) {
-         log.error("failing test with first exception", t);
-         fail("exception during test : " + t);
-      }
-      assertTrue("excepted messages received within time limit", allGood);
-
-      assertEquals(0, exceptions.size());
-
-      for (int i = 0; i < numConsumers; i++) {
-         // last recovery sends message to deq so is not received again
-         assertEquals(dlqMessages * 2, ((ConsumerThread) threads.get(i)).recoveries);
-         assertEquals(numMessages + dlqMessages, ((ConsumerThread) threads.get(i)).counter);
-      }
-
-      // half of the messages for each consumer should go to the dlq but duplicates will
-      // be suppressed
-      consumeFromDLQ(dlqMessages);
-
-   }
-
-   private void consumeFromDLQ(int messageCount) throws Exception {
-      ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(CONSUMER_BROKER_URL);
-      Connection connection = connectionFactory.createConnection();
-      connection.start();
-
-      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      MessageConsumer dlqConsumer = session.createConsumer(new ActiveMQQueue("ActiveMQ.DLQ"));
-      int count = 0;
-      for (int i = 0; i < messageCount; i++) {
-         if (dlqConsumer.receive(1000) == null) {
-            break;
-         }
-         count++;
-      }
-      assertEquals(messageCount, count);
-   }
-
-   public void produce(int count) throws Exception {
-      Connection connection = null;
-      try {
-         ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(PRODUCER_BROKER_URL);
-         connection = factory.createConnection();
-         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         MessageProducer producer = session.createProducer(destination);
-         producer.setTimeToLive(0);
-         connection.start();
-
-         for (int i = 0; i < count; i++) {
-            int id = i + 1;
-            TextMessage message = session.createTextMessage(name.getMethodName() + " Message " + id);
-            message.setIntProperty("MsgNumber", id);
-            producer.send(message);
-
-            if (id % 500 == 0) {
-               log.info("sent " + id + ", ith " + message);
-            }
-         }
-      }
-      catch (JMSException e) {
-         log.error("unexpected ex on produce", e);
-         exceptions.add(e);
-      }
-      finally {
-         try {
-            if (connection != null) {
-               connection.close();
-            }
-         }
-         catch (Throwable e) {
-         }
-      }
-   }
-
-   public class ConsumerThread extends Thread implements MessageListener {
-
-      public long counter = 0;
-      public long recoveries = 0;
-      private Session session;
-
-      public ConsumerThread(String threadId) {
-         super(threadId);
-      }
-
-      @Override
-      public void run() {
-         try {
-            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(CONSUMER_BROKER_URL);
-            Connection connection = connectionFactory.createConnection();
-            connection.setExceptionListener(testCase);
-            connection.setClientID(getName());
-            session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-            MessageConsumer consumer = session.createDurableSubscriber(destination, getName());
-            consumer.setMessageListener(this);
-            connection.start();
-
-            started.countDown();
-
-         }
-         catch (JMSException exception) {
-            log.error("unexpected ex in consumer run", exception);
-            exceptions.add(exception);
-         }
-      }
-
-      @Override
-      public void onMessage(Message message) {
-         try {
-            counter++;
-            int messageNumber = message.getIntProperty("MsgNumber");
-            if (messageNumber % 2 == 0) {
-               session.recover();
-               recoveries++;
-            }
-            else {
-               message.acknowledge();
-            }
-
-            if (counter % 200 == 0) {
-               log.info("recoveries:" + recoveries + ", Received " + counter + ", counter'th " + message);
-            }
-            receivedLatch.countDown();
-         }
-         catch (Exception e) {
-            log.error("unexpected ex on onMessage", e);
-            exceptions.add(e);
-         }
-      }
-
-   }
-
-   @Override
-   public void onException(JMSException exception) {
-      log.info("Unexpected JMSException", exception);
-      exceptions.add(exception);
-   }
-
-   @Override
-   public void uncaughtException(Thread thread, Throwable exception) {
-      log.info("Unexpected exception from thread " + thread + ", ex: " + exception);
-      exceptions.add(exception);
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e52d54e9/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2084Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2084Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2084Test.java
deleted file mode 100644
index de9f2b5..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2084Test.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Properties;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import javax.jms.Queue;
-import javax.jms.QueueConnection;
-import javax.jms.QueueConnectionFactory;
-import javax.jms.QueueReceiver;
-import javax.jms.QueueSession;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import javax.jms.TopicConnection;
-import javax.jms.TopicConnectionFactory;
-import javax.jms.TopicPublisher;
-import javax.jms.TopicSession;
-import javax.jms.TopicSubscriber;
-import javax.naming.InitialContext;
-
-import org.apache.activemq.broker.BrokerService;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ2084Test {
-
-   private static final Logger LOG = LoggerFactory.getLogger(AMQ2084Test.class);
-   BrokerService broker;
-   CountDownLatch qreceived;
-   String connectionUri;
-
-   @Before
-   public void startBroker() throws Exception {
-      broker = new BrokerService();
-      broker.setPersistent(false);
-      connectionUri = broker.addConnector("tcp://localhost:0").getPublishableConnectString();
-      broker.start();
-
-      qreceived = new CountDownLatch(1);
-   }
-
-   @After
-   public void stopBroker() throws Exception {
-      if (broker != null) {
-         broker.stop();
-      }
-   }
-
-   public void listenQueue(final String queueName, final String selectors) {
-      try {
-         Properties props = new Properties();
-         props.put("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
-         props.put("java.naming.provider.url", connectionUri);
-         props.put("queue.queueName", queueName);
-
-         javax.naming.Context ctx = new InitialContext(props);
-         QueueConnectionFactory factory = (QueueConnectionFactory) ctx.lookup("ConnectionFactory");
-         QueueConnection conn = factory.createQueueConnection();
-         final Queue queue = (Queue) ctx.lookup("queueName");
-         QueueSession session = conn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
-         QueueReceiver receiver = session.createReceiver(queue, selectors);
-         System.out.println("Message Selector: " + receiver.getMessageSelector());
-         receiver.setMessageListener(new MessageListener() {
-            @Override
-            public void onMessage(Message message) {
-               try {
-                  if (message instanceof TextMessage) {
-                     TextMessage txtMsg = (TextMessage) message;
-                     String msg = txtMsg.getText();
-                     LOG.info("Queue Message Received: " + queueName + " - " + msg);
-                     qreceived.countDown();
-
-                  }
-                  message.acknowledge();
-               }
-               catch (Throwable e) {
-                  e.printStackTrace();
-               }
-            }
-         });
-         conn.start();
-      }
-      catch (Exception e) {
-         e.printStackTrace();
-      }
-   }
-
-   public void listenTopic(final String topicName, final String selectors) {
-      try {
-         Properties props = new Properties();
-         props.put("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
-         props.put("java.naming.provider.url", connectionUri);
-         props.put("topic.topicName", topicName);
-
-         javax.naming.Context ctx = new InitialContext(props);
-         TopicConnectionFactory factory = (TopicConnectionFactory) ctx.lookup("ConnectionFactory");
-         TopicConnection conn = factory.createTopicConnection();
-         final Topic topic = (Topic) ctx.lookup("topicName");
-         TopicSession session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
-         TopicSubscriber receiver = session.createSubscriber(topic, selectors, false);
-
-         receiver.setMessageListener(new MessageListener() {
-            @Override
-            public void onMessage(Message message) {
-               try {
-                  if (message instanceof TextMessage) {
-                     TextMessage txtMsg = (TextMessage) message;
-                     String msg = txtMsg.getText();
-                     LOG.info("Topic Message Received: " + topicName + " - " + msg);
-                  }
-                  message.acknowledge();
-               }
-               catch (Exception e) {
-                  e.printStackTrace();
-               }
-            }
-         });
-         conn.start();
-      }
-      catch (Exception e) {
-         e.printStackTrace();
-      }
-   }
-
-   public void publish(String topicName, String message) {
-      try {
-         Properties props = new Properties();
-         props.put("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
-         props.put("java.naming.provider.url", connectionUri);
-         props.put("topic.topicName", topicName);
-         javax.naming.Context ctx = new InitialContext(props);
-         TopicConnectionFactory factory = (TopicConnectionFactory) ctx.lookup("ConnectionFactory");
-         TopicConnection conn = factory.createTopicConnection();
-         Topic topic = (Topic) ctx.lookup("topicName");
-         TopicSession session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
-         TopicPublisher publisher = session.createPublisher(topic);
-         if (message != null) {
-            Message msg = session.createTextMessage(message);
-            publisher.send(msg);
-         }
-      }
-      catch (Exception e) {
-         e.printStackTrace();
-      }
-   }
-
-   @Test
-   public void tryXpathSelectorMatch() throws Exception {
-      String xPath = "XPATH '//books//book[@lang=''en'']'";
-      listenQueue("Consumer.Sample.VirtualTopic.TestXpath", xPath);
-      publish("VirtualTopic.TestXpath", "<?xml version=\"1.0\" encoding=\"UTF-8\"?><books><book lang=\"en\">ABC</book></books>");
-      assertTrue("topic received: ", qreceived.await(20, TimeUnit.SECONDS));
-   }
-
-   @Test
-   public void tryXpathSelectorNoMatch() throws Exception {
-      String xPath = "XPATH '//books//book[@lang=''es'']'";
-      listenQueue("Consumer.Sample.VirtualTopic.TestXpath", xPath);
-      publish("VirtualTopic.TestXpath", "<?xml version=\"1.0\" encoding=\"UTF-8\"?><books><book lang=\"en\">ABC</book></books>");
-      assertFalse("topic did not receive unmatched", qreceived.await(5, TimeUnit.SECONDS));
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e52d54e9/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2103Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2103Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2103Test.java
deleted file mode 100644
index 8067305..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2103Test.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import javax.jms.Connection;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import junit.framework.Test;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerTestSupport;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQMapMessage;
-import org.apache.activemq.command.ActiveMQObjectMessage;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTextMessage;
-import org.apache.activemq.usecases.MyObject;
-
-public class AMQ2103Test extends BrokerTestSupport {
-
-   static PolicyEntry reduceMemoryFootprint = new PolicyEntry();
-
-   static {
-      reduceMemoryFootprint.setReduceMemoryFootprint(true);
-   }
-
-   public PolicyEntry defaultPolicy = reduceMemoryFootprint;
-
-   @Override
-   protected PolicyEntry getDefaultPolicy() {
-      return defaultPolicy;
-   }
-
-   public void initCombosForTestVerifyMarshalledStateIsCleared() throws Exception {
-      addCombinationValues("defaultPolicy", new Object[]{defaultPolicy, null});
-   }
-
-   public static Test suite() {
-      return suite(AMQ2103Test.class);
-   }
-
-   /**
-    * use mem persistence so no marshaling,
-    * reduceMemoryFootprint on/off that will reduce memory by whacking the marshaled state
-    * With vm transport and deferred serialisation and no persistence (mem persistence),
-    * we see the message as sent by the client so we can validate the contents against
-    * the policy
-    *
-    * @throws Exception
-    */
-   public void testVerifyMarshalledStateIsCleared() throws Exception {
-
-      ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
-      factory.setOptimizedMessageDispatch(true);
-      factory.setObjectMessageSerializationDefered(true);
-      factory.setCopyMessageOnSend(false);
-
-      Connection connection = factory.createConnection();
-      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      ActiveMQDestination destination = new ActiveMQQueue("testQ");
-      MessageConsumer consumer = session.createConsumer(destination);
-      connection.start();
-
-      MessageProducer producer = session.createProducer(destination);
-      final MyObject obj = new MyObject("A message");
-      ActiveMQObjectMessage m1 = (ActiveMQObjectMessage) session.createObjectMessage();
-      m1.setObject(obj);
-      producer.send(m1);
-
-      ActiveMQTextMessage m2 = new ActiveMQTextMessage();
-      m2.setText("Test Message Payload.");
-      producer.send(m2);
-
-      ActiveMQMapMessage m3 = new ActiveMQMapMessage();
-      m3.setString("text", "my message");
-      producer.send(m3);
-
-      Message m = consumer.receive(maxWait);
-      assertNotNull(m);
-      assertEquals(m1.getMessageId().toString(), m.getJMSMessageID());
-      assertTrue(m instanceof ActiveMQObjectMessage);
-
-      if (getDefaultPolicy() != null) {
-         assertNull("object data cleared by reduceMemoryFootprint (and never marshalled as using mem persistence)", ((ActiveMQObjectMessage) m).getObject());
-      }
-
-      // verify no serialisation via vm transport
-      assertEquals("writeObject called", 0, obj.getWriteObjectCalled());
-      assertEquals("readObject called", 0, obj.getReadObjectCalled());
-      assertEquals("readObjectNoData called", 0, obj.getReadObjectNoDataCalled());
-
-      m = consumer.receive(maxWait);
-      assertNotNull(m);
-      assertEquals(m2.getMessageId().toString(), m.getJMSMessageID());
-      assertTrue(m instanceof ActiveMQTextMessage);
-
-      if (getDefaultPolicy() != null) {
-         assertNull("text cleared by reduceMemoryFootprint (and never marshalled as using mem persistence)", ((ActiveMQTextMessage) m).getText());
-      }
-
-      m = consumer.receive(maxWait);
-      assertNotNull(m);
-      assertEquals(m3.getMessageId().toString(), m.getJMSMessageID());
-      assertTrue(m instanceof ActiveMQMapMessage);
-
-      if (getDefaultPolicy() != null) {
-         assertNull("text cleared by reduceMemoryFootprint (and never marshalled as using mem persistence)", ((ActiveMQMapMessage) m).getStringProperty("text"));
-      }
-
-      connection.close();
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e52d54e9/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149LevelDBTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149LevelDBTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149LevelDBTest.java
deleted file mode 100644
index 8cda3ef..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149LevelDBTest.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.leveldb.LevelDBStore;
-
-public class AMQ2149LevelDBTest extends AMQ2149Test {
-
-   @Override
-   protected void configurePersistenceAdapter(BrokerService brokerService) throws Exception {
-      LevelDBStore persistenceFactory = new LevelDBStore();
-      persistenceFactory.setDirectory(dataDirFile);
-      brokerService.setPersistenceAdapter(persistenceFactory);
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e52d54e9/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
deleted file mode 100644
index 19dbf0e..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
+++ /dev/null
@@ -1,614 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.bugs;
-
-import java.io.File;
-import java.lang.IllegalStateException;
-import java.util.HashSet;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.Vector;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.jms.*;
-
-import org.apache.activemq.AutoFailTestSupport;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.Destination;
-import org.apache.activemq.broker.region.DestinationStatistics;
-import org.apache.activemq.broker.region.RegionBroker;
-import org.apache.activemq.broker.util.LoggingBrokerPlugin;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.junit.rules.TestName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.junit.Assert.*;
-
-interface Configurer {
-
-   public void configure(BrokerService broker) throws Exception;
-}
-
-public class AMQ2149Test {
-
-   private static final Logger LOG = LoggerFactory.getLogger(AMQ2149Test.class);
-   @Rule
-   public TestName testName = new TestName();
-
-   private static final String BROKER_CONNECTOR = "tcp://localhost:61617";
-   private static final String DEFAULT_BROKER_URL = "failover:(" + BROKER_CONNECTOR + ")?maxReconnectDelay=1000&useExponentialBackOff=false";
-
-   private final String SEQ_NUM_PROPERTY = "seqNum";
-
-   final int MESSAGE_LENGTH_BYTES = 75 * 1024;
-   final long SLEEP_BETWEEN_SEND_MS = 25;
-   final int NUM_SENDERS_AND_RECEIVERS = 10;
-   final Object brokerLock = new Object();
-
-   private static final long DEFAULT_BROKER_STOP_PERIOD = 10 * 1000;
-   private static final long DEFAULT_NUM_TO_SEND = 1400;
-
-   long brokerStopPeriod = DEFAULT_BROKER_STOP_PERIOD;
-   long numtoSend = DEFAULT_NUM_TO_SEND;
-   long sleepBetweenSend = SLEEP_BETWEEN_SEND_MS;
-   String brokerURL = DEFAULT_BROKER_URL;
-
-   int numBrokerRestarts = 0;
-   final static int MAX_BROKER_RESTARTS = 4;
-   BrokerService broker;
-   Vector<Throwable> exceptions = new Vector<>();
-
-   protected File dataDirFile;
-   final LoggingBrokerPlugin[] plugins = new LoggingBrokerPlugin[]{new LoggingBrokerPlugin()};
-
-   public void createBroker(Configurer configurer) throws Exception {
-      broker = new BrokerService();
-      configurePersistenceAdapter(broker);
-
-      broker.getSystemUsage().getMemoryUsage().setLimit(MESSAGE_LENGTH_BYTES * 200 * NUM_SENDERS_AND_RECEIVERS);
-
-      broker.addConnector(BROKER_CONNECTOR);
-      broker.setBrokerName(testName.getMethodName());
-      broker.setDataDirectoryFile(dataDirFile);
-      if (configurer != null) {
-         configurer.configure(broker);
-      }
-      broker.start();
-   }
-
-   protected void configurePersistenceAdapter(BrokerService brokerService) throws Exception {
-   }
-
-   @Before
-   public void setUp() throws Exception {
-      LOG.debug("Starting test {}", testName.getMethodName());
-      dataDirFile = new File("target/" + testName.getMethodName());
-      numtoSend = DEFAULT_NUM_TO_SEND;
-      brokerStopPeriod = DEFAULT_BROKER_STOP_PERIOD;
-      sleepBetweenSend = SLEEP_BETWEEN_SEND_MS;
-      brokerURL = DEFAULT_BROKER_URL;
-   }
-
-   @After
-   public void tearDown() throws Exception {
-      ExecutorService executor = Executors.newSingleThreadExecutor();
-      Future<Boolean> future = executor.submit(new TeardownTask(brokerLock, broker));
-      try {
-         LOG.debug("Teardown started.");
-         long start = System.currentTimeMillis();
-         Boolean result = future.get(30, TimeUnit.SECONDS);
-         long finish = System.currentTimeMillis();
-         LOG.debug("Result of teardown: {} after {} ms ", result, (finish - start));
-      }
-      catch (TimeoutException e) {
-         fail("Teardown timed out");
-         AutoFailTestSupport.dumpAllThreads(testName.getMethodName());
-      }
-      executor.shutdownNow();
-      exceptions.clear();
-   }
-
-   private String buildLongString() {
-      final StringBuilder stringBuilder = new StringBuilder(MESSAGE_LENGTH_BYTES);
-      for (int i = 0; i < MESSAGE_LENGTH_BYTES; ++i) {
-         stringBuilder.append((int) (Math.random() * 10));
-      }
-      return stringBuilder.toString();
-   }
-
-   HashSet<Connection> connections = new HashSet<>();
-
-   private class Receiver implements MessageListener {
-
-      private final javax.jms.Destination dest;
-
-      private final Connection connection;
-
-      private final Session session;
-
-      private final MessageConsumer messageConsumer;
-
-      private AtomicLong nextExpectedSeqNum = new AtomicLong();
-
-      private final boolean transactional;
-
-      private String lastId = null;
-
-      public Receiver(javax.jms.Destination dest, boolean transactional) throws JMSException {
-         this.dest = dest;
-         this.transactional = transactional;
-         connection = new ActiveMQConnectionFactory(brokerURL).createConnection();
-         connection.setClientID(dest.toString());
-         session = connection.createSession(transactional, transactional ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
-         if (ActiveMQDestination.transform(dest).isTopic()) {
-            messageConsumer = session.createDurableSubscriber((Topic) dest, dest.toString());
-         }
-         else {
-            messageConsumer = session.createConsumer(dest);
-         }
-         messageConsumer.setMessageListener(this);
-         connection.start();
-         connections.add(connection);
-      }
-
-      public void close() throws JMSException {
-         connection.close();
-      }
-
-      public long getNextExpectedSeqNo() {
-         return nextExpectedSeqNum.get();
-      }
-
-      final int TRANSACITON_BATCH = 500;
-      boolean resumeOnNextOrPreviousIsOk = false;
-
-      @Override
-      public void onMessage(Message message) {
-         try {
-            final long seqNum = message.getLongProperty(SEQ_NUM_PROPERTY);
-            if ((seqNum % TRANSACITON_BATCH) == 0) {
-               LOG.info(dest + " received " + seqNum);
-
-               if (transactional) {
-                  LOG.info("committing..");
-                  session.commit();
-               }
-            }
-            if (resumeOnNextOrPreviousIsOk) {
-               // after an indoubt commit we need to accept what we get (within reason)
-               if (seqNum != nextExpectedSeqNum.get()) {
-                  final long l = nextExpectedSeqNum.get();
-                  if (seqNum == l - (TRANSACITON_BATCH - 1)) {
-                     nextExpectedSeqNum.compareAndSet(l, l - (TRANSACITON_BATCH - 1));
-                     LOG.info("In doubt commit failed, getting replay at:" + nextExpectedSeqNum);
-                  }
-               }
-               resumeOnNextOrPreviousIsOk = false;
-            }
-            if (seqNum != nextExpectedSeqNum.get()) {
-               LOG.warn(dest + " received " + seqNum + " in msg: " + message.getJMSMessageID() + " expected " + nextExpectedSeqNum + ", lastId: " + lastId + ", message:" + message);
-               fail(dest + " received " + seqNum + " expected " + nextExpectedSeqNum);
-            }
-            nextExpectedSeqNum.incrementAndGet();
-            lastId = message.getJMSMessageID();
-         }
-         catch (TransactionRolledBackException expectedSometimesOnFailoverRecovery) {
-            LOG.info("got rollback: " + expectedSometimesOnFailoverRecovery);
-            if (expectedSometimesOnFailoverRecovery.getMessage().contains("completion in doubt")) {
-               // in doubt - either commit command or reply missing
-               // don't know if we will get a replay
-               resumeOnNextOrPreviousIsOk = true;
-               nextExpectedSeqNum.incrementAndGet();
-               LOG.info("in doubt transaction completion: ok to get next or previous batch. next:" + nextExpectedSeqNum);
-            }
-            else {
-               resumeOnNextOrPreviousIsOk = false;
-               // batch will be replayed
-               nextExpectedSeqNum.addAndGet(-(TRANSACITON_BATCH - 1));
-            }
-
-         }
-         catch (Throwable e) {
-            LOG.error(dest + " onMessage error", e);
-            exceptions.add(e);
-         }
-      }
-
-   }
-
-   private class Sender implements Runnable {
-
-      private final javax.jms.Destination dest;
-
-      private final Connection connection;
-
-      private final Session session;
-
-      private final MessageProducer messageProducer;
-
-      private volatile long nextSequenceNumber = 0;
-      private final Object guard = new Object();
-
-      public Sender(javax.jms.Destination dest) throws JMSException {
-         this.dest = dest;
-         connection = new ActiveMQConnectionFactory(brokerURL).createConnection();
-         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         messageProducer = session.createProducer(dest);
-         messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
-         connection.start();
-         connections.add(connection);
-      }
-
-      @Override
-      public void run() {
-         final String longString = buildLongString();
-         long nextSequenceNumber = this.nextSequenceNumber;
-         while (nextSequenceNumber < numtoSend) {
-            try {
-               final Message message = session.createTextMessage(longString);
-               message.setLongProperty(SEQ_NUM_PROPERTY, nextSequenceNumber);
-               synchronized (guard) {
-                  if (nextSequenceNumber == this.nextSequenceNumber) {
-                     this.nextSequenceNumber = nextSequenceNumber + 1;
-                     messageProducer.send(message);
-                  }
-                  else {
-                     continue;
-                  }
-               }
-
-               if ((nextSequenceNumber % 500) == 0) {
-                  LOG.info(dest + " sent " + nextSequenceNumber);
-               }
-
-            }
-            catch (javax.jms.IllegalStateException e) {
-               LOG.error(dest + " bailing on send error", e);
-               exceptions.add(e);
-               break;
-            }
-            catch (Exception e) {
-               LOG.error(dest + " send error", e);
-               exceptions.add(e);
-            }
-            if (sleepBetweenSend > 0) {
-               try {
-                  Thread.sleep(sleepBetweenSend);
-               }
-               catch (InterruptedException e) {
-                  LOG.warn(dest + " sleep interrupted", e);
-               }
-            }
-         }
-         try {
-            connection.close();
-         }
-         catch (JMSException ignored) {
-         }
-      }
-   }
-
-   // attempt to simply replicate leveldb failure. no joy yet
-   public void x_testRestartReReceive() throws Exception {
-      createBroker(new Configurer() {
-         @Override
-         public void configure(BrokerService broker) throws Exception {
-            broker.deleteAllMessages();
-         }
-      });
-
-      final javax.jms.Destination destination = ActiveMQDestination.createDestination("test.dest.X", ActiveMQDestination.QUEUE_TYPE);
-      Thread thread = new Thread(new Sender(destination));
-      thread.start();
-      thread.join();
-
-      Connection connection = new ActiveMQConnectionFactory(brokerURL).createConnection();
-      connection.setClientID(destination.toString());
-      Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
-      MessageConsumer messageConsumer = session.createConsumer(destination);
-      connection.start();
-
-      int batch = 200;
-      long expectedSeq;
-
-      final TimerTask restartTask = scheduleRestartTask(null, new Configurer() {
-         @Override
-         public void configure(BrokerService broker) throws Exception {
-         }
-      });
-
-      expectedSeq = 0;
-      for (int s = 0; s < 4; s++) {
-         for (int i = 0; i < batch; i++) {
-            Message message = messageConsumer.receive(20000);
-            assertNotNull("s:" + s + ", i:" + i, message);
-            final long seqNum = message.getLongProperty(SEQ_NUM_PROPERTY);
-            assertEquals("expected order s:" + s, expectedSeq++, seqNum);
-
-            if (i > 0 && i % 600 == 0) {
-               LOG.info("Commit on %5");
-               //    session.commit();
-            }
-         }
-         restartTask.run();
-      }
-
-   }
-
-   // no need to run this unless there are some issues with the others
-   public void vanilaVerify_testOrder() throws Exception {
-
-      createBroker(new Configurer() {
-         @Override
-         public void configure(BrokerService broker) throws Exception {
-            broker.deleteAllMessages();
-         }
-      });
-
-      verifyOrderedMessageReceipt();
-      verifyStats(false);
-   }
-
-   @Test(timeout = 5 * 60 * 1000)
-   public void testOrderWithRestart() throws Exception {
-      createBroker(new Configurer() {
-         @Override
-         public void configure(BrokerService broker) throws Exception {
-            broker.deleteAllMessages();
-         }
-      });
-
-      final Timer timer = new Timer();
-      scheduleRestartTask(timer, new Configurer() {
-         @Override
-         public void configure(BrokerService broker) throws Exception {
-         }
-      });
-
-      try {
-         verifyOrderedMessageReceipt();
-      }
-      finally {
-         timer.cancel();
-      }
-
-      verifyStats(true);
-   }
-
-   @Test(timeout = 5 * 60 * 1000)
-   public void testTopicOrderWithRestart() throws Exception {
-      createBroker(new Configurer() {
-         @Override
-         public void configure(BrokerService broker) throws Exception {
-            broker.deleteAllMessages();
-         }
-      });
-
-      final Timer timer = new Timer();
-      scheduleRestartTask(timer, null);
-
-      try {
-         verifyOrderedMessageReceipt(ActiveMQDestination.TOPIC_TYPE);
-      }
-      finally {
-         timer.cancel();
-      }
-
-      verifyStats(true);
-   }
-
-   @Test(timeout = 5 * 60 * 1000)
-   public void testQueueTransactionalOrderWithRestart() throws Exception {
-      doTestTransactionalOrderWithRestart(ActiveMQDestination.QUEUE_TYPE);
-   }
-
-   @Test(timeout = 5 * 60 * 1000)
-   public void testTopicTransactionalOrderWithRestart() throws Exception {
-      doTestTransactionalOrderWithRestart(ActiveMQDestination.TOPIC_TYPE);
-   }
-
-   public void doTestTransactionalOrderWithRestart(byte destinationType) throws Exception {
-      numtoSend = 10000;
-      sleepBetweenSend = 3;
-      brokerStopPeriod = 10 * 1000;
-
-      createBroker(new Configurer() {
-         @Override
-         public void configure(BrokerService broker) throws Exception {
-            broker.deleteAllMessages();
-         }
-      });
-
-      final Timer timer = new Timer();
-      scheduleRestartTask(timer, null);
-
-      try {
-         verifyOrderedMessageReceipt(destinationType, 1, true);
-      }
-      finally {
-         timer.cancel();
-      }
-
-      verifyStats(true);
-   }
-
-   private void verifyStats(boolean brokerRestarts) throws Exception {
-      RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker();
-
-      for (Destination dest : regionBroker.getQueueRegion().getDestinationMap().values()) {
-         DestinationStatistics stats = dest.getDestinationStatistics();
-         if (brokerRestarts) {
-            // all bets are off w.r.t stats as there may be duplicate sends and duplicate
-            // dispatches, all of which will be suppressed - either by the reference store
-            // not allowing duplicate references or consumers acking duplicates
-            LOG.info("with restart: not asserting qneue/dequeue stat match for: " + dest.getName() + " " + stats.getEnqueues().getCount() + " <= " + stats.getDequeues().getCount());
-         }
-         else {
-            assertEquals("qneue/dequeue match for: " + dest.getName(), stats.getEnqueues().getCount(), stats.getDequeues().getCount());
-         }
-      }
-   }
-
-   private TimerTask scheduleRestartTask(final Timer timer, final Configurer configurer) {
-      class RestartTask extends TimerTask {
-
-         @Override
-         public void run() {
-            synchronized (brokerLock) {
-               LOG.info("stopping broker..");
-               try {
-                  broker.stop();
-                  broker.waitUntilStopped();
-               }
-               catch (Exception e) {
-                  LOG.error("ex on broker stop", e);
-                  exceptions.add(e);
-               }
-               LOG.info("restarting broker");
-               try {
-                  createBroker(configurer);
-                  broker.waitUntilStarted();
-               }
-               catch (Exception e) {
-                  LOG.error("ex on broker restart", e);
-                  exceptions.add(e);
-               }
-            }
-            if (++numBrokerRestarts < MAX_BROKER_RESTARTS && timer != null) {
-               // do it again
-               try {
-                  timer.schedule(new RestartTask(), brokerStopPeriod);
-               }
-               catch (IllegalStateException ignore_alreadyCancelled) {
-               }
-            }
-            else {
-               LOG.info("no longer stopping broker on reaching Max restarts: " + MAX_BROKER_RESTARTS);
-            }
-         }
-      }
-      RestartTask task = new RestartTask();
-      if (timer != null) {
-         timer.schedule(task, brokerStopPeriod);
-      }
-      return task;
-   }
-
-   private void verifyOrderedMessageReceipt(byte destinationType) throws Exception {
-      verifyOrderedMessageReceipt(destinationType, NUM_SENDERS_AND_RECEIVERS, false);
-   }
-
-   private void verifyOrderedMessageReceipt() throws Exception {
-      verifyOrderedMessageReceipt(ActiveMQDestination.QUEUE_TYPE, NUM_SENDERS_AND_RECEIVERS, false);
-   }
-
-   private void verifyOrderedMessageReceipt(byte destinationType,
-                                            int concurrentPairs,
-                                            boolean transactional) throws Exception {
-
-      Vector<Thread> threads = new Vector<>();
-      Vector<Receiver> receivers = new Vector<>();
-
-      for (int i = 0; i < concurrentPairs; ++i) {
-         final javax.jms.Destination destination = ActiveMQDestination.createDestination("test.dest." + i, destinationType);
-         receivers.add(new Receiver(destination, transactional));
-         Thread thread = new Thread(new Sender(destination));
-         thread.start();
-         threads.add(thread);
-      }
-
-      final long expiry = System.currentTimeMillis() + 1000 * 60 * 4;
-      while (!threads.isEmpty() && exceptions.isEmpty() && System.currentTimeMillis() < expiry) {
-         Thread sendThread = threads.firstElement();
-         sendThread.join(1000 * 30);
-         if (!sendThread.isAlive()) {
-            threads.remove(sendThread);
-         }
-         else {
-            AutoFailTestSupport.dumpAllThreads("Send blocked");
-         }
-      }
-      LOG.info("senders done..." + threads);
-
-      while (!receivers.isEmpty() && System.currentTimeMillis() < expiry) {
-         Receiver receiver = receivers.firstElement();
-         if (receiver.getNextExpectedSeqNo() >= numtoSend || !exceptions.isEmpty()) {
-            receiver.close();
-            receivers.remove(receiver);
-         }
-      }
-
-      for (Connection connection : connections) {
-         try {
-            connection.close();
-         }
-         catch (Exception ignored) {
-         }
-      }
-      connections.clear();
-
-      assertTrue("No timeout waiting for senders/receivers to complete", System.currentTimeMillis() < expiry);
-      if (!exceptions.isEmpty()) {
-         exceptions.get(0).printStackTrace();
-      }
-
-      LOG.info("Dangling threads: " + threads);
-      for (Thread dangling : threads) {
-         dangling.interrupt();
-         dangling.join(10 * 1000);
-      }
-
-      assertTrue("No exceptions", exceptions.isEmpty());
-   }
-
-}
-
-class TeardownTask implements Callable<Boolean> {
-
-   private final Object brokerLock;
-   private BrokerService broker;
-
-   public TeardownTask(Object brokerLock, BrokerService broker) {
-      this.brokerLock = brokerLock;
-      this.broker = broker;
-   }
-
-   @Override
-   public Boolean call() throws Exception {
-      synchronized (brokerLock) {
-         if (broker != null) {
-            broker.stop();
-            broker.waitUntilStopped();
-         }
-      }
-      return Boolean.TRUE;
-   }
-}


Mime
View raw message