activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [34/48] activemq-artemis git commit: open wire changes equivalent to ab16f7098fb52d2b4c40627ed110e1776525f208
Date Tue, 16 Feb 2016 20:10:48 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/740d40bb/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4212Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4212Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4212Test.java
deleted file mode 100644
index 3504c1f..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4212Test.java
+++ /dev/null
@@ -1,357 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import javax.management.ObjectName;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
-import org.apache.activemq.store.kahadb.disk.journal.DataFile;
-import org.apache.activemq.util.Wait;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ4212Test {
-
-   private static final Logger LOG = LoggerFactory.getLogger(AMQ4212Test.class);
-
-   private BrokerService service;
-   private String connectionUri;
-   private ActiveMQConnectionFactory cf;
-
-   private final int MSG_COUNT = 256;
-
-   @Before
-   public void setUp() throws IOException, Exception {
-      createBroker(true, false);
-   }
-
-   public void createBroker(boolean deleteAllMessages, boolean recover) throws Exception {
-      service = new BrokerService();
-      service.setBrokerName("InactiveSubTest");
-      service.setDeleteAllMessagesOnStartup(deleteAllMessages);
-      service.setAdvisorySupport(false);
-      service.setPersistent(true);
-      service.setUseJmx(true);
-      service.setKeepDurableSubsActive(false);
-
-      KahaDBPersistenceAdapter pa = new KahaDBPersistenceAdapter();
-      File dataFile = new File("KahaDB");
-      pa.setDirectory(dataFile);
-      pa.setJournalMaxFileLength(10 * 1024);
-      pa.setCheckpointInterval(TimeUnit.SECONDS.toMillis(5));
-      pa.setCleanupInterval(TimeUnit.SECONDS.toMillis(5));
-      pa.setForceRecoverIndex(recover);
-
-      service.setPersistenceAdapter(pa);
-      service.start();
-      service.waitUntilStarted();
-
-      connectionUri = "vm://InactiveSubTest?create=false";
-      cf = new ActiveMQConnectionFactory(connectionUri);
-   }
-
-   private void restartBroker() throws Exception {
-      stopBroker();
-      createBroker(false, false);
-   }
-
-   private void recoverBroker() throws Exception {
-      stopBroker();
-      createBroker(false, true);
-   }
-
-   @After
-   public void stopBroker() throws Exception {
-      if (service != null) {
-         service.stop();
-         service.waitUntilStopped();
-         service = null;
-      }
-   }
-
-   @Test
-   public void testDirableSubPrefetchRecovered() throws Exception {
-
-      ActiveMQQueue queue = new ActiveMQQueue("MyQueue");
-      ActiveMQTopic topic = new ActiveMQTopic("MyDurableTopic");
-
-      // Send to a Queue to create some journal files
-      sendMessages(queue);
-
-      LOG.info("There are currently [{}] journal log files.", getNumberOfJournalFiles());
-
-      createInactiveDurableSub(topic);
-
-      assertTrue("Should have an inactive durable sub", Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisified() throws Exception {
-            ObjectName[] subs = service.getAdminView().getInactiveDurableTopicSubscribers();
-            return subs != null && subs.length == 1 ? true : false;
-         }
-      }));
-
-      // Now send some more to the queue to create even more files.
-      sendMessages(queue);
-
-      LOG.info("There are currently [{}] journal log files.", getNumberOfJournalFiles());
-      assertTrue(getNumberOfJournalFiles() > 1);
-
-      LOG.info("Restarting the broker.");
-      restartBroker();
-      LOG.info("Restarted the broker.");
-
-      LOG.info("There are currently [{}] journal log files.", getNumberOfJournalFiles());
-      assertTrue(getNumberOfJournalFiles() > 1);
-
-      assertTrue("Should have an inactive durable sub", Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisified() throws Exception {
-            ObjectName[] subs = service.getAdminView().getInactiveDurableTopicSubscribers();
-            return subs != null && subs.length == 1 ? true : false;
-         }
-      }));
-
-      // Clear out all queue data
-      service.getAdminView().removeQueue(queue.getQueueName());
-
-      assertTrue("Less than two journal files expected, was " + getNumberOfJournalFiles(), Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisified() throws Exception {
-            return getNumberOfJournalFiles() <= 2;
-         }
-      }, TimeUnit.MINUTES.toMillis(2)));
-
-      LOG.info("Sending {} Messages to the Topic.", MSG_COUNT);
-      // Send some messages to the inactive destination
-      sendMessages(topic);
-
-      LOG.info("Attempt to consume {} messages from the Topic.", MSG_COUNT);
-      assertEquals(MSG_COUNT, consumeFromInactiveDurableSub(topic));
-
-      LOG.info("Recovering the broker.");
-      recoverBroker();
-      LOG.info("Recovering the broker.");
-
-      assertTrue("Should have an inactive durable sub", Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisified() throws Exception {
-            ObjectName[] subs = service.getAdminView().getInactiveDurableTopicSubscribers();
-            return subs != null && subs.length == 1 ? true : false;
-         }
-      }));
-   }
-
-   @Test
-   public void testDurableAcksNotDropped() throws Exception {
-
-      ActiveMQQueue queue = new ActiveMQQueue("MyQueue");
-      ActiveMQTopic topic = new ActiveMQTopic("MyDurableTopic");
-
-      // Create durable sub in first data file.
-      createInactiveDurableSub(topic);
-
-      assertTrue("Should have an inactive durable sub", Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisified() throws Exception {
-            ObjectName[] subs = service.getAdminView().getInactiveDurableTopicSubscribers();
-            return subs != null && subs.length == 1 ? true : false;
-         }
-      }));
-
-      // Send to a Topic
-      sendMessages(topic, 1);
-
-      // Send to a Queue to create some journal files
-      sendMessages(queue);
-
-      LOG.info("Before consume there are currently [{}] journal log files.", getNumberOfJournalFiles());
-
-      // Consume all the Messages leaving acks behind.
-      consumeDurableMessages(topic, 1);
-
-      LOG.info("After consume there are currently [{}] journal log files.", getNumberOfJournalFiles());
-
-      // Now send some more to the queue to create even more files.
-      sendMessages(queue);
-
-      LOG.info("More Queued. There are currently [{}] journal log files.", getNumberOfJournalFiles());
-      assertTrue(getNumberOfJournalFiles() > 1);
-
-      LOG.info("Restarting the broker.");
-      restartBroker();
-      LOG.info("Restarted the broker.");
-
-      LOG.info("There are currently [{}] journal log files.", getNumberOfJournalFiles());
-      assertTrue(getNumberOfJournalFiles() > 1);
-
-      assertTrue("Should have an inactive durable sub", Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisified() throws Exception {
-            ObjectName[] subs = service.getAdminView().getInactiveDurableTopicSubscribers();
-            return subs != null && subs.length == 1 ? true : false;
-         }
-      }));
-
-      // Clear out all queue data
-      service.getAdminView().removeQueue(queue.getQueueName());
-
-      assertTrue("Less than three journal file expected, was " + getNumberOfJournalFiles(), Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisified() throws Exception {
-            return getNumberOfJournalFiles() <= 3;
-         }
-      }, TimeUnit.MINUTES.toMillis(3)));
-
-      // See if we receive any message they should all be acked.
-      tryConsumeExpectNone(topic);
-
-      LOG.info("There are currently [{}] journal log files.", getNumberOfJournalFiles());
-
-      LOG.info("Recovering the broker.");
-      recoverBroker();
-      LOG.info("Recovering the broker.");
-
-      LOG.info("There are currently [{}] journal log files.", getNumberOfJournalFiles());
-
-      assertTrue("Should have an inactive durable sub", Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisified() throws Exception {
-            ObjectName[] subs = service.getAdminView().getInactiveDurableTopicSubscribers();
-            return subs != null && subs.length == 1 ? true : false;
-         }
-      }));
-
-      // See if we receive any message they should all be acked.
-      tryConsumeExpectNone(topic);
-
-      assertTrue("Less than three journal file expected, was " + getNumberOfJournalFiles(), Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisified() throws Exception {
-            return getNumberOfJournalFiles() == 1;
-         }
-      }, TimeUnit.MINUTES.toMillis(1)));
-   }
-
-   private int getNumberOfJournalFiles() throws IOException {
-      Collection<DataFile> files = ((KahaDBPersistenceAdapter) service.getPersistenceAdapter()).getStore().getJournal().getFileMap().values();
-      int reality = 0;
-      for (DataFile file : files) {
-         if (file != null) {
-            reality++;
-         }
-      }
-
-      return reality;
-   }
-
-   private void createInactiveDurableSub(Topic topic) throws Exception {
-      Connection connection = cf.createConnection();
-      connection.setClientID("Inactive");
-      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      MessageConsumer consumer = session.createDurableSubscriber(topic, "Inactive");
-      consumer.close();
-      connection.close();
-   }
-
-   private void consumeDurableMessages(Topic topic, int count) throws Exception {
-      Connection connection = cf.createConnection();
-      connection.setClientID("Inactive");
-      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      MessageConsumer consumer = session.createDurableSubscriber(topic, "Inactive");
-      connection.start();
-      for (int i = 0; i < count; ++i) {
-         if (consumer.receive(TimeUnit.SECONDS.toMillis(10)) == null) {
-            fail("should have received a message");
-         }
-      }
-      consumer.close();
-      connection.close();
-   }
-
-   private void tryConsumeExpectNone(Topic topic) throws Exception {
-      Connection connection = cf.createConnection();
-      connection.setClientID("Inactive");
-      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      MessageConsumer consumer = session.createDurableSubscriber(topic, "Inactive");
-      connection.start();
-      if (consumer.receive(TimeUnit.SECONDS.toMillis(10)) != null) {
-         fail("Should be no messages for this durable.");
-      }
-      consumer.close();
-      connection.close();
-   }
-
-   private int consumeFromInactiveDurableSub(Topic topic) throws Exception {
-      Connection connection = cf.createConnection();
-      connection.setClientID("Inactive");
-      connection.start();
-      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      MessageConsumer consumer = session.createDurableSubscriber(topic, "Inactive");
-
-      int count = 0;
-
-      while (consumer.receive(10000) != null) {
-         count++;
-      }
-
-      consumer.close();
-      connection.close();
-
-      return count;
-   }
-
-   private void sendMessages(Destination destination) throws Exception {
-      sendMessages(destination, MSG_COUNT);
-   }
-
-   private void sendMessages(Destination destination, int count) throws Exception {
-      Connection connection = cf.createConnection();
-      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      MessageProducer producer = session.createProducer(destination);
-      producer.setDeliveryMode(DeliveryMode.PERSISTENT);
-      for (int i = 0; i < count; ++i) {
-         TextMessage message = session.createTextMessage("Message #" + i + " for destination: " + destination);
-         producer.send(message);
-      }
-      connection.close();
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/740d40bb/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4213Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4213Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4213Test.java
deleted file mode 100644
index c033e97..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4213Test.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.fail;
-
-import javax.jms.JMSException;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerPlugin;
-import org.apache.activemq.broker.BrokerPluginSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ProducerInfo;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class AMQ4213Test {
-
-   private static BrokerService brokerService;
-   private static String BROKER_ADDRESS = "tcp://localhost:0";
-   private static String TEST_QUEUE = "testQueue";
-   private static ActiveMQQueue queue = new ActiveMQQueue(TEST_QUEUE);
-
-   private String connectionUri;
-
-   @SuppressWarnings("unchecked")
-   @Before
-   public void setUp() throws Exception {
-      brokerService = new BrokerService();
-      brokerService.setPersistent(false);
-      brokerService.setUseJmx(true);
-      brokerService.setDeleteAllMessagesOnStartup(true);
-      connectionUri = brokerService.addConnector(BROKER_ADDRESS).getPublishableConnectString();
-
-      brokerService.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() {
-
-         @Override
-         public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
-            throw new javax.jms.JMSSecurityException(connectionUri);
-         }
-      }});
-
-      brokerService.start();
-      brokerService.waitUntilStarted();
-   }
-
-   @After
-   public void tearDown() throws Exception {
-      brokerService.stop();
-      brokerService.waitUntilStopped();
-   }
-
-   @Test
-   public void testExceptionOnProducerCreateThrows() throws Exception {
-      ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
-      ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
-
-      Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
-      connection.start();
-
-      try {
-         session.createProducer(queue);
-         fail("Should not be able to create this producer.");
-      }
-      catch (JMSException ex) {
-      }
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/740d40bb/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4220Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4220Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4220Test.java
deleted file mode 100644
index 7433b18..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4220Test.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.ArrayList;
-
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.store.PersistenceAdapter;
-import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
-import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter;
-import org.apache.activemq.util.Wait;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ4220Test {
-
-   static final Logger LOG = LoggerFactory.getLogger(AMQ4220Test.class);
-   private final static int maxFileLength = 1024 * 1024 * 32;
-   private final static String destinationName = "TEST.QUEUE";
-   BrokerService broker;
-
-   @Before
-   public void setUp() throws Exception {
-      prepareBrokerWithMultiStore(true);
-      broker.start();
-      broker.waitUntilStarted();
-   }
-
-   @After
-   public void tearDown() throws Exception {
-      broker.stop();
-   }
-
-   protected BrokerService createBroker(PersistenceAdapter kaha) throws Exception {
-      BrokerService broker = new BrokerService();
-      broker.setUseJmx(true);
-      broker.setBrokerName("localhost");
-      broker.setPersistenceAdapter(kaha);
-      return broker;
-   }
-
-   @Test
-   public void testRestartAfterQueueDelete() throws Exception {
-
-      // Ensure we have an Admin View.
-      assertTrue("Broker doesn't have an Admin View.", Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisified() throws Exception {
-            return (broker.getAdminView()) != null;
-         }
-      }));
-
-      LOG.info("Adding initial destination: {}", destinationName);
-
-      broker.getAdminView().addQueue(destinationName);
-
-      assertNotNull(broker.getDestination(new ActiveMQQueue(destinationName)));
-
-      LOG.info("Removing initial destination: {}", destinationName);
-
-      broker.getAdminView().removeQueue(destinationName);
-
-      LOG.info("Adding back destination: {}", destinationName);
-
-      broker.getAdminView().addQueue(destinationName);
-
-      assertNotNull(broker.getDestination(new ActiveMQQueue(destinationName)));
-   }
-
-   protected KahaDBPersistenceAdapter createStore(boolean delete) throws IOException {
-      KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter();
-      kaha.setJournalMaxFileLength(maxFileLength);
-      kaha.setCleanupInterval(5000);
-      if (delete) {
-         kaha.deleteAllMessages();
-      }
-      return kaha;
-   }
-
-   public void prepareBrokerWithMultiStore(boolean deleteAllMessages) throws Exception {
-
-      MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter = new MultiKahaDBPersistenceAdapter();
-      if (deleteAllMessages) {
-         multiKahaDBPersistenceAdapter.deleteAllMessages();
-      }
-      ArrayList<FilteredKahaDBPersistenceAdapter> adapters = new ArrayList<>();
-
-      FilteredKahaDBPersistenceAdapter template = new FilteredKahaDBPersistenceAdapter();
-      template.setPersistenceAdapter(createStore(deleteAllMessages));
-      template.setPerDestination(true);
-      adapters.add(template);
-
-      multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(adapters);
-      broker = createBroker(multiKahaDBPersistenceAdapter);
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/740d40bb/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4221Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4221Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4221Test.java
deleted file mode 100644
index 0e9c488..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4221Test.java
+++ /dev/null
@@ -1,274 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.util.HashSet;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import junit.framework.Test;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.ActiveMQPrefetchPolicy;
-import org.apache.activemq.TestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.TransportConnector;
-import org.apache.activemq.broker.region.DestinationStatistics;
-import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.store.kahadb.plist.PListStoreImpl;
-import org.apache.activemq.util.DefaultTestAppender;
-import org.apache.log4j.Level;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.spi.LoggingEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ4221Test extends TestSupport {
-
-   private static final Logger LOG = LoggerFactory.getLogger(AMQ4221Test.class);
-   public int PAYLOAD_SIZE_BYTES = 4 * 1024;
-   public int NUM_TO_SEND = 60000;
-   public int NUM_CONCURRENT_PRODUCERS = 20;
-   public int QUEUE_COUNT = 1;
-   public int TMP_JOURNAL_MAX_FILE_SIZE = 10 * 1024 * 1024;
-
-   public int DLQ_PURGE_INTERVAL = 30000;
-
-   public int MESSAGE_TIME_TO_LIVE = 20000;
-   public int EXPIRE_SWEEP_PERIOD = 200;
-   public int TMP_JOURNAL_GC_PERIOD = 50;
-   public int RECEIVE_POLL_PERIOD = 4000;
-   private int RECEIVE_BATCH = 5000;
-
-   final byte[] payload = new byte[PAYLOAD_SIZE_BYTES];
-   final AtomicInteger counter = new AtomicInteger(0);
-   final HashSet<Throwable> exceptions = new HashSet<>();
-   BrokerService brokerService;
-   private String brokerUrlString;
-   ExecutorService executorService = Executors.newCachedThreadPool();
-   final AtomicBoolean done = new AtomicBoolean(false);
-
-   public static Test suite() {
-      return suite(AMQ4221Test.class);
-   }
-
-   @Override
-   public void setUp() throws Exception {
-
-      LogManager.getRootLogger().addAppender(new DefaultTestAppender() {
-
-         @Override
-         public void doAppend(LoggingEvent event) {
-            if (event.getLevel().isGreaterOrEqual(Level.ERROR)) {
-               System.err.println("exit on error: " + event.getMessage());
-               done.set(true);
-               new Thread() {
-                  @Override
-                  public void run() {
-                     System.exit(787);
-                  }
-               }.start();
-            }
-         }
-      });
-
-      done.set(false);
-      brokerService = new BrokerService();
-      brokerService.setDeleteAllMessagesOnStartup(true);
-      brokerService.setDestinations(new ActiveMQDestination[]{new ActiveMQQueue("ActiveMQ.DLQ")});
-
-      PolicyEntry defaultPolicy = new PolicyEntry();
-      defaultPolicy.setPendingQueuePolicy(new FilePendingQueueMessageStoragePolicy());
-      defaultPolicy.setExpireMessagesPeriod(EXPIRE_SWEEP_PERIOD);
-      defaultPolicy.setProducerFlowControl(false);
-      defaultPolicy.setMemoryLimit(50 * 1024 * 1024);
-
-      brokerService.getSystemUsage().getMemoryUsage().setLimit(50 * 1024 * 1024);
-
-      PolicyMap destinationPolicyMap = new PolicyMap();
-      destinationPolicyMap.setDefaultEntry(defaultPolicy);
-      brokerService.setDestinationPolicy(destinationPolicyMap);
-
-      PListStoreImpl tempDataStore = new PListStoreImpl();
-      tempDataStore.setDirectory(brokerService.getTmpDataDirectory());
-      tempDataStore.setJournalMaxFileLength(TMP_JOURNAL_MAX_FILE_SIZE);
-      tempDataStore.setCleanupInterval(TMP_JOURNAL_GC_PERIOD);
-      tempDataStore.setIndexPageSize(200);
-      tempDataStore.setIndexEnablePageCaching(false);
-
-      brokerService.setTempDataStore(tempDataStore);
-      brokerService.setAdvisorySupport(false);
-      TransportConnector tcp = brokerService.addConnector("tcp://localhost:0");
-      brokerService.start();
-      brokerUrlString = tcp.getPublishableConnectString();
-   }
-
-   @Override
-   public void tearDown() throws Exception {
-      brokerService.stop();
-      brokerService.waitUntilStopped();
-      executorService.shutdownNow();
-   }
-
-   public void testProduceConsumeExpireHalf() throws Exception {
-
-      final org.apache.activemq.broker.region.Queue dlq = (org.apache.activemq.broker.region.Queue) getDestination(brokerService, new ActiveMQQueue("ActiveMQ.DLQ"));
-
-      if (DLQ_PURGE_INTERVAL > 0) {
-         executorService.execute(new Runnable() {
-            @Override
-            public void run() {
-               while (!done.get()) {
-                  try {
-                     Thread.sleep(DLQ_PURGE_INTERVAL);
-                     LOG.info("Purge DLQ, current size: " + dlq.getDestinationStatistics().getMessages().getCount());
-                     dlq.purge();
-                  }
-                  catch (InterruptedException allDone) {
-                  }
-                  catch (Throwable e) {
-                     e.printStackTrace();
-                     exceptions.add(e);
-                  }
-               }
-            }
-         });
-
-      }
-
-      final CountDownLatch latch = new CountDownLatch(QUEUE_COUNT);
-      for (int i = 0; i < QUEUE_COUNT; i++) {
-         final int id = i;
-         executorService.execute(new Runnable() {
-            @Override
-            public void run() {
-               try {
-                  doProduceConsumeExpireHalf(id, latch);
-               }
-               catch (Throwable e) {
-                  e.printStackTrace();
-                  exceptions.add(e);
-               }
-            }
-         });
-      }
-
-      while (!done.get()) {
-         done.set(latch.await(5, TimeUnit.SECONDS));
-      }
-      executorService.shutdown();
-      executorService.awaitTermination(5, TimeUnit.MINUTES);
-
-      assertTrue("no exceptions:" + exceptions, exceptions.isEmpty());
-
-   }
-
-   public void doProduceConsumeExpireHalf(int id, CountDownLatch latch) throws Exception {
-
-      final ActiveMQQueue queue = new ActiveMQQueue("Q" + id);
-
-      final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrlString);
-      ActiveMQPrefetchPolicy prefecthPolicy = new ActiveMQPrefetchPolicy();
-      prefecthPolicy.setAll(0);
-      factory.setPrefetchPolicy(prefecthPolicy);
-      Connection connection = factory.createConnection();
-      connection.start();
-      final MessageConsumer consumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(queue, "on = 'true'");
-
-      executorService.execute(new Runnable() {
-         @Override
-         public void run() {
-            try {
-               while (!done.get()) {
-                  Thread.sleep(RECEIVE_POLL_PERIOD);
-                  for (int i = 0; i < RECEIVE_BATCH && !done.get(); i++) {
-
-                     Message message = consumer.receive(1000);
-                     if (message != null) {
-                        counter.incrementAndGet();
-                        if (counter.get() > 0 && counter.get() % 500 == 0) {
-                           LOG.info("received: " + counter.get() + ", " + message.getJMSDestination().toString());
-                        }
-                     }
-                  }
-               }
-            }
-            catch (JMSException ignored) {
-
-            }
-            catch (Exception e) {
-               e.printStackTrace();
-               exceptions.add(e);
-            }
-         }
-      });
-
-      final AtomicInteger accumulator = new AtomicInteger(0);
-      final CountDownLatch producersDone = new CountDownLatch(NUM_CONCURRENT_PRODUCERS);
-
-      for (int i = 0; i < NUM_CONCURRENT_PRODUCERS; i++) {
-         executorService.execute(new Runnable() {
-            @Override
-            public void run() {
-               try {
-                  Connection sendConnection = factory.createConnection();
-                  sendConnection.start();
-                  Session sendSession = sendConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-                  MessageProducer producer = sendSession.createProducer(queue);
-                  producer.setTimeToLive(MESSAGE_TIME_TO_LIVE);
-                  producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-
-                  while (accumulator.incrementAndGet() < NUM_TO_SEND && !done.get()) {
-                     BytesMessage message = sendSession.createBytesMessage();
-                     message.writeBytes(payload);
-                     message.setStringProperty("on", String.valueOf(accumulator.get() % 2 == 0));
-                     producer.send(message);
-
-                  }
-                  producersDone.countDown();
-               }
-               catch (Exception e) {
-                  e.printStackTrace();
-                  exceptions.add(e);
-               }
-            }
-         });
-      }
-
-      producersDone.await(10, TimeUnit.MINUTES);
-
-      final DestinationStatistics view = getDestinationStatistics(brokerService, queue);
-      LOG.info("total expired so far " + view.getExpired().getCount() + ", " + queue.getQueueName());
-      latch.countDown();
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/740d40bb/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4222Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4222Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4222Test.java
deleted file mode 100644
index adaf0e5..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4222Test.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.lang.reflect.Field;
-import java.net.URI;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.TestSupport;
-import org.apache.activemq.broker.BrokerFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.ProducerBrokerExchange;
-import org.apache.activemq.broker.TransportConnection;
-import org.apache.activemq.broker.TransportConnector;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ProducerId;
-import org.apache.activemq.transport.vm.VMTransportFactory;
-import org.apache.activemq.util.Wait;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author <a href="http://www.christianposta.com/blog">Christian Posta</a>
- */
-public class AMQ4222Test extends TestSupport {
-
-   private static final Logger LOG = LoggerFactory.getLogger(AMQ4222Test.class);
-
-   protected BrokerService brokerService;
-
-   @Override
-   protected void setUp() throws Exception {
-      super.setUp();
-      topic = false;
-      brokerService = createBroker();
-   }
-
-   @Override
-   protected void tearDown() throws Exception {
-      brokerService.stop();
-      brokerService.waitUntilStopped();
-   }
-
-   protected BrokerService createBroker() throws Exception {
-      BrokerService broker = BrokerFactory.createBroker(new URI("broker:()/localhost?persistent=false"));
-      broker.start();
-      broker.waitUntilStarted();
-      return broker;
-   }
-
-   @Override
-   protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
-      return new ActiveMQConnectionFactory("vm://localhost");
-   }
-
-   public void testTempQueueCleanedUp() throws Exception {
-
-      Destination requestQueue = createDestination();
-
-      Connection producerConnection = createConnection();
-      producerConnection.start();
-      Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-      MessageProducer producer = producerSession.createProducer(requestQueue);
-      Destination replyTo = producerSession.createTemporaryQueue();
-      MessageConsumer producerSessionConsumer = producerSession.createConsumer(replyTo);
-
-      final CountDownLatch countDownLatch = new CountDownLatch(1);
-      // let's listen to the response on the queue
-      producerSessionConsumer.setMessageListener(new MessageListener() {
-         @Override
-         public void onMessage(Message message) {
-            try {
-               if (message instanceof TextMessage) {
-                  LOG.info("You got a message: " + ((TextMessage) message).getText());
-                  countDownLatch.countDown();
-               }
-            }
-            catch (JMSException e) {
-               e.printStackTrace();
-            }
-         }
-      });
-
-      producer.send(createRequest(producerSession, replyTo));
-
-      Connection consumerConnection = createConnection();
-      consumerConnection.start();
-      Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      MessageConsumer consumer = consumerSession.createConsumer(requestQueue);
-      final MessageProducer consumerProducer = consumerSession.createProducer(null);
-
-      consumer.setMessageListener(new MessageListener() {
-         @Override
-         public void onMessage(Message message) {
-            try {
-               consumerProducer.send(message.getJMSReplyTo(), message);
-            }
-            catch (JMSException e) {
-               LOG.error("error sending a response on the temp queue");
-               e.printStackTrace();
-            }
-         }
-      });
-
-      countDownLatch.await(2, TimeUnit.SECONDS);
-
-      // producer has not gone away yet...
-      org.apache.activemq.broker.region.Destination tempDestination = getDestination(brokerService, (ActiveMQDestination) replyTo);
-      assertNotNull(tempDestination);
-
-      // clean up
-      producer.close();
-      producerSession.close();
-      producerConnection.close();
-
-      // producer has gone away.. so the temp queue should not exist anymore... let's see..
-      // producer has not gone away yet...
-      tempDestination = getDestination(brokerService, (ActiveMQDestination) replyTo);
-      assertNull(tempDestination);
-
-      // now.. the connection on the broker side for the dude producing to the temp dest will
-      // still have a reference in his producerBrokerExchange.. this will keep the destination
-      // from being reclaimed by GC if there is never another send that producer makes...
-      // let's see if that reference is there...
-      final TransportConnector connector = VMTransportFactory.CONNECTORS.get("localhost");
-      assertNotNull(connector);
-      assertTrue(Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisified() throws Exception {
-            return connector.getConnections().size() == 1;
-         }
-      }));
-      TransportConnection transportConnection = connector.getConnections().get(0);
-      Map<ProducerId, ProducerBrokerExchange> exchanges = getProducerExchangeFromConn(transportConnection);
-      assertEquals(1, exchanges.size());
-      ProducerBrokerExchange exchange = exchanges.values().iterator().next();
-
-      // so this is the reason for the test... we don't want these exchanges to hold a reference
-      // to a region destination.. after a send is completed, the destination is not used anymore on
-      // a producer exchange
-      assertNull(exchange.getRegionDestination());
-      assertNull(exchange.getRegion());
-
-   }
-
-   @SuppressWarnings("unchecked")
-   private Map<ProducerId, ProducerBrokerExchange> getProducerExchangeFromConn(TransportConnection transportConnection) throws NoSuchFieldException, IllegalAccessException {
-      Field f = TransportConnection.class.getDeclaredField("producerExchanges");
-      f.setAccessible(true);
-      Map<ProducerId, ProducerBrokerExchange> producerExchanges = (Map<ProducerId, ProducerBrokerExchange>) f.get(transportConnection);
-      return producerExchanges;
-   }
-
-   private Message createRequest(Session session, Destination replyTo) throws JMSException {
-      Message message = session.createTextMessage("Payload");
-      message.setJMSReplyTo(replyTo);
-      return message;
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/740d40bb/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4323Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4323Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4323Test.java
deleted file mode 100644
index 415dad6..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4323Test.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.io.File;
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
-import org.apache.activemq.util.ConsumerThread;
-import org.apache.activemq.util.ProducerThread;
-import org.apache.activemq.util.Wait;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class AMQ4323Test {
-
-   private static final Logger LOG = LoggerFactory.getLogger(AMQ4323Test.class);
-
-   BrokerService broker = null;
-   File kahaDbDir = null;
-   private final Destination destination = new ActiveMQQueue("q");
-   final String payload = new String(new byte[1024]);
-
-   protected void startBroker(boolean delete) throws Exception {
-      broker = new BrokerService();
-
-      //Start with a clean directory
-      kahaDbDir = new File(broker.getBrokerDataDirectory(), "KahaDB");
-      deleteDir(kahaDbDir);
-
-      broker.setSchedulerSupport(false);
-      broker.setDeleteAllMessagesOnStartup(delete);
-      broker.setPersistent(true);
-      broker.setUseJmx(false);
-      broker.addConnector("tcp://localhost:0");
-
-      PolicyMap map = new PolicyMap();
-      PolicyEntry entry = new PolicyEntry();
-      entry.setUseCache(false);
-      map.setDefaultEntry(entry);
-      broker.setDestinationPolicy(map);
-
-      configurePersistence(broker, delete);
-
-      broker.start();
-      LOG.info("Starting broker..");
-   }
-
-   protected void configurePersistence(BrokerService brokerService, boolean deleteAllOnStart) throws Exception {
-      KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter();
-
-      // ensure there are a bunch of data files but multiple entries in each
-      adapter.setJournalMaxFileLength(1024 * 20);
-
-      // speed up the test case, checkpoint and cleanup early and often
-      adapter.setCheckpointInterval(500);
-      adapter.setCleanupInterval(500);
-
-      if (!deleteAllOnStart) {
-         adapter.setForceRecoverIndex(true);
-      }
-
-   }
-
-   private boolean deleteDir(File dir) {
-      if (dir.isDirectory()) {
-         String[] children = dir.list();
-         for (int i = 0; i < children.length; i++) {
-            boolean success = deleteDir(new File(dir, children[i]));
-            if (!success) {
-               return false;
-            }
-         }
-      }
-
-      return dir.delete();
-   }
-
-   private int getFileCount(File dir) {
-      if (dir.isDirectory()) {
-         String[] children = dir.list();
-         return children.length;
-      }
-
-      return 0;
-   }
-
-   @Test
-   public void testCleanupOfFiles() throws Exception {
-      final int messageCount = 500;
-      startBroker(true);
-      int fileCount = getFileCount(kahaDbDir);
-      assertEquals(4, fileCount);
-
-      Connection connection = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri()).createConnection();
-      connection.start();
-      Session producerSess = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      Session consumerSess = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-      ProducerThread producer = new ProducerThread(producerSess, destination) {
-         @Override
-         protected Message createMessage(int i) throws Exception {
-            return sess.createTextMessage(payload + "::" + i);
-         }
-      };
-      producer.setMessageCount(messageCount);
-      ConsumerThread consumer = new ConsumerThread(consumerSess, destination);
-      consumer.setBreakOnNull(false);
-      consumer.setMessageCount(messageCount);
-
-      producer.start();
-      producer.join();
-
-      consumer.start();
-      consumer.join();
-
-      assertEquals("consumer got all produced messages", producer.getMessageCount(), consumer.getReceived());
-
-      // verify cleanup
-      assertTrue("gc worked", Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisified() throws Exception {
-            int fileCount = getFileCount(kahaDbDir);
-            LOG.info("current filecount:" + fileCount);
-            return 4 == fileCount;
-         }
-      }));
-
-      broker.stop();
-      broker.waitUntilStopped();
-
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/740d40bb/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4356Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4356Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4356Test.java
deleted file mode 100644
index 3d4ec84..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4356Test.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-import javax.jms.Connection;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class AMQ4356Test {
-
-   private static BrokerService brokerService;
-   private static String BROKER_ADDRESS = "tcp://localhost:0";
-
-   private String connectionUri;
-   private ActiveMQConnectionFactory cf;
-   private final String CLIENT_ID = "AMQ4356Test";
-   private final String SUBSCRIPTION_NAME = "AMQ4356Test";
-
-   private void createBroker(boolean deleteOnStart) throws Exception {
-      brokerService = new BrokerService();
-      brokerService.setUseJmx(true);
-      brokerService.setDeleteAllMessagesOnStartup(deleteOnStart);
-      connectionUri = brokerService.addConnector(BROKER_ADDRESS).getPublishableConnectString();
-      brokerService.start();
-      brokerService.waitUntilStarted();
-
-   }
-
-   private void startBroker() throws Exception {
-      createBroker(true);
-   }
-
-   private void restartBroker() throws Exception {
-      brokerService.stop();
-      brokerService.waitUntilStopped();
-      createBroker(false);
-   }
-
-   @Before
-   public void setUp() throws Exception {
-      startBroker();
-      cf = new ActiveMQConnectionFactory(connectionUri);
-   }
-
-   @After
-   public void tearDown() throws Exception {
-      brokerService.stop();
-      brokerService.waitUntilStopped();
-   }
-
-   @Test
-   public void testVirtualTopicUnsubDurable() throws Exception {
-      Connection connection = cf.createConnection();
-      connection.setClientID(CLIENT_ID);
-      connection.start();
-
-      // create consumer 'cluster'
-      ActiveMQQueue queue1 = new ActiveMQQueue(getVirtualTopicConsumerName());
-      ActiveMQQueue queue2 = new ActiveMQQueue(getVirtualTopicConsumerName());
-
-      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      MessageConsumer c1 = session.createConsumer(queue1);
-      c1.setMessageListener(new MessageListener() {
-         @Override
-         public void onMessage(Message message) {
-         }
-      });
-      MessageConsumer c2 = session.createConsumer(queue2);
-      c2.setMessageListener(new MessageListener() {
-         @Override
-         public void onMessage(Message message) {
-         }
-      });
-
-      ActiveMQTopic topic = new ActiveMQTopic(getVirtualTopicName());
-      MessageConsumer c3 = session.createDurableSubscriber(topic, SUBSCRIPTION_NAME);
-
-      assertEquals(1, brokerService.getAdminView().getDurableTopicSubscribers().length);
-      assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
-
-      c3.close();
-
-      // create topic producer
-      MessageProducer producer = session.createProducer(topic);
-      assertNotNull(producer);
-
-      int total = 10;
-      for (int i = 0; i < total; i++) {
-         producer.send(session.createTextMessage("message: " + i));
-      }
-
-      assertEquals(0, brokerService.getAdminView().getDurableTopicSubscribers().length);
-      assertEquals(1, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
-
-      session.unsubscribe(SUBSCRIPTION_NAME);
-      connection.close();
-
-      assertEquals(0, brokerService.getAdminView().getDurableTopicSubscribers().length);
-      assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
-
-      restartBroker();
-
-      assertEquals(0, brokerService.getAdminView().getDurableTopicSubscribers().length);
-      assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
-   }
-
-   protected String getVirtualTopicName() {
-      return "VirtualTopic.TEST";
-   }
-
-   protected String getVirtualTopicConsumerName() {
-      return "Consumer.A.VirtualTopic.TEST";
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/740d40bb/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4361Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4361Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4361Test.java
deleted file mode 100644
index 27c4f64..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4361Test.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-
-import java.util.Random;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.MessageProducer;
-import javax.jms.ObjectMessage;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
-import org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ4361Test {
-
-   private static final Logger LOG = LoggerFactory.getLogger(AMQ4361Test.class);
-
-   private BrokerService service;
-   private String brokerUrlString;
-
-   @Before
-   public void setUp() throws Exception {
-      service = new BrokerService();
-      service.setDeleteAllMessagesOnStartup(true);
-      service.setUseJmx(false);
-
-      PolicyMap policyMap = new PolicyMap();
-      PolicyEntry policy = new PolicyEntry();
-      policy.setMemoryLimit(1);
-      policy.setPendingSubscriberPolicy(new VMPendingSubscriberMessageStoragePolicy());
-      policy.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy());
-      policy.setProducerFlowControl(true);
-      policyMap.setDefaultEntry(policy);
-      service.setDestinationPolicy(policyMap);
-
-      service.setAdvisorySupport(false);
-      brokerUrlString = service.addConnector("tcp://localhost:0").getPublishableConnectString();
-      service.start();
-      service.waitUntilStarted();
-   }
-
-   @After
-   public void tearDown() throws Exception {
-      if (service != null) {
-         service.stop();
-         service.waitUntilStopped();
-      }
-   }
-
-   @Test
-   public void testCloseWhenHunk() throws Exception {
-
-      ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrlString);
-      connectionFactory.setProducerWindowSize(1024);
-
-      // TINY QUEUE is flow controlled after 1024 bytes
-      final ActiveMQDestination destination = ActiveMQDestination.createDestination("queue://TINY_QUEUE", (byte) 0xff);
-
-      Connection connection = connectionFactory.createConnection();
-      connection.start();
-      final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      final MessageProducer producer = session.createProducer(destination);
-      producer.setTimeToLive(0);
-      producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-
-      final AtomicReference<Exception> publishException = new AtomicReference<>(null);
-      final AtomicReference<Exception> closeException = new AtomicReference<>(null);
-      final AtomicLong lastLoop = new AtomicLong(System.currentTimeMillis() + 100);
-
-      Thread pubThread = new Thread(new Runnable() {
-         @Override
-         public void run() {
-            try {
-               byte[] data = new byte[1000];
-               new Random(0xdeadbeef).nextBytes(data);
-               for (int i = 0; i < 10000; i++) {
-                  lastLoop.set(System.currentTimeMillis());
-                  ObjectMessage objMsg = session.createObjectMessage();
-                  objMsg.setObject(data);
-                  producer.send(destination, objMsg);
-               }
-            }
-            catch (Exception e) {
-               publishException.set(e);
-            }
-         }
-      }, "PublishingThread");
-      pubThread.start();
-
-      // wait for publisher to deadlock
-      while (System.currentTimeMillis() - lastLoop.get() < 2000) {
-         Thread.sleep(100);
-      }
-      LOG.info("Publisher deadlock detected.");
-
-      Thread closeThread = new Thread(new Runnable() {
-         @Override
-         public void run() {
-            try {
-               LOG.info("Attempting close..");
-               producer.close();
-            }
-            catch (Exception e) {
-               closeException.set(e);
-            }
-         }
-      }, "ClosingThread");
-      closeThread.start();
-
-      try {
-         closeThread.join(30000);
-      }
-      catch (InterruptedException ie) {
-         assertFalse("Closing thread didn't complete in 10 seconds", true);
-      }
-
-      try {
-         pubThread.join(30000);
-      }
-      catch (InterruptedException ie) {
-         assertFalse("Publishing thread didn't complete in 10 seconds", true);
-      }
-
-      assertNull(closeException.get());
-      assertNotNull(publishException.get());
-   }
-}
-

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/740d40bb/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4368Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4368Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4368Test.java
deleted file mode 100644
index ef53a0a..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4368Test.java
+++ /dev/null
@@ -1,256 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertTrue;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
-import org.apache.activemq.util.Wait;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ4368Test {
-
-   private static final Logger LOG = LoggerFactory.getLogger(AMQ4368Test.class);
-
-   private BrokerService broker;
-   private ActiveMQConnectionFactory connectionFactory;
-   private final Destination destination = new ActiveMQQueue("large_message_queue");
-   private String connectionUri;
-
-   @Before
-   public void setUp() throws Exception {
-      broker = createBroker();
-      connectionUri = broker.addConnector("tcp://localhost:0").getPublishableConnectString();
-      broker.start();
-      connectionFactory = new ActiveMQConnectionFactory(connectionUri);
-   }
-
-   @After
-   public void tearDown() throws Exception {
-      broker.stop();
-      broker.waitUntilStopped();
-   }
-
-   protected BrokerService createBroker() throws Exception {
-      BrokerService broker = new BrokerService();
-
-      PolicyEntry policy = new PolicyEntry();
-      policy.setUseCache(false);
-      broker.setDestinationPolicy(new PolicyMap());
-      broker.getDestinationPolicy().setDefaultEntry(policy);
-
-      KahaDBPersistenceAdapter kahadb = new KahaDBPersistenceAdapter();
-      kahadb.setCheckForCorruptJournalFiles(true);
-      kahadb.setCleanupInterval(1000);
-
-      kahadb.deleteAllMessages();
-      broker.setPersistenceAdapter(kahadb);
-      broker.getSystemUsage().getMemoryUsage().setLimit(1024 * 1024 * 100);
-      broker.setUseJmx(false);
-
-      return broker;
-   }
-
-   abstract class Client implements Runnable {
-
-      private final String name;
-      final AtomicBoolean done = new AtomicBoolean();
-      CountDownLatch startedLatch;
-      CountDownLatch doneLatch = new CountDownLatch(1);
-      Connection connection;
-      Session session;
-      final AtomicLong size = new AtomicLong();
-
-      Client(String name, CountDownLatch startedLatch) {
-         this.name = name;
-         this.startedLatch = startedLatch;
-      }
-
-      public void start() {
-         LOG.info("Starting: " + name);
-         new Thread(this, name).start();
-      }
-
-      public void stopAsync() {
-         done.set(true);
-      }
-
-      public void stop() throws InterruptedException {
-         stopAsync();
-         if (!doneLatch.await(20, TimeUnit.MILLISECONDS)) {
-            try {
-               connection.close();
-               doneLatch.await();
-            }
-            catch (Exception e) {
-            }
-         }
-      }
-
-      @Override
-      public void run() {
-         try {
-            connection = createConnection();
-            connection.start();
-            try {
-               session = createSession();
-               work();
-            }
-            finally {
-               try {
-                  connection.close();
-               }
-               catch (JMSException ignore) {
-               }
-               LOG.info("Stopped: " + name);
-            }
-         }
-         catch (Exception e) {
-            e.printStackTrace();
-            done.set(true);
-         }
-         finally {
-            doneLatch.countDown();
-         }
-      }
-
-      protected Session createSession() throws JMSException {
-         return connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      }
-
-      protected Connection createConnection() throws JMSException {
-         return connectionFactory.createConnection();
-      }
-
-      abstract protected void work() throws Exception;
-   }
-
-   class ProducingClient extends Client {
-
-      ProducingClient(String name, CountDownLatch startedLatch) {
-         super(name, startedLatch);
-      }
-
-      private String createMessage() {
-         StringBuffer stringBuffer = new StringBuffer();
-         for (long i = 0; i < 1000000; i++) {
-            stringBuffer.append("1234567890");
-         }
-         return stringBuffer.toString();
-      }
-
-      @Override
-      protected void work() throws Exception {
-         String data = createMessage();
-         MessageProducer producer = session.createProducer(destination);
-         startedLatch.countDown();
-         while (!done.get()) {
-            producer.send(session.createTextMessage(data));
-            long i = size.incrementAndGet();
-            if ((i % 1000) == 0) {
-               LOG.info("produced " + i + ".");
-            }
-         }
-      }
-   }
-
-   class ConsumingClient extends Client {
-
-      public ConsumingClient(String name, CountDownLatch startedLatch) {
-         super(name, startedLatch);
-      }
-
-      @Override
-      protected void work() throws Exception {
-         MessageConsumer consumer = session.createConsumer(destination);
-         startedLatch.countDown();
-         while (!done.get()) {
-            Message msg = consumer.receive(100);
-            if (msg != null) {
-               size.incrementAndGet();
-            }
-         }
-      }
-   }
-
-   @Test
-   public void testENTMQ220() throws Exception {
-      LOG.info("Start test.");
-      CountDownLatch producer1Started = new CountDownLatch(1);
-      CountDownLatch producer2Started = new CountDownLatch(1);
-      CountDownLatch listener1Started = new CountDownLatch(1);
-
-      final ProducingClient producer1 = new ProducingClient("1", producer1Started);
-      final ProducingClient producer2 = new ProducingClient("2", producer2Started);
-      final ConsumingClient listener1 = new ConsumingClient("subscriber-1", listener1Started);
-      final AtomicLong lastSize = new AtomicLong();
-
-      try {
-
-         producer1.start();
-         producer2.start();
-         listener1.start();
-
-         producer1Started.await(15, TimeUnit.SECONDS);
-         producer2Started.await(15, TimeUnit.SECONDS);
-         listener1Started.await(15, TimeUnit.SECONDS);
-
-         lastSize.set(listener1.size.get());
-         for (int i = 0; i < 10; i++) {
-            Wait.waitFor(new Wait.Condition() {
-
-               @Override
-               public boolean isSatisified() throws Exception {
-                  return listener1.size.get() > lastSize.get();
-               }
-            });
-            long size = listener1.size.get();
-            LOG.info("Listener 1: consumed: " + (size - lastSize.get()));
-            assertTrue("No messages received on iteration: " + i, size > lastSize.get());
-            lastSize.set(size);
-         }
-      }
-      finally {
-         LOG.info("Stopping clients");
-         producer1.stop();
-         producer2.stop();
-         listener1.stop();
-      }
-   }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/740d40bb/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4407Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4407Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4407Test.java
deleted file mode 100644
index 38a9398..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4407Test.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.ArrayList;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.Destination;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.store.PersistenceAdapter;
-import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
-import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter;
-import org.apache.activemq.util.Wait;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ4407Test {
-
-   static final Logger LOG = LoggerFactory.getLogger(AMQ4407Test.class);
-   private final static int maxFileLength = 1024 * 1024 * 32;
-
-   private final static String PREFIX_DESTINATION_NAME = "queue";
-
-   private final static String DESTINATION_NAME = PREFIX_DESTINATION_NAME + ".test";
-   private final static String DESTINATION_NAME_2 = PREFIX_DESTINATION_NAME + "2.test";
-   private final static String DESTINATION_NAME_3 = PREFIX_DESTINATION_NAME + "3.test";
-
-   BrokerService broker;
-
-   @Before
-   public void setUp() throws Exception {
-      prepareBrokerWithMultiStore(true);
-      broker.start();
-      broker.waitUntilStarted();
-   }
-
-   @After
-   public void tearDown() throws Exception {
-      broker.stop();
-   }
-
-   protected BrokerService createBroker(PersistenceAdapter kaha) throws Exception {
-      BrokerService broker = new BrokerService();
-      broker.setUseJmx(true);
-      broker.setBrokerName("localhost");
-      broker.setPersistenceAdapter(kaha);
-      return broker;
-   }
-
-   @Test
-   public void testRestartAfterQueueDelete() throws Exception {
-
-      // Ensure we have an Admin View.
-      assertTrue("Broker doesn't have an Admin View.", Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisified() throws Exception {
-            return (broker.getAdminView()) != null;
-         }
-      }));
-
-      LOG.info("Adding destinations: {}, {}, {}", new Object[]{DESTINATION_NAME, DESTINATION_NAME_3, DESTINATION_NAME_3});
-      sendMessage(DESTINATION_NAME, "test 1");
-      sendMessage(DESTINATION_NAME_2, "test 1");
-      sendMessage(DESTINATION_NAME_3, "test 1");
-
-      assertNotNull(broker.getDestination(new ActiveMQQueue(DESTINATION_NAME)));
-      assertNotNull(broker.getDestination(new ActiveMQQueue(DESTINATION_NAME_2)));
-      assertNotNull(broker.getDestination(new ActiveMQQueue(DESTINATION_NAME_3)));
-
-      LOG.info("Removing destination: {}", DESTINATION_NAME_2);
-      broker.getAdminView().removeQueue(DESTINATION_NAME_2);
-
-      LOG.info("Recreating destination: {}", DESTINATION_NAME_2);
-      sendMessage(DESTINATION_NAME_2, "test 1");
-
-      Destination destination2 = broker.getDestination(new ActiveMQQueue(DESTINATION_NAME_2));
-      assertNotNull(destination2);
-      assertEquals(1, destination2.getMessageStore().getMessageCount());
-   }
-
-   protected KahaDBPersistenceAdapter createStore(boolean delete) throws IOException {
-      KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter();
-      kaha.setJournalMaxFileLength(maxFileLength);
-      kaha.setCleanupInterval(5000);
-      if (delete) {
-         kaha.deleteAllMessages();
-      }
-      return kaha;
-   }
-
-   public void prepareBrokerWithMultiStore(boolean deleteAllMessages) throws Exception {
-
-      MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter = new MultiKahaDBPersistenceAdapter();
-      if (deleteAllMessages) {
-         multiKahaDBPersistenceAdapter.deleteAllMessages();
-      }
-      ArrayList<FilteredKahaDBPersistenceAdapter> adapters = new ArrayList<>();
-
-      adapters.add(createFilteredKahaDBByDestinationPrefix(PREFIX_DESTINATION_NAME, deleteAllMessages));
-      adapters.add(createFilteredKahaDBByDestinationPrefix(PREFIX_DESTINATION_NAME + "2", deleteAllMessages));
-      adapters.add(createFilteredKahaDBByDestinationPrefix(null, deleteAllMessages));
-
-      multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(adapters);
-      broker = createBroker(multiKahaDBPersistenceAdapter);
-   }
-
-   /**
-    * Create filtered KahaDB adapter by destination prefix.
-    *
-    * @param destinationPrefix
-    * @param deleteAllMessages
-    * @return
-    * @throws IOException
-    */
-   private FilteredKahaDBPersistenceAdapter createFilteredKahaDBByDestinationPrefix(String destinationPrefix,
-                                                                                    boolean deleteAllMessages) throws IOException {
-      FilteredKahaDBPersistenceAdapter template = new FilteredKahaDBPersistenceAdapter();
-      template.setPersistenceAdapter(createStore(deleteAllMessages));
-      if (destinationPrefix != null) {
-         template.setQueue(destinationPrefix + ".>");
-      }
-      return template;
-   }
-
-   /**
-    * Send message to particular destination.
-    *
-    * @param destinationName
-    * @param message
-    * @throws JMSException
-    */
-   private void sendMessage(String destinationName, String message) throws JMSException {
-      ActiveMQConnectionFactory f = new ActiveMQConnectionFactory("vm://localhost");
-      f.setAlwaysSyncSend(true);
-      Connection c = f.createConnection();
-      c.start();
-      Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      MessageProducer producer = s.createProducer(new ActiveMQQueue(destinationName));
-      producer.send(s.createTextMessage(message));
-      producer.close();
-      s.close();
-      c.stop();
-   }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/740d40bb/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4413Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4413Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4413Test.java
deleted file mode 100644
index cd3ed95..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4413Test.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertTrue;
-
-import java.util.ArrayList;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import javax.jms.TopicSubscriber;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- */
-public class AMQ4413Test {
-
-   static final Logger LOG = LoggerFactory.getLogger(AMQ4413Test.class);
-
-   final String brokerUrl = "tcp://localhost:0";
-   private String connectionUri;
-   final int numMsgsTriggeringReconnection = 2;
-   final int numMsgs = 30;
-   final int numTests = 75;
-   final ExecutorService threadPool = Executors.newCachedThreadPool();
-
-   @Test
-   public void testDurableSubMessageLoss() throws Exception {
-      // start embedded broker
-      BrokerService brokerService = new BrokerService();
-      connectionUri = brokerService.addConnector(brokerUrl).getPublishableConnectString();
-      brokerService.setPersistent(false);
-      brokerService.setUseJmx(false);
-      brokerService.setKeepDurableSubsActive(true);
-      brokerService.setAdvisorySupport(false);
-      brokerService.start();
-      LOG.info("##### broker started");
-
-      // repeat test 50 times
-      try {
-         for (int i = 0; i < numTests; ++i) {
-            LOG.info("##### test " + i + " started");
-            test();
-         }
-
-         LOG.info("##### tests are done");
-      }
-      catch (Exception e) {
-         e.printStackTrace();
-         LOG.info("##### tests failed!");
-      }
-      finally {
-         threadPool.shutdown();
-         brokerService.stop();
-         LOG.info("##### broker stopped");
-      }
-   }
-
-   private void test() throws Exception {
-
-      final String topicName = "topic-" + UUID.randomUUID();
-      final String clientId = "client-" + UUID.randomUUID();
-      final String subName = "sub-" + UUID.randomUUID();
-
-      // create (and only create) subscription first
-      ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
-      factory.setWatchTopicAdvisories(false);
-      Connection connection = factory.createConnection();
-      connection.setClientID(clientId);
-      connection.start();
-
-      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      Topic topic = session.createTopic(topicName);
-      TopicSubscriber durableSubscriptionCreator = session.createDurableSubscriber(topic, subName);
-
-      connection.stop();
-      durableSubscriptionCreator.close();
-      session.close();
-      connection.close();
-
-      // publisher task
-      Callable<Boolean> publisher = new Callable<Boolean>() {
-         @Override
-         public Boolean call() throws Exception {
-            Connection connection = null;
-
-            try {
-               ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
-               factory.setWatchTopicAdvisories(false);
-               connection = factory.createConnection();
-               Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-               Topic topic = session.createTopic(topicName);
-
-               MessageProducer producer = session.createProducer(topic);
-               producer.setDeliveryMode(DeliveryMode.PERSISTENT);
-               producer.setPriority(Message.DEFAULT_PRIORITY);
-               producer.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE);
-
-               for (int seq = 1; seq <= numMsgs; ++seq) {
-                  TextMessage msg = session.createTextMessage(String.valueOf(seq));
-                  producer.send(msg);
-                  LOG.info("pub sent msg: " + seq);
-                  Thread.sleep(1L);
-               }
-
-               LOG.info("pub is done");
-            }
-            finally {
-               if (connection != null) {
-                  try {
-                     connection.close();
-                  }
-                  catch (JMSException e) {
-                     e.printStackTrace();
-                  }
-               }
-            }
-            return Boolean.TRUE;
-         }
-      };
-
-      // subscriber task
-      Callable<Boolean> durableSubscriber = new Callable<Boolean>() {
-         ActiveMQConnectionFactory factory;
-         Connection connection;
-         Session session;
-         Topic topic;
-         TopicSubscriber consumer;
-
-         @Override
-         public Boolean call() throws Exception {
-            factory = new ActiveMQConnectionFactory(connectionUri);
-            factory.setWatchTopicAdvisories(false);
-
-            try {
-               connect();
-
-               for (int seqExpected = 1; seqExpected <= numMsgs; ++seqExpected) {
-                  TextMessage msg = (TextMessage) consumer.receive(3000L);
-                  if (msg == null) {
-                     LOG.info("expected: " + seqExpected + ", actual: timed out", msg);
-                     return Boolean.FALSE;
-                  }
-
-                  int seq = Integer.parseInt(msg.getText());
-
-                  LOG.info("sub received msg: " + seq);
-
-                  if (seqExpected != seq) {
-                     LOG.info("expected: " + seqExpected + ", actual: " + seq);
-                     return Boolean.FALSE;
-                  }
-
-                  if (seq % numMsgsTriggeringReconnection == 0) {
-                     close(false);
-                     connect();
-
-                     LOG.info("sub reconnected");
-                  }
-               }
-
-               LOG.info("sub is done");
-            }
-            finally {
-               try {
-                  close(true);
-               }
-               catch (Exception e) {
-                  e.printStackTrace();
-               }
-            }
-
-            return Boolean.TRUE;
-         }
-
-         void connect() throws Exception {
-            connection = factory.createConnection();
-            connection.setClientID(clientId);
-            connection.start();
-
-            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            topic = session.createTopic(topicName);
-            consumer = session.createDurableSubscriber(topic, subName);
-         }
-
-         void close(boolean unsubscribe) throws Exception {
-            if (connection != null) {
-               connection.stop();
-            }
-
-            if (consumer != null) {
-               consumer.close();
-            }
-
-            if (session != null) {
-               if (unsubscribe) {
-                  session.unsubscribe(subName);
-               }
-               session.close();
-            }
-
-            if (connection != null) {
-               connection.close();
-            }
-         }
-      };
-
-      ArrayList<Future<Boolean>> results = new ArrayList<>();
-      results.add(threadPool.submit(publisher));
-      results.add(threadPool.submit(durableSubscriber));
-
-      for (Future<Boolean> result : results) {
-         assertTrue(result.get());
-      }
-   }
-}


Mime
View raw message