Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0156018641 for ; Tue, 23 Feb 2016 23:27:09 +0000 (UTC) Received: (qmail 28704 invoked by uid 500); 23 Feb 2016 23:27:08 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 28598 invoked by uid 500); 23 Feb 2016 23:27:08 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 27955 invoked by uid 99); 23 Feb 2016 23:27:08 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 23 Feb 2016 23:27:08 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3AD0EE8E8E; Tue, 23 Feb 2016 23:27:08 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Tue, 23 Feb 2016 23:27:36 -0000 Message-Id: <682968193faf4ef9a214dd790e1b14f1@git.apache.org> In-Reply-To: <85bcf1a3599d4007b0799d846136b1f6@git.apache.org> References: <85bcf1a3599d4007b0799d846136b1f6@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [30/42] activemq-artemis git commit: open wire changes equivalent to ab16f7098fb52d2b4c40627ed110e1776525f208 http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ab5b3b0c/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1866.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1866.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1866.java deleted file mode 100644 index ad12f71..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1866.java +++ /dev/null @@ -1,233 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import java.util.ArrayList; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; - -import javax.jms.Connection; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; - -import junit.framework.TestCase; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.leveldb.LevelDBStore; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This is a test case for the issue reported at: - * https://issues.apache.org/activemq/browse/AMQ-1866 - * - * If you have a JMS producer sending messages to multiple fast consumers and - * one slow consumer, eventually all consumers will run as slow as - * the slowest consumer. - */ -public class AMQ1866 extends TestCase { - - private static final Logger log = LoggerFactory.getLogger(ConsumerThread.class); - private BrokerService brokerService; - private ArrayList threads = new ArrayList<>(); - - private final String ACTIVEMQ_BROKER_BIND = "tcp://localhost:0"; - private String ACTIVEMQ_BROKER_URI; - - AtomicBoolean shutdown = new AtomicBoolean(); - private ActiveMQQueue destination; - - @Override - protected void setUp() throws Exception { - // Start an embedded broker up. - brokerService = new BrokerService(); - LevelDBStore adaptor = new LevelDBStore(); - brokerService.setPersistenceAdapter(adaptor); - brokerService.deleteAllMessages(); - - // A small max page size makes this issue occur faster. - PolicyMap policyMap = new PolicyMap(); - PolicyEntry pe = new PolicyEntry(); - pe.setMaxPageSize(1); - policyMap.put(new ActiveMQQueue(">"), pe); - brokerService.setDestinationPolicy(policyMap); - - brokerService.addConnector(ACTIVEMQ_BROKER_BIND); - brokerService.start(); - - ACTIVEMQ_BROKER_URI = brokerService.getTransportConnectors().get(0).getPublishableConnectString(); - destination = new ActiveMQQueue(getName()); - } - - @Override - protected void tearDown() throws Exception { - // Stop any running threads. - shutdown.set(true); - for (Thread t : threads) { - t.interrupt(); - t.join(); - } - brokerService.stop(); - } - - public void testConsumerSlowDownPrefetch0() throws Exception { - ACTIVEMQ_BROKER_URI = ACTIVEMQ_BROKER_URI + "?jms.prefetchPolicy.queuePrefetch=0"; - doTestConsumerSlowDown(); - } - - public void testConsumerSlowDownPrefetch10() throws Exception { - ACTIVEMQ_BROKER_URI = ACTIVEMQ_BROKER_URI + "?jms.prefetchPolicy.queuePrefetch=10"; - doTestConsumerSlowDown(); - } - - public void testConsumerSlowDownDefaultPrefetch() throws Exception { - doTestConsumerSlowDown(); - } - - public void doTestConsumerSlowDown() throws Exception { - - // Preload the queue. - produce(20000); - - Thread producer = new Thread() { - @Override - public void run() { - try { - while (!shutdown.get()) { - produce(1000); - } - } - catch (Exception e) { - } - } - }; - threads.add(producer); - producer.start(); - - // This is the slow consumer. - ConsumerThread c1 = new ConsumerThread("Consumer-1"); - threads.add(c1); - c1.start(); - - // Wait a bit so that the slow consumer gets assigned most of the messages. - Thread.sleep(500); - ConsumerThread c2 = new ConsumerThread("Consumer-2"); - threads.add(c2); - c2.start(); - - int totalReceived = 0; - for (int i = 0; i < 30; i++) { - Thread.sleep(1000); - long c1Counter = c1.counter.getAndSet(0); - long c2Counter = c2.counter.getAndSet(0); - log.debug("c1: " + c1Counter + ", c2: " + c2Counter); - totalReceived += c1Counter; - totalReceived += c2Counter; - - // Once message have been flowing for a few seconds, start asserting that c2 always gets messages. It should be receiving about 100 / sec - if (i > 10) { - assertTrue("Total received=" + totalReceived + ", Consumer 2 should be receiving new messages every second.", c2Counter > 0); - } - } - } - - public void produce(int count) throws Exception { - Connection connection = null; - try { - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_BROKER_URI); - factory.setDispatchAsync(true); - - connection = factory.createConnection(); - - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(destination); - connection.start(); - - for (int i = 0; i < count; i++) { - producer.send(session.createTextMessage(getName() + " Message " + (++i))); - } - - } - finally { - try { - connection.close(); - } - catch (Throwable e) { - } - } - } - - public class ConsumerThread extends Thread { - - final AtomicLong counter = new AtomicLong(); - - public ConsumerThread(String threadId) { - super(threadId); - } - - @Override - public void run() { - Connection connection = null; - try { - log.debug(getName() + ": is running"); - - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_BROKER_URI); - factory.setDispatchAsync(true); - - connection = factory.createConnection(); - - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = session.createConsumer(destination); - connection.start(); - - while (!shutdown.get()) { - TextMessage msg = (TextMessage) consumer.receive(1000); - if (msg != null) { - int sleepingTime; - if (getName().equals("Consumer-1")) { - sleepingTime = 1000 * 1000; - } - else { - sleepingTime = 1; - } - counter.incrementAndGet(); - Thread.sleep(sleepingTime); - } - } - - } - catch (Exception e) { - } - finally { - log.debug(getName() + ": is stopping"); - try { - connection.close(); - } - catch (Throwable e) { - } - } - } - - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ab5b3b0c/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1893Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1893Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1893Test.java deleted file mode 100644 index b9cb919..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1893Test.java +++ /dev/null @@ -1,192 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import junit.framework.TestCase; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.command.ActiveMQQueue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -public class AMQ1893Test extends TestCase { - - private static final Logger log = LoggerFactory.getLogger(AMQ1893Test.class); - - static final String QUEUE_NAME = "TEST"; - - static final int MESSAGE_COUNT_OF_ONE_GROUP = 10000; - - static final int[] PRIORITIES = new int[]{0, 5, 10}; - - static final boolean debug = false; - - private BrokerService brokerService; - - private ActiveMQQueue destination; - - @Override - protected void setUp() throws Exception { - brokerService = new BrokerService(); - brokerService.setDeleteAllMessagesOnStartup(true); - brokerService.addConnector("tcp://localhost:0"); - brokerService.start(); - destination = new ActiveMQQueue(QUEUE_NAME); - } - - @Override - protected void tearDown() throws Exception { - // Stop any running threads. - brokerService.stop(); - } - - public void testProduceConsumeWithSelector() throws Exception { - new TestProducer().produceMessages(); - new TestConsumer().consume(); - } - - class TestProducer { - - public void produceMessages() throws Exception { - ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerService.getTransportConnectors().get(0).getConnectUri().toString()); - Connection connection = connectionFactory.createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Destination destination = session.createQueue(QUEUE_NAME); - MessageProducer producer = session.createProducer(destination); - producer.setDeliveryMode(DeliveryMode.PERSISTENT); - - long start = System.currentTimeMillis(); - - for (int priority : PRIORITIES) { - - String name = null; - if (priority == 10) { - name = "high"; - } - else if (priority == 5) { - name = "mid"; - } - else { - name = "low"; - } - - for (int i = 1; i <= MESSAGE_COUNT_OF_ONE_GROUP; i++) { - - TextMessage message = session.createTextMessage(name + "_" + i); - message.setIntProperty("priority", priority); - - producer.send(message); - } - } - - long end = System.currentTimeMillis(); - - log.info("sent " + (MESSAGE_COUNT_OF_ONE_GROUP * 3) + " messages in " + (end - start) + " ms"); - - producer.close(); - session.close(); - connection.close(); - } - } - - class TestConsumer { - - private CountDownLatch finishLatch = new CountDownLatch(1); - - public void consume() throws Exception { - ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerService.getTransportConnectors().get(0).getConnectUri().toString()); - - final int totalMessageCount = MESSAGE_COUNT_OF_ONE_GROUP * PRIORITIES.length; - final AtomicInteger counter = new AtomicInteger(); - final MessageListener listener = new MessageListener() { - @Override - public void onMessage(Message message) { - - if (debug) { - try { - log.info(((TextMessage) message).getText()); - } - catch (JMSException e) { - e.printStackTrace(); - } - } - - if (counter.incrementAndGet() == totalMessageCount) { - - finishLatch.countDown(); - - } - } - }; - - int consumerCount = PRIORITIES.length; - Connection[] connections = new Connection[consumerCount]; - Session[] sessions = new Session[consumerCount]; - MessageConsumer[] consumers = new MessageConsumer[consumerCount]; - - for (int i = 0; i < consumerCount; i++) { - String selector = "priority = " + PRIORITIES[i]; - - connections[i] = connectionFactory.createConnection(); - sessions[i] = connections[i].createSession(false, Session.AUTO_ACKNOWLEDGE); - - consumers[i] = sessions[i].createConsumer(destination, selector); - consumers[i].setMessageListener(listener); - } - - for (Connection connection : connections) { - connection.start(); - } - - log.info("received " + counter.get() + " messages"); - - assertTrue("got all messages in time", finishLatch.await(60, TimeUnit.SECONDS)); - - log.info("received " + counter.get() + " messages"); - - for (MessageConsumer consumer : consumers) { - consumer.close(); - } - - for (Session session : sessions) { - session.close(); - } - - for (Connection connection : connections) { - connection.close(); - } - } - - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ab5b3b0c/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1917Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1917Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1917Test.java deleted file mode 100644 index a7eb699..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1917Test.java +++ /dev/null @@ -1,229 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import junit.framework.TestCase; - -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.command.ActiveMQDestination; - -public class AMQ1917Test extends TestCase { - - private static final int NUM_MESSAGES = 4000; - private static final int NUM_THREADS = 10; - private static final String REQUEST_QUEUE = "mock.in.queue"; - private static final String REPLY_QUEUE = "mock.out.queue"; - - private Destination requestDestination = ActiveMQDestination.createDestination(REQUEST_QUEUE, ActiveMQDestination.QUEUE_TYPE); - private Destination replyDestination = ActiveMQDestination.createDestination(REPLY_QUEUE, ActiveMQDestination.QUEUE_TYPE); - - private CountDownLatch roundTripLatch = new CountDownLatch(NUM_MESSAGES); - private CountDownLatch errorLatch = new CountDownLatch(1); - private ThreadPoolExecutor tpe; - private final String BROKER_URL = "tcp://localhost:0"; - private String connectionUri; - private BrokerService broker = null; - private boolean working = true; - - // trival session/producer pool - final Session[] sessions = new Session[NUM_THREADS]; - final MessageProducer[] producers = new MessageProducer[NUM_THREADS]; - - @Override - public void setUp() throws Exception { - broker = new BrokerService(); - broker.setPersistent(false); - broker.addConnector(BROKER_URL); - broker.start(); - - connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString(); - - BlockingQueue queue = new ArrayBlockingQueue<>(10000); - tpe = new ThreadPoolExecutor(NUM_THREADS, NUM_THREADS, 60000, TimeUnit.MILLISECONDS, queue); - ThreadFactory limitedthreadFactory = new LimitedThreadFactory(tpe.getThreadFactory()); - tpe.setThreadFactory(limitedthreadFactory); - } - - @Override - public void tearDown() throws Exception { - broker.stop(); - tpe.shutdown(); - } - - public void testLoadedSendReceiveWithCorrelationId() throws Exception { - - ActiveMQConnectionFactory connectionFactory = new org.apache.activemq.ActiveMQConnectionFactory(); - connectionFactory.setBrokerURL(connectionUri); - Connection connection = connectionFactory.createConnection(); - setupReceiver(connection); - - connection = connectionFactory.createConnection(); - connection.start(); - - // trival session/producer pool - for (int i = 0; i < NUM_THREADS; i++) { - sessions[i] = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - producers[i] = sessions[i].createProducer(requestDestination); - } - - for (int i = 0; i < NUM_MESSAGES; i++) { - MessageSenderReceiver msr = new MessageSenderReceiver(requestDestination, replyDestination, "Test Message : " + i); - tpe.execute(msr); - } - - while (!roundTripLatch.await(4000, TimeUnit.MILLISECONDS)) { - if (errorLatch.await(1000, TimeUnit.MILLISECONDS)) { - fail("there was an error, check the console for thread or thread allocation failure"); - break; - } - } - working = false; - } - - private void setupReceiver(final Connection connection) throws Exception { - - final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - final MessageConsumer consumer = session.createConsumer(requestDestination); - final MessageProducer sender = session.createProducer(replyDestination); - connection.start(); - - new Thread() { - @Override - public void run() { - while (working) { - // wait for messages in infinitive loop - // time out is set to show the client is awaiting - try { - TextMessage msg = (TextMessage) consumer.receive(20000); - if (msg == null) { - errorLatch.countDown(); - fail("Response timed out." + " latchCount=" + roundTripLatch.getCount()); - } - else { - String result = msg.getText(); - //System.out.println("Request:" + (i++) - // + ", msg=" + result + ", ID" + msg.getJMSMessageID()); - TextMessage response = session.createTextMessage(); - response.setJMSCorrelationID(msg.getJMSMessageID()); - response.setText(result); - sender.send(response); - } - } - catch (JMSException e) { - if (working) { - errorLatch.countDown(); - fail("Unexpected exception:" + e); - } - } - } - } - }.start(); - } - - class MessageSenderReceiver implements Runnable { - - Destination reqDest; - Destination replyDest; - String origMsg; - - public MessageSenderReceiver(Destination reqDest, Destination replyDest, String msg) throws Exception { - this.replyDest = replyDest; - this.reqDest = reqDest; - this.origMsg = msg; - } - - private int getIndexFromCurrentThread() { - String name = Thread.currentThread().getName(); - String num = name.substring(name.lastIndexOf('-') + 1); - int idx = Integer.parseInt(num) - 1; - assertTrue("idx is in range: idx=" + idx, idx < NUM_THREADS); - return idx; - } - - @Override - public void run() { - try { - // get thread session and producer from pool - int threadIndex = getIndexFromCurrentThread(); - Session session = sessions[threadIndex]; - MessageProducer producer = producers[threadIndex]; - - final Message sendJmsMsg = session.createTextMessage(origMsg); - producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - producer.send(sendJmsMsg); - - String jmsId = sendJmsMsg.getJMSMessageID(); - String selector = "JMSCorrelationID='" + jmsId + "'"; - - MessageConsumer consumer = session.createConsumer(replyDest, selector); - Message receiveJmsMsg = consumer.receive(2000); - consumer.close(); - if (receiveJmsMsg == null) { - errorLatch.countDown(); - fail("Unable to receive response for:" + origMsg + ", with selector=" + selector); - } - else { - //System.out.println("received response message :" - // + ((TextMessage) receiveJmsMsg).getText() - // + " with selector : " + selector); - roundTripLatch.countDown(); - } - } - catch (JMSException e) { - fail("unexpected exception:" + e); - } - } - } - - public class LimitedThreadFactory implements ThreadFactory { - - int threadCount; - private ThreadFactory factory; - - public LimitedThreadFactory(ThreadFactory threadFactory) { - this.factory = threadFactory; - } - - @Override - public Thread newThread(Runnable arg0) { - if (++threadCount > NUM_THREADS) { - errorLatch.countDown(); - fail("too many threads requested"); - } - return factory.newThread(arg0); - } - } -} - http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ab5b3b0c/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1936Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1936Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1936Test.java deleted file mode 100644 index 6e49550..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1936Test.java +++ /dev/null @@ -1,320 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import javax.jms.DeliveryMode; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.Queue; -import javax.jms.QueueConnection; -import javax.jms.QueueConnectionFactory; -import javax.jms.QueueReceiver; -import javax.jms.QueueSender; -import javax.jms.QueueSession; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.naming.NamingException; - -import junit.framework.TestCase; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.AutoFailTestSupport; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.util.Wait; -import org.apache.log4j.Logger; - -/** - * AMQ1936Test - */ -public class AMQ1936Test extends TestCase { - - private final static Logger logger = Logger.getLogger(AMQ1936Test.class); - private final static String TEST_QUEUE_NAME = "dynamicQueues/duplicate.message.test.queue"; - // //-- - // - private final static long TEST_MESSAGE_COUNT = 6000; // The number of test messages to use - // - // //-- - private final static int CONSUMER_COUNT = 2; // The number of message receiver instances - private final static boolean TRANSACTED_RECEIVE = true; // Flag used by receiver which indicates messages should be - // processed within a JMS transaction - - private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(CONSUMER_COUNT, CONSUMER_COUNT, Long.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue()); - private final ThreadedMessageReceiver[] receivers = new ThreadedMessageReceiver[CONSUMER_COUNT]; - private BrokerService broker = null; - static QueueConnectionFactory connectionFactory = null; - - @Override - protected void setUp() throws Exception { - super.setUp(); - - broker = new BrokerService(); - broker.getSystemUsage().getMemoryUsage().setLimit(5 * 1024 * 1024); - broker.setBrokerName("test"); - broker.setDeleteAllMessagesOnStartup(true); - broker.start(); - connectionFactory = new ActiveMQConnectionFactory("vm://test"); - } - - @Override - protected void tearDown() throws Exception { - super.tearDown(); - - if (threadPool != null) { - // signal receivers to stop - for (ThreadedMessageReceiver receiver : receivers) { - receiver.setShouldStop(true); - } - - logger.info("Waiting for receivers to shutdown.."); - if (!threadPool.awaitTermination(10, TimeUnit.SECONDS)) { - logger.warn("Not all receivers completed shutdown."); - } - else { - logger.info("All receivers shutdown successfully.."); - } - } - - logger.debug("Stoping the broker."); - - if (broker != null) { - broker.stop(); - } - } - - private void sendTextMessage(String queueName, int i) throws JMSException, NamingException { - QueueConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://test"); - QueueConnection queueConnection = null; - QueueSession session = null; - QueueSender sender = null; - Queue queue = null; - TextMessage message = null; - - try { - - // Create the queue connection - queueConnection = connectionFactory.createQueueConnection(); - - session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); - queue = session.createQueue(TEST_QUEUE_NAME); - sender = session.createSender(queue); - sender.setDeliveryMode(DeliveryMode.PERSISTENT); - - message = session.createTextMessage(String.valueOf(i)); - - // send the message - sender.send(message); - - if (session.getTransacted()) { - session.commit(); - } - if (i % 1000 == 0) { - logger.info("Message successfully sent to : " + queue.getQueueName() + " messageid: " + message.getJMSMessageID() + " content:" + message.getText()); - } - } - finally { - if (sender != null) { - sender.close(); - } - if (session != null) { - session.close(); - } - if (queueConnection != null) { - queueConnection.close(); - } - } - } - - public void testForDuplicateMessages() throws Exception { - final ConcurrentHashMap messages = new ConcurrentHashMap<>(); - final Object lock = new Object(); - final CountDownLatch duplicateSignal = new CountDownLatch(1); - final AtomicInteger messageCount = new AtomicInteger(0); - - // add 1/2 the number of our total messages - for (int i = 0; i < TEST_MESSAGE_COUNT / 2; i++) { - if (duplicateSignal.getCount() == 0) { - fail("Duplicate message id detected"); - } - sendTextMessage(TEST_QUEUE_NAME, i); - } - - // create a number of consumers to read of the messages and start them with a handler which simply stores the - // message ids - // in a Map and checks for a duplicate - for (int i = 0; i < CONSUMER_COUNT; i++) { - receivers[i] = new ThreadedMessageReceiver(TEST_QUEUE_NAME, new IMessageHandler() { - - @Override - public void onMessage(Message message) throws Exception { - synchronized (lock) { - int current = messageCount.incrementAndGet(); - if (current % 1000 == 0) { - logger.info("Received message:" + message.getJMSMessageID() + " with content: " + ((TextMessage) message).getText()); - } - if (messages.containsKey(message.getJMSMessageID())) { - duplicateSignal.countDown(); - logger.fatal("duplicate message id detected:" + message.getJMSMessageID()); - fail("Duplicate message id detected:" + message.getJMSMessageID()); - } - else { - messages.put(message.getJMSMessageID(), message.getJMSMessageID()); - } - } - } - }); - threadPool.submit(receivers[i]); - } - - // starting adding the remaining messages - for (int i = 0; i < TEST_MESSAGE_COUNT / 2; i++) { - if (duplicateSignal.getCount() == 0) { - fail("Duplicate message id detected"); - } - sendTextMessage(TEST_QUEUE_NAME, i); - } - - logger.info("sent all " + TEST_MESSAGE_COUNT + " messages"); - - // allow some time for messages to be delivered to receivers. - boolean ok = Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return TEST_MESSAGE_COUNT == messages.size(); - } - }, TimeUnit.MINUTES.toMillis(7)); - if (!ok) { - AutoFailTestSupport.dumpAllThreads("--STUCK?--"); - } - assertEquals("Number of messages received does not match the number sent", TEST_MESSAGE_COUNT, messages.size()); - assertEquals(TEST_MESSAGE_COUNT, messageCount.get()); - } - - private final static class ThreadedMessageReceiver implements Runnable { - - private IMessageHandler handler = null; - private final AtomicBoolean shouldStop = new AtomicBoolean(false); - - public ThreadedMessageReceiver(String queueName, IMessageHandler handler) { - this.handler = handler; - } - - @Override - public void run() { - - QueueConnection queueConnection = null; - QueueSession session = null; - QueueReceiver receiver = null; - Queue queue = null; - Message message = null; - try { - try { - - queueConnection = connectionFactory.createQueueConnection(); - // create a transacted session - session = queueConnection.createQueueSession(TRANSACTED_RECEIVE, Session.AUTO_ACKNOWLEDGE); - queue = session.createQueue(TEST_QUEUE_NAME); - receiver = session.createReceiver(queue); - - // start the connection - queueConnection.start(); - - logger.info("Receiver " + Thread.currentThread().getName() + " connected."); - - // start receive loop - while (!(shouldStop.get() || Thread.currentThread().isInterrupted())) { - try { - message = receiver.receive(200); - } - catch (Exception e) { - // - // ignore interrupted exceptions - // - if (e instanceof InterruptedException || e.getCause() instanceof InterruptedException) { - /* ignore */ - } - else { - throw e; - } - } - - if (message != null && this.handler != null) { - this.handler.onMessage(message); - } - - // commit session on successful handling of message - if (session.getTransacted()) { - session.commit(); - } - } - - logger.info("Receiver " + Thread.currentThread().getName() + " shutting down."); - - } - finally { - if (receiver != null) { - try { - receiver.close(); - } - catch (JMSException e) { - logger.warn(e); - } - } - if (session != null) { - try { - session.close(); - } - catch (JMSException e) { - logger.warn(e); - } - } - if (queueConnection != null) { - queueConnection.close(); - } - } - } - catch (JMSException e) { - logger.error(e); - e.printStackTrace(); - } - catch (NamingException e) { - logger.error(e); - } - catch (Exception e) { - logger.error(e); - e.printStackTrace(); - } - } - - public void setShouldStop(Boolean shouldStop) { - this.shouldStop.set(shouldStop); - } - } - - public interface IMessageHandler { - - void onMessage(Message message) throws Exception; - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ab5b3b0c/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2021Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2021Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2021Test.java deleted file mode 100644 index 7236581..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2021Test.java +++ /dev/null @@ -1,275 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.lang.Thread.UncaughtExceptionHandler; -import java.util.ArrayList; -import java.util.Vector; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import javax.jms.Connection; -import javax.jms.ExceptionListener; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.ActiveMQTopic; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestName; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This is a test case for the issue reported at: https://issues.apache.org/activemq/browse/AMQ-2021 Bug is modification - * of inflight message properties so the failure can manifest itself in a bunch or ways, from message receipt with null - * properties to marshall errors - */ -public class AMQ2021Test implements ExceptionListener, UncaughtExceptionHandler { - - private static final Logger log = LoggerFactory.getLogger(AMQ2021Test.class); - BrokerService brokerService; - ArrayList threads = new ArrayList<>(); - Vector exceptions; - - @Rule - public TestName name = new TestName(); - - AMQ2021Test testCase; - - private final String ACTIVEMQ_BROKER_BIND = "tcp://localhost:0"; - private String CONSUMER_BROKER_URL = "?jms.redeliveryPolicy.maximumRedeliveries=1&jms.redeliveryPolicy.initialRedeliveryDelay=0"; - private String PRODUCER_BROKER_URL; - - private final int numMessages = 1000; - private final int numConsumers = 2; - private final int dlqMessages = numMessages / 2; - - private CountDownLatch receivedLatch; - private ActiveMQTopic destination; - private CountDownLatch started; - - @Before - public void setUp() throws Exception { - Thread.setDefaultUncaughtExceptionHandler(this); - testCase = this; - - // Start an embedded broker up. - brokerService = new BrokerService(); - brokerService.setDeleteAllMessagesOnStartup(true); - brokerService.addConnector(ACTIVEMQ_BROKER_BIND); - brokerService.start(); - destination = new ActiveMQTopic(name.getMethodName()); - exceptions = new Vector<>(); - - CONSUMER_BROKER_URL = brokerService.getTransportConnectors().get(0).getPublishableConnectString() + CONSUMER_BROKER_URL; - PRODUCER_BROKER_URL = brokerService.getTransportConnectors().get(0).getPublishableConnectString(); - - receivedLatch = new CountDownLatch(numConsumers * (numMessages + dlqMessages)); - started = new CountDownLatch(1); - } - - @After - public void tearDown() throws Exception { - for (Thread t : threads) { - t.interrupt(); - t.join(); - } - brokerService.stop(); - } - - @Test(timeout = 240000) - public void testConcurrentTopicResendToDLQ() throws Exception { - - for (int i = 0; i < numConsumers; i++) { - ConsumerThread c1 = new ConsumerThread("Consumer-" + i); - threads.add(c1); - c1.start(); - } - - assertTrue(started.await(10, TimeUnit.SECONDS)); - - Thread producer = new Thread() { - @Override - public void run() { - try { - produce(numMessages); - } - catch (Exception e) { - } - } - }; - threads.add(producer); - producer.start(); - - boolean allGood = receivedLatch.await(90, TimeUnit.SECONDS); - for (Throwable t : exceptions) { - log.error("failing test with first exception", t); - fail("exception during test : " + t); - } - assertTrue("excepted messages received within time limit", allGood); - - assertEquals(0, exceptions.size()); - - for (int i = 0; i < numConsumers; i++) { - // last recovery sends message to deq so is not received again - assertEquals(dlqMessages * 2, ((ConsumerThread) threads.get(i)).recoveries); - assertEquals(numMessages + dlqMessages, ((ConsumerThread) threads.get(i)).counter); - } - - // half of the messages for each consumer should go to the dlq but duplicates will - // be suppressed - consumeFromDLQ(dlqMessages); - - } - - private void consumeFromDLQ(int messageCount) throws Exception { - ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(CONSUMER_BROKER_URL); - Connection connection = connectionFactory.createConnection(); - connection.start(); - - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer dlqConsumer = session.createConsumer(new ActiveMQQueue("ActiveMQ.DLQ")); - int count = 0; - for (int i = 0; i < messageCount; i++) { - if (dlqConsumer.receive(1000) == null) { - break; - } - count++; - } - assertEquals(messageCount, count); - } - - public void produce(int count) throws Exception { - Connection connection = null; - try { - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(PRODUCER_BROKER_URL); - connection = factory.createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(destination); - producer.setTimeToLive(0); - connection.start(); - - for (int i = 0; i < count; i++) { - int id = i + 1; - TextMessage message = session.createTextMessage(name.getMethodName() + " Message " + id); - message.setIntProperty("MsgNumber", id); - producer.send(message); - - if (id % 500 == 0) { - log.info("sent " + id + ", ith " + message); - } - } - } - catch (JMSException e) { - log.error("unexpected ex on produce", e); - exceptions.add(e); - } - finally { - try { - if (connection != null) { - connection.close(); - } - } - catch (Throwable e) { - } - } - } - - public class ConsumerThread extends Thread implements MessageListener { - - public long counter = 0; - public long recoveries = 0; - private Session session; - - public ConsumerThread(String threadId) { - super(threadId); - } - - @Override - public void run() { - try { - ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(CONSUMER_BROKER_URL); - Connection connection = connectionFactory.createConnection(); - connection.setExceptionListener(testCase); - connection.setClientID(getName()); - session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - MessageConsumer consumer = session.createDurableSubscriber(destination, getName()); - consumer.setMessageListener(this); - connection.start(); - - started.countDown(); - - } - catch (JMSException exception) { - log.error("unexpected ex in consumer run", exception); - exceptions.add(exception); - } - } - - @Override - public void onMessage(Message message) { - try { - counter++; - int messageNumber = message.getIntProperty("MsgNumber"); - if (messageNumber % 2 == 0) { - session.recover(); - recoveries++; - } - else { - message.acknowledge(); - } - - if (counter % 200 == 0) { - log.info("recoveries:" + recoveries + ", Received " + counter + ", counter'th " + message); - } - receivedLatch.countDown(); - } - catch (Exception e) { - log.error("unexpected ex on onMessage", e); - exceptions.add(e); - } - } - - } - - @Override - public void onException(JMSException exception) { - log.info("Unexpected JMSException", exception); - exceptions.add(exception); - } - - @Override - public void uncaughtException(Thread thread, Throwable exception) { - log.info("Unexpected exception from thread " + thread + ", ex: " + exception); - exceptions.add(exception); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ab5b3b0c/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2084Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2084Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2084Test.java deleted file mode 100644 index de9f2b5..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2084Test.java +++ /dev/null @@ -1,188 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.util.Properties; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import javax.jms.Message; -import javax.jms.MessageListener; -import javax.jms.Queue; -import javax.jms.QueueConnection; -import javax.jms.QueueConnectionFactory; -import javax.jms.QueueReceiver; -import javax.jms.QueueSession; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.jms.Topic; -import javax.jms.TopicConnection; -import javax.jms.TopicConnectionFactory; -import javax.jms.TopicPublisher; -import javax.jms.TopicSession; -import javax.jms.TopicSubscriber; -import javax.naming.InitialContext; - -import org.apache.activemq.broker.BrokerService; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AMQ2084Test { - - private static final Logger LOG = LoggerFactory.getLogger(AMQ2084Test.class); - BrokerService broker; - CountDownLatch qreceived; - String connectionUri; - - @Before - public void startBroker() throws Exception { - broker = new BrokerService(); - broker.setPersistent(false); - connectionUri = broker.addConnector("tcp://localhost:0").getPublishableConnectString(); - broker.start(); - - qreceived = new CountDownLatch(1); - } - - @After - public void stopBroker() throws Exception { - if (broker != null) { - broker.stop(); - } - } - - public void listenQueue(final String queueName, final String selectors) { - try { - Properties props = new Properties(); - props.put("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory"); - props.put("java.naming.provider.url", connectionUri); - props.put("queue.queueName", queueName); - - javax.naming.Context ctx = new InitialContext(props); - QueueConnectionFactory factory = (QueueConnectionFactory) ctx.lookup("ConnectionFactory"); - QueueConnection conn = factory.createQueueConnection(); - final Queue queue = (Queue) ctx.lookup("queueName"); - QueueSession session = conn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); - QueueReceiver receiver = session.createReceiver(queue, selectors); - System.out.println("Message Selector: " + receiver.getMessageSelector()); - receiver.setMessageListener(new MessageListener() { - @Override - public void onMessage(Message message) { - try { - if (message instanceof TextMessage) { - TextMessage txtMsg = (TextMessage) message; - String msg = txtMsg.getText(); - LOG.info("Queue Message Received: " + queueName + " - " + msg); - qreceived.countDown(); - - } - message.acknowledge(); - } - catch (Throwable e) { - e.printStackTrace(); - } - } - }); - conn.start(); - } - catch (Exception e) { - e.printStackTrace(); - } - } - - public void listenTopic(final String topicName, final String selectors) { - try { - Properties props = new Properties(); - props.put("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory"); - props.put("java.naming.provider.url", connectionUri); - props.put("topic.topicName", topicName); - - javax.naming.Context ctx = new InitialContext(props); - TopicConnectionFactory factory = (TopicConnectionFactory) ctx.lookup("ConnectionFactory"); - TopicConnection conn = factory.createTopicConnection(); - final Topic topic = (Topic) ctx.lookup("topicName"); - TopicSession session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); - TopicSubscriber receiver = session.createSubscriber(topic, selectors, false); - - receiver.setMessageListener(new MessageListener() { - @Override - public void onMessage(Message message) { - try { - if (message instanceof TextMessage) { - TextMessage txtMsg = (TextMessage) message; - String msg = txtMsg.getText(); - LOG.info("Topic Message Received: " + topicName + " - " + msg); - } - message.acknowledge(); - } - catch (Exception e) { - e.printStackTrace(); - } - } - }); - conn.start(); - } - catch (Exception e) { - e.printStackTrace(); - } - } - - public void publish(String topicName, String message) { - try { - Properties props = new Properties(); - props.put("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory"); - props.put("java.naming.provider.url", connectionUri); - props.put("topic.topicName", topicName); - javax.naming.Context ctx = new InitialContext(props); - TopicConnectionFactory factory = (TopicConnectionFactory) ctx.lookup("ConnectionFactory"); - TopicConnection conn = factory.createTopicConnection(); - Topic topic = (Topic) ctx.lookup("topicName"); - TopicSession session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); - TopicPublisher publisher = session.createPublisher(topic); - if (message != null) { - Message msg = session.createTextMessage(message); - publisher.send(msg); - } - } - catch (Exception e) { - e.printStackTrace(); - } - } - - @Test - public void tryXpathSelectorMatch() throws Exception { - String xPath = "XPATH '//books//book[@lang=''en'']'"; - listenQueue("Consumer.Sample.VirtualTopic.TestXpath", xPath); - publish("VirtualTopic.TestXpath", "ABC"); - assertTrue("topic received: ", qreceived.await(20, TimeUnit.SECONDS)); - } - - @Test - public void tryXpathSelectorNoMatch() throws Exception { - String xPath = "XPATH '//books//book[@lang=''es'']'"; - listenQueue("Consumer.Sample.VirtualTopic.TestXpath", xPath); - publish("VirtualTopic.TestXpath", "ABC"); - assertFalse("topic did not receive unmatched", qreceived.await(5, TimeUnit.SECONDS)); - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ab5b3b0c/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2103Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2103Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2103Test.java deleted file mode 100644 index 8067305..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2103Test.java +++ /dev/null @@ -1,130 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import javax.jms.Connection; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import junit.framework.Test; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerTestSupport; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQMapMessage; -import org.apache.activemq.command.ActiveMQObjectMessage; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.ActiveMQTextMessage; -import org.apache.activemq.usecases.MyObject; - -public class AMQ2103Test extends BrokerTestSupport { - - static PolicyEntry reduceMemoryFootprint = new PolicyEntry(); - - static { - reduceMemoryFootprint.setReduceMemoryFootprint(true); - } - - public PolicyEntry defaultPolicy = reduceMemoryFootprint; - - @Override - protected PolicyEntry getDefaultPolicy() { - return defaultPolicy; - } - - public void initCombosForTestVerifyMarshalledStateIsCleared() throws Exception { - addCombinationValues("defaultPolicy", new Object[]{defaultPolicy, null}); - } - - public static Test suite() { - return suite(AMQ2103Test.class); - } - - /** - * use mem persistence so no marshaling, - * reduceMemoryFootprint on/off that will reduce memory by whacking the marshaled state - * With vm transport and deferred serialisation and no persistence (mem persistence), - * we see the message as sent by the client so we can validate the contents against - * the policy - * - * @throws Exception - */ - public void testVerifyMarshalledStateIsCleared() throws Exception { - - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost"); - factory.setOptimizedMessageDispatch(true); - factory.setObjectMessageSerializationDefered(true); - factory.setCopyMessageOnSend(false); - - Connection connection = factory.createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - ActiveMQDestination destination = new ActiveMQQueue("testQ"); - MessageConsumer consumer = session.createConsumer(destination); - connection.start(); - - MessageProducer producer = session.createProducer(destination); - final MyObject obj = new MyObject("A message"); - ActiveMQObjectMessage m1 = (ActiveMQObjectMessage) session.createObjectMessage(); - m1.setObject(obj); - producer.send(m1); - - ActiveMQTextMessage m2 = new ActiveMQTextMessage(); - m2.setText("Test Message Payload."); - producer.send(m2); - - ActiveMQMapMessage m3 = new ActiveMQMapMessage(); - m3.setString("text", "my message"); - producer.send(m3); - - Message m = consumer.receive(maxWait); - assertNotNull(m); - assertEquals(m1.getMessageId().toString(), m.getJMSMessageID()); - assertTrue(m instanceof ActiveMQObjectMessage); - - if (getDefaultPolicy() != null) { - assertNull("object data cleared by reduceMemoryFootprint (and never marshalled as using mem persistence)", ((ActiveMQObjectMessage) m).getObject()); - } - - // verify no serialisation via vm transport - assertEquals("writeObject called", 0, obj.getWriteObjectCalled()); - assertEquals("readObject called", 0, obj.getReadObjectCalled()); - assertEquals("readObjectNoData called", 0, obj.getReadObjectNoDataCalled()); - - m = consumer.receive(maxWait); - assertNotNull(m); - assertEquals(m2.getMessageId().toString(), m.getJMSMessageID()); - assertTrue(m instanceof ActiveMQTextMessage); - - if (getDefaultPolicy() != null) { - assertNull("text cleared by reduceMemoryFootprint (and never marshalled as using mem persistence)", ((ActiveMQTextMessage) m).getText()); - } - - m = consumer.receive(maxWait); - assertNotNull(m); - assertEquals(m3.getMessageId().toString(), m.getJMSMessageID()); - assertTrue(m instanceof ActiveMQMapMessage); - - if (getDefaultPolicy() != null) { - assertNull("text cleared by reduceMemoryFootprint (and never marshalled as using mem persistence)", ((ActiveMQMapMessage) m).getStringProperty("text")); - } - - connection.close(); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ab5b3b0c/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149LevelDBTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149LevelDBTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149LevelDBTest.java deleted file mode 100644 index 8cda3ef..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149LevelDBTest.java +++ /dev/null @@ -1,30 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.leveldb.LevelDBStore; - -public class AMQ2149LevelDBTest extends AMQ2149Test { - - @Override - protected void configurePersistenceAdapter(BrokerService brokerService) throws Exception { - LevelDBStore persistenceFactory = new LevelDBStore(); - persistenceFactory.setDirectory(dataDirFile); - brokerService.setPersistenceAdapter(persistenceFactory); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ab5b3b0c/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java deleted file mode 100644 index 19dbf0e..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java +++ /dev/null @@ -1,614 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.bugs; - -import java.io.File; -import java.lang.IllegalStateException; -import java.util.HashSet; -import java.util.Timer; -import java.util.TimerTask; -import java.util.Vector; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicLong; - -import javax.jms.*; - -import org.apache.activemq.AutoFailTestSupport; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.region.Destination; -import org.apache.activemq.broker.region.DestinationStatistics; -import org.apache.activemq.broker.region.RegionBroker; -import org.apache.activemq.broker.util.LoggingBrokerPlugin; -import org.apache.activemq.command.ActiveMQDestination; -import org.junit.rules.TestName; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.junit.Assert.*; - -interface Configurer { - - public void configure(BrokerService broker) throws Exception; -} - -public class AMQ2149Test { - - private static final Logger LOG = LoggerFactory.getLogger(AMQ2149Test.class); - @Rule - public TestName testName = new TestName(); - - private static final String BROKER_CONNECTOR = "tcp://localhost:61617"; - private static final String DEFAULT_BROKER_URL = "failover:(" + BROKER_CONNECTOR + ")?maxReconnectDelay=1000&useExponentialBackOff=false"; - - private final String SEQ_NUM_PROPERTY = "seqNum"; - - final int MESSAGE_LENGTH_BYTES = 75 * 1024; - final long SLEEP_BETWEEN_SEND_MS = 25; - final int NUM_SENDERS_AND_RECEIVERS = 10; - final Object brokerLock = new Object(); - - private static final long DEFAULT_BROKER_STOP_PERIOD = 10 * 1000; - private static final long DEFAULT_NUM_TO_SEND = 1400; - - long brokerStopPeriod = DEFAULT_BROKER_STOP_PERIOD; - long numtoSend = DEFAULT_NUM_TO_SEND; - long sleepBetweenSend = SLEEP_BETWEEN_SEND_MS; - String brokerURL = DEFAULT_BROKER_URL; - - int numBrokerRestarts = 0; - final static int MAX_BROKER_RESTARTS = 4; - BrokerService broker; - Vector exceptions = new Vector<>(); - - protected File dataDirFile; - final LoggingBrokerPlugin[] plugins = new LoggingBrokerPlugin[]{new LoggingBrokerPlugin()}; - - public void createBroker(Configurer configurer) throws Exception { - broker = new BrokerService(); - configurePersistenceAdapter(broker); - - broker.getSystemUsage().getMemoryUsage().setLimit(MESSAGE_LENGTH_BYTES * 200 * NUM_SENDERS_AND_RECEIVERS); - - broker.addConnector(BROKER_CONNECTOR); - broker.setBrokerName(testName.getMethodName()); - broker.setDataDirectoryFile(dataDirFile); - if (configurer != null) { - configurer.configure(broker); - } - broker.start(); - } - - protected void configurePersistenceAdapter(BrokerService brokerService) throws Exception { - } - - @Before - public void setUp() throws Exception { - LOG.debug("Starting test {}", testName.getMethodName()); - dataDirFile = new File("target/" + testName.getMethodName()); - numtoSend = DEFAULT_NUM_TO_SEND; - brokerStopPeriod = DEFAULT_BROKER_STOP_PERIOD; - sleepBetweenSend = SLEEP_BETWEEN_SEND_MS; - brokerURL = DEFAULT_BROKER_URL; - } - - @After - public void tearDown() throws Exception { - ExecutorService executor = Executors.newSingleThreadExecutor(); - Future future = executor.submit(new TeardownTask(brokerLock, broker)); - try { - LOG.debug("Teardown started."); - long start = System.currentTimeMillis(); - Boolean result = future.get(30, TimeUnit.SECONDS); - long finish = System.currentTimeMillis(); - LOG.debug("Result of teardown: {} after {} ms ", result, (finish - start)); - } - catch (TimeoutException e) { - fail("Teardown timed out"); - AutoFailTestSupport.dumpAllThreads(testName.getMethodName()); - } - executor.shutdownNow(); - exceptions.clear(); - } - - private String buildLongString() { - final StringBuilder stringBuilder = new StringBuilder(MESSAGE_LENGTH_BYTES); - for (int i = 0; i < MESSAGE_LENGTH_BYTES; ++i) { - stringBuilder.append((int) (Math.random() * 10)); - } - return stringBuilder.toString(); - } - - HashSet connections = new HashSet<>(); - - private class Receiver implements MessageListener { - - private final javax.jms.Destination dest; - - private final Connection connection; - - private final Session session; - - private final MessageConsumer messageConsumer; - - private AtomicLong nextExpectedSeqNum = new AtomicLong(); - - private final boolean transactional; - - private String lastId = null; - - public Receiver(javax.jms.Destination dest, boolean transactional) throws JMSException { - this.dest = dest; - this.transactional = transactional; - connection = new ActiveMQConnectionFactory(brokerURL).createConnection(); - connection.setClientID(dest.toString()); - session = connection.createSession(transactional, transactional ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE); - if (ActiveMQDestination.transform(dest).isTopic()) { - messageConsumer = session.createDurableSubscriber((Topic) dest, dest.toString()); - } - else { - messageConsumer = session.createConsumer(dest); - } - messageConsumer.setMessageListener(this); - connection.start(); - connections.add(connection); - } - - public void close() throws JMSException { - connection.close(); - } - - public long getNextExpectedSeqNo() { - return nextExpectedSeqNum.get(); - } - - final int TRANSACITON_BATCH = 500; - boolean resumeOnNextOrPreviousIsOk = false; - - @Override - public void onMessage(Message message) { - try { - final long seqNum = message.getLongProperty(SEQ_NUM_PROPERTY); - if ((seqNum % TRANSACITON_BATCH) == 0) { - LOG.info(dest + " received " + seqNum); - - if (transactional) { - LOG.info("committing.."); - session.commit(); - } - } - if (resumeOnNextOrPreviousIsOk) { - // after an indoubt commit we need to accept what we get (within reason) - if (seqNum != nextExpectedSeqNum.get()) { - final long l = nextExpectedSeqNum.get(); - if (seqNum == l - (TRANSACITON_BATCH - 1)) { - nextExpectedSeqNum.compareAndSet(l, l - (TRANSACITON_BATCH - 1)); - LOG.info("In doubt commit failed, getting replay at:" + nextExpectedSeqNum); - } - } - resumeOnNextOrPreviousIsOk = false; - } - if (seqNum != nextExpectedSeqNum.get()) { - LOG.warn(dest + " received " + seqNum + " in msg: " + message.getJMSMessageID() + " expected " + nextExpectedSeqNum + ", lastId: " + lastId + ", message:" + message); - fail(dest + " received " + seqNum + " expected " + nextExpectedSeqNum); - } - nextExpectedSeqNum.incrementAndGet(); - lastId = message.getJMSMessageID(); - } - catch (TransactionRolledBackException expectedSometimesOnFailoverRecovery) { - LOG.info("got rollback: " + expectedSometimesOnFailoverRecovery); - if (expectedSometimesOnFailoverRecovery.getMessage().contains("completion in doubt")) { - // in doubt - either commit command or reply missing - // don't know if we will get a replay - resumeOnNextOrPreviousIsOk = true; - nextExpectedSeqNum.incrementAndGet(); - LOG.info("in doubt transaction completion: ok to get next or previous batch. next:" + nextExpectedSeqNum); - } - else { - resumeOnNextOrPreviousIsOk = false; - // batch will be replayed - nextExpectedSeqNum.addAndGet(-(TRANSACITON_BATCH - 1)); - } - - } - catch (Throwable e) { - LOG.error(dest + " onMessage error", e); - exceptions.add(e); - } - } - - } - - private class Sender implements Runnable { - - private final javax.jms.Destination dest; - - private final Connection connection; - - private final Session session; - - private final MessageProducer messageProducer; - - private volatile long nextSequenceNumber = 0; - private final Object guard = new Object(); - - public Sender(javax.jms.Destination dest) throws JMSException { - this.dest = dest; - connection = new ActiveMQConnectionFactory(brokerURL).createConnection(); - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - messageProducer = session.createProducer(dest); - messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT); - connection.start(); - connections.add(connection); - } - - @Override - public void run() { - final String longString = buildLongString(); - long nextSequenceNumber = this.nextSequenceNumber; - while (nextSequenceNumber < numtoSend) { - try { - final Message message = session.createTextMessage(longString); - message.setLongProperty(SEQ_NUM_PROPERTY, nextSequenceNumber); - synchronized (guard) { - if (nextSequenceNumber == this.nextSequenceNumber) { - this.nextSequenceNumber = nextSequenceNumber + 1; - messageProducer.send(message); - } - else { - continue; - } - } - - if ((nextSequenceNumber % 500) == 0) { - LOG.info(dest + " sent " + nextSequenceNumber); - } - - } - catch (javax.jms.IllegalStateException e) { - LOG.error(dest + " bailing on send error", e); - exceptions.add(e); - break; - } - catch (Exception e) { - LOG.error(dest + " send error", e); - exceptions.add(e); - } - if (sleepBetweenSend > 0) { - try { - Thread.sleep(sleepBetweenSend); - } - catch (InterruptedException e) { - LOG.warn(dest + " sleep interrupted", e); - } - } - } - try { - connection.close(); - } - catch (JMSException ignored) { - } - } - } - - // attempt to simply replicate leveldb failure. no joy yet - public void x_testRestartReReceive() throws Exception { - createBroker(new Configurer() { - @Override - public void configure(BrokerService broker) throws Exception { - broker.deleteAllMessages(); - } - }); - - final javax.jms.Destination destination = ActiveMQDestination.createDestination("test.dest.X", ActiveMQDestination.QUEUE_TYPE); - Thread thread = new Thread(new Sender(destination)); - thread.start(); - thread.join(); - - Connection connection = new ActiveMQConnectionFactory(brokerURL).createConnection(); - connection.setClientID(destination.toString()); - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - MessageConsumer messageConsumer = session.createConsumer(destination); - connection.start(); - - int batch = 200; - long expectedSeq; - - final TimerTask restartTask = scheduleRestartTask(null, new Configurer() { - @Override - public void configure(BrokerService broker) throws Exception { - } - }); - - expectedSeq = 0; - for (int s = 0; s < 4; s++) { - for (int i = 0; i < batch; i++) { - Message message = messageConsumer.receive(20000); - assertNotNull("s:" + s + ", i:" + i, message); - final long seqNum = message.getLongProperty(SEQ_NUM_PROPERTY); - assertEquals("expected order s:" + s, expectedSeq++, seqNum); - - if (i > 0 && i % 600 == 0) { - LOG.info("Commit on %5"); - // session.commit(); - } - } - restartTask.run(); - } - - } - - // no need to run this unless there are some issues with the others - public void vanilaVerify_testOrder() throws Exception { - - createBroker(new Configurer() { - @Override - public void configure(BrokerService broker) throws Exception { - broker.deleteAllMessages(); - } - }); - - verifyOrderedMessageReceipt(); - verifyStats(false); - } - - @Test(timeout = 5 * 60 * 1000) - public void testOrderWithRestart() throws Exception { - createBroker(new Configurer() { - @Override - public void configure(BrokerService broker) throws Exception { - broker.deleteAllMessages(); - } - }); - - final Timer timer = new Timer(); - scheduleRestartTask(timer, new Configurer() { - @Override - public void configure(BrokerService broker) throws Exception { - } - }); - - try { - verifyOrderedMessageReceipt(); - } - finally { - timer.cancel(); - } - - verifyStats(true); - } - - @Test(timeout = 5 * 60 * 1000) - public void testTopicOrderWithRestart() throws Exception { - createBroker(new Configurer() { - @Override - public void configure(BrokerService broker) throws Exception { - broker.deleteAllMessages(); - } - }); - - final Timer timer = new Timer(); - scheduleRestartTask(timer, null); - - try { - verifyOrderedMessageReceipt(ActiveMQDestination.TOPIC_TYPE); - } - finally { - timer.cancel(); - } - - verifyStats(true); - } - - @Test(timeout = 5 * 60 * 1000) - public void testQueueTransactionalOrderWithRestart() throws Exception { - doTestTransactionalOrderWithRestart(ActiveMQDestination.QUEUE_TYPE); - } - - @Test(timeout = 5 * 60 * 1000) - public void testTopicTransactionalOrderWithRestart() throws Exception { - doTestTransactionalOrderWithRestart(ActiveMQDestination.TOPIC_TYPE); - } - - public void doTestTransactionalOrderWithRestart(byte destinationType) throws Exception { - numtoSend = 10000; - sleepBetweenSend = 3; - brokerStopPeriod = 10 * 1000; - - createBroker(new Configurer() { - @Override - public void configure(BrokerService broker) throws Exception { - broker.deleteAllMessages(); - } - }); - - final Timer timer = new Timer(); - scheduleRestartTask(timer, null); - - try { - verifyOrderedMessageReceipt(destinationType, 1, true); - } - finally { - timer.cancel(); - } - - verifyStats(true); - } - - private void verifyStats(boolean brokerRestarts) throws Exception { - RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker(); - - for (Destination dest : regionBroker.getQueueRegion().getDestinationMap().values()) { - DestinationStatistics stats = dest.getDestinationStatistics(); - if (brokerRestarts) { - // all bets are off w.r.t stats as there may be duplicate sends and duplicate - // dispatches, all of which will be suppressed - either by the reference store - // not allowing duplicate references or consumers acking duplicates - LOG.info("with restart: not asserting qneue/dequeue stat match for: " + dest.getName() + " " + stats.getEnqueues().getCount() + " <= " + stats.getDequeues().getCount()); - } - else { - assertEquals("qneue/dequeue match for: " + dest.getName(), stats.getEnqueues().getCount(), stats.getDequeues().getCount()); - } - } - } - - private TimerTask scheduleRestartTask(final Timer timer, final Configurer configurer) { - class RestartTask extends TimerTask { - - @Override - public void run() { - synchronized (brokerLock) { - LOG.info("stopping broker.."); - try { - broker.stop(); - broker.waitUntilStopped(); - } - catch (Exception e) { - LOG.error("ex on broker stop", e); - exceptions.add(e); - } - LOG.info("restarting broker"); - try { - createBroker(configurer); - broker.waitUntilStarted(); - } - catch (Exception e) { - LOG.error("ex on broker restart", e); - exceptions.add(e); - } - } - if (++numBrokerRestarts < MAX_BROKER_RESTARTS && timer != null) { - // do it again - try { - timer.schedule(new RestartTask(), brokerStopPeriod); - } - catch (IllegalStateException ignore_alreadyCancelled) { - } - } - else { - LOG.info("no longer stopping broker on reaching Max restarts: " + MAX_BROKER_RESTARTS); - } - } - } - RestartTask task = new RestartTask(); - if (timer != null) { - timer.schedule(task, brokerStopPeriod); - } - return task; - } - - private void verifyOrderedMessageReceipt(byte destinationType) throws Exception { - verifyOrderedMessageReceipt(destinationType, NUM_SENDERS_AND_RECEIVERS, false); - } - - private void verifyOrderedMessageReceipt() throws Exception { - verifyOrderedMessageReceipt(ActiveMQDestination.QUEUE_TYPE, NUM_SENDERS_AND_RECEIVERS, false); - } - - private void verifyOrderedMessageReceipt(byte destinationType, - int concurrentPairs, - boolean transactional) throws Exception { - - Vector threads = new Vector<>(); - Vector receivers = new Vector<>(); - - for (int i = 0; i < concurrentPairs; ++i) { - final javax.jms.Destination destination = ActiveMQDestination.createDestination("test.dest." + i, destinationType); - receivers.add(new Receiver(destination, transactional)); - Thread thread = new Thread(new Sender(destination)); - thread.start(); - threads.add(thread); - } - - final long expiry = System.currentTimeMillis() + 1000 * 60 * 4; - while (!threads.isEmpty() && exceptions.isEmpty() && System.currentTimeMillis() < expiry) { - Thread sendThread = threads.firstElement(); - sendThread.join(1000 * 30); - if (!sendThread.isAlive()) { - threads.remove(sendThread); - } - else { - AutoFailTestSupport.dumpAllThreads("Send blocked"); - } - } - LOG.info("senders done..." + threads); - - while (!receivers.isEmpty() && System.currentTimeMillis() < expiry) { - Receiver receiver = receivers.firstElement(); - if (receiver.getNextExpectedSeqNo() >= numtoSend || !exceptions.isEmpty()) { - receiver.close(); - receivers.remove(receiver); - } - } - - for (Connection connection : connections) { - try { - connection.close(); - } - catch (Exception ignored) { - } - } - connections.clear(); - - assertTrue("No timeout waiting for senders/receivers to complete", System.currentTimeMillis() < expiry); - if (!exceptions.isEmpty()) { - exceptions.get(0).printStackTrace(); - } - - LOG.info("Dangling threads: " + threads); - for (Thread dangling : threads) { - dangling.interrupt(); - dangling.join(10 * 1000); - } - - assertTrue("No exceptions", exceptions.isEmpty()); - } - -} - -class TeardownTask implements Callable { - - private final Object brokerLock; - private BrokerService broker; - - public TeardownTask(Object brokerLock, BrokerService broker) { - this.brokerLock = brokerLock; - this.broker = broker; - } - - @Override - public Boolean call() throws Exception { - synchronized (brokerLock) { - if (broker != null) { - broker.stop(); - broker.waitUntilStopped(); - } - } - return Boolean.TRUE; - } -}