Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 350DB184AC for ; Mon, 22 Feb 2016 15:54:30 +0000 (UTC) Received: (qmail 61859 invoked by uid 500); 22 Feb 2016 15:38:32 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 61788 invoked by uid 500); 22 Feb 2016 15:38:32 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 61483 invoked by uid 99); 22 Feb 2016 15:38:31 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 22 Feb 2016 15:38:31 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 027AAE0492; Mon, 22 Feb 2016 15:38:26 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Mon, 22 Feb 2016 15:38:54 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [30/47] activemq-artemis git commit: open wire changes equivalent to ab16f7098fb52d2b4c40627ed110e1776525f208 http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/34074d71/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266SingleDestTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266SingleDestTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266SingleDestTest.java deleted file mode 100644 index 6cf42f3..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266SingleDestTest.java +++ /dev/null @@ -1,617 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.TreeSet; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.QueueConnection; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.RedeliveryPolicy; -import org.apache.activemq.TestSupport; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.TransportConnector; -import org.apache.activemq.broker.region.RegionBroker; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.command.ActiveMQQueue; -import org.junit.After; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.junit.Assert.assertEquals; - -/** - * Non transactional concurrent producer/consumer to single dest - */ -@RunWith(Parameterized.class) -public class AMQ5266SingleDestTest { - - static Logger LOG = LoggerFactory.getLogger(AMQ5266SingleDestTest.class); - String activemqURL; - BrokerService brokerService; - - public int numDests = 1; - public int messageSize = 10 * 1000; - - @Parameterized.Parameter(0) - public int publisherMessagesPerThread = 1000; - - @Parameterized.Parameter(1) - public int publisherThreadCount = 20; - - @Parameterized.Parameter(2) - public int consumerThreadsPerQueue = 5; - - @Parameterized.Parameter(3) - public int destMemoryLimit = 50 * 1024; - - @Parameterized.Parameter(4) - public boolean useCache = true; - - @Parameterized.Parameter(5) - public TestSupport.PersistenceAdapterChoice persistenceAdapterChoice = TestSupport.PersistenceAdapterChoice.KahaDB; - - @Parameterized.Parameter(6) - public boolean optimizeDispatch = false; - - @Parameterized.Parameters(name = "#{0},producerThreads:{1},consumerThreads:{2},mL:{3},useCache:{4},useDefaultStore:{5},optimizedDispatch:{6}") - public static Iterable parameters() { - return Arrays.asList(new Object[][]{{1000, 40, 40, 1024 * 1024 * 1, true, TestSupport.PersistenceAdapterChoice.KahaDB, false}, {1000, 40, 40, 1024 * 1024 * 1, true, TestSupport.PersistenceAdapterChoice.LevelDB, false}, {1000, 40, 40, 1024 * 1024 * 1, true, TestSupport.PersistenceAdapterChoice.JDBC, false},}); - } - - public int consumerBatchSize = 25; - - @BeforeClass - public static void derbyTestMode() throws Exception { - System.setProperty("derby.system.durability", "test"); - } - - @Before - public void startBroker() throws Exception { - brokerService = new BrokerService(); - - TestSupport.setPersistenceAdapter(brokerService, persistenceAdapterChoice); - brokerService.setDeleteAllMessagesOnStartup(true); - brokerService.setUseJmx(false); - brokerService.setAdvisorySupport(false); - - PolicyMap policyMap = new PolicyMap(); - PolicyEntry defaultEntry = new PolicyEntry(); - defaultEntry.setUseConsumerPriority(false); // java.lang.IllegalArgumentException: Comparison method violates its general contract! - defaultEntry.setMaxProducersToAudit(publisherThreadCount); - defaultEntry.setEnableAudit(true); - defaultEntry.setUseCache(useCache); - defaultEntry.setMaxPageSize(1000); - defaultEntry.setOptimizedDispatch(optimizeDispatch); - defaultEntry.setMemoryLimit(destMemoryLimit); - defaultEntry.setExpireMessagesPeriod(0); - policyMap.setDefaultEntry(defaultEntry); - brokerService.setDestinationPolicy(policyMap); - - brokerService.getSystemUsage().getMemoryUsage().setLimit(64 * 1024 * 1024); - - TransportConnector transportConnector = brokerService.addConnector("tcp://0.0.0.0:0"); - brokerService.start(); - activemqURL = transportConnector.getPublishableConnectString(); - activemqURL += "?jms.watchTopicAdvisories=false"; // ensure all messages are queue or dlq messages - } - - @After - public void stopBroker() throws Exception { - if (brokerService != null) { - brokerService.stop(); - } - } - - @Test - public void test() throws Exception { - - String activemqQueues = "activemq"; - for (int i = 1; i < numDests; i++) { - activemqQueues += ",activemq" + i; - } - - int consumerWaitForConsumption = 5 * 60 * 1000; - - ExportQueuePublisher publisher = null; - ExportQueueConsumer consumer = null; - - LOG.info("Publisher will publish " + (publisherMessagesPerThread * publisherThreadCount) + " messages to each queue specified."); - LOG.info("\nBuilding Publisher..."); - - publisher = new ExportQueuePublisher(activemqURL, activemqQueues, publisherMessagesPerThread, publisherThreadCount); - - LOG.info("Building Consumer..."); - - consumer = new ExportQueueConsumer(activemqURL, activemqQueues, consumerThreadsPerQueue, consumerBatchSize, publisherMessagesPerThread * publisherThreadCount); - - long totalStart = System.currentTimeMillis(); - - LOG.info("Starting Publisher..."); - - publisher.start(); - - LOG.info("Starting Consumer..."); - - consumer.start(); - - int distinctPublishedCount = 0; - - LOG.info("Waiting For Publisher Completion..."); - - publisher.waitForCompletion(); - - List publishedIds = publisher.getIDs(); - distinctPublishedCount = new TreeSet(publishedIds).size(); - - LOG.info("Publisher Complete. Published: " + publishedIds.size() + ", Distinct IDs Published: " + distinctPublishedCount); - LOG.info("Publisher duration: {}", TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - totalStart)); - - long endWait = System.currentTimeMillis() + consumerWaitForConsumption; - while (!consumer.completed() && System.currentTimeMillis() < endWait) { - try { - int secs = (int) (endWait - System.currentTimeMillis()) / 1000; - LOG.info("Waiting For Consumer Completion. Time left: " + secs + " secs"); - Thread.sleep(1000); - } - catch (Exception e) { - } - } - - LOG.info("\nConsumer Complete: " + consumer.completed() + ", Shutting Down."); - - LOG.info("Total duration: {}", TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - totalStart)); - - consumer.shutdown(); - - TimeUnit.SECONDS.sleep(2); - - LOG.info("Consumer Stats:"); - - for (Map.Entry> entry : consumer.getIDs().entrySet()) { - - List idList = entry.getValue(); - - int distinctConsumed = new TreeSet<>(idList).size(); - - StringBuilder sb = new StringBuilder(); - sb.append(" Queue: " + entry.getKey() + - " -> Total Messages Consumed: " + idList.size() + - ", Distinct IDs Consumed: " + distinctConsumed); - - int diff = distinctPublishedCount - distinctConsumed; - sb.append(" ( " + (diff > 0 ? diff : "NO") + " STUCK MESSAGES " + " ) "); - LOG.info(sb.toString()); - - assertEquals("expect to get all messages!", 0, diff); - - } - - // verify empty dlq - assertEquals("No pending messages", 0L, ((RegionBroker) brokerService.getRegionBroker()).getDestinationStatistics().getMessages().getCount()); - } - - public class ExportQueuePublisher { - - private final String amqUser = ActiveMQConnection.DEFAULT_USER; - private final String amqPassword = ActiveMQConnection.DEFAULT_PASSWORD; - private ActiveMQConnectionFactory connectionFactory = null; - private String activemqURL = null; - private String activemqQueues = null; - // Collection of distinct IDs that the publisher has published. - // After a message is published, its UUID will be written to this list for tracking. - // This list of IDs (or distinct count) will be used to compare to the consumed list of IDs. - //private Set ids = Collections.synchronizedSet(new TreeSet()); - private List ids = Collections.synchronizedList(new ArrayList()); - private List threads; - - public ExportQueuePublisher(String activemqURL, - String activemqQueues, - int messagesPerThread, - int threadCount) throws Exception { - - this.activemqURL = activemqURL; - this.activemqQueues = activemqQueues; - - threads = new ArrayList<>(); - - // Build the threads and tell them how many messages to publish - for (int i = 0; i < threadCount; i++) { - PublisherThread pt = new PublisherThread(messagesPerThread); - threads.add(pt); - } - } - - public List getIDs() { - return ids; - } - - // Kick off threads - public void start() throws Exception { - - for (PublisherThread pt : threads) { - pt.start(); - } - } - - // Wait for threads to complete. They will complete once they've published all of their messages. - public void waitForCompletion() throws Exception { - - for (PublisherThread pt : threads) { - pt.join(); - pt.close(); - } - } - - private Session newSession(QueueConnection queueConnection) throws Exception { - return queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - } - - private synchronized QueueConnection newQueueConnection() throws Exception { - - if (connectionFactory == null) { - connectionFactory = new ActiveMQConnectionFactory(amqUser, amqPassword, activemqURL); - } - - // Set the redelivery count to -1 (infinite), or else messages will start dropping - // after the queue has had a certain number of failures (default is 6) - RedeliveryPolicy policy = connectionFactory.getRedeliveryPolicy(); - policy.setMaximumRedeliveries(-1); - - QueueConnection amqConnection = connectionFactory.createQueueConnection(); - amqConnection.start(); - return amqConnection; - } - - private class PublisherThread extends Thread { - - private int count; - private QueueConnection qc; - private Session session; - private MessageProducer mp; - - private PublisherThread(int count) throws Exception { - - this.count = count; - - // Each Thread has its own Connection and Session, so no sync worries - qc = newQueueConnection(); - session = newSession(qc); - - // In our code, when publishing to multiple queues, - // we're using composite destinations like below - Queue q = new ActiveMQQueue(activemqQueues); - mp = session.createProducer(q); - } - - @Override - public void run() { - - try { - - // Loop until we've published enough messages - while (count-- > 0) { - - TextMessage tm = session.createTextMessage(getMessageText()); - String id = UUID.randomUUID().toString(); - tm.setStringProperty("KEY", id); - ids.add(id); // keep track of the key to compare against consumer - - mp.send(tm); - } - } - catch (Exception e) { - e.printStackTrace(); - } - } - - // Called by waitForCompletion - public void close() { - - try { - mp.close(); - } - catch (Exception e) { - } - - try { - session.close(); - } - catch (Exception e) { - } - - try { - qc.close(); - } - catch (Exception e) { - } - } - } - - } - - String messageText; - - private String getMessageText() { - - if (messageText == null) { - - synchronized (this) { - - if (messageText == null) { - - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < messageSize; i++) { - sb.append("X"); - } - messageText = sb.toString(); - } - } - } - - return messageText; - } - - public class ExportQueueConsumer { - - private final String amqUser = ActiveMQConnection.DEFAULT_USER; - private final String amqPassword = ActiveMQConnection.DEFAULT_PASSWORD; - private final int totalToExpect; - private ActiveMQConnectionFactory connectionFactory = null; - private String activemqURL = null; - private String activemqQueues = null; - private String[] queues = null; - // Map of IDs that were consumed, keyed by queue name. - // We'll compare these against what was published to know if any got stuck or dropped. - private Map> idsByQueue = new HashMap<>(); - private Map> threads; - - public ExportQueueConsumer(String activemqURL, - String activemqQueues, - int threadsPerQueue, - int batchSize, - int totalToExpect) throws Exception { - - this.activemqURL = activemqURL; - this.activemqQueues = activemqQueues; - this.totalToExpect = totalToExpect; - - queues = this.activemqQueues.split(","); - - for (int i = 0; i < queues.length; i++) { - queues[i] = queues[i].trim(); - } - - threads = new HashMap<>(); - - // For each queue, create a list of threads and set up the list of ids - for (String q : queues) { - - List list = new ArrayList<>(); - - idsByQueue.put(q, Collections.synchronizedList(new ArrayList())); - - for (int i = 0; i < threadsPerQueue; i++) { - list.add(new ConsumerThread(q, batchSize)); - } - - threads.put(q, list); - } - } - - public Map> getIDs() { - return idsByQueue; - } - - // Start the threads - public void start() throws Exception { - - for (List list : threads.values()) { - - for (ConsumerThread ct : list) { - - ct.start(); - } - } - } - - // Tell the threads to stop - // Then wait for them to stop - public void shutdown() throws Exception { - - for (List list : threads.values()) { - - for (ConsumerThread ct : list) { - - ct.shutdown(); - } - } - - for (List list : threads.values()) { - - for (ConsumerThread ct : list) { - - ct.join(); - } - } - } - - private Session newSession(QueueConnection queueConnection) throws Exception { - return queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - } - - private synchronized QueueConnection newQueueConnection() throws Exception { - - if (connectionFactory == null) { - connectionFactory = new ActiveMQConnectionFactory(amqUser, amqPassword, activemqURL); - } - - // Set the redelivery count to -1 (infinite), or else messages will start dropping - // after the queue has had a certain number of failures (default is 6) - RedeliveryPolicy policy = connectionFactory.getRedeliveryPolicy(); - policy.setMaximumRedeliveries(-1); - - QueueConnection amqConnection = connectionFactory.createQueueConnection(); - amqConnection.start(); - return amqConnection; - } - - public boolean completed() { - for (List list : threads.values()) { - - for (ConsumerThread ct : list) { - - if (ct.isAlive()) { - LOG.info("thread for {} is still alive.", ct.qName); - return false; - } - } - } - return true; - } - - private class ConsumerThread extends Thread { - - private int batchSize; - private QueueConnection qc; - private Session session; - private MessageConsumer mc; - private List idList; - private boolean shutdown = false; - private String qName; - - private ConsumerThread(String queueName, int batchSize) throws Exception { - - this.batchSize = batchSize; - - // Each thread has its own connection and session - qName = queueName; - qc = newQueueConnection(); - session = newSession(qc); - Queue q = session.createQueue(queueName + "?consumer.prefetchSize=" + batchSize); - mc = session.createConsumer(q); - - idList = idsByQueue.get(queueName); - } - - @Override - public void run() { - - try { - - int count = 0; - - // Keep reading as long as it hasn't been told to shutdown - while (!shutdown) { - - if (idList.size() >= totalToExpect) { - LOG.info("Got {} for q: {}", +idList.size(), qName); - break; - } - Message m = mc.receive(4000); - - if (m != null) { - - // We received a non-null message, add the ID to our list - - idList.add(m.getStringProperty("KEY")); - - count++; - - // If we've reached our batch size, commit the batch and reset the count - - if (count == batchSize) { - count = 0; - } - } - else { - - // We didn't receive anything this time, commit any current batch and reset the count - - count = 0; - - // Sleep a little before trying to read after not getting a message - - try { - if (idList.size() < totalToExpect) { - LOG.info("did not receive on {}, current count: {}", qName, idList.size()); - } - //sleep(3000); - } - catch (Exception e) { - } - } - } - } - catch (Exception e) { - e.printStackTrace(); - } - finally { - - // Once we exit, close everything - close(); - } - } - - public void shutdown() { - shutdown = true; - } - - public void close() { - - try { - mc.close(); - } - catch (Exception e) { - } - - try { - session.close(); - } - catch (Exception e) { - } - - try { - qc.close(); - } - catch (Exception e) { - - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/34074d71/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266StarvedConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266StarvedConsumerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266StarvedConsumerTest.java deleted file mode 100644 index 0bef00b..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266StarvedConsumerTest.java +++ /dev/null @@ -1,628 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.TreeSet; -import java.util.UUID; -import java.util.concurrent.CyclicBarrier; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.QueueConnection; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.RedeliveryPolicy; -import org.apache.activemq.TestSupport; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.TransportConnector; -import org.apache.activemq.broker.region.RegionBroker; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.command.ActiveMQQueue; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.junit.Assert.assertEquals; - -/* - * pause producers if consumers stall and verify broker drained before resume - */ -@RunWith(Parameterized.class) -public class AMQ5266StarvedConsumerTest { - - static Logger LOG = LoggerFactory.getLogger(AMQ5266StarvedConsumerTest.class); - String activemqURL; - BrokerService brokerService; - - public int messageSize = 1000; - - @Parameterized.Parameter(0) - public int publisherMessagesPerThread = 1000; - - @Parameterized.Parameter(1) - public int publisherThreadCount = 20; - - @Parameterized.Parameter(2) - public int consumerThreadsPerQueue = 5; - - @Parameterized.Parameter(3) - public int destMemoryLimit = 50 * 1024; - - @Parameterized.Parameter(4) - public boolean useCache = true; - - @Parameterized.Parameter(5) - public TestSupport.PersistenceAdapterChoice persistenceAdapterChoice = TestSupport.PersistenceAdapterChoice.KahaDB; - - @Parameterized.Parameter(6) - public boolean optimizeDispatch = false; - private AtomicBoolean didNotReceive = new AtomicBoolean(false); - - @Parameterized.Parameters(name = "#{0},producerThreads:{1},consumerThreads:{2},mL:{3},useCache:{4},store:{5},optimizedDispatch:{6}") - public static Iterable parameters() { - return Arrays.asList(new Object[][]{{1000, 40, 5, 1024 * 1024, false, TestSupport.PersistenceAdapterChoice.KahaDB, true}, {1000, 40, 5, 1024 * 1024, false, TestSupport.PersistenceAdapterChoice.LevelDB, true}, {1000, 40, 5, 1024 * 1024, false, TestSupport.PersistenceAdapterChoice.JDBC, true}, - - {500, 20, 20, 1024 * 1024, false, TestSupport.PersistenceAdapterChoice.KahaDB, true}, {500, 20, 20, 1024 * 1024, false, TestSupport.PersistenceAdapterChoice.LevelDB, true}, {500, 20, 20, 1024 * 1024, false, TestSupport.PersistenceAdapterChoice.JDBC, true},}); - } - - public int consumerBatchSize = 5; - - @Before - public void startBroker() throws Exception { - brokerService = new BrokerService(); - TestSupport.setPersistenceAdapter(brokerService, persistenceAdapterChoice); - brokerService.setDeleteAllMessagesOnStartup(true); - brokerService.setUseJmx(false); - brokerService.setAdvisorySupport(false); - - PolicyMap policyMap = new PolicyMap(); - PolicyEntry defaultEntry = new PolicyEntry(); - defaultEntry.setUseConsumerPriority(false); // java.lang.IllegalArgumentException: Comparison method violates its general contract! - defaultEntry.setMaxAuditDepth(publisherThreadCount); - defaultEntry.setEnableAudit(true); - defaultEntry.setUseCache(useCache); - defaultEntry.setMaxPageSize(1000); - defaultEntry.setOptimizedDispatch(optimizeDispatch); - defaultEntry.setMemoryLimit(destMemoryLimit); - defaultEntry.setExpireMessagesPeriod(0); - policyMap.setDefaultEntry(defaultEntry); - brokerService.setDestinationPolicy(policyMap); - - brokerService.getSystemUsage().getMemoryUsage().setLimit(512 * 1024 * 1024); - - TransportConnector transportConnector = brokerService.addConnector("tcp://0.0.0.0:0"); - brokerService.start(); - activemqURL = transportConnector.getPublishableConnectString(); - } - - @After - public void stopBroker() throws Exception { - if (brokerService != null) { - brokerService.stop(); - } - } - - CyclicBarrier globalProducerHalt = new CyclicBarrier(publisherThreadCount, new Runnable() { - @Override - public void run() { - // wait for queue size to go to zero - try { - while (((RegionBroker) brokerService.getRegionBroker()).getDestinationStatistics().getMessages().getCount() > 0) { - LOG.info("Total messageCount: " + ((RegionBroker) brokerService.getRegionBroker()).getDestinationStatistics().getMessages().getCount()); - TimeUnit.SECONDS.sleep(5); - } - } - catch (Exception ignored) { - ignored.printStackTrace(); - } - } - }); - - @Test(timeout = 30 * 60 * 1000) - public void test() throws Exception { - - String activemqQueues = "activemq,activemq2,activemq3,activemq4";//,activemq5,activemq6,activemq7,activemq8,activemq9"; - - int consumerWaitForConsumption = 5 * 60 * 1000; - - ExportQueuePublisher publisher = null; - ExportQueueConsumer consumer = null; - - LOG.info("Publisher will publish " + (publisherMessagesPerThread * publisherThreadCount) + " messages to each queue specified."); - LOG.info("\nBuilding Publisher..."); - - publisher = new ExportQueuePublisher(activemqURL, activemqQueues, publisherMessagesPerThread, publisherThreadCount); - - LOG.info("Building Consumer..."); - - consumer = new ExportQueueConsumer(activemqURL, activemqQueues, consumerThreadsPerQueue, consumerBatchSize, publisherMessagesPerThread * publisherThreadCount); - - LOG.info("Starting Publisher..."); - - publisher.start(); - - LOG.info("Starting Consumer..."); - - consumer.start(); - - int distinctPublishedCount = 0; - - LOG.info("Waiting For Publisher Completion..."); - - publisher.waitForCompletion(); - - List publishedIds = publisher.getIDs(); - distinctPublishedCount = new TreeSet(publishedIds).size(); - - LOG.info("Publisher Complete. Published: " + publishedIds.size() + ", Distinct IDs Published: " + distinctPublishedCount); - - long endWait = System.currentTimeMillis() + consumerWaitForConsumption; - while (!consumer.completed() && System.currentTimeMillis() < endWait) { - try { - int secs = (int) (endWait - System.currentTimeMillis()) / 1000; - LOG.info("Waiting For Consumer Completion. Time left: " + secs + " secs"); - Thread.sleep(10000); - } - catch (Exception e) { - } - } - - LOG.info("\nConsumer Complete: " + consumer.completed() + ", Shutting Down."); - - consumer.shutdown(); - - LOG.info("Consumer Stats:"); - - for (Map.Entry> entry : consumer.getIDs().entrySet()) { - - List idList = entry.getValue(); - - int distinctConsumed = new TreeSet<>(idList).size(); - - StringBuilder sb = new StringBuilder(); - sb.append(" Queue: " + entry.getKey() + - " -> Total Messages Consumed: " + idList.size() + - ", Distinct IDs Consumed: " + distinctConsumed); - - int diff = distinctPublishedCount - distinctConsumed; - sb.append(" ( " + (diff > 0 ? diff : "NO") + " STUCK MESSAGES " + " ) "); - LOG.info(sb.toString()); - - assertEquals("expect to get all messages!", 0, diff); - - } - } - - public class ExportQueuePublisher { - - private final String amqUser = ActiveMQConnection.DEFAULT_USER; - private final String amqPassword = ActiveMQConnection.DEFAULT_PASSWORD; - private ActiveMQConnectionFactory connectionFactory = null; - private String activemqURL = null; - private String activemqQueues = null; - // Collection of distinct IDs that the publisher has published. - // After a message is published, its UUID will be written to this list for tracking. - // This list of IDs (or distinct count) will be used to compare to the consumed list of IDs. - //private Set ids = Collections.synchronizedSet(new TreeSet()); - private List ids = Collections.synchronizedList(new ArrayList()); - private List threads; - - public ExportQueuePublisher(String activemqURL, - String activemqQueues, - int messagesPerThread, - int threadCount) throws Exception { - - this.activemqURL = activemqURL; - this.activemqQueues = activemqQueues; - - threads = new ArrayList<>(); - - // Build the threads and tell them how many messages to publish - for (int i = 0; i < threadCount; i++) { - PublisherThread pt = new PublisherThread(messagesPerThread); - threads.add(pt); - } - } - - public List getIDs() { - return ids; - } - - // Kick off threads - public void start() throws Exception { - - for (PublisherThread pt : threads) { - pt.start(); - } - } - - // Wait for threads to complete. They will complete once they've published all of their messages. - public void waitForCompletion() throws Exception { - - for (PublisherThread pt : threads) { - pt.join(); - pt.close(); - } - } - - private Session newSession(QueueConnection queueConnection) throws Exception { - return queueConnection.createSession(true, Session.SESSION_TRANSACTED); - } - - private synchronized QueueConnection newQueueConnection() throws Exception { - - if (connectionFactory == null) { - connectionFactory = new ActiveMQConnectionFactory(amqUser, amqPassword, activemqURL); - connectionFactory.setWatchTopicAdvisories(false); - } - - // Set the redelivery count to -1 (infinite), or else messages will start dropping - // after the queue has had a certain number of failures (default is 6) - RedeliveryPolicy policy = connectionFactory.getRedeliveryPolicy(); - policy.setMaximumRedeliveries(-1); - - QueueConnection amqConnection = connectionFactory.createQueueConnection(); - amqConnection.start(); - return amqConnection; - } - - private class PublisherThread extends Thread { - - private int count; - private QueueConnection qc; - private Session session; - private MessageProducer mp; - private Queue q; - - private PublisherThread(int count) throws Exception { - - this.count = count; - - // Each Thread has its own Connection and Session, so no sync worries - qc = newQueueConnection(); - session = newSession(qc); - - // In our code, when publishing to multiple queues, - // we're using composite destinations like below - q = new ActiveMQQueue(activemqQueues); - mp = session.createProducer(null); - } - - @Override - public void run() { - - try { - - // Loop until we've published enough messages - while (count-- > 0) { - - TextMessage tm = session.createTextMessage(getMessageText()); - String id = UUID.randomUUID().toString(); - tm.setStringProperty("KEY", id); - ids.add(id); // keep track of the key to compare against consumer - - mp.send(q, tm); - session.commit(); - - if (didNotReceive.get()) { - globalProducerHalt.await(); - } - } - } - catch (Exception e) { - e.printStackTrace(); - } - } - - // Called by waitForCompletion - public void close() { - - try { - mp.close(); - } - catch (Exception e) { - } - - try { - session.close(); - } - catch (Exception e) { - } - - try { - qc.close(); - } - catch (Exception e) { - } - } - } - - } - - String messageText; - - private String getMessageText() { - - if (messageText == null) { - - synchronized (this) { - - if (messageText == null) { - - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < messageSize; i++) { - sb.append("X"); - } - messageText = sb.toString(); - } - } - } - - return messageText; - } - - public class ExportQueueConsumer { - - private final String amqUser = ActiveMQConnection.DEFAULT_USER; - private final String amqPassword = ActiveMQConnection.DEFAULT_PASSWORD; - private final int totalToExpect; - private ActiveMQConnectionFactory connectionFactory = null; - private String activemqURL = null; - private String activemqQueues = null; - private String[] queues = null; - // Map of IDs that were consumed, keyed by queue name. - // We'll compare these against what was published to know if any got stuck or dropped. - private Map> idsByQueue = new HashMap<>(); - private Map> threads; - - public ExportQueueConsumer(String activemqURL, - String activemqQueues, - int threadsPerQueue, - int batchSize, - int totalToExpect) throws Exception { - - this.activemqURL = activemqURL; - this.activemqQueues = activemqQueues; - this.totalToExpect = totalToExpect; - - queues = this.activemqQueues.split(","); - - for (int i = 0; i < queues.length; i++) { - queues[i] = queues[i].trim(); - } - - threads = new HashMap<>(); - - // For each queue, create a list of threads and set up the list of ids - for (String q : queues) { - - List list = new ArrayList<>(); - - idsByQueue.put(q, Collections.synchronizedList(new ArrayList())); - - for (int i = 0; i < threadsPerQueue; i++) { - list.add(new ConsumerThread(q, batchSize)); - } - - threads.put(q, list); - } - } - - public Map> getIDs() { - return idsByQueue; - } - - // Start the threads - public void start() throws Exception { - - for (List list : threads.values()) { - - for (ConsumerThread ct : list) { - - ct.start(); - } - } - } - - // Tell the threads to stop - // Then wait for them to stop - public void shutdown() throws Exception { - - for (List list : threads.values()) { - - for (ConsumerThread ct : list) { - - ct.shutdown(); - } - } - - for (List list : threads.values()) { - - for (ConsumerThread ct : list) { - - ct.join(); - } - } - } - - private Session newSession(QueueConnection queueConnection) throws Exception { - return queueConnection.createSession(true, Session.SESSION_TRANSACTED); - } - - private synchronized QueueConnection newQueueConnection() throws Exception { - - if (connectionFactory == null) { - connectionFactory = new ActiveMQConnectionFactory(amqUser, amqPassword, activemqURL); - connectionFactory.setWatchTopicAdvisories(false); - } - - // Set the redelivery count to -1 (infinite), or else messages will start dropping - // after the queue has had a certain number of failures (default is 6) - RedeliveryPolicy policy = connectionFactory.getRedeliveryPolicy(); - policy.setMaximumRedeliveries(-1); - - QueueConnection amqConnection = connectionFactory.createQueueConnection(); - amqConnection.start(); - return amqConnection; - } - - public boolean completed() { - for (List list : threads.values()) { - - for (ConsumerThread ct : list) { - - if (ct.isAlive()) { - LOG.info("thread for {} is still alive.", ct.qName); - return false; - } - } - } - return true; - } - - private class ConsumerThread extends Thread { - - private int batchSize; - private QueueConnection qc; - private Session session; - private MessageConsumer mc; - private List idList; - private boolean shutdown = false; - private String qName; - - private ConsumerThread(String queueName, int batchSize) throws Exception { - - this.batchSize = batchSize; - - // Each thread has its own connection and session - qName = queueName; - qc = newQueueConnection(); - session = newSession(qc); - Queue q = session.createQueue(queueName + "?consumer.prefetchSize=" + batchSize); - mc = session.createConsumer(q); - - idList = idsByQueue.get(queueName); - } - - @Override - public void run() { - - try { - - int count = 0; - - // Keep reading as long as it hasn't been told to shutdown - while (!shutdown) { - - if (idList.size() >= totalToExpect) { - LOG.info("Got {} for q: {}", +idList.size(), qName); - session.commit(); - break; - } - Message m = mc.receive(4000); - - if (m != null) { - - // We received a non-null message, add the ID to our list - - idList.add(m.getStringProperty("KEY")); - - count++; - - // If we've reached our batch size, commit the batch and reset the count - - if (count == batchSize) { - session.commit(); - count = 0; - } - } - else { - - // We didn't receive anything this time, commit any current batch and reset the count - - session.commit(); - count = 0; - - // Sleep a little before trying to read after not getting a message - - try { - if (idList.size() < totalToExpect) { - LOG.info("did not receive on {}, current count: {}", qName, idList.size()); - didNotReceive.set(true); - } - //sleep(3000); - } - catch (Exception e) { - } - } - } - } - catch (Exception e) { - e.printStackTrace(); - } - finally { - - // Once we exit, close everything - close(); - } - } - - public void shutdown() { - shutdown = true; - } - - public void close() { - - try { - mc.close(); - } - catch (Exception e) { - } - - try { - session.close(); - } - catch (Exception e) { - } - - try { - qc.close(); - } - catch (Exception e) { - - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/34074d71/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java deleted file mode 100644 index bfbb49d..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java +++ /dev/null @@ -1,604 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.TreeSet; -import java.util.UUID; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.QueueConnection; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.RedeliveryPolicy; -import org.apache.activemq.TestSupport; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.TransportConnector; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.command.ActiveMQQueue; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.junit.Assert.assertEquals; - -/** - * Stuck messages test client. - *
- * Will kick of publisher and consumer simultaneously, and will usually result in stuck messages on the queue. - */ -@RunWith(Parameterized.class) -public class AMQ5266Test { - - static Logger LOG = LoggerFactory.getLogger(AMQ5266Test.class); - String activemqURL = "tcp://localhost:61617"; - BrokerService brokerService; - - public int messageSize = 1000; - - @Parameterized.Parameter(0) - public int publisherMessagesPerThread = 1000; - - @Parameterized.Parameter(1) - public int publisherThreadCount = 20; - - @Parameterized.Parameter(2) - public int consumerThreadsPerQueue = 5; - - @Parameterized.Parameter(3) - public int destMemoryLimit = 50 * 1024; - - @Parameterized.Parameter(4) - public boolean useCache = true; - - @Parameterized.Parameter(5) - public TestSupport.PersistenceAdapterChoice persistenceAdapterChoice = TestSupport.PersistenceAdapterChoice.KahaDB; - - @Parameterized.Parameter(6) - public boolean optimizeDispatch = false; - - @Parameterized.Parameters(name = "#{0},producerThreads:{1},consumerThreads:{2},mL:{3},useCache:{4},store:{5},optimizedDispatch:{6}") - public static Iterable parameters() { - return Arrays.asList(new Object[][]{{1, 1, 1, 50 * 1024, false, TestSupport.PersistenceAdapterChoice.JDBC, true}, {1000, 20, 5, 50 * 1024, true, TestSupport.PersistenceAdapterChoice.JDBC, false}, {100, 20, 5, 50 * 1024, false, TestSupport.PersistenceAdapterChoice.JDBC, false}, {1000, 5, 20, 50 * 1024, true, TestSupport.PersistenceAdapterChoice.JDBC, false}, {1000, 20, 20, 1024 * 1024, true, TestSupport.PersistenceAdapterChoice.JDBC, false}, - - {1, 1, 1, 50 * 1024, false, TestSupport.PersistenceAdapterChoice.KahaDB, true}, {100, 5, 5, 50 * 1024, false, TestSupport.PersistenceAdapterChoice.KahaDB, false}, {1000, 20, 5, 50 * 1024, true, TestSupport.PersistenceAdapterChoice.KahaDB, false}, {100, 20, 5, 50 * 1024, false, TestSupport.PersistenceAdapterChoice.KahaDB, false}, {1000, 5, 20, 50 * 1024, true, TestSupport.PersistenceAdapterChoice.KahaDB, false}, {1000, 20, 20, 1024 * 1024, true, TestSupport.PersistenceAdapterChoice.KahaDB, false}, - - {1, 1, 1, 50 * 1024, false, TestSupport.PersistenceAdapterChoice.LevelDB, true}, {100, 5, 5, 50 * 1024, false, TestSupport.PersistenceAdapterChoice.LevelDB, false}, {1000, 20, 5, 50 * 1024, true, TestSupport.PersistenceAdapterChoice.LevelDB, false}, {100, 20, 5, 50 * 1024, false, TestSupport.PersistenceAdapterChoice.LevelDB, false}, {1000, 5, 20, 50 * 1024, true, TestSupport.PersistenceAdapterChoice.LevelDB, false}, {1000, 20, 20, 1024 * 1024, true, TestSupport.PersistenceAdapterChoice.LevelDB, false}, - - }); - } - - public int consumerBatchSize = 5; - - @Before - public void startBroker() throws Exception { - brokerService = new BrokerService(); - TestSupport.setPersistenceAdapter(brokerService, persistenceAdapterChoice); - brokerService.setDeleteAllMessagesOnStartup(true); - brokerService.setUseJmx(false); - - PolicyMap policyMap = new PolicyMap(); - PolicyEntry defaultEntry = new PolicyEntry(); - defaultEntry.setUseConsumerPriority(false); // java.lang.IllegalArgumentException: Comparison method violates its general contract! - defaultEntry.setMaxAuditDepth(publisherThreadCount); - defaultEntry.setEnableAudit(true); - defaultEntry.setUseCache(useCache); - defaultEntry.setMaxPageSize(1000); - defaultEntry.setOptimizedDispatch(optimizeDispatch); - defaultEntry.setMemoryLimit(destMemoryLimit); - defaultEntry.setExpireMessagesPeriod(0); - policyMap.setDefaultEntry(defaultEntry); - brokerService.setDestinationPolicy(policyMap); - - brokerService.getSystemUsage().getMemoryUsage().setLimit(512 * 1024 * 1024); - - TransportConnector transportConnector = brokerService.addConnector("tcp://0.0.0.0:0"); - brokerService.start(); - activemqURL = transportConnector.getPublishableConnectString(); - } - - @After - public void stopBroker() throws Exception { - if (brokerService != null) { - brokerService.stop(); - } - } - - @Test - public void test() throws Exception { - - String activemqQueues = "activemq,activemq2";//,activemq3,activemq4,activemq5,activemq6,activemq7,activemq8,activemq9"; - - int consumerWaitForConsumption = 5 * 60 * 1000; - - ExportQueuePublisher publisher = null; - ExportQueueConsumer consumer = null; - - LOG.info("Publisher will publish " + (publisherMessagesPerThread * publisherThreadCount) + " messages to each queue specified."); - LOG.info("\nBuilding Publisher..."); - - publisher = new ExportQueuePublisher(activemqURL, activemqQueues, publisherMessagesPerThread, publisherThreadCount); - - LOG.info("Building Consumer..."); - - consumer = new ExportQueueConsumer(activemqURL, activemqQueues, consumerThreadsPerQueue, consumerBatchSize, publisherMessagesPerThread * publisherThreadCount); - - LOG.info("Starting Publisher..."); - - publisher.start(); - - LOG.info("Starting Consumer..."); - - consumer.start(); - - int distinctPublishedCount = 0; - - LOG.info("Waiting For Publisher Completion..."); - - publisher.waitForCompletion(); - - List publishedIds = publisher.getIDs(); - distinctPublishedCount = new TreeSet(publishedIds).size(); - - LOG.info("Publisher Complete. Published: " + publishedIds.size() + ", Distinct IDs Published: " + distinctPublishedCount); - - long endWait = System.currentTimeMillis() + consumerWaitForConsumption; - while (!consumer.completed() && System.currentTimeMillis() < endWait) { - try { - int secs = (int) (endWait - System.currentTimeMillis()) / 1000; - LOG.info("Waiting For Consumer Completion. Time left: " + secs + " secs"); - Thread.sleep(10000); - } - catch (Exception e) { - } - } - - LOG.info("\nConsumer Complete: " + consumer.completed() + ", Shutting Down."); - - consumer.shutdown(); - - LOG.info("Consumer Stats:"); - - for (Map.Entry> entry : consumer.getIDs().entrySet()) { - - List idList = entry.getValue(); - - int distinctConsumed = new TreeSet<>(idList).size(); - - StringBuilder sb = new StringBuilder(); - sb.append(" Queue: " + entry.getKey() + - " -> Total Messages Consumed: " + idList.size() + - ", Distinct IDs Consumed: " + distinctConsumed); - - int diff = distinctPublishedCount - distinctConsumed; - sb.append(" ( " + (diff > 0 ? diff : "NO") + " STUCK MESSAGES " + " ) "); - LOG.info(sb.toString()); - - assertEquals("expect to get all messages!", 0, diff); - - } - } - - public class ExportQueuePublisher { - - private final String amqUser = ActiveMQConnection.DEFAULT_USER; - private final String amqPassword = ActiveMQConnection.DEFAULT_PASSWORD; - private ActiveMQConnectionFactory connectionFactory = null; - private String activemqURL = null; - private String activemqQueues = null; - // Collection of distinct IDs that the publisher has published. - // After a message is published, its UUID will be written to this list for tracking. - // This list of IDs (or distinct count) will be used to compare to the consumed list of IDs. - //private Set ids = Collections.synchronizedSet(new TreeSet()); - private List ids = Collections.synchronizedList(new ArrayList()); - private List threads; - - public ExportQueuePublisher(String activemqURL, - String activemqQueues, - int messagesPerThread, - int threadCount) throws Exception { - - this.activemqURL = activemqURL; - this.activemqQueues = activemqQueues; - - threads = new ArrayList<>(); - - // Build the threads and tell them how many messages to publish - for (int i = 0; i < threadCount; i++) { - PublisherThread pt = new PublisherThread(messagesPerThread); - threads.add(pt); - } - } - - public List getIDs() { - return ids; - } - - // Kick off threads - public void start() throws Exception { - - for (PublisherThread pt : threads) { - pt.start(); - } - } - - // Wait for threads to complete. They will complete once they've published all of their messages. - public void waitForCompletion() throws Exception { - - for (PublisherThread pt : threads) { - pt.join(); - pt.close(); - } - } - - private Session newSession(QueueConnection queueConnection) throws Exception { - return queueConnection.createSession(true, Session.SESSION_TRANSACTED); - } - - private synchronized QueueConnection newQueueConnection() throws Exception { - - if (connectionFactory == null) { - connectionFactory = new ActiveMQConnectionFactory(amqUser, amqPassword, activemqURL); - } - - // Set the redelivery count to -1 (infinite), or else messages will start dropping - // after the queue has had a certain number of failures (default is 6) - RedeliveryPolicy policy = connectionFactory.getRedeliveryPolicy(); - policy.setMaximumRedeliveries(-1); - - QueueConnection amqConnection = connectionFactory.createQueueConnection(); - amqConnection.start(); - return amqConnection; - } - - private class PublisherThread extends Thread { - - private int count; - private QueueConnection qc; - private Session session; - private MessageProducer mp; - - private PublisherThread(int count) throws Exception { - - this.count = count; - - // Each Thread has its own Connection and Session, so no sync worries - qc = newQueueConnection(); - session = newSession(qc); - - // In our code, when publishing to multiple queues, - // we're using composite destinations like below - Queue q = new ActiveMQQueue(activemqQueues); - mp = session.createProducer(q); - } - - @Override - public void run() { - - try { - - // Loop until we've published enough messages - while (count-- > 0) { - - TextMessage tm = session.createTextMessage(getMessageText()); - String id = UUID.randomUUID().toString(); - tm.setStringProperty("KEY", id); - ids.add(id); // keep track of the key to compare against consumer - - mp.send(tm); - session.commit(); - } - } - catch (Exception e) { - e.printStackTrace(); - } - } - - // Called by waitForCompletion - public void close() { - - try { - mp.close(); - } - catch (Exception e) { - } - - try { - session.close(); - } - catch (Exception e) { - } - - try { - qc.close(); - } - catch (Exception e) { - } - } - } - - } - - String messageText; - - private String getMessageText() { - - if (messageText == null) { - - synchronized (this) { - - if (messageText == null) { - - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < messageSize; i++) { - sb.append("X"); - } - messageText = sb.toString(); - } - } - } - - return messageText; - } - - public class ExportQueueConsumer { - - private final String amqUser = ActiveMQConnection.DEFAULT_USER; - private final String amqPassword = ActiveMQConnection.DEFAULT_PASSWORD; - private final int totalToExpect; - private ActiveMQConnectionFactory connectionFactory = null; - private String activemqURL = null; - private String activemqQueues = null; - private String[] queues = null; - // Map of IDs that were consumed, keyed by queue name. - // We'll compare these against what was published to know if any got stuck or dropped. - private Map> idsByQueue = new HashMap<>(); - private Map> threads; - - public ExportQueueConsumer(String activemqURL, - String activemqQueues, - int threadsPerQueue, - int batchSize, - int totalToExpect) throws Exception { - - this.activemqURL = activemqURL; - this.activemqQueues = activemqQueues; - this.totalToExpect = totalToExpect; - - queues = this.activemqQueues.split(","); - - for (int i = 0; i < queues.length; i++) { - queues[i] = queues[i].trim(); - } - - threads = new HashMap<>(); - - // For each queue, create a list of threads and set up the list of ids - for (String q : queues) { - - List list = new ArrayList<>(); - - idsByQueue.put(q, Collections.synchronizedList(new ArrayList())); - - for (int i = 0; i < threadsPerQueue; i++) { - list.add(new ConsumerThread(q, batchSize)); - } - - threads.put(q, list); - } - } - - public Map> getIDs() { - return idsByQueue; - } - - // Start the threads - public void start() throws Exception { - - for (List list : threads.values()) { - - for (ConsumerThread ct : list) { - - ct.start(); - } - } - } - - // Tell the threads to stop - // Then wait for them to stop - public void shutdown() throws Exception { - - for (List list : threads.values()) { - - for (ConsumerThread ct : list) { - - ct.shutdown(); - } - } - - for (List list : threads.values()) { - - for (ConsumerThread ct : list) { - - ct.join(); - } - } - } - - private Session newSession(QueueConnection queueConnection) throws Exception { - return queueConnection.createSession(true, Session.SESSION_TRANSACTED); - } - - private synchronized QueueConnection newQueueConnection() throws Exception { - - if (connectionFactory == null) { - connectionFactory = new ActiveMQConnectionFactory(amqUser, amqPassword, activemqURL); - } - - // Set the redelivery count to -1 (infinite), or else messages will start dropping - // after the queue has had a certain number of failures (default is 6) - RedeliveryPolicy policy = connectionFactory.getRedeliveryPolicy(); - policy.setMaximumRedeliveries(-1); - - QueueConnection amqConnection = connectionFactory.createQueueConnection(); - amqConnection.start(); - return amqConnection; - } - - public boolean completed() { - for (List list : threads.values()) { - - for (ConsumerThread ct : list) { - - if (ct.isAlive()) { - LOG.info("thread for {} is still alive.", ct.qName); - return false; - } - } - } - return true; - } - - private class ConsumerThread extends Thread { - - private int batchSize; - private QueueConnection qc; - private Session session; - private MessageConsumer mc; - private List idList; - private boolean shutdown = false; - private String qName; - - private ConsumerThread(String queueName, int batchSize) throws Exception { - - this.batchSize = batchSize; - - // Each thread has its own connection and session - qName = queueName; - qc = newQueueConnection(); - session = newSession(qc); - Queue q = session.createQueue(queueName + "?consumer.prefetchSize=" + batchSize); - mc = session.createConsumer(q); - - idList = idsByQueue.get(queueName); - } - - @Override - public void run() { - - try { - - int count = 0; - - // Keep reading as long as it hasn't been told to shutdown - while (!shutdown) { - - if (idList.size() >= totalToExpect) { - LOG.info("Got {} for q: {}", +idList.size(), qName); - session.commit(); - break; - } - Message m = mc.receive(4000); - - if (m != null) { - - // We received a non-null message, add the ID to our list - - idList.add(m.getStringProperty("KEY")); - - count++; - - // If we've reached our batch size, commit the batch and reset the count - - if (count == batchSize) { - session.commit(); - count = 0; - } - } - else { - - // We didn't receive anything this time, commit any current batch and reset the count - - session.commit(); - count = 0; - - // Sleep a little before trying to read after not getting a message - - try { - if (idList.size() < totalToExpect) { - LOG.info("did not receive on {}, current count: {}", qName, idList.size()); - } - //sleep(3000); - } - catch (Exception e) { - } - } - } - } - catch (Exception e) { - e.printStackTrace(); - } - finally { - - // Once we exit, close everything - close(); - } - } - - public void shutdown() { - shutdown = true; - } - - public void close() { - - try { - mc.close(); - } - catch (Exception e) { - } - - try { - session.close(); - } - catch (Exception e) { - } - - try { - qc.close(); - } - catch (Exception e) { - - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/34074d71/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5274Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5274Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5274Test.java deleted file mode 100644 index d4c02fb..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5274Test.java +++ /dev/null @@ -1,133 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import java.util.concurrent.TimeUnit; -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.RedeliveryPolicy; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.jmx.BrokerMBeanSupport; -import org.apache.activemq.broker.jmx.QueueViewMBean; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.command.ActiveMQQueue; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -public class AMQ5274Test { - - static Logger LOG = LoggerFactory.getLogger(AMQ5274Test.class); - String activemqURL; - BrokerService brokerService; - ActiveMQQueue dest = new ActiveMQQueue("TestQ"); - - @Before - public void startBroker() throws Exception { - brokerService = new BrokerService(); - brokerService.setPersistent(false); - PolicyMap policyMap = new PolicyMap(); - PolicyEntry defaultPolicy = new PolicyEntry(); - defaultPolicy.setExpireMessagesPeriod(1000); - policyMap.setDefaultEntry(defaultPolicy); - brokerService.setDestinationPolicy(policyMap); - activemqURL = brokerService.addConnector("tcp://localhost:0").getPublishableConnectString(); - brokerService.start(); - } - - @After - public void stopBroker() throws Exception { - if (brokerService != null) { - brokerService.stop(); - } - } - - @Test - public void test() throws Exception { - LOG.info("Starting Test"); - assertTrue(brokerService.isStarted()); - - produce(); - consumeAndRollback(); - - // check reported queue size using JMX - long queueSize = getQueueSize(); - assertEquals("Queue " + dest.getPhysicalName() + " not empty, reporting " + queueSize + " messages.", 0, queueSize); - } - - private void consumeAndRollback() throws JMSException, InterruptedException { - ActiveMQConnection connection = createConnection(); - RedeliveryPolicy noRedelivery = new RedeliveryPolicy(); - noRedelivery.setMaximumRedeliveries(0); - connection.setRedeliveryPolicy(noRedelivery); - connection.start(); - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - MessageConsumer consumer = session.createConsumer(dest); - Message m; - while ((m = consumer.receive(4000)) != null) { - LOG.info("Got:" + m); - TimeUnit.SECONDS.sleep(1); - session.rollback(); - } - connection.close(); - } - - private void produce() throws Exception { - Connection connection = createConnection(); - connection.start(); - - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(dest); - producer.setTimeToLive(10000); - for (int i = 0; i < 20; i++) { - producer.send(session.createTextMessage("i=" + i)); - } - connection.close(); - } - - private ActiveMQConnection createConnection() throws JMSException { - return (ActiveMQConnection) new ActiveMQConnectionFactory(activemqURL).createConnection(); - } - - public long getQueueSize() throws Exception { - long queueSize = 0; - try { - QueueViewMBean queueViewMBean = (QueueViewMBean) brokerService.getManagementContext().newProxyInstance(BrokerMBeanSupport.createDestinationName(brokerService.getBrokerObjectName(), dest), QueueViewMBean.class, false); - queueSize = queueViewMBean.getQueueSize(); - LOG.info("QueueSize for destination {} is {}", dest, queueSize); - } - catch (Exception ex) { - LOG.error("Error retrieving QueueSize from JMX ", ex); - throw ex; - } - return queueSize; - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/34074d71/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5381Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5381Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5381Test.java deleted file mode 100644 index a05d56d..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5381Test.java +++ /dev/null @@ -1,178 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import java.io.PrintWriter; -import java.io.StringWriter; -import java.util.Arrays; -import java.util.Random; - -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; - -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.command.ActiveMQBytesMessage; -import org.apache.activemq.command.ActiveMQMessage; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestName; - -public class AMQ5381Test { - - public static final byte[] ORIG_MSG_CONTENT = randomByteArray(); - public static final String AMQ5381_EXCEPTION_MESSAGE = "java.util.zip.DataFormatException: incorrect header check"; - - private BrokerService brokerService; - private String brokerURI; - - @Rule - public TestName name = new TestName(); - - @Before - public void startBroker() throws Exception { - brokerService = new BrokerService(); - brokerService.setPersistent(false); - brokerService.setUseJmx(false); - brokerService.addConnector("tcp://localhost:0"); - brokerService.start(); - brokerService.waitUntilStarted(); - - brokerURI = brokerService.getTransportConnectorByScheme("tcp").getPublishableConnectString(); - } - - @After - public void stopBroker() throws Exception { - if (brokerService != null) { - brokerService.stop(); - } - } - - private ActiveMQConnection createConnection(boolean useCompression) throws Exception { - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURI); - factory.setUseCompression(useCompression); - Connection connection = factory.createConnection(); - connection.start(); - return (ActiveMQConnection) connection; - } - - @Test - public void amq5381Test() throws Exception { - - // Consumer Configured for (useCompression=true) - final ActiveMQConnection consumerConnection = createConnection(true); - final Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - final Queue consumerQueue = consumerSession.createQueue(name.getMethodName()); - final MessageConsumer consumer = consumerSession.createConsumer(consumerQueue); - - // Producer Configured for (useCompression=false) - final ActiveMQConnection producerConnection = createConnection(false); - final Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - final Queue producerQueue = producerSession.createQueue(name.getMethodName()); - - try { - - final ActiveMQBytesMessage messageProduced = (ActiveMQBytesMessage) producerSession.createBytesMessage(); - messageProduced.writeBytes(ORIG_MSG_CONTENT); - Assert.assertFalse(messageProduced.isReadOnlyBody()); - - Assert.assertFalse("Produced Message's 'compressed' flag should remain false until the message is sent (where it will be compressed, if necessary)", messageProduced.isCompressed()); - - final MessageProducer producer = producerSession.createProducer(null); - producer.send(producerQueue, messageProduced); - - Assert.assertEquals("Once sent, the produced Message's 'compressed' flag should match its Connection's 'useCompression' flag", producerConnection.isUseCompression(), messageProduced.isCompressed()); - - final ActiveMQBytesMessage messageConsumed = (ActiveMQBytesMessage) consumer.receive(); - Assert.assertNotNull(messageConsumed); - Assert.assertTrue("Consumed Message should be read-only", messageConsumed.isReadOnlyBody()); - Assert.assertEquals("Consumed Message's 'compressed' flag should match the produced Message's 'compressed' flag", messageProduced.isCompressed(), messageConsumed.isCompressed()); - - // ensure consumed message content matches what was originally set - final byte[] consumedMsgContent = new byte[(int) messageConsumed.getBodyLength()]; - messageConsumed.readBytes(consumedMsgContent); - - Assert.assertTrue("Consumed Message content should match the original Message content", Arrays.equals(ORIG_MSG_CONTENT, consumedMsgContent)); - - // make message writable so the consumer can modify and reuse it - makeWritable(messageConsumed); - - // modify message, attempt to trigger DataFormatException due - // to old incorrect compression logic - try { - messageConsumed.setStringProperty(this.getClass().getName(), "test"); - } - catch (JMSException jmsE) { - if (AMQ5381_EXCEPTION_MESSAGE.equals(jmsE.getMessage())) { - StringWriter sw = new StringWriter(); - PrintWriter pw = new PrintWriter(sw); - jmsE.printStackTrace(pw); - - Assert.fail("AMQ5381 Error State Achieved: attempted to decompress BytesMessage contents that are not compressed\n" + sw.toString()); - } - else { - throw jmsE; - } - } - - Assert.assertEquals("The consumed Message's 'compressed' flag should still match the produced Message's 'compressed' flag after it has been made writable", messageProduced.isCompressed(), messageConsumed.isCompressed()); - - // simulate re-publishing message - simulatePublish(messageConsumed); - - // ensure consumed message content matches what was originally set - final byte[] modifiedMsgContent = new byte[(int) messageConsumed.getBodyLength()]; - messageConsumed.readBytes(modifiedMsgContent); - - Assert.assertTrue("After the message properties are modified and it is re-published, its message content should still match the original message content", Arrays.equals(ORIG_MSG_CONTENT, modifiedMsgContent)); - } - finally { - producerSession.close(); - producerConnection.close(); - consumerSession.close(); - consumerConnection.close(); - } - } - - protected static final int MAX_RANDOM_BYTE_ARRAY_SIZE_KB = 128; - - protected static byte[] randomByteArray() { - final Random random = new Random(); - final byte[] byteArray = new byte[random.nextInt(MAX_RANDOM_BYTE_ARRAY_SIZE_KB * 1024)]; - random.nextBytes(byteArray); - - return byteArray; - } - - protected static void makeWritable(final ActiveMQMessage message) { - message.setReadOnlyBody(false); - message.setReadOnlyProperties(false); - } - - protected static void simulatePublish(final ActiveMQBytesMessage message) throws JMSException { - message.reset(); - message.onSend(); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/34074d71/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5421Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5421Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5421Test.java deleted file mode 100644 index 0e9e310..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5421Test.java +++ /dev/null @@ -1,119 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import java.net.URI; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.region.policy.AbortSlowAckConsumerStrategy; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AMQ5421Test { - - private static final Logger LOG = LoggerFactory.getLogger(AMQ5421Test.class); - - private static final int DEST_COUNT = 1000; - private final Destination[] destination = new Destination[DEST_COUNT]; - private final MessageProducer[] producer = new MessageProducer[DEST_COUNT]; - private BrokerService brokerService; - private String connectionUri; - - protected ConnectionFactory createConnectionFactory() throws Exception { - ActiveMQConnectionFactory conFactory = new ActiveMQConnectionFactory(connectionUri); - conFactory.setWatchTopicAdvisories(false); - return conFactory; - } - - protected AbortSlowAckConsumerStrategy createSlowConsumerStrategy() { - AbortSlowAckConsumerStrategy strategy = new AbortSlowAckConsumerStrategy(); - strategy.setCheckPeriod(2000); - strategy.setMaxTimeSinceLastAck(5000); - strategy.setIgnoreIdleConsumers(false); - - return strategy; - } - - @Before - public void setUp() throws Exception { - brokerService = BrokerFactory.createBroker(new URI("broker://()/localhost?persistent=false&useJmx=true")); - PolicyEntry policy = new PolicyEntry(); - - policy.setSlowConsumerStrategy(createSlowConsumerStrategy()); - policy.setQueuePrefetch(10); - policy.setTopicPrefetch(10); - PolicyMap pMap = new PolicyMap(); - pMap.setDefaultEntry(policy); - brokerService.setDestinationPolicy(pMap); - brokerService.addConnector("tcp://0.0.0.0:0"); - brokerService.start(); - - connectionUri = brokerService.getTransportConnectorByScheme("tcp").getPublishableConnectString(); - } - - @Test - public void testManyTempDestinations() throws Exception { - Connection connection = createConnectionFactory().createConnection(); - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - for (int i = 0; i < DEST_COUNT; i++) { - destination[i] = session.createTemporaryQueue(); - LOG.debug("Created temp queue: [}", i); - } - - for (int i = 0; i < DEST_COUNT; i++) { - producer[i] = session.createProducer(destination[i]); - LOG.debug("Created producer: {}", i); - TextMessage msg = session.createTextMessage(" testMessage " + i); - producer[i].send(msg); - LOG.debug("message sent: {}", i); - MessageConsumer consumer = session.createConsumer(destination[i]); - Message message = consumer.receive(1000); - Assert.assertTrue(message.equals(msg)); - } - - for (int i = 0; i < DEST_COUNT; i++) { - producer[i].close(); - } - - connection.close(); - } - - @After - public void tearDown() throws Exception { - brokerService.stop(); - brokerService.waitUntilStopped(); - } -}