activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [37/47] activemq-artemis git commit: open wire changes equivalent to ab16f7098fb52d2b4c40627ed110e1776525f208
Date Mon, 08 Feb 2016 17:11:24 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/032317ba/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2910Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2910Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2910Test.java
deleted file mode 100644
index 362fa5c..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2910Test.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.JmsMultipleClientsTestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.BlockJUnit4ClassRunner;
-
-import java.util.Vector;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertTrue;
-
-@RunWith(BlockJUnit4ClassRunner.class)
-public class AMQ2910Test extends JmsMultipleClientsTestSupport {
-
-   final int maxConcurrency = 60;
-   final int msgCount = 200;
-   final Vector<Throwable> exceptions = new Vector<>();
-
-   @Override
-   protected BrokerService createBroker() throws Exception {
-      //persistent = true;
-      BrokerService broker = new BrokerService();
-      broker.setDeleteAllMessagesOnStartup(true);
-      broker.addConnector("tcp://localhost:0");
-      PolicyMap policyMap = new PolicyMap();
-      PolicyEntry defaultEntry = new PolicyEntry();
-      defaultEntry.setPendingQueuePolicy(new FilePendingQueueMessageStoragePolicy());
-      defaultEntry.setCursorMemoryHighWaterMark(50);
-      defaultEntry.setMemoryLimit(500 * 1024);
-      defaultEntry.setProducerFlowControl(false);
-      policyMap.setDefaultEntry(defaultEntry);
-      broker.setDestinationPolicy(policyMap);
-
-      broker.getSystemUsage().getMemoryUsage().setLimit(1000 * 1024);
-
-      return broker;
-   }
-
-   @Test(timeout = 30 * 1000)
-   public void testConcurrentSendToPendingCursor() throws Exception {
-      final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri());
-      factory.setCloseTimeout(30000);
-      ExecutorService executor = Executors.newCachedThreadPool();
-      for (int i = 0; i < maxConcurrency; i++) {
-         final ActiveMQQueue dest = new ActiveMQQueue("Queue-" + i);
-         executor.execute(new Runnable() {
-            @Override
-            public void run() {
-               try {
-                  sendMessages(factory.createConnection(), dest, msgCount);
-               }
-               catch (Throwable t) {
-                  exceptions.add(t);
-               }
-            }
-         });
-      }
-
-      executor.shutdown();
-
-      assertTrue("send completed", executor.awaitTermination(60, TimeUnit.SECONDS));
-      assertNoExceptions();
-
-      executor = Executors.newCachedThreadPool();
-      for (int i = 0; i < maxConcurrency; i++) {
-         final ActiveMQQueue dest = new ActiveMQQueue("Queue-" + i);
-         executor.execute(new Runnable() {
-            @Override
-            public void run() {
-               try {
-                  startConsumers(factory, dest);
-               }
-               catch (Throwable t) {
-                  exceptions.add(t);
-               }
-            }
-         });
-      }
-
-      executor.shutdown();
-      assertTrue("consumers completed", executor.awaitTermination(60, TimeUnit.SECONDS));
-
-      allMessagesList.setMaximumDuration(120 * 1000);
-      final int numExpected = maxConcurrency * msgCount;
-      allMessagesList.waitForMessagesToArrive(numExpected);
-
-      if (allMessagesList.getMessageCount() != numExpected) {
-         dumpAllThreads(getName());
-
-      }
-      allMessagesList.assertMessagesReceivedNoWait(numExpected);
-
-      assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
-
-   }
-
-   private void assertNoExceptions() {
-      if (!exceptions.isEmpty()) {
-         for (Throwable t : exceptions) {
-            t.printStackTrace();
-         }
-      }
-      assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/032317ba/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2982Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2982Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2982Test.java
deleted file mode 100644
index 573cdd3..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2982Test.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-
-import java.io.IOException;
-import java.util.concurrent.CountDownLatch;
-
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.RedeliveryPolicy;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
-import org.apache.activemq.store.kahadb.KahaDBStore;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-public class AMQ2982Test {
-
-   private static final int MAX_MESSAGES = 500;
-
-   private static final String QUEUE_NAME = "test.queue";
-
-   private BrokerService broker;
-
-   private final CountDownLatch messageCountDown = new CountDownLatch(MAX_MESSAGES);
-
-   private CleanableKahaDBStore kahaDB;
-
-   private static class CleanableKahaDBStore extends KahaDBStore {
-
-      // make checkpoint cleanup accessible
-      public void forceCleanup() throws IOException {
-         checkpointCleanup(true);
-      }
-
-      public int getFileMapSize() throws IOException {
-         // ensure save memory publishing, use the right lock
-         indexLock.readLock().lock();
-         try {
-            return getJournal().getFileMap().size();
-         }
-         finally {
-            indexLock.readLock().unlock();
-         }
-      }
-   }
-
-   @Before
-   public void setup() throws Exception {
-
-      broker = new BrokerService();
-      broker.setDeleteAllMessagesOnStartup(true);
-      broker.setPersistent(true);
-
-      kahaDB = new CleanableKahaDBStore();
-      kahaDB.setJournalMaxFileLength(256 * 1024);
-      broker.setPersistenceAdapter(kahaDB);
-
-      broker.start();
-      broker.waitUntilStarted();
-   }
-
-   private Connection registerDLQMessageListener() throws Exception {
-      ConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
-      Connection connection = factory.createConnection();
-      connection.start();
-      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      MessageConsumer consumer = session.createConsumer(session.createQueue(SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME));
-      consumer.setMessageListener(new MessageListener() {
-
-         @Override
-         public void onMessage(Message message) {
-            messageCountDown.countDown();
-         }
-      });
-
-      return connection;
-   }
-
-   class ConsumerThread extends Thread {
-
-      @Override
-      public void run() {
-         try {
-            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
-
-            RedeliveryPolicy policy = new RedeliveryPolicy();
-            policy.setMaximumRedeliveries(0);
-            policy.setInitialRedeliveryDelay(100);
-            policy.setUseExponentialBackOff(false);
-
-            factory.setRedeliveryPolicy(policy);
-
-            Connection connection = factory.createConnection();
-            connection.start();
-            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
-            MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME));
-            do {
-               Message message = consumer.receive(300);
-               if (message != null) {
-                  session.rollback();
-               }
-            } while (messageCountDown.getCount() != 0);
-            consumer.close();
-            session.close();
-            connection.close();
-         }
-         catch (Exception e) {
-            Assert.fail(e.getMessage());
-         }
-      }
-   }
-
-   private void sendMessages() throws Exception {
-      ConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
-      Connection connection = factory.createConnection();
-      connection.start();
-      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME));
-      producer.setDeliveryMode(DeliveryMode.PERSISTENT);
-      for (int i = 0; i < MAX_MESSAGES; i++) {
-         BytesMessage message = session.createBytesMessage();
-         message.writeBytes(new byte[1000]);
-         producer.send(message);
-      }
-      producer.close();
-      session.close();
-      connection.close();
-   }
-
-   @Test
-   public void testNoStickyKahaDbLogFilesOnLocalTransactionRollback() throws Exception {
-
-      Connection dlqConnection = registerDLQMessageListener();
-
-      ConsumerThread thread = new ConsumerThread();
-      thread.start();
-
-      sendMessages();
-
-      thread.join(60 * 1000);
-      assertFalse(thread.isAlive());
-
-      dlqConnection.close();
-
-      kahaDB.forceCleanup();
-
-      assertEquals("only one active KahaDB log file after cleanup is expected", 1, kahaDB.getFileMapSize());
-   }
-
-   @After
-   public void tearDown() throws Exception {
-      broker.stop();
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/032317ba/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2983Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2983Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2983Test.java
deleted file mode 100644
index 8714477..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2983Test.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.store.kahadb.KahaDBStore;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-public class AMQ2983Test {
-
-   private static final int MAX_CONSUMER = 10;
-
-   private static final int MAX_MESSAGES = 2000;
-
-   private static final String QUEUE_NAME = "test.queue";
-
-   private BrokerService broker;
-
-   private final CountDownLatch messageCountDown = new CountDownLatch(MAX_MESSAGES);
-
-   private CleanableKahaDBStore kahaDB;
-
-   private static class CleanableKahaDBStore extends KahaDBStore {
-
-      // make checkpoint cleanup accessible
-      public void forceCleanup() throws IOException {
-         checkpointCleanup(true);
-      }
-
-      public int getFileMapSize() throws IOException {
-         // ensure save memory publishing, use the right lock
-         indexLock.readLock().lock();
-         try {
-            return getJournal().getFileMap().size();
-         }
-         finally {
-            indexLock.readLock().unlock();
-         }
-      }
-   }
-
-   private class ConsumerThread extends Thread {
-
-      @Override
-      public void run() {
-         try {
-            ConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
-            Connection connection = factory.createConnection();
-            connection.start();
-            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
-            MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME));
-            do {
-               Message message = consumer.receive(200);
-               if (message != null) {
-                  session.commit();
-                  messageCountDown.countDown();
-               }
-            } while (messageCountDown.getCount() != 0);
-            consumer.close();
-            session.close();
-            connection.close();
-         }
-         catch (Exception e) {
-            Assert.fail(e.getMessage());
-         }
-      }
-   }
-
-   @Before
-   public void setup() throws Exception {
-
-      broker = new BrokerService();
-      broker.setDeleteAllMessagesOnStartup(true);
-      broker.setPersistent(true);
-
-      kahaDB = new CleanableKahaDBStore();
-      kahaDB.setJournalMaxFileLength(256 * 1024);
-      broker.setPersistenceAdapter(kahaDB);
-
-      broker.start();
-      broker.waitUntilStarted();
-   }
-
-   @After
-   public void tearDown() throws Exception {
-      broker.stop();
-   }
-
-   @Test
-   public void testNoStickyKahaDbLogFilesOnConcurrentTransactionalConsumer() throws Exception {
-
-      List<Thread> consumerThreads = new ArrayList<>();
-      for (int i = 0; i < MAX_CONSUMER; i++) {
-         ConsumerThread thread = new ConsumerThread();
-         thread.start();
-         consumerThreads.add(thread);
-      }
-      sendMessages();
-
-      boolean allMessagesReceived = messageCountDown.await(60, TimeUnit.SECONDS);
-      assertTrue(allMessagesReceived);
-
-      for (Thread thread : consumerThreads) {
-         thread.join(TimeUnit.MILLISECONDS.convert(60, TimeUnit.SECONDS));
-         assertFalse(thread.isAlive());
-      }
-      kahaDB.forceCleanup();
-      assertEquals("Expect only one active KahaDB log file after cleanup", 1, kahaDB.getFileMapSize());
-   }
-
-   private void sendMessages() throws Exception {
-      ConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
-      Connection connection = factory.createConnection();
-      connection.start();
-      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME));
-      producer.setDeliveryMode(DeliveryMode.PERSISTENT);
-      for (int i = 0; i < MAX_MESSAGES; i++) {
-         BytesMessage message = session.createBytesMessage();
-         message.writeBytes(new byte[200]);
-         producer.send(message);
-      }
-      producer.close();
-      session.close();
-      connection.close();
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/032317ba/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3014Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3014Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3014Test.java
deleted file mode 100644
index 1e3c737..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3014Test.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.Connection;
-import org.apache.activemq.broker.TransportConnector;
-import org.apache.activemq.command.BrokerInfo;
-import org.apache.activemq.network.DiscoveryNetworkConnector;
-import org.apache.activemq.thread.Task;
-import org.apache.activemq.thread.TaskRunner;
-import org.apache.activemq.thread.TaskRunnerFactory;
-import org.apache.activemq.transport.*;
-import org.apache.activemq.transport.discovery.simple.SimpleDiscoveryAgent;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * This test involves the creation of a local and remote broker, both of which
- * communicate over VM and TCP. The local broker establishes a bridge to the
- * remote broker for the purposes of verifying that broker info is only
- * transferred once the local broker's ID is known to the bridge support.
- */
-public class AMQ3014Test {
-
-   // Change this URL to be an unused port.
-   private static final String BROKER_URL = "tcp://localhost:0";
-
-   private List<BrokerInfo> remoteBrokerInfos = Collections.synchronizedList(new ArrayList<BrokerInfo>());
-
-   private BrokerService localBroker = new BrokerService();
-
-   // Override the "remote" broker so that it records all (remote) BrokerInfos
-   // that it receives.
-   private BrokerService remoteBroker = new BrokerService() {
-      @Override
-      protected TransportConnector createTransportConnector(URI brokerURI) throws Exception {
-         TransportServer transport = TransportFactorySupport.bind(this, brokerURI);
-         return new TransportConnector(transport) {
-            @Override
-            protected Connection createConnection(Transport transport) throws IOException {
-               Connection connection = super.createConnection(transport);
-               final TransportListener proxiedListener = transport.getTransportListener();
-               transport.setTransportListener(new TransportListener() {
-
-                  @Override
-                  public void onCommand(Object command) {
-                     if (command instanceof BrokerInfo) {
-                        remoteBrokerInfos.add((BrokerInfo) command);
-                     }
-                     proxiedListener.onCommand(command);
-                  }
-
-                  @Override
-                  public void onException(IOException error) {
-                     proxiedListener.onException(error);
-                  }
-
-                  @Override
-                  public void transportInterupted() {
-                     proxiedListener.transportInterupted();
-                  }
-
-                  @Override
-                  public void transportResumed() {
-                     proxiedListener.transportResumed();
-                  }
-               });
-               return connection;
-            }
-
-         };
-      }
-   };
-
-   @Before
-   public void init() throws Exception {
-      localBroker.setBrokerName("localBroker");
-      localBroker.setPersistent(false);
-      localBroker.setUseJmx(false);
-      localBroker.setSchedulerSupport(false);
-
-      remoteBroker.setBrokerName("remoteBroker");
-      remoteBroker.setPersistent(false);
-      remoteBroker.setUseJmx(false);
-      remoteBroker.addConnector(BROKER_URL);
-      remoteBroker.setSchedulerSupport(false);
-   }
-
-   @After
-   public void cleanup() throws Exception {
-      try {
-         localBroker.stop();
-      }
-      finally {
-         remoteBroker.stop();
-      }
-   }
-
-   /**
-    * This test verifies that the local broker's ID is typically known by the
-    * bridge support before the local broker's BrokerInfo is sent to the remote
-    * broker.
-    */
-   @Test
-   public void NormalCaseTest() throws Exception {
-      runTest(0, 3000);
-   }
-
-   /**
-    * This test verifies that timing can arise under which the local broker's
-    * ID is not known by the bridge support before the local broker's
-    * BrokerInfo is sent to the remote broker.
-    */
-   @Test
-   public void DelayedCaseTest() throws Exception {
-      runTest(500, 3000);
-   }
-
-   private void runTest(final long taskRunnerDelay, long timeout) throws Exception {
-      // Add a network connector to the local broker that will create a bridge
-      // to the remote broker.
-      DiscoveryNetworkConnector dnc = new DiscoveryNetworkConnector();
-      SimpleDiscoveryAgent da = new SimpleDiscoveryAgent();
-      da.setServices(remoteBroker.getTransportConnectors().get(0).getPublishableConnectString());
-      dnc.setDiscoveryAgent(da);
-      localBroker.addNetworkConnector(dnc);
-
-      // Before starting the local broker, intercept the task runner factory
-      // so that the
-      // local VMTransport dispatcher is artificially delayed.
-      final TaskRunnerFactory realTaskRunnerFactory = localBroker.getTaskRunnerFactory();
-      localBroker.setTaskRunnerFactory(new TaskRunnerFactory() {
-         @Override
-         public TaskRunner createTaskRunner(Task task, String name) {
-            final TaskRunner realTaskRunner = realTaskRunnerFactory.createTaskRunner(task, name);
-            if (name.startsWith("ActiveMQ Connection Dispatcher: ")) {
-               return new TaskRunner() {
-                  @Override
-                  public void shutdown() throws InterruptedException {
-                     realTaskRunner.shutdown();
-                  }
-
-                  @Override
-                  public void shutdown(long timeout) throws InterruptedException {
-                     realTaskRunner.shutdown(timeout);
-                  }
-
-                  @Override
-                  public void wakeup() throws InterruptedException {
-                     Thread.sleep(taskRunnerDelay);
-                     realTaskRunner.wakeup();
-                  }
-               };
-            }
-            else {
-               return realTaskRunnerFactory.createTaskRunner(task, name);
-            }
-         }
-      });
-
-      // Start the brokers and wait for the bridge to be created; the remote
-      // broker is started first to ensure it is available for the local
-      // broker to connect to.
-      remoteBroker.start();
-      localBroker.start();
-
-      // Wait for the remote broker to receive the local broker's BrokerInfo
-      // and then verify the local broker's ID is known.
-      long startTimeMillis = System.currentTimeMillis();
-      while (remoteBrokerInfos.isEmpty() && (System.currentTimeMillis() - startTimeMillis) < timeout) {
-         Thread.sleep(100);
-      }
-
-      Assert.assertFalse("Timed out waiting for bridge to form.", remoteBrokerInfos.isEmpty());
-      Assert.assertNotNull("Local broker ID is null.", remoteBrokerInfos.get(0).getBrokerId());
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/032317ba/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3120Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3120Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3120Test.java
deleted file mode 100644
index 88a0db8..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3120Test.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
-import org.apache.activemq.util.ConsumerThread;
-import org.apache.activemq.util.ProducerThread;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.junit.Test;
-
-import javax.jms.*;
-
-import java.io.File;
-
-import static org.junit.Assert.assertEquals;
-
-public class AMQ3120Test {
-
-   private static final Logger LOG = LoggerFactory.getLogger(AMQ3120Test.class);
-
-   BrokerService broker = null;
-   File kahaDbDir = null;
-   private final Destination destination = new ActiveMQQueue("AMQ3120Test");
-   final String payload = new String(new byte[1024]);
-
-   protected void startBroker(boolean delete) throws Exception {
-      broker = new BrokerService();
-
-      //Start with a clean directory
-      kahaDbDir = new File(broker.getBrokerDataDirectory(), "KahaDB");
-      deleteDir(kahaDbDir);
-
-      broker.setSchedulerSupport(false);
-      broker.setDeleteAllMessagesOnStartup(delete);
-      broker.setPersistent(true);
-      broker.setUseJmx(false);
-      broker.addConnector("tcp://localhost:0");
-
-      PolicyMap map = new PolicyMap();
-      PolicyEntry entry = new PolicyEntry();
-      entry.setUseCache(false);
-      map.setDefaultEntry(entry);
-      broker.setDestinationPolicy(map);
-
-      configurePersistence(broker, delete);
-
-      broker.start();
-      LOG.info("Starting broker..");
-   }
-
-   protected void configurePersistence(BrokerService brokerService, boolean deleteAllOnStart) throws Exception {
-      KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter();
-
-      // ensure there are a bunch of data files but multiple entries in each
-      adapter.setJournalMaxFileLength(1024 * 20);
-
-      // speed up the test case, checkpoint and cleanup early and often
-      adapter.setCheckpointInterval(500);
-      adapter.setCleanupInterval(500);
-
-      if (!deleteAllOnStart) {
-         adapter.setForceRecoverIndex(true);
-      }
-
-   }
-
-   private boolean deleteDir(File dir) {
-      if (dir.isDirectory()) {
-         String[] children = dir.list();
-         for (int i = 0; i < children.length; i++) {
-            boolean success = deleteDir(new File(dir, children[i]));
-            if (!success) {
-               return false;
-            }
-         }
-      }
-
-      return dir.delete();
-   }
-
-   private int getFileCount(File dir) {
-      if (dir.isDirectory()) {
-         String[] children = dir.list();
-         return children.length;
-      }
-
-      return 0;
-   }
-
-   @Test
-   public void testCleanupOfFiles() throws Exception {
-      final int messageCount = 500;
-      startBroker(true);
-      int fileCount = getFileCount(kahaDbDir);
-      assertEquals(4, fileCount);
-
-      Connection connection = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri()).createConnection();
-      connection.start();
-      Session producerSess = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      Session consumerSess = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-      ProducerThread producer = new ProducerThread(producerSess, destination) {
-         @Override
-         protected Message createMessage(int i) throws Exception {
-            return sess.createTextMessage(payload + "::" + i);
-         }
-      };
-      producer.setSleep(650);
-      producer.setMessageCount(messageCount);
-      ConsumerThread consumer = new ConsumerThread(consumerSess, destination);
-      consumer.setBreakOnNull(false);
-      consumer.setMessageCount(messageCount);
-
-      producer.start();
-      consumer.start();
-
-      producer.join();
-      consumer.join();
-
-      assertEquals("consumer got all produced messages", producer.getMessageCount(), consumer.getReceived());
-
-      broker.stop();
-      broker.waitUntilStopped();
-
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/032317ba/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3140Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3140Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3140Test.java
deleted file mode 100644
index 621b421..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3140Test.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.ScheduledMessage;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.util.IOHelper;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class AMQ3140Test {
-
-   private static final int MESSAGES_PER_THREAD = 100;
-
-   private static final int THREAD_COUNT = 10;
-
-   private BrokerService broker;
-
-   private static final String QUEUE_NAME = "test";
-
-   private static class Sender extends Thread {
-
-      private static final int DELAY = 3000;
-
-      @Override
-      public void run() {
-         try {
-            ConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
-            Connection connection = cf.createConnection();
-            connection.start();
-            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME));
-            Message message = session.createTextMessage("test");
-            for (int i = 0; i < MESSAGES_PER_THREAD; i++) {
-               message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, DELAY);
-               producer.send(message);
-            }
-            session.close();
-            connection.close();
-         }
-         catch (JMSException e) {
-            fail(e.getMessage());
-         }
-      }
-   }
-
-   @Before
-   public void setup() throws Exception {
-      File schedulerDirectory = new File("target/test/ScheduledDB");
-
-      IOHelper.mkdirs(schedulerDirectory);
-      IOHelper.deleteChildren(schedulerDirectory);
-
-      broker = new BrokerService();
-      broker.setSchedulerSupport(true);
-      broker.setPersistent(true);
-      broker.setDeleteAllMessagesOnStartup(true);
-      broker.setDataDirectory("target");
-      broker.setSchedulerDirectoryFile(schedulerDirectory);
-      broker.setUseJmx(false);
-      broker.addConnector("vm://localhost");
-
-      broker.start();
-      broker.waitUntilStarted();
-   }
-
-   @After
-   public void tearDown() throws Exception {
-      broker.stop();
-   }
-
-   @Test
-   public void noMessageLostOnConcurrentScheduling() throws JMSException, InterruptedException {
-
-      final AtomicLong receiveCounter = new AtomicLong();
-
-      ConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
-      Connection connection = cf.createConnection();
-      connection.start();
-      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-      MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME));
-      consumer.setMessageListener(new MessageListener() {
-
-         @Override
-         public void onMessage(Message message) {
-            receiveCounter.incrementAndGet();
-         }
-      });
-
-      List<Sender> senderThreads = new ArrayList<>();
-      for (int i = 0; i < THREAD_COUNT; i++) {
-         Sender sender = new Sender();
-         senderThreads.add(sender);
-      }
-      for (Sender sender : senderThreads) {
-         sender.start();
-      }
-      for (Sender sender : senderThreads) {
-         sender.join();
-      }
-
-      // wait until all scheduled messages has been received
-      TimeUnit.MINUTES.sleep(2);
-
-      session.close();
-      connection.close();
-
-      assertEquals(MESSAGES_PER_THREAD * THREAD_COUNT, receiveCounter.get());
-   }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/032317ba/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3141Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3141Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3141Test.java
deleted file mode 100644
index 49db143..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3141Test.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.ScheduledMessage;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.util.IOHelper;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class AMQ3141Test {
-
-   private static final int MAX_MESSAGES = 100;
-
-   private static final long DELAY_IN_MS = 100;
-
-   private static final String QUEUE_NAME = "target.queue";
-
-   private BrokerService broker;
-
-   private final CountDownLatch messageCountDown = new CountDownLatch(MAX_MESSAGES);
-
-   private ConnectionFactory factory;
-
-   @Before
-   public void setup() throws Exception {
-
-      broker = new BrokerService();
-      broker.setPersistent(true);
-      broker.setSchedulerSupport(true);
-      broker.setDataDirectory("target");
-      broker.setUseJmx(false);
-      broker.addConnector("vm://localhost");
-
-      File schedulerDirectory = new File("target/test/ScheduledDB");
-      IOHelper.mkdirs(schedulerDirectory);
-      IOHelper.deleteChildren(schedulerDirectory);
-      broker.setSchedulerDirectoryFile(schedulerDirectory);
-
-      broker.start();
-      broker.waitUntilStarted();
-
-      factory = new ActiveMQConnectionFactory("vm://localhost");
-   }
-
-   private void sendMessages() throws Exception {
-      Connection connection = factory.createConnection();
-      connection.start();
-      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME));
-      for (int i = 0; i < MAX_MESSAGES; i++) {
-         Message message = session.createTextMessage();
-         message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, DELAY_IN_MS);
-         producer.send(message);
-      }
-      connection.close();
-   }
-
-   @Test
-   public void testNoMissingMessagesOnShortScheduleDelay() throws Exception {
-
-      Connection connection = factory.createConnection();
-      connection.start();
-      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME));
-
-      consumer.setMessageListener(new MessageListener() {
-         @Override
-         public void onMessage(Message message) {
-            messageCountDown.countDown();
-         }
-      });
-      sendMessages();
-
-      boolean receiveComplete = messageCountDown.await(5, TimeUnit.SECONDS);
-
-      connection.close();
-
-      assertTrue("expect all messages received but " + messageCountDown.getCount() + " are missing", receiveComplete);
-   }
-
-   @After
-   public void tearDown() throws Exception {
-      broker.stop();
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/032317ba/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3145Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3145Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3145Test.java
deleted file mode 100644
index 7e7c959..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3145Test.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.jmx.QueueViewMBean;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ3145Test {
-
-   private static final Logger LOG = LoggerFactory.getLogger(AMQ3145Test.class);
-   private final String MESSAGE_TEXT = new String(new byte[1024]);
-   BrokerService broker;
-   ConnectionFactory factory;
-   Connection connection;
-   Session session;
-   Queue queue;
-   MessageConsumer consumer;
-
-   @Before
-   public void createBroker() throws Exception {
-      createBroker(true);
-   }
-
-   public void createBroker(boolean deleteAll) throws Exception {
-      broker = new BrokerService();
-      broker.setDeleteAllMessagesOnStartup(deleteAll);
-      broker.setDataDirectory("target/AMQ3145Test");
-      broker.setUseJmx(true);
-      broker.getManagementContext().setCreateConnector(false);
-      broker.addConnector("tcp://localhost:0");
-      broker.start();
-      broker.waitUntilStarted();
-      factory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri().toString());
-      connection = factory.createConnection();
-      connection.start();
-      session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-   }
-
-   @After
-   public void tearDown() throws Exception {
-      if (consumer != null) {
-         consumer.close();
-      }
-      session.close();
-      connection.stop();
-      connection.close();
-      broker.stop();
-   }
-
-   @Test
-   public void testCacheDisableReEnable() throws Exception {
-      createProducerAndSendMessages(1);
-      QueueViewMBean proxy = getProxyToQueueViewMBean();
-      assertTrue("cache is enabled", proxy.isCacheEnabled());
-      tearDown();
-      createBroker(false);
-      proxy = getProxyToQueueViewMBean();
-      assertEquals("one pending message", 1, proxy.getQueueSize());
-      assertTrue("cache is disabled when there is a pending message", !proxy.isCacheEnabled());
-
-      createConsumer(1);
-      createProducerAndSendMessages(1);
-      assertTrue("cache is enabled again on next send when there are no messages", proxy.isCacheEnabled());
-   }
-
-   private QueueViewMBean getProxyToQueueViewMBean() throws MalformedObjectNameException, JMSException {
-      ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq" + ":destinationType=Queue,destinationName=" + queue.getQueueName() + ",type=Broker,brokerName=localhost");
-      QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext().newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
-      return proxy;
-   }
-
-   private void createProducerAndSendMessages(int numToSend) throws Exception {
-      queue = session.createQueue("test1");
-      MessageProducer producer = session.createProducer(queue);
-      for (int i = 0; i < numToSend; i++) {
-         TextMessage message = session.createTextMessage(MESSAGE_TEXT + i);
-         if (i != 0 && i % 50000 == 0) {
-            LOG.info("sent: " + i);
-         }
-         producer.send(message);
-      }
-      producer.close();
-   }
-
-   private void createConsumer(int numToConsume) throws Exception {
-      consumer = session.createConsumer(queue);
-      // wait for buffer fill out
-      for (int i = 0; i < numToConsume; ++i) {
-         Message message = consumer.receive(2000);
-         message.acknowledge();
-      }
-      consumer.close();
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/032317ba/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3157Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3157Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3157Test.java
deleted file mode 100644
index 34b1909..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3157Test.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.Connection;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.management.ObjectName;
-
-import org.apache.activemq.EmbeddedBrokerTestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.jmx.DestinationViewMBean;
-import org.apache.activemq.broker.region.DestinationInterceptor;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.broker.region.virtual.MirroredQueue;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.spring.ConsumerBean;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ3157Test extends EmbeddedBrokerTestSupport {
-
-   private static final transient Logger LOG = LoggerFactory.getLogger(AMQ3157Test.class);
-   private Connection connection;
-
-   public void testInactiveMirroredQueueIsCleanedUp() throws Exception {
-
-      if (connection == null) {
-         connection = createConnection();
-      }
-      connection.start();
-
-      ConsumerBean messageList = new ConsumerBean();
-      messageList.setVerbose(true);
-
-      ActiveMQDestination consumeDestination = createConsumeDestination();
-
-      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      LOG.info("Consuming from: " + consumeDestination);
-
-      MessageConsumer c1 = session.createConsumer(consumeDestination);
-      c1.setMessageListener(messageList);
-
-      // create topic producer
-      ActiveMQQueue sendDestination = new ActiveMQQueue(getQueueName());
-      LOG.info("Sending to: " + sendDestination);
-
-      MessageProducer producer = session.createProducer(sendDestination);
-      assertNotNull(producer);
-
-      final int total = 10;
-      for (int i = 0; i < total; i++) {
-         producer.send(session.createTextMessage("message: " + i));
-      }
-
-      messageList.assertMessagesArrived(total);
-      LOG.info("Received: " + messageList);
-      messageList.flushMessages();
-
-      MessageConsumer c2 = session.createConsumer(sendDestination);
-      c2.setMessageListener(messageList);
-      messageList.assertMessagesArrived(total);
-      LOG.info("Q Received: " + messageList);
-
-      connection.close();
-
-      List<ObjectName> topics = Arrays.asList(broker.getAdminView().getTopics());
-      assertTrue(topics.contains(createObjectName(consumeDestination)));
-      List<ObjectName> queues = Arrays.asList(broker.getAdminView().getQueues());
-      assertTrue(queues.contains(createObjectName(sendDestination)));
-
-      Thread.sleep(TimeUnit.SECONDS.toMillis(10));
-
-      topics = Arrays.asList(broker.getAdminView().getTopics());
-      if (topics != null) {
-         assertFalse("Virtual Topic Desination did not get cleaned up.", topics.contains(createObjectName(consumeDestination)));
-      }
-      queues = Arrays.asList(broker.getAdminView().getQueues());
-      if (queues != null) {
-         assertFalse("Mirrored Queue Desination did not get cleaned up.", queues.contains(createObjectName(sendDestination)));
-      }
-   }
-
-   protected ActiveMQDestination createConsumeDestination() {
-      return new ActiveMQTopic("VirtualTopic.Mirror." + getQueueName());
-   }
-
-   protected String getQueueName() {
-      return "My.Queue";
-   }
-
-   @Override
-   protected BrokerService createBroker() throws Exception {
-      BrokerService answer = new BrokerService();
-      answer.setUseMirroredQueues(true);
-      answer.setPersistent(isPersistent());
-      answer.setSchedulePeriodForDestinationPurge(1000);
-
-      PolicyEntry entry = new PolicyEntry();
-      entry.setGcInactiveDestinations(true);
-      entry.setInactiveTimeoutBeforeGC(5000);
-      entry.setProducerFlowControl(true);
-      PolicyMap map = new PolicyMap();
-      map.setDefaultEntry(entry);
-
-      MirroredQueue mirrorQ = new MirroredQueue();
-      mirrorQ.setCopyMessage(true);
-      DestinationInterceptor[] destinationInterceptors = new DestinationInterceptor[]{mirrorQ};
-      answer.setDestinationInterceptors(destinationInterceptors);
-
-      answer.setDestinationPolicy(map);
-      answer.addConnector(bindAddress);
-
-      return answer;
-   }
-
-   protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception {
-      String domain = "org.apache.activemq";
-      ObjectName name;
-      if (destination.isQueue()) {
-         name = new ObjectName(domain + ":BrokerName=localhost,Type=Queue,Destination=" + destination.getPhysicalName());
-      }
-      else {
-         name = new ObjectName(domain + ":BrokerName=localhost,Type=Topic,Destination=" + destination.getPhysicalName());
-      }
-      return (DestinationViewMBean) broker.getManagementContext().newProxyInstance(name, DestinationViewMBean.class, true);
-   }
-
-   protected ObjectName createObjectName(ActiveMQDestination destination) throws Exception {
-      String domain = "org.apache.activemq";
-      ObjectName name;
-      if (destination.isQueue()) {
-         name = new ObjectName(domain + ":type=Broker,brokerName=localhost," +
-                                  "destinationType=Queue,destinationName=" + destination.getPhysicalName());
-      }
-      else {
-         name = new ObjectName(domain + ":type=Broker,brokerName=localhost," +
-                                  "destinationType=Topic,destinationName=" + destination.getPhysicalName());
-      }
-
-      return name;
-   }
-
-   @Override
-   protected void tearDown() throws Exception {
-      if (connection != null) {
-         connection.close();
-      }
-      super.tearDown();
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/032317ba/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3167Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3167Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3167Test.java
deleted file mode 100644
index 6fd81b2..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3167Test.java
+++ /dev/null
@@ -1,471 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.ArrayList;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * Test the loss of messages detected during testing with ActiveMQ 5.4.1 and 5.4.2.
- * <br>
- * Symptoms: - 1 record is lost "early" in the stream. - no more records lost.
- * <br>
- * Test Configuration: - Broker Settings: - Destination Policy - Occurs with "Destination Policy" using Store Cursor and
- * a memory limit - Not reproduced without "Destination Policy" defined - Persistence Adapter - Memory: Does not occur.
- * - KahaDB: Occurs. - Messages - Occurs with TextMessage and BinaryMessage - Persistent messages.
- * <br>
- * Notes: - Lower memory limits increase the rate of occurrence. - Higher memory limits may prevent the problem
- * (probably because memory limits not reached). - Producers sending a number of messages before consumers come online
- * increases rate of occurrence.
- */
-
-public class AMQ3167Test {
-
-   protected BrokerService embeddedBroker;
-
-   protected static final int MEMORY_LIMIT = 16 * 1024;
-
-   protected static boolean Debug_f = false;
-
-   protected long Producer_stop_time = 0;
-   protected long Consumer_stop_time = 0;
-   protected long Consumer_startup_delay_ms = 2000;
-   protected boolean Stop_after_error = true;
-
-   protected Connection JMS_conn;
-   protected long Num_error = 0;
-
-   // // ////
-   // // UTILITIES ////
-   // // ////
-
-   /**
-    * Create a new, unsecured, client connection to the test broker using the given username and password. This
-    * connection bypasses all security.
-    * <br>
-    * Don't forget to start the connection or no messages will be received by consumers even though producers will work
-    * fine.
-    *
-    * @username name of the JMS user for the connection; may be null.
-    * @password Password for the JMS user; may be null.
-    */
-
-   protected Connection createUnsecuredConnection(String username, String password) throws javax.jms.JMSException {
-      ActiveMQConnectionFactory conn_fact;
-
-      conn_fact = new ActiveMQConnectionFactory(embeddedBroker.getVmConnectorURI());
-
-      return conn_fact.createConnection(username, password);
-   }
-
-   // // ////
-   // // TEST FUNCTIONALITY ////
-   // // ////
-
-   @Before
-   public void testPrep() throws Exception {
-      embeddedBroker = new BrokerService();
-      configureBroker(embeddedBroker);
-      embeddedBroker.start();
-      embeddedBroker.waitUntilStarted();
-
-      // Prepare the connection
-      JMS_conn = createUnsecuredConnection(null, null);
-      JMS_conn.start();
-   }
-
-   @After
-   public void testCleanup() throws java.lang.Exception {
-      JMS_conn.stop();
-      embeddedBroker.stop();
-   }
-
-   protected void configureBroker(BrokerService broker_svc) throws Exception {
-
-      broker_svc.setBrokerName("testbroker1");
-
-      broker_svc.setUseJmx(false);
-      broker_svc.setPersistent(true);
-      broker_svc.setDataDirectory("target/AMQ3167Test");
-      configureDestinationPolicy(broker_svc);
-   }
-
-   /**
-    * NOTE: overrides any prior policy map defined for the broker service.
-    */
-
-   protected void configureDestinationPolicy(BrokerService broker_svc) {
-      PolicyMap pol_map;
-      PolicyEntry pol_ent;
-      ArrayList<PolicyEntry> ent_list;
-
-      ent_list = new ArrayList<>();
-
-      //
-      // QUEUES
-      //
-
-      pol_ent = new PolicyEntry();
-      pol_ent.setQueue(">");
-      pol_ent.setMemoryLimit(MEMORY_LIMIT);
-      pol_ent.setProducerFlowControl(false);
-      ent_list.add(pol_ent);
-
-      //
-      // COMPLETE POLICY MAP
-      //
-
-      pol_map = new PolicyMap();
-      pol_map.setPolicyEntries(ent_list);
-
-      broker_svc.setDestinationPolicy(pol_map);
-   }
-
-   // // ////
-   // // TEST ////
-   // // ////
-
-   @Test
-   public void testQueueLostMessage() throws Exception {
-      Destination dest;
-
-      dest = ActiveMQDestination.createDestination("lostmsgtest.queue", ActiveMQDestination.QUEUE_TYPE);
-
-      // 10 seconds from now
-      Producer_stop_time = java.lang.System.nanoTime() + (10L * 1000000000L);
-
-      // 15 seconds from now
-      Consumer_stop_time = Producer_stop_time + (5L * 1000000000L);
-
-      runLostMsgTest(dest, 1000000, 1, 1, false);
-
-      // Make sure failures in the threads are thoroughly reported in the JUnit framework.
-      assertTrue(Num_error == 0);
-   }
-
-   /**
-    *
-    */
-
-   protected static void log(String msg) {
-      if (Debug_f)
-         java.lang.System.err.println(msg);
-   }
-
-   /**
-    * Main body of the lost-message test.
-    */
-
-   protected void runLostMsgTest(Destination dest,
-                                 int num_msg,
-                                 int num_send_per_sess,
-                                 int num_recv_per_sess,
-                                 boolean topic_f) throws Exception {
-      Thread prod_thread;
-      Thread cons_thread;
-      String tag;
-      Session sess;
-      MessageProducer prod;
-      MessageConsumer cons;
-      int ack_mode;
-
-      //
-      // Start the producer
-      //
-
-      tag = "prod";
-      log(">> Starting producer " + tag);
-
-      sess = JMS_conn.createSession((num_send_per_sess > 1), Session.AUTO_ACKNOWLEDGE);
-      prod = sess.createProducer(dest);
-
-      prod_thread = new producerThread(sess, prod, tag, num_msg, num_send_per_sess);
-      prod_thread.start();
-      log("Started producer " + tag);
-
-      //
-      // Delay before starting consumers
-      //
-
-      log("Waiting before starting consumers");
-      java.lang.Thread.sleep(Consumer_startup_delay_ms);
-
-      //
-      // Now create and start the consumer
-      //
-
-      tag = "cons";
-      log(">> Starting consumer");
-
-      if (num_recv_per_sess > 1)
-         ack_mode = Session.CLIENT_ACKNOWLEDGE;
-      else
-         ack_mode = Session.AUTO_ACKNOWLEDGE;
-
-      sess = JMS_conn.createSession(false, ack_mode);
-      cons = sess.createConsumer(dest);
-
-      cons_thread = new consumerThread(sess, cons, tag, num_msg, num_recv_per_sess);
-      cons_thread.start();
-      log("Started consumer " + tag);
-
-      //
-      // Wait for the producer and consumer to finish.
-      //
-
-      log("< waiting for producer.");
-      prod_thread.join();
-
-      log("< waiting for consumer.");
-      cons_thread.join();
-
-      log("Shutting down");
-   }
-
-   // // ////
-   // // INTERNAL CLASSES ////
-   // // ////
-
-   /**
-    * Producer thread - runs a single producer until the maximum number of messages is sent, the producer stop time is
-    * reached, or a test error is detected.
-    */
-
-   protected class producerThread extends Thread {
-
-      protected Session msgSess;
-      protected MessageProducer msgProd;
-      protected String producerTag;
-      protected int numMsg;
-      protected int numPerSess;
-      protected long producer_stop_time;
-
-      producerThread(Session sess, MessageProducer prod, String tag, int num_msg, int sess_size) {
-         super();
-
-         producer_stop_time = 0;
-         msgSess = sess;
-         msgProd = prod;
-         producerTag = tag;
-         numMsg = num_msg;
-         numPerSess = sess_size;
-      }
-
-      public void execTest() throws Exception {
-         Message msg;
-         int sess_start;
-         int cur;
-
-         sess_start = 0;
-         cur = 0;
-         while ((cur < numMsg) && (!didTimeOut()) && ((!Stop_after_error) || (Num_error == 0))) {
-            msg = msgSess.createTextMessage("test message from " + producerTag);
-            msg.setStringProperty("testprodtag", producerTag);
-            msg.setIntProperty("seq", cur);
-
-            if (msg instanceof ActiveMQMessage) {
-               ((ActiveMQMessage) msg).setResponseRequired(true);
-            }
-
-            //
-            // Send the message.
-            //
-
-            msgProd.send(msg);
-            cur++;
-
-            //
-            // Commit if the number of messages per session has been reached, and
-            // transactions are being used (only when > 1 msg per sess).
-            //
-
-            if ((numPerSess > 1) && ((cur - sess_start) >= numPerSess)) {
-               msgSess.commit();
-               sess_start = cur;
-            }
-         }
-
-         // Make sure to send the final commit, if there were sends since the last commit.
-         if ((numPerSess > 1) && ((cur - sess_start) > 0))
-            msgSess.commit();
-
-         if (cur < numMsg)
-            log("* Producer " + producerTag + " timed out at " + java.lang.System.nanoTime() + " (stop time " + producer_stop_time + ")");
-      }
-
-      /**
-       * Check whether it is time for the producer to terminate.
-       */
-
-      protected boolean didTimeOut() {
-         if ((Producer_stop_time > 0) && (java.lang.System.nanoTime() >= Producer_stop_time))
-            return true;
-
-         return false;
-      }
-
-      /**
-       * Run the producer.
-       */
-
-      @Override
-      public void run() {
-         try {
-            log("- running producer " + producerTag);
-            execTest();
-            log("- finished running producer " + producerTag);
-         }
-         catch (Throwable thrown) {
-            Num_error++;
-            fail("producer " + producerTag + " failed: " + thrown.getMessage());
-            throw new Error("producer " + producerTag + " failed", thrown);
-         }
-      }
-
-      @Override
-      public String toString() {
-         return producerTag;
-      }
-   }
-
-   /**
-    * Producer thread - runs a single consumer until the maximum number of messages is received, the consumer stop time
-    * is reached, or a test error is detected.
-    */
-
-   protected class consumerThread extends Thread {
-
-      protected Session msgSess;
-      protected MessageConsumer msgCons;
-      protected String consumerTag;
-      protected int numMsg;
-      protected int numPerSess;
-
-      consumerThread(Session sess, MessageConsumer cons, String tag, int num_msg, int sess_size) {
-         super();
-
-         msgSess = sess;
-         msgCons = cons;
-         consumerTag = tag;
-         numMsg = num_msg;
-         numPerSess = sess_size;
-      }
-
-      public void execTest() throws Exception {
-         Message msg;
-         int sess_start;
-         int cur;
-
-         msg = null;
-         sess_start = 0;
-         cur = 0;
-
-         while ((cur < numMsg) && (!didTimeOut()) && ((!Stop_after_error) || (Num_error == 0))) {
-            //
-            // Use a timeout of 1 second to periodically check the consumer timeout.
-            //
-            msg = msgCons.receive(1000);
-            if (msg != null) {
-               checkMessage(msg, cur);
-               cur++;
-
-               if ((numPerSess > 1) && ((cur - sess_start) >= numPerSess)) {
-                  msg.acknowledge();
-                  sess_start = cur;
-               }
-            }
-         }
-
-         // Acknowledge the last messages, if they were not yet acknowledged.
-         if ((numPerSess > 1) && ((cur - sess_start) > 0))
-            msg.acknowledge();
-
-         if (cur < numMsg)
-            log("* Consumer " + consumerTag + " timed out");
-      }
-
-      /**
-       * Check whether it is time for the consumer to terminate.
-       */
-
-      protected boolean didTimeOut() {
-         if ((Consumer_stop_time > 0) && (java.lang.System.nanoTime() >= Consumer_stop_time))
-            return true;
-
-         return false;
-      }
-
-      /**
-       * Verify the message received. Sequence numbers are checked and are expected to exactly match the message
-       * number (starting at 0).
-       */
-
-      protected void checkMessage(Message msg, int exp_seq) throws javax.jms.JMSException {
-         int seq;
-
-         seq = msg.getIntProperty("seq");
-
-         if (exp_seq != seq) {
-            Num_error++;
-            fail("*** Consumer " + consumerTag + " expected seq " + exp_seq + "; received " + seq);
-         }
-      }
-
-      /**
-       * Run the consumer.
-       */
-
-      @Override
-      public void run() {
-         try {
-            log("- running consumer " + consumerTag);
-            execTest();
-            log("- running consumer " + consumerTag);
-         }
-         catch (Throwable thrown) {
-            Num_error++;
-            fail("consumer " + consumerTag + " failed: " + thrown.getMessage());
-            throw new Error("consumer " + consumerTag + " failed", thrown);
-         }
-      }
-
-      @Override
-      public String toString() {
-         return consumerTag;
-      }
-   }
-}
\ No newline at end of file


Mime
View raw message