Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3B4E018C04 for ; Mon, 8 Feb 2016 22:42:38 +0000 (UTC) Received: (qmail 67865 invoked by uid 500); 8 Feb 2016 22:42:37 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 67782 invoked by uid 500); 8 Feb 2016 22:42:37 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 66488 invoked by uid 99); 8 Feb 2016 22:42:36 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 08 Feb 2016 22:42:36 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A2E01E031B; Mon, 8 Feb 2016 22:42:36 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Mon, 08 Feb 2016 22:43:04 -0000 Message-Id: In-Reply-To: <925558cab14542fa849e9cf367654a3e@git.apache.org> References: <925558cab14542fa849e9cf367654a3e@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [29/39] activemq-artemis git commit: open wire changes equivalent to ab16f7098fb52d2b4c40627ed110e1776525f208 http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/72918fa9/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2910Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2910Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2910Test.java deleted file mode 100644 index 362fa5c..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2910Test.java +++ /dev/null @@ -1,130 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.JmsMultipleClientsTestSupport; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.command.ActiveMQQueue; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.BlockJUnit4ClassRunner; - -import java.util.Vector; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -import static org.junit.Assert.assertTrue; - -@RunWith(BlockJUnit4ClassRunner.class) -public class AMQ2910Test extends JmsMultipleClientsTestSupport { - - final int maxConcurrency = 60; - final int msgCount = 200; - final Vector exceptions = new Vector<>(); - - @Override - protected BrokerService createBroker() throws Exception { - //persistent = true; - BrokerService broker = new BrokerService(); - broker.setDeleteAllMessagesOnStartup(true); - broker.addConnector("tcp://localhost:0"); - PolicyMap policyMap = new PolicyMap(); - PolicyEntry defaultEntry = new PolicyEntry(); - defaultEntry.setPendingQueuePolicy(new FilePendingQueueMessageStoragePolicy()); - defaultEntry.setCursorMemoryHighWaterMark(50); - defaultEntry.setMemoryLimit(500 * 1024); - defaultEntry.setProducerFlowControl(false); - policyMap.setDefaultEntry(defaultEntry); - broker.setDestinationPolicy(policyMap); - - broker.getSystemUsage().getMemoryUsage().setLimit(1000 * 1024); - - return broker; - } - - @Test(timeout = 30 * 1000) - public void testConcurrentSendToPendingCursor() throws Exception { - final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri()); - factory.setCloseTimeout(30000); - ExecutorService executor = Executors.newCachedThreadPool(); - for (int i = 0; i < maxConcurrency; i++) { - final ActiveMQQueue dest = new ActiveMQQueue("Queue-" + i); - executor.execute(new Runnable() { - @Override - public void run() { - try { - sendMessages(factory.createConnection(), dest, msgCount); - } - catch (Throwable t) { - exceptions.add(t); - } - } - }); - } - - executor.shutdown(); - - assertTrue("send completed", executor.awaitTermination(60, TimeUnit.SECONDS)); - assertNoExceptions(); - - executor = Executors.newCachedThreadPool(); - for (int i = 0; i < maxConcurrency; i++) { - final ActiveMQQueue dest = new ActiveMQQueue("Queue-" + i); - executor.execute(new Runnable() { - @Override - public void run() { - try { - startConsumers(factory, dest); - } - catch (Throwable t) { - exceptions.add(t); - } - } - }); - } - - executor.shutdown(); - assertTrue("consumers completed", executor.awaitTermination(60, TimeUnit.SECONDS)); - - allMessagesList.setMaximumDuration(120 * 1000); - final int numExpected = maxConcurrency * msgCount; - allMessagesList.waitForMessagesToArrive(numExpected); - - if (allMessagesList.getMessageCount() != numExpected) { - dumpAllThreads(getName()); - - } - allMessagesList.assertMessagesReceivedNoWait(numExpected); - - assertTrue("no exceptions: " + exceptions, exceptions.isEmpty()); - - } - - private void assertNoExceptions() { - if (!exceptions.isEmpty()) { - for (Throwable t : exceptions) { - t.printStackTrace(); - } - } - assertTrue("no exceptions: " + exceptions, exceptions.isEmpty()); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/72918fa9/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2982Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2982Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2982Test.java deleted file mode 100644 index 573cdd3..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2982Test.java +++ /dev/null @@ -1,184 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; - -import java.io.IOException; -import java.util.concurrent.CountDownLatch; - -import javax.jms.BytesMessage; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.DeliveryMode; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.RedeliveryPolicy; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy; -import org.apache.activemq.store.kahadb.KahaDBStore; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -public class AMQ2982Test { - - private static final int MAX_MESSAGES = 500; - - private static final String QUEUE_NAME = "test.queue"; - - private BrokerService broker; - - private final CountDownLatch messageCountDown = new CountDownLatch(MAX_MESSAGES); - - private CleanableKahaDBStore kahaDB; - - private static class CleanableKahaDBStore extends KahaDBStore { - - // make checkpoint cleanup accessible - public void forceCleanup() throws IOException { - checkpointCleanup(true); - } - - public int getFileMapSize() throws IOException { - // ensure save memory publishing, use the right lock - indexLock.readLock().lock(); - try { - return getJournal().getFileMap().size(); - } - finally { - indexLock.readLock().unlock(); - } - } - } - - @Before - public void setup() throws Exception { - - broker = new BrokerService(); - broker.setDeleteAllMessagesOnStartup(true); - broker.setPersistent(true); - - kahaDB = new CleanableKahaDBStore(); - kahaDB.setJournalMaxFileLength(256 * 1024); - broker.setPersistenceAdapter(kahaDB); - - broker.start(); - broker.waitUntilStarted(); - } - - private Connection registerDLQMessageListener() throws Exception { - ConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost"); - Connection connection = factory.createConnection(); - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = session.createConsumer(session.createQueue(SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME)); - consumer.setMessageListener(new MessageListener() { - - @Override - public void onMessage(Message message) { - messageCountDown.countDown(); - } - }); - - return connection; - } - - class ConsumerThread extends Thread { - - @Override - public void run() { - try { - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost"); - - RedeliveryPolicy policy = new RedeliveryPolicy(); - policy.setMaximumRedeliveries(0); - policy.setInitialRedeliveryDelay(100); - policy.setUseExponentialBackOff(false); - - factory.setRedeliveryPolicy(policy); - - Connection connection = factory.createConnection(); - connection.start(); - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME)); - do { - Message message = consumer.receive(300); - if (message != null) { - session.rollback(); - } - } while (messageCountDown.getCount() != 0); - consumer.close(); - session.close(); - connection.close(); - } - catch (Exception e) { - Assert.fail(e.getMessage()); - } - } - } - - private void sendMessages() throws Exception { - ConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost"); - Connection connection = factory.createConnection(); - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME)); - producer.setDeliveryMode(DeliveryMode.PERSISTENT); - for (int i = 0; i < MAX_MESSAGES; i++) { - BytesMessage message = session.createBytesMessage(); - message.writeBytes(new byte[1000]); - producer.send(message); - } - producer.close(); - session.close(); - connection.close(); - } - - @Test - public void testNoStickyKahaDbLogFilesOnLocalTransactionRollback() throws Exception { - - Connection dlqConnection = registerDLQMessageListener(); - - ConsumerThread thread = new ConsumerThread(); - thread.start(); - - sendMessages(); - - thread.join(60 * 1000); - assertFalse(thread.isAlive()); - - dlqConnection.close(); - - kahaDB.forceCleanup(); - - assertEquals("only one active KahaDB log file after cleanup is expected", 1, kahaDB.getFileMapSize()); - } - - @After - public void tearDown() throws Exception { - broker.stop(); - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/72918fa9/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2983Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2983Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2983Test.java deleted file mode 100644 index 8714477..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2983Test.java +++ /dev/null @@ -1,165 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import javax.jms.BytesMessage; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.DeliveryMode; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.store.kahadb.KahaDBStore; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -public class AMQ2983Test { - - private static final int MAX_CONSUMER = 10; - - private static final int MAX_MESSAGES = 2000; - - private static final String QUEUE_NAME = "test.queue"; - - private BrokerService broker; - - private final CountDownLatch messageCountDown = new CountDownLatch(MAX_MESSAGES); - - private CleanableKahaDBStore kahaDB; - - private static class CleanableKahaDBStore extends KahaDBStore { - - // make checkpoint cleanup accessible - public void forceCleanup() throws IOException { - checkpointCleanup(true); - } - - public int getFileMapSize() throws IOException { - // ensure save memory publishing, use the right lock - indexLock.readLock().lock(); - try { - return getJournal().getFileMap().size(); - } - finally { - indexLock.readLock().unlock(); - } - } - } - - private class ConsumerThread extends Thread { - - @Override - public void run() { - try { - ConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost"); - Connection connection = factory.createConnection(); - connection.start(); - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME)); - do { - Message message = consumer.receive(200); - if (message != null) { - session.commit(); - messageCountDown.countDown(); - } - } while (messageCountDown.getCount() != 0); - consumer.close(); - session.close(); - connection.close(); - } - catch (Exception e) { - Assert.fail(e.getMessage()); - } - } - } - - @Before - public void setup() throws Exception { - - broker = new BrokerService(); - broker.setDeleteAllMessagesOnStartup(true); - broker.setPersistent(true); - - kahaDB = new CleanableKahaDBStore(); - kahaDB.setJournalMaxFileLength(256 * 1024); - broker.setPersistenceAdapter(kahaDB); - - broker.start(); - broker.waitUntilStarted(); - } - - @After - public void tearDown() throws Exception { - broker.stop(); - } - - @Test - public void testNoStickyKahaDbLogFilesOnConcurrentTransactionalConsumer() throws Exception { - - List consumerThreads = new ArrayList<>(); - for (int i = 0; i < MAX_CONSUMER; i++) { - ConsumerThread thread = new ConsumerThread(); - thread.start(); - consumerThreads.add(thread); - } - sendMessages(); - - boolean allMessagesReceived = messageCountDown.await(60, TimeUnit.SECONDS); - assertTrue(allMessagesReceived); - - for (Thread thread : consumerThreads) { - thread.join(TimeUnit.MILLISECONDS.convert(60, TimeUnit.SECONDS)); - assertFalse(thread.isAlive()); - } - kahaDB.forceCleanup(); - assertEquals("Expect only one active KahaDB log file after cleanup", 1, kahaDB.getFileMapSize()); - } - - private void sendMessages() throws Exception { - ConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost"); - Connection connection = factory.createConnection(); - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME)); - producer.setDeliveryMode(DeliveryMode.PERSISTENT); - for (int i = 0; i < MAX_MESSAGES; i++) { - BytesMessage message = session.createBytesMessage(); - message.writeBytes(new byte[200]); - producer.send(message); - } - producer.close(); - session.close(); - connection.close(); - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/72918fa9/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3014Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3014Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3014Test.java deleted file mode 100644 index 1e3c737..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3014Test.java +++ /dev/null @@ -1,200 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import java.io.IOException; -import java.net.URI; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.Connection; -import org.apache.activemq.broker.TransportConnector; -import org.apache.activemq.command.BrokerInfo; -import org.apache.activemq.network.DiscoveryNetworkConnector; -import org.apache.activemq.thread.Task; -import org.apache.activemq.thread.TaskRunner; -import org.apache.activemq.thread.TaskRunnerFactory; -import org.apache.activemq.transport.*; -import org.apache.activemq.transport.discovery.simple.SimpleDiscoveryAgent; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -/** - * This test involves the creation of a local and remote broker, both of which - * communicate over VM and TCP. The local broker establishes a bridge to the - * remote broker for the purposes of verifying that broker info is only - * transferred once the local broker's ID is known to the bridge support. - */ -public class AMQ3014Test { - - // Change this URL to be an unused port. - private static final String BROKER_URL = "tcp://localhost:0"; - - private List remoteBrokerInfos = Collections.synchronizedList(new ArrayList()); - - private BrokerService localBroker = new BrokerService(); - - // Override the "remote" broker so that it records all (remote) BrokerInfos - // that it receives. - private BrokerService remoteBroker = new BrokerService() { - @Override - protected TransportConnector createTransportConnector(URI brokerURI) throws Exception { - TransportServer transport = TransportFactorySupport.bind(this, brokerURI); - return new TransportConnector(transport) { - @Override - protected Connection createConnection(Transport transport) throws IOException { - Connection connection = super.createConnection(transport); - final TransportListener proxiedListener = transport.getTransportListener(); - transport.setTransportListener(new TransportListener() { - - @Override - public void onCommand(Object command) { - if (command instanceof BrokerInfo) { - remoteBrokerInfos.add((BrokerInfo) command); - } - proxiedListener.onCommand(command); - } - - @Override - public void onException(IOException error) { - proxiedListener.onException(error); - } - - @Override - public void transportInterupted() { - proxiedListener.transportInterupted(); - } - - @Override - public void transportResumed() { - proxiedListener.transportResumed(); - } - }); - return connection; - } - - }; - } - }; - - @Before - public void init() throws Exception { - localBroker.setBrokerName("localBroker"); - localBroker.setPersistent(false); - localBroker.setUseJmx(false); - localBroker.setSchedulerSupport(false); - - remoteBroker.setBrokerName("remoteBroker"); - remoteBroker.setPersistent(false); - remoteBroker.setUseJmx(false); - remoteBroker.addConnector(BROKER_URL); - remoteBroker.setSchedulerSupport(false); - } - - @After - public void cleanup() throws Exception { - try { - localBroker.stop(); - } - finally { - remoteBroker.stop(); - } - } - - /** - * This test verifies that the local broker's ID is typically known by the - * bridge support before the local broker's BrokerInfo is sent to the remote - * broker. - */ - @Test - public void NormalCaseTest() throws Exception { - runTest(0, 3000); - } - - /** - * This test verifies that timing can arise under which the local broker's - * ID is not known by the bridge support before the local broker's - * BrokerInfo is sent to the remote broker. - */ - @Test - public void DelayedCaseTest() throws Exception { - runTest(500, 3000); - } - - private void runTest(final long taskRunnerDelay, long timeout) throws Exception { - // Add a network connector to the local broker that will create a bridge - // to the remote broker. - DiscoveryNetworkConnector dnc = new DiscoveryNetworkConnector(); - SimpleDiscoveryAgent da = new SimpleDiscoveryAgent(); - da.setServices(remoteBroker.getTransportConnectors().get(0).getPublishableConnectString()); - dnc.setDiscoveryAgent(da); - localBroker.addNetworkConnector(dnc); - - // Before starting the local broker, intercept the task runner factory - // so that the - // local VMTransport dispatcher is artificially delayed. - final TaskRunnerFactory realTaskRunnerFactory = localBroker.getTaskRunnerFactory(); - localBroker.setTaskRunnerFactory(new TaskRunnerFactory() { - @Override - public TaskRunner createTaskRunner(Task task, String name) { - final TaskRunner realTaskRunner = realTaskRunnerFactory.createTaskRunner(task, name); - if (name.startsWith("ActiveMQ Connection Dispatcher: ")) { - return new TaskRunner() { - @Override - public void shutdown() throws InterruptedException { - realTaskRunner.shutdown(); - } - - @Override - public void shutdown(long timeout) throws InterruptedException { - realTaskRunner.shutdown(timeout); - } - - @Override - public void wakeup() throws InterruptedException { - Thread.sleep(taskRunnerDelay); - realTaskRunner.wakeup(); - } - }; - } - else { - return realTaskRunnerFactory.createTaskRunner(task, name); - } - } - }); - - // Start the brokers and wait for the bridge to be created; the remote - // broker is started first to ensure it is available for the local - // broker to connect to. - remoteBroker.start(); - localBroker.start(); - - // Wait for the remote broker to receive the local broker's BrokerInfo - // and then verify the local broker's ID is known. - long startTimeMillis = System.currentTimeMillis(); - while (remoteBrokerInfos.isEmpty() && (System.currentTimeMillis() - startTimeMillis) < timeout) { - Thread.sleep(100); - } - - Assert.assertFalse("Timed out waiting for bridge to form.", remoteBrokerInfos.isEmpty()); - Assert.assertNotNull("Local broker ID is null.", remoteBrokerInfos.get(0).getBrokerId()); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/72918fa9/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3120Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3120Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3120Test.java deleted file mode 100644 index 88a0db8..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3120Test.java +++ /dev/null @@ -1,147 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; -import org.apache.activemq.util.ConsumerThread; -import org.apache.activemq.util.ProducerThread; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.junit.Test; - -import javax.jms.*; - -import java.io.File; - -import static org.junit.Assert.assertEquals; - -public class AMQ3120Test { - - private static final Logger LOG = LoggerFactory.getLogger(AMQ3120Test.class); - - BrokerService broker = null; - File kahaDbDir = null; - private final Destination destination = new ActiveMQQueue("AMQ3120Test"); - final String payload = new String(new byte[1024]); - - protected void startBroker(boolean delete) throws Exception { - broker = new BrokerService(); - - //Start with a clean directory - kahaDbDir = new File(broker.getBrokerDataDirectory(), "KahaDB"); - deleteDir(kahaDbDir); - - broker.setSchedulerSupport(false); - broker.setDeleteAllMessagesOnStartup(delete); - broker.setPersistent(true); - broker.setUseJmx(false); - broker.addConnector("tcp://localhost:0"); - - PolicyMap map = new PolicyMap(); - PolicyEntry entry = new PolicyEntry(); - entry.setUseCache(false); - map.setDefaultEntry(entry); - broker.setDestinationPolicy(map); - - configurePersistence(broker, delete); - - broker.start(); - LOG.info("Starting broker.."); - } - - protected void configurePersistence(BrokerService brokerService, boolean deleteAllOnStart) throws Exception { - KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter(); - - // ensure there are a bunch of data files but multiple entries in each - adapter.setJournalMaxFileLength(1024 * 20); - - // speed up the test case, checkpoint and cleanup early and often - adapter.setCheckpointInterval(500); - adapter.setCleanupInterval(500); - - if (!deleteAllOnStart) { - adapter.setForceRecoverIndex(true); - } - - } - - private boolean deleteDir(File dir) { - if (dir.isDirectory()) { - String[] children = dir.list(); - for (int i = 0; i < children.length; i++) { - boolean success = deleteDir(new File(dir, children[i])); - if (!success) { - return false; - } - } - } - - return dir.delete(); - } - - private int getFileCount(File dir) { - if (dir.isDirectory()) { - String[] children = dir.list(); - return children.length; - } - - return 0; - } - - @Test - public void testCleanupOfFiles() throws Exception { - final int messageCount = 500; - startBroker(true); - int fileCount = getFileCount(kahaDbDir); - assertEquals(4, fileCount); - - Connection connection = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri()).createConnection(); - connection.start(); - Session producerSess = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Session consumerSess = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - ProducerThread producer = new ProducerThread(producerSess, destination) { - @Override - protected Message createMessage(int i) throws Exception { - return sess.createTextMessage(payload + "::" + i); - } - }; - producer.setSleep(650); - producer.setMessageCount(messageCount); - ConsumerThread consumer = new ConsumerThread(consumerSess, destination); - consumer.setBreakOnNull(false); - consumer.setMessageCount(messageCount); - - producer.start(); - consumer.start(); - - producer.join(); - consumer.join(); - - assertEquals("consumer got all produced messages", producer.getMessageCount(), consumer.getReceived()); - - broker.stop(); - broker.waitUntilStopped(); - - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/72918fa9/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3140Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3140Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3140Test.java deleted file mode 100644 index 621b421..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3140Test.java +++ /dev/null @@ -1,146 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -import java.io.File; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.ScheduledMessage; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.util.IOHelper; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -public class AMQ3140Test { - - private static final int MESSAGES_PER_THREAD = 100; - - private static final int THREAD_COUNT = 10; - - private BrokerService broker; - - private static final String QUEUE_NAME = "test"; - - private static class Sender extends Thread { - - private static final int DELAY = 3000; - - @Override - public void run() { - try { - ConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost"); - Connection connection = cf.createConnection(); - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME)); - Message message = session.createTextMessage("test"); - for (int i = 0; i < MESSAGES_PER_THREAD; i++) { - message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, DELAY); - producer.send(message); - } - session.close(); - connection.close(); - } - catch (JMSException e) { - fail(e.getMessage()); - } - } - } - - @Before - public void setup() throws Exception { - File schedulerDirectory = new File("target/test/ScheduledDB"); - - IOHelper.mkdirs(schedulerDirectory); - IOHelper.deleteChildren(schedulerDirectory); - - broker = new BrokerService(); - broker.setSchedulerSupport(true); - broker.setPersistent(true); - broker.setDeleteAllMessagesOnStartup(true); - broker.setDataDirectory("target"); - broker.setSchedulerDirectoryFile(schedulerDirectory); - broker.setUseJmx(false); - broker.addConnector("vm://localhost"); - - broker.start(); - broker.waitUntilStarted(); - } - - @After - public void tearDown() throws Exception { - broker.stop(); - } - - @Test - public void noMessageLostOnConcurrentScheduling() throws JMSException, InterruptedException { - - final AtomicLong receiveCounter = new AtomicLong(); - - ConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost"); - Connection connection = cf.createConnection(); - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME)); - consumer.setMessageListener(new MessageListener() { - - @Override - public void onMessage(Message message) { - receiveCounter.incrementAndGet(); - } - }); - - List senderThreads = new ArrayList<>(); - for (int i = 0; i < THREAD_COUNT; i++) { - Sender sender = new Sender(); - senderThreads.add(sender); - } - for (Sender sender : senderThreads) { - sender.start(); - } - for (Sender sender : senderThreads) { - sender.join(); - } - - // wait until all scheduled messages has been received - TimeUnit.MINUTES.sleep(2); - - session.close(); - connection.close(); - - assertEquals(MESSAGES_PER_THREAD * THREAD_COUNT, receiveCounter.get()); - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/72918fa9/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3141Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3141Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3141Test.java deleted file mode 100644 index 49db143..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3141Test.java +++ /dev/null @@ -1,117 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import static org.junit.Assert.assertTrue; - -import java.io.File; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.ScheduledMessage; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.util.IOHelper; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -public class AMQ3141Test { - - private static final int MAX_MESSAGES = 100; - - private static final long DELAY_IN_MS = 100; - - private static final String QUEUE_NAME = "target.queue"; - - private BrokerService broker; - - private final CountDownLatch messageCountDown = new CountDownLatch(MAX_MESSAGES); - - private ConnectionFactory factory; - - @Before - public void setup() throws Exception { - - broker = new BrokerService(); - broker.setPersistent(true); - broker.setSchedulerSupport(true); - broker.setDataDirectory("target"); - broker.setUseJmx(false); - broker.addConnector("vm://localhost"); - - File schedulerDirectory = new File("target/test/ScheduledDB"); - IOHelper.mkdirs(schedulerDirectory); - IOHelper.deleteChildren(schedulerDirectory); - broker.setSchedulerDirectoryFile(schedulerDirectory); - - broker.start(); - broker.waitUntilStarted(); - - factory = new ActiveMQConnectionFactory("vm://localhost"); - } - - private void sendMessages() throws Exception { - Connection connection = factory.createConnection(); - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME)); - for (int i = 0; i < MAX_MESSAGES; i++) { - Message message = session.createTextMessage(); - message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, DELAY_IN_MS); - producer.send(message); - } - connection.close(); - } - - @Test - public void testNoMissingMessagesOnShortScheduleDelay() throws Exception { - - Connection connection = factory.createConnection(); - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME)); - - consumer.setMessageListener(new MessageListener() { - @Override - public void onMessage(Message message) { - messageCountDown.countDown(); - } - }); - sendMessages(); - - boolean receiveComplete = messageCountDown.await(5, TimeUnit.SECONDS); - - connection.close(); - - assertTrue("expect all messages received but " + messageCountDown.getCount() + " are missing", receiveComplete); - } - - @After - public void tearDown() throws Exception { - broker.stop(); - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/72918fa9/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3145Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3145Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3145Test.java deleted file mode 100644 index 7e7c959..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3145Test.java +++ /dev/null @@ -1,129 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.management.MalformedObjectNameException; -import javax.management.ObjectName; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.jmx.QueueViewMBean; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AMQ3145Test { - - private static final Logger LOG = LoggerFactory.getLogger(AMQ3145Test.class); - private final String MESSAGE_TEXT = new String(new byte[1024]); - BrokerService broker; - ConnectionFactory factory; - Connection connection; - Session session; - Queue queue; - MessageConsumer consumer; - - @Before - public void createBroker() throws Exception { - createBroker(true); - } - - public void createBroker(boolean deleteAll) throws Exception { - broker = new BrokerService(); - broker.setDeleteAllMessagesOnStartup(deleteAll); - broker.setDataDirectory("target/AMQ3145Test"); - broker.setUseJmx(true); - broker.getManagementContext().setCreateConnector(false); - broker.addConnector("tcp://localhost:0"); - broker.start(); - broker.waitUntilStarted(); - factory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri().toString()); - connection = factory.createConnection(); - connection.start(); - session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - } - - @After - public void tearDown() throws Exception { - if (consumer != null) { - consumer.close(); - } - session.close(); - connection.stop(); - connection.close(); - broker.stop(); - } - - @Test - public void testCacheDisableReEnable() throws Exception { - createProducerAndSendMessages(1); - QueueViewMBean proxy = getProxyToQueueViewMBean(); - assertTrue("cache is enabled", proxy.isCacheEnabled()); - tearDown(); - createBroker(false); - proxy = getProxyToQueueViewMBean(); - assertEquals("one pending message", 1, proxy.getQueueSize()); - assertTrue("cache is disabled when there is a pending message", !proxy.isCacheEnabled()); - - createConsumer(1); - createProducerAndSendMessages(1); - assertTrue("cache is enabled again on next send when there are no messages", proxy.isCacheEnabled()); - } - - private QueueViewMBean getProxyToQueueViewMBean() throws MalformedObjectNameException, JMSException { - ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq" + ":destinationType=Queue,destinationName=" + queue.getQueueName() + ",type=Broker,brokerName=localhost"); - QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext().newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true); - return proxy; - } - - private void createProducerAndSendMessages(int numToSend) throws Exception { - queue = session.createQueue("test1"); - MessageProducer producer = session.createProducer(queue); - for (int i = 0; i < numToSend; i++) { - TextMessage message = session.createTextMessage(MESSAGE_TEXT + i); - if (i != 0 && i % 50000 == 0) { - LOG.info("sent: " + i); - } - producer.send(message); - } - producer.close(); - } - - private void createConsumer(int numToConsume) throws Exception { - consumer = session.createConsumer(queue); - // wait for buffer fill out - for (int i = 0; i < numToConsume; ++i) { - Message message = consumer.receive(2000); - message.acknowledge(); - } - consumer.close(); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/72918fa9/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3157Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3157Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3157Test.java deleted file mode 100644 index 34b1909..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3157Test.java +++ /dev/null @@ -1,174 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.TimeUnit; - -import javax.jms.Connection; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.management.ObjectName; - -import org.apache.activemq.EmbeddedBrokerTestSupport; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.jmx.DestinationViewMBean; -import org.apache.activemq.broker.region.DestinationInterceptor; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.broker.region.virtual.MirroredQueue; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.spring.ConsumerBean; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AMQ3157Test extends EmbeddedBrokerTestSupport { - - private static final transient Logger LOG = LoggerFactory.getLogger(AMQ3157Test.class); - private Connection connection; - - public void testInactiveMirroredQueueIsCleanedUp() throws Exception { - - if (connection == null) { - connection = createConnection(); - } - connection.start(); - - ConsumerBean messageList = new ConsumerBean(); - messageList.setVerbose(true); - - ActiveMQDestination consumeDestination = createConsumeDestination(); - - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - LOG.info("Consuming from: " + consumeDestination); - - MessageConsumer c1 = session.createConsumer(consumeDestination); - c1.setMessageListener(messageList); - - // create topic producer - ActiveMQQueue sendDestination = new ActiveMQQueue(getQueueName()); - LOG.info("Sending to: " + sendDestination); - - MessageProducer producer = session.createProducer(sendDestination); - assertNotNull(producer); - - final int total = 10; - for (int i = 0; i < total; i++) { - producer.send(session.createTextMessage("message: " + i)); - } - - messageList.assertMessagesArrived(total); - LOG.info("Received: " + messageList); - messageList.flushMessages(); - - MessageConsumer c2 = session.createConsumer(sendDestination); - c2.setMessageListener(messageList); - messageList.assertMessagesArrived(total); - LOG.info("Q Received: " + messageList); - - connection.close(); - - List topics = Arrays.asList(broker.getAdminView().getTopics()); - assertTrue(topics.contains(createObjectName(consumeDestination))); - List queues = Arrays.asList(broker.getAdminView().getQueues()); - assertTrue(queues.contains(createObjectName(sendDestination))); - - Thread.sleep(TimeUnit.SECONDS.toMillis(10)); - - topics = Arrays.asList(broker.getAdminView().getTopics()); - if (topics != null) { - assertFalse("Virtual Topic Desination did not get cleaned up.", topics.contains(createObjectName(consumeDestination))); - } - queues = Arrays.asList(broker.getAdminView().getQueues()); - if (queues != null) { - assertFalse("Mirrored Queue Desination did not get cleaned up.", queues.contains(createObjectName(sendDestination))); - } - } - - protected ActiveMQDestination createConsumeDestination() { - return new ActiveMQTopic("VirtualTopic.Mirror." + getQueueName()); - } - - protected String getQueueName() { - return "My.Queue"; - } - - @Override - protected BrokerService createBroker() throws Exception { - BrokerService answer = new BrokerService(); - answer.setUseMirroredQueues(true); - answer.setPersistent(isPersistent()); - answer.setSchedulePeriodForDestinationPurge(1000); - - PolicyEntry entry = new PolicyEntry(); - entry.setGcInactiveDestinations(true); - entry.setInactiveTimeoutBeforeGC(5000); - entry.setProducerFlowControl(true); - PolicyMap map = new PolicyMap(); - map.setDefaultEntry(entry); - - MirroredQueue mirrorQ = new MirroredQueue(); - mirrorQ.setCopyMessage(true); - DestinationInterceptor[] destinationInterceptors = new DestinationInterceptor[]{mirrorQ}; - answer.setDestinationInterceptors(destinationInterceptors); - - answer.setDestinationPolicy(map); - answer.addConnector(bindAddress); - - return answer; - } - - protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception { - String domain = "org.apache.activemq"; - ObjectName name; - if (destination.isQueue()) { - name = new ObjectName(domain + ":BrokerName=localhost,Type=Queue,Destination=" + destination.getPhysicalName()); - } - else { - name = new ObjectName(domain + ":BrokerName=localhost,Type=Topic,Destination=" + destination.getPhysicalName()); - } - return (DestinationViewMBean) broker.getManagementContext().newProxyInstance(name, DestinationViewMBean.class, true); - } - - protected ObjectName createObjectName(ActiveMQDestination destination) throws Exception { - String domain = "org.apache.activemq"; - ObjectName name; - if (destination.isQueue()) { - name = new ObjectName(domain + ":type=Broker,brokerName=localhost," + - "destinationType=Queue,destinationName=" + destination.getPhysicalName()); - } - else { - name = new ObjectName(domain + ":type=Broker,brokerName=localhost," + - "destinationType=Topic,destinationName=" + destination.getPhysicalName()); - } - - return name; - } - - @Override - protected void tearDown() throws Exception { - if (connection != null) { - connection.close(); - } - super.tearDown(); - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/72918fa9/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3167Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3167Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3167Test.java deleted file mode 100644 index 6fd81b2..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3167Test.java +++ /dev/null @@ -1,471 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.bugs; - -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.util.ArrayList; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQMessage; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -/** - * Test the loss of messages detected during testing with ActiveMQ 5.4.1 and 5.4.2. - *
- * Symptoms: - 1 record is lost "early" in the stream. - no more records lost. - *
- * Test Configuration: - Broker Settings: - Destination Policy - Occurs with "Destination Policy" using Store Cursor and - * a memory limit - Not reproduced without "Destination Policy" defined - Persistence Adapter - Memory: Does not occur. - * - KahaDB: Occurs. - Messages - Occurs with TextMessage and BinaryMessage - Persistent messages. - *
- * Notes: - Lower memory limits increase the rate of occurrence. - Higher memory limits may prevent the problem - * (probably because memory limits not reached). - Producers sending a number of messages before consumers come online - * increases rate of occurrence. - */ - -public class AMQ3167Test { - - protected BrokerService embeddedBroker; - - protected static final int MEMORY_LIMIT = 16 * 1024; - - protected static boolean Debug_f = false; - - protected long Producer_stop_time = 0; - protected long Consumer_stop_time = 0; - protected long Consumer_startup_delay_ms = 2000; - protected boolean Stop_after_error = true; - - protected Connection JMS_conn; - protected long Num_error = 0; - - // // //// - // // UTILITIES //// - // // //// - - /** - * Create a new, unsecured, client connection to the test broker using the given username and password. This - * connection bypasses all security. - *
- * Don't forget to start the connection or no messages will be received by consumers even though producers will work - * fine. - * - * @username name of the JMS user for the connection; may be null. - * @password Password for the JMS user; may be null. - */ - - protected Connection createUnsecuredConnection(String username, String password) throws javax.jms.JMSException { - ActiveMQConnectionFactory conn_fact; - - conn_fact = new ActiveMQConnectionFactory(embeddedBroker.getVmConnectorURI()); - - return conn_fact.createConnection(username, password); - } - - // // //// - // // TEST FUNCTIONALITY //// - // // //// - - @Before - public void testPrep() throws Exception { - embeddedBroker = new BrokerService(); - configureBroker(embeddedBroker); - embeddedBroker.start(); - embeddedBroker.waitUntilStarted(); - - // Prepare the connection - JMS_conn = createUnsecuredConnection(null, null); - JMS_conn.start(); - } - - @After - public void testCleanup() throws java.lang.Exception { - JMS_conn.stop(); - embeddedBroker.stop(); - } - - protected void configureBroker(BrokerService broker_svc) throws Exception { - - broker_svc.setBrokerName("testbroker1"); - - broker_svc.setUseJmx(false); - broker_svc.setPersistent(true); - broker_svc.setDataDirectory("target/AMQ3167Test"); - configureDestinationPolicy(broker_svc); - } - - /** - * NOTE: overrides any prior policy map defined for the broker service. - */ - - protected void configureDestinationPolicy(BrokerService broker_svc) { - PolicyMap pol_map; - PolicyEntry pol_ent; - ArrayList ent_list; - - ent_list = new ArrayList<>(); - - // - // QUEUES - // - - pol_ent = new PolicyEntry(); - pol_ent.setQueue(">"); - pol_ent.setMemoryLimit(MEMORY_LIMIT); - pol_ent.setProducerFlowControl(false); - ent_list.add(pol_ent); - - // - // COMPLETE POLICY MAP - // - - pol_map = new PolicyMap(); - pol_map.setPolicyEntries(ent_list); - - broker_svc.setDestinationPolicy(pol_map); - } - - // // //// - // // TEST //// - // // //// - - @Test - public void testQueueLostMessage() throws Exception { - Destination dest; - - dest = ActiveMQDestination.createDestination("lostmsgtest.queue", ActiveMQDestination.QUEUE_TYPE); - - // 10 seconds from now - Producer_stop_time = java.lang.System.nanoTime() + (10L * 1000000000L); - - // 15 seconds from now - Consumer_stop_time = Producer_stop_time + (5L * 1000000000L); - - runLostMsgTest(dest, 1000000, 1, 1, false); - - // Make sure failures in the threads are thoroughly reported in the JUnit framework. - assertTrue(Num_error == 0); - } - - /** - * - */ - - protected static void log(String msg) { - if (Debug_f) - java.lang.System.err.println(msg); - } - - /** - * Main body of the lost-message test. - */ - - protected void runLostMsgTest(Destination dest, - int num_msg, - int num_send_per_sess, - int num_recv_per_sess, - boolean topic_f) throws Exception { - Thread prod_thread; - Thread cons_thread; - String tag; - Session sess; - MessageProducer prod; - MessageConsumer cons; - int ack_mode; - - // - // Start the producer - // - - tag = "prod"; - log(">> Starting producer " + tag); - - sess = JMS_conn.createSession((num_send_per_sess > 1), Session.AUTO_ACKNOWLEDGE); - prod = sess.createProducer(dest); - - prod_thread = new producerThread(sess, prod, tag, num_msg, num_send_per_sess); - prod_thread.start(); - log("Started producer " + tag); - - // - // Delay before starting consumers - // - - log("Waiting before starting consumers"); - java.lang.Thread.sleep(Consumer_startup_delay_ms); - - // - // Now create and start the consumer - // - - tag = "cons"; - log(">> Starting consumer"); - - if (num_recv_per_sess > 1) - ack_mode = Session.CLIENT_ACKNOWLEDGE; - else - ack_mode = Session.AUTO_ACKNOWLEDGE; - - sess = JMS_conn.createSession(false, ack_mode); - cons = sess.createConsumer(dest); - - cons_thread = new consumerThread(sess, cons, tag, num_msg, num_recv_per_sess); - cons_thread.start(); - log("Started consumer " + tag); - - // - // Wait for the producer and consumer to finish. - // - - log("< waiting for producer."); - prod_thread.join(); - - log("< waiting for consumer."); - cons_thread.join(); - - log("Shutting down"); - } - - // // //// - // // INTERNAL CLASSES //// - // // //// - - /** - * Producer thread - runs a single producer until the maximum number of messages is sent, the producer stop time is - * reached, or a test error is detected. - */ - - protected class producerThread extends Thread { - - protected Session msgSess; - protected MessageProducer msgProd; - protected String producerTag; - protected int numMsg; - protected int numPerSess; - protected long producer_stop_time; - - producerThread(Session sess, MessageProducer prod, String tag, int num_msg, int sess_size) { - super(); - - producer_stop_time = 0; - msgSess = sess; - msgProd = prod; - producerTag = tag; - numMsg = num_msg; - numPerSess = sess_size; - } - - public void execTest() throws Exception { - Message msg; - int sess_start; - int cur; - - sess_start = 0; - cur = 0; - while ((cur < numMsg) && (!didTimeOut()) && ((!Stop_after_error) || (Num_error == 0))) { - msg = msgSess.createTextMessage("test message from " + producerTag); - msg.setStringProperty("testprodtag", producerTag); - msg.setIntProperty("seq", cur); - - if (msg instanceof ActiveMQMessage) { - ((ActiveMQMessage) msg).setResponseRequired(true); - } - - // - // Send the message. - // - - msgProd.send(msg); - cur++; - - // - // Commit if the number of messages per session has been reached, and - // transactions are being used (only when > 1 msg per sess). - // - - if ((numPerSess > 1) && ((cur - sess_start) >= numPerSess)) { - msgSess.commit(); - sess_start = cur; - } - } - - // Make sure to send the final commit, if there were sends since the last commit. - if ((numPerSess > 1) && ((cur - sess_start) > 0)) - msgSess.commit(); - - if (cur < numMsg) - log("* Producer " + producerTag + " timed out at " + java.lang.System.nanoTime() + " (stop time " + producer_stop_time + ")"); - } - - /** - * Check whether it is time for the producer to terminate. - */ - - protected boolean didTimeOut() { - if ((Producer_stop_time > 0) && (java.lang.System.nanoTime() >= Producer_stop_time)) - return true; - - return false; - } - - /** - * Run the producer. - */ - - @Override - public void run() { - try { - log("- running producer " + producerTag); - execTest(); - log("- finished running producer " + producerTag); - } - catch (Throwable thrown) { - Num_error++; - fail("producer " + producerTag + " failed: " + thrown.getMessage()); - throw new Error("producer " + producerTag + " failed", thrown); - } - } - - @Override - public String toString() { - return producerTag; - } - } - - /** - * Producer thread - runs a single consumer until the maximum number of messages is received, the consumer stop time - * is reached, or a test error is detected. - */ - - protected class consumerThread extends Thread { - - protected Session msgSess; - protected MessageConsumer msgCons; - protected String consumerTag; - protected int numMsg; - protected int numPerSess; - - consumerThread(Session sess, MessageConsumer cons, String tag, int num_msg, int sess_size) { - super(); - - msgSess = sess; - msgCons = cons; - consumerTag = tag; - numMsg = num_msg; - numPerSess = sess_size; - } - - public void execTest() throws Exception { - Message msg; - int sess_start; - int cur; - - msg = null; - sess_start = 0; - cur = 0; - - while ((cur < numMsg) && (!didTimeOut()) && ((!Stop_after_error) || (Num_error == 0))) { - // - // Use a timeout of 1 second to periodically check the consumer timeout. - // - msg = msgCons.receive(1000); - if (msg != null) { - checkMessage(msg, cur); - cur++; - - if ((numPerSess > 1) && ((cur - sess_start) >= numPerSess)) { - msg.acknowledge(); - sess_start = cur; - } - } - } - - // Acknowledge the last messages, if they were not yet acknowledged. - if ((numPerSess > 1) && ((cur - sess_start) > 0)) - msg.acknowledge(); - - if (cur < numMsg) - log("* Consumer " + consumerTag + " timed out"); - } - - /** - * Check whether it is time for the consumer to terminate. - */ - - protected boolean didTimeOut() { - if ((Consumer_stop_time > 0) && (java.lang.System.nanoTime() >= Consumer_stop_time)) - return true; - - return false; - } - - /** - * Verify the message received. Sequence numbers are checked and are expected to exactly match the message - * number (starting at 0). - */ - - protected void checkMessage(Message msg, int exp_seq) throws javax.jms.JMSException { - int seq; - - seq = msg.getIntProperty("seq"); - - if (exp_seq != seq) { - Num_error++; - fail("*** Consumer " + consumerTag + " expected seq " + exp_seq + "; received " + seq); - } - } - - /** - * Run the consumer. - */ - - @Override - public void run() { - try { - log("- running consumer " + consumerTag); - execTest(); - log("- running consumer " + consumerTag); - } - catch (Throwable thrown) { - Num_error++; - fail("consumer " + consumerTag + " failed: " + thrown.getMessage()); - throw new Error("consumer " + consumerTag + " failed", thrown); - } - } - - @Override - public String toString() { - return consumerTag; - } - } -} \ No newline at end of file