Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 18C1610CF1 for ; Fri, 6 Mar 2015 22:30:51 +0000 (UTC) Received: (qmail 13573 invoked by uid 500); 6 Mar 2015 22:30:35 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 13423 invoked by uid 500); 6 Mar 2015 22:30:35 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 13399 invoked by uid 99); 6 Mar 2015 22:30:35 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 06 Mar 2015 22:30:35 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D108EE10CB; Fri, 6 Mar 2015 22:30:34 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jbertram@apache.org To: commits@activemq.apache.org Date: Fri, 06 Mar 2015 22:30:36 -0000 Message-Id: <5a1b7418b3ee48be8e69f54bd428cf0d@git.apache.org> In-Reply-To: <9f7eed35e8f341e9aacf76cc3c5f5797@git.apache.org> References: <9f7eed35e8f341e9aacf76cc3c5f5797@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [03/15] activemq-6 git commit: Refactored the testsuite a bit http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3661829e/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/ManyConnectionsStressTest.java ---------------------------------------------------------------------- diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/ManyConnectionsStressTest.java b/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/ManyConnectionsStressTest.java deleted file mode 100644 index a53f174..0000000 --- a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/ManyConnectionsStressTest.java +++ /dev/null @@ -1,218 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.jms.tests.stress; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.jms.Topic; -import javax.naming.InitialContext; -import java.util.HashSet; -import java.util.Set; - -import org.apache.activemq.jms.tests.ActiveMQServerTestCase; -import org.apache.activemq.jms.tests.util.ProxyAssertSupport; -import org.junit.After; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; - -/** - * Create 500 connections each with a consumer, consuming from a topic - * - * @author Tim Fox - */ -public class ManyConnectionsStressTest extends ActiveMQServerTestCase -{ - @BeforeClass - public static void stressTestsEnabled() - { - org.junit.Assume.assumeTrue(JMSStressTestBase.STRESS_TESTS_ENABLED); - } - - private static final int NUM_CONNECTIONS = 500; - - private static final int NUM_MESSAGES = 100; - - // Static -------------------------------------------------------- - - // Attributes ---------------------------------------------------- - - private InitialContext ic; - - private volatile boolean failed; - - private final Set listeners = new HashSet(); - - // Constructors -------------------------------------------------- - - // Public -------------------------------------------------------- - - @Override - @Before - public void setUp() throws Exception - { - super.setUp(); - - // ServerManagement.start("all"); - - ic = getInitialContext(); - - createTopic("StressTestTopic"); - } - - @Override - @After - public void tearDown() throws Exception - { - destroyTopic("StressTestTopic"); - ic.close(); - super.tearDown(); - } - - @Test - public void testManyConnections() throws Exception - { - ConnectionFactory cf = (ConnectionFactory) ic.lookup("/ConnectionFactory"); - - Topic topic = (Topic) ic.lookup("/topic/StressTestTopic"); - - Connection[] conns = new Connection[ManyConnectionsStressTest.NUM_CONNECTIONS]; - - for (int i = 0; i < ManyConnectionsStressTest.NUM_CONNECTIONS; i++) - { - conns[i] = addConnection(cf.createConnection()); - - Session sess = conns[i].createSession(false, Session.AUTO_ACKNOWLEDGE); - - MessageConsumer cons = sess.createConsumer(topic); - - MyListener listener = new MyListener(); - - synchronized (listeners) - { - listeners.add(listener); - } - - cons.setMessageListener(listener); - - conns[i].start(); - - log.info("Created " + i); - } - - // Thread.sleep(100 * 60 * 1000); - - Connection connSend = addConnection(cf.createConnection()); - - Session sessSend = connSend.createSession(false, Session.AUTO_ACKNOWLEDGE); - - MessageProducer prod = sessSend.createProducer(topic); - - for (int i = 0; i < ManyConnectionsStressTest.NUM_MESSAGES; i++) - { - TextMessage tm = sessSend.createTextMessage("message" + i); - - tm.setIntProperty("count", i); - - prod.send(tm); - } - - long wait = 30000; - - synchronized (listeners) - { - while (!listeners.isEmpty() && wait > 0) - { - long start = System.currentTimeMillis(); - try - { - listeners.wait(wait); - } - catch (InterruptedException e) - { - // Ignore - } - wait -= System.currentTimeMillis() - start; - } - } - - if (wait <= 0) - { - ProxyAssertSupport.fail("Timed out"); - } - - ProxyAssertSupport.assertFalse(failed); - } - - private void finished(final MyListener listener) - { - synchronized (listeners) - { - log.info("consumer " + listener + " has finished"); - - listeners.remove(listener); - - listeners.notify(); - } - } - - private void failed(final MyListener listener) - { - synchronized (listeners) - { - log.error("consumer " + listener + " has failed"); - - listeners.remove(listener); - - failed = true; - - listeners.notify(); - } - } - - private final class MyListener implements MessageListener - { - public void onMessage(final Message msg) - { - try - { - int count = msg.getIntProperty("count"); - - // log.info(this + " got message " + msg); - - if (count == ManyConnectionsStressTest.NUM_MESSAGES - 1) - { - finished(this); - } - } - catch (JMSException e) - { - log.error("Failed to get int property", e); - - failed(this); - } - } - - } -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3661829e/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/OpenCloseStressTest.java ---------------------------------------------------------------------- diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/OpenCloseStressTest.java b/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/OpenCloseStressTest.java deleted file mode 100644 index 29d5f8f..0000000 --- a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/OpenCloseStressTest.java +++ /dev/null @@ -1,445 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.jms.tests.stress; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.jms.Topic; -import javax.naming.InitialContext; - -import org.apache.activemq.jms.tests.ActiveMQServerTestCase; -import org.apache.activemq.jms.tests.util.ProxyAssertSupport; -import org.apache.activemq.utils.UUIDGenerator; -import org.junit.After; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; - -/** - * A OpenCloseStressTest. - *

- * This stress test starts several publisher connections and several subscriber connections, then - * sends and consumes messages while concurrently closing the sessions. - *

- * This test will help catch race conditions that occurred with rapid open/closing of sessions when - * messages are being sent/received - *

- * E.g. http://jira.jboss.com/jira/browse/JBMESSAGING-982 - * @author Tim Fox - */ -public class OpenCloseStressTest extends ActiveMQServerTestCase -{ - @BeforeClass - public static void stressTestsEnabled() - { - org.junit.Assume.assumeTrue(JMSStressTestBase.STRESS_TESTS_ENABLED); - } - - InitialContext ic; - - ConnectionFactory cf; - - Topic topic; - - @Override - @Before - public void setUp() throws Exception - { - super.setUp(); - - // ServerManagement.start("all"); - - ic = getInitialContext(); - cf = (ConnectionFactory)ic.lookup("/ConnectionFactory"); - - destroyTopic("TestTopic"); - createTopic("TestTopic"); - - topic = (Topic)ic.lookup("topic/TestTopic"); - - log.debug("setup done"); - } - - @Override - @After - public void tearDown() throws Exception - { - destroyQueue("TestQueue"); - log.debug("tear down done"); - } - - @Test - public void testOpenClose() throws Exception - { - Connection conn1 = null; - Connection conn2 = null; - Connection conn3 = null; - - Connection conn4 = null; - Connection conn5 = null; - Connection conn6 = null; - Connection conn7 = null; - Connection conn8 = null; - - try - { - Publisher[] publishers = new Publisher[3]; - - final int MSGS_PER_PUBLISHER = 10000; - - conn1 = cf.createConnection(); - Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer prod1 = sess1.createProducer(topic); - prod1.setDeliveryMode(DeliveryMode.PERSISTENT); - publishers[0] = new Publisher(sess1, prod1, MSGS_PER_PUBLISHER, 2); - - conn2 = cf.createConnection(); - Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer prod2 = sess2.createProducer(topic); - prod2.setDeliveryMode(DeliveryMode.PERSISTENT); - publishers[1] = new Publisher(sess2, prod2, MSGS_PER_PUBLISHER, 5); - - conn3 = cf.createConnection(); - Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer prod3 = sess3.createProducer(topic); - prod3.setDeliveryMode(DeliveryMode.PERSISTENT); - publishers[2] = new Publisher(sess3, prod3, MSGS_PER_PUBLISHER, 1); - - Subscriber[] subscribers = new Subscriber[5]; - - conn4 = cf.createConnection(); - subscribers[0] = new Subscriber(conn4, 3 * MSGS_PER_PUBLISHER, 500, 1000 * 60 * 15, topic, false); - - conn5 = cf.createConnection(); - subscribers[1] = new Subscriber(conn5, 3 * MSGS_PER_PUBLISHER, 2000, 1000 * 60 * 15, topic, false); - - conn6 = cf.createConnection(); - subscribers[2] = new Subscriber(conn6, 3 * MSGS_PER_PUBLISHER, 700, 1000 * 60 * 15, topic, false); - - conn7 = cf.createConnection(); - subscribers[3] = new Subscriber(conn7, 3 * MSGS_PER_PUBLISHER, 1500, 1000 * 60 * 15, topic, true); - - conn8 = cf.createConnection(); - subscribers[4] = new Subscriber(conn8, 3 * MSGS_PER_PUBLISHER, 1200, 1000 * 60 * 15, topic, true); - - Thread[] threads = new Thread[8]; - - // subscribers - threads[0] = new Thread(subscribers[0]); - - threads[1] = new Thread(subscribers[1]); - - threads[2] = new Thread(subscribers[2]); - - threads[3] = new Thread(subscribers[3]); - - threads[4] = new Thread(subscribers[4]); - - // publishers - - threads[5] = new Thread(publishers[0]); - - threads[6] = new Thread(publishers[1]); - - threads[7] = new Thread(publishers[2]); - - for (int i = 0; i < subscribers.length; i++) - { - threads[i].start(); - } - - // Pause before creating producers otherwise subscribers to make sure they're all created - - Thread.sleep(5000); - - for (int i = subscribers.length; i < threads.length; i++) - { - threads[i].start(); - } - - for (Thread thread : threads) - { - thread.join(); - } - - for (Subscriber subscriber : subscribers) - { - if (subscriber.isDurable()) - { - ProxyAssertSupport.assertEquals(3 * MSGS_PER_PUBLISHER, subscriber.getMessagesReceived()); - } - else - { - // Note that for a non durable subscriber the number of messages received in total - // will be somewhat less than the total number received since when recycling the session - // there is a period of time after closing the previous session and starting the next one - // when messages are being sent and won't be received (since there is no consumer) - } - - ProxyAssertSupport.assertFalse(subscriber.isFailed()); - } - - for (Publisher publisher : publishers) - { - ProxyAssertSupport.assertFalse(publisher.isFailed()); - } - } - finally - { - if (conn1 != null) - { - conn1.close(); - } - if (conn2 != null) - { - conn2.close(); - } - if (conn3 != null) - { - conn3.close(); - } - if (conn4 != null) - { - conn4.close(); - } - if (conn5 != null) - { - conn5.close(); - } - if (conn6 != null) - { - conn6.close(); - } - if (conn7 != null) - { - conn7.close(); - } - if (conn8 != null) - { - conn8.close(); - } - } - - } - - class Publisher implements Runnable - { - private final Session sess; - - private final int numMessages; - - private final int delay; - - private final MessageProducer prod; - - private boolean failed; - - boolean isFailed() - { - return failed; - } - - Publisher(final Session sess, final MessageProducer prod, final int numMessages, final int delay) - { - this.sess = sess; - - this.prod = prod; - - this.numMessages = numMessages; - - this.delay = delay; - } - - public void run() - { - try - { - for (int i = 0; i < numMessages; i++) - { - TextMessage tm = sess.createTextMessage("message" + i); - - prod.send(tm); - - try - { - Thread.sleep(delay); - } - catch (Exception ignore) - { - } - } - } - catch (JMSException e) - { - log.error("Failed to send message", e); - failed = true; - } - } - - } - - class Subscriber implements Runnable - { - private Session sess; - - private MessageConsumer cons; - - private int msgsReceived; - - private final int numMessages; - - private final int delay; - - private final Connection conn; - - private boolean failed; - - private final long timeout; - - private final Destination dest; - - private final boolean durable; - - private String subname; - - boolean isFailed() - { - return failed; - } - - boolean isDurable() - { - return durable; - } - - synchronized void msgReceived() - { - msgsReceived++; - } - - synchronized int getMessagesReceived() - { - return msgsReceived; - } - - class Listener implements MessageListener - { - - public void onMessage(final Message msg) - { - msgReceived(); - } - - } - - Subscriber(final Connection conn, - final int numMessages, - final int delay, - final long timeout, - final Destination dest, - final boolean durable) throws Exception - { - this.conn = conn; - - this.numMessages = numMessages; - - this.delay = delay; - - this.timeout = timeout; - - this.dest = dest; - - this.durable = durable; - - if (durable) - { - conn.setClientID(UUIDGenerator.getInstance().generateStringUUID()); - - subname = UUIDGenerator.getInstance().generateStringUUID(); - } - } - - public void run() - { - try - { - long start = System.currentTimeMillis(); - - while (System.currentTimeMillis() - start < timeout && msgsReceived < numMessages) - { - // recycle the session - - recycleSession(); - - Thread.sleep(delay); - } - - // Delete the durable sub - - if (durable) - { - recycleSession(); - - cons.close(); - - sess.unsubscribe(subname); - } - } - catch (Exception e) - { - log.error("Failed in subscriber", e); - failed = true; - } - - } - - void recycleSession() throws Exception - { - conn.stop(); - - if (sess != null) - { - sess.close(); - } - - sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - - if (durable) - { - cons = sess.createDurableSubscriber((Topic)dest, subname); - } - else - { - cons = sess.createConsumer(dest); - } - - cons.setMessageListener(new Listener()); - - conn.start(); - } - - } - -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3661829e/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/QueueStressTest.java ---------------------------------------------------------------------- diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/QueueStressTest.java b/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/QueueStressTest.java deleted file mode 100644 index 7878859..0000000 --- a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/QueueStressTest.java +++ /dev/null @@ -1,271 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.jms.tests.stress; - -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.XAConnection; -import javax.jms.XASession; - -import org.junit.BeforeClass; -import org.junit.Test; - -/** - * A QueueStressTest. - * - * @author Tim Fox - * @version $Revision: 2349 $ - */ - -public class QueueStressTest extends JMSStressTestBase -{ - @BeforeClass - public static void stressTestsEnabled() - { - org.junit.Assume.assumeTrue(JMSStressTestBase.STRESS_TESTS_ENABLED); - } - - /* - * Stress a queue with transational, non transactional and 2pc senders sending both persistent - * and non persistent messages - * Transactional senders go through a cycle of sending and rolling back - * - */ - @Test - public void testQueueMultipleSenders() throws Exception - { - Connection conn1 = cf.createConnection(); - - Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE); - Session sess2 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE); - Session sess3 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE); - Session sess4 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE); - Session sess5 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE); - Session sess6 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE); - Session sess7 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE); - Session sess8 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE); - - Session sess9 = conn1.createSession(true, Session.SESSION_TRANSACTED); - Session sess10 = conn1.createSession(true, Session.SESSION_TRANSACTED); - Session sess11 = conn1.createSession(true, Session.SESSION_TRANSACTED); - Session sess12 = conn1.createSession(true, Session.SESSION_TRANSACTED); - Session sess13 = conn1.createSession(true, Session.SESSION_TRANSACTED); - Session sess14 = conn1.createSession(true, Session.SESSION_TRANSACTED); - Session sess15 = conn1.createSession(true, Session.SESSION_TRANSACTED); - Session sess16 = conn1.createSession(true, Session.SESSION_TRANSACTED); - - XASession xaSess1 = ((XAConnection) conn1).createXASession(); - tweakXASession(xaSess1); - XASession xaSess2 = ((XAConnection) conn1).createXASession(); - tweakXASession(xaSess2); - XASession xaSess3 = ((XAConnection) conn1).createXASession(); - tweakXASession(xaSess3); - XASession xaSess4 = ((XAConnection) conn1).createXASession(); - tweakXASession(xaSess4); - XASession xaSess5 = ((XAConnection) conn1).createXASession(); - tweakXASession(xaSess5); - XASession xaSess6 = ((XAConnection) conn1).createXASession(); - tweakXASession(xaSess6); - XASession xaSess7 = ((XAConnection) conn1).createXASession(); - tweakXASession(xaSess7); - XASession xaSess8 = ((XAConnection) conn1).createXASession(); - tweakXASession(xaSess8); - - Session sess17 = xaSess1.getSession(); - Session sess18 = xaSess2.getSession(); - Session sess19 = xaSess3.getSession(); - Session sess20 = xaSess4.getSession(); - Session sess21 = xaSess5.getSession(); - Session sess22 = xaSess6.getSession(); - Session sess23 = xaSess7.getSession(); - Session sess24 = xaSess8.getSession(); - - MessageProducer prod1 = sess1.createProducer(destinationQueue1); - prod1.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - MessageProducer prod2 = sess2.createProducer(destinationQueue1); - prod2.setDeliveryMode(DeliveryMode.PERSISTENT); - MessageProducer prod3 = sess3.createProducer(destinationQueue1); - prod3.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - MessageProducer prod4 = sess4.createProducer(destinationQueue1); - prod4.setDeliveryMode(DeliveryMode.PERSISTENT); - MessageProducer prod5 = sess5.createProducer(destinationQueue1); - prod5.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - MessageProducer prod6 = sess6.createProducer(destinationQueue1); - prod6.setDeliveryMode(DeliveryMode.PERSISTENT); - MessageProducer prod7 = sess7.createProducer(destinationQueue1); - prod7.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - MessageProducer prod8 = sess8.createProducer(destinationQueue1); - prod8.setDeliveryMode(DeliveryMode.PERSISTENT); - MessageProducer prod9 = sess9.createProducer(destinationQueue1); - prod9.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - MessageProducer prod10 = sess10.createProducer(destinationQueue1); - prod10.setDeliveryMode(DeliveryMode.PERSISTENT); - MessageProducer prod11 = sess11.createProducer(destinationQueue1); - prod11.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - MessageProducer prod12 = sess12.createProducer(destinationQueue1); - prod12.setDeliveryMode(DeliveryMode.PERSISTENT); - MessageProducer prod13 = sess13.createProducer(destinationQueue1); - prod13.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - MessageProducer prod14 = sess14.createProducer(destinationQueue1); - prod14.setDeliveryMode(DeliveryMode.PERSISTENT); - MessageProducer prod15 = sess15.createProducer(destinationQueue1); - prod15.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - MessageProducer prod16 = sess16.createProducer(destinationQueue1); - prod16.setDeliveryMode(DeliveryMode.PERSISTENT); - MessageProducer prod17 = sess17.createProducer(destinationQueue1); - prod17.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - MessageProducer prod18 = sess18.createProducer(destinationQueue1); - prod18.setDeliveryMode(DeliveryMode.PERSISTENT); - MessageProducer prod19 = sess19.createProducer(destinationQueue1); - prod19.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - MessageProducer prod20 = sess20.createProducer(destinationQueue1); - prod20.setDeliveryMode(DeliveryMode.PERSISTENT); - MessageProducer prod21 = sess21.createProducer(destinationQueue1); - prod21.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - MessageProducer prod22 = sess22.createProducer(destinationQueue1); - prod22.setDeliveryMode(DeliveryMode.PERSISTENT); - MessageProducer prod23 = sess23.createProducer(destinationQueue1); - prod23.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - MessageProducer prod24 = sess24.createProducer(destinationQueue1); - prod24.setDeliveryMode(DeliveryMode.PERSISTENT); - - Connection conn2 = cf.createConnection(); - conn2.start(); - Session sessReceive = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer cons = sessReceive.createConsumer(destinationQueue1); - - Runner[] runners = new Runner[]{ - new Sender("prod1", sess1, prod1, JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES), - new Sender("prod2", sess2, prod2, JMSStressTestBase.NUM_PERSISTENT_MESSAGES), - new Sender("prod3", sess3, prod3, JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES), - new Sender("prod4", sess4, prod4, JMSStressTestBase.NUM_PERSISTENT_MESSAGES), - new Sender("prod5", sess5, prod5, JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES), - new Sender("prod6", sess6, prod6, JMSStressTestBase.NUM_PERSISTENT_MESSAGES), - new Sender("prod7", sess7, prod7, JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES), - new Sender("prod8", sess8, prod8, JMSStressTestBase.NUM_PERSISTENT_MESSAGES), - new TransactionalSender("prod9", - sess9, - prod9, - JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES, - 1, - 1), - new TransactionalSender("prod10", - sess10, - prod10, - JMSStressTestBase.NUM_PERSISTENT_MESSAGES, - 1, - 1), - new TransactionalSender("prod11", - sess11, - prod11, - JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES, - 10, - 7), - new TransactionalSender("prod12", - sess12, - prod12, - JMSStressTestBase.NUM_PERSISTENT_MESSAGES, - 10, - 7), - new TransactionalSender("prod13", - sess13, - prod13, - JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES, - 50, - 21), - new TransactionalSender("prod14", - sess14, - prod14, - JMSStressTestBase.NUM_PERSISTENT_MESSAGES, - 50, - 21), - new TransactionalSender("prod15", - sess15, - prod15, - JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES, - 100, - 67), - new TransactionalSender("prod16", - sess16, - prod16, - JMSStressTestBase.NUM_PERSISTENT_MESSAGES, - 100, - 67), - new Transactional2PCSender("prod17", - xaSess1, - prod17, - JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES, - 1, - 1), - new Transactional2PCSender("prod18", - xaSess2, - prod18, - JMSStressTestBase.NUM_PERSISTENT_MESSAGES, - 1, - 1), - new Transactional2PCSender("prod19", - xaSess3, - prod19, - JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES, - 10, - 7), - new Transactional2PCSender("prod20", - xaSess4, - prod20, - JMSStressTestBase.NUM_PERSISTENT_MESSAGES, - 10, - 7), - new Transactional2PCSender("prod21", - xaSess5, - prod21, - JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES, - 50, - 21), - new Transactional2PCSender("prod22", - xaSess6, - prod22, - JMSStressTestBase.NUM_PERSISTENT_MESSAGES, - 50, - 21), - new Transactional2PCSender("prod23", - xaSess7, - prod23, - JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES, - 100, - 67), - new Transactional2PCSender("prod24", - xaSess8, - prod24, - JMSStressTestBase.NUM_PERSISTENT_MESSAGES, - 100, - 67), - new Receiver(sessReceive, - cons, - 12 * JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES + 12 * JMSStressTestBase.NUM_PERSISTENT_MESSAGES, - false)}; - - runRunners(runners); - - conn1.close(); - - conn2.close(); - } - -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3661829e/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/Receiver.java ---------------------------------------------------------------------- diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/Receiver.java b/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/Receiver.java deleted file mode 100644 index 1a4c6b4..0000000 --- a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/Receiver.java +++ /dev/null @@ -1,297 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.jms.tests.stress; - -import java.util.HashMap; -import java.util.Map; - -import javax.jms.Connection; -import javax.jms.ConnectionConsumer; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.ServerSession; -import javax.jms.ServerSessionPool; -import javax.jms.Session; - -import org.apache.activemq.jms.tests.JmsTestLogger; - -/** - * Receives messages from a destination for stress testing - * @author Tim Fox - */ -public class Receiver extends Runner implements MessageListener -{ - private static final JmsTestLogger log = JmsTestLogger.LOGGER; - - private static final long RECEIVE_TIMEOUT = 120000; - - protected MessageConsumer cons; - - protected int count; - - protected boolean isListener; - - protected Map counts = new HashMap(); - - protected boolean isCC; - - protected Connection conn; - - protected ConnectionConsumer cc; - - private final Object lock1 = new Object(); - - private final Object lock2 = new Object(); - - private Message theMessage; - - private boolean finished; - - public Receiver(final Connection conn, final Session sess, final int numMessages, final Destination dest) throws Exception - { - super(sess, numMessages); - - isListener = true; - - isCC = true; - - sess.setMessageListener(this); - - cc = conn.createConnectionConsumer(dest, null, new MockServerSessionPool(sess), 10); - - } - - public Receiver(final Session sess, final MessageConsumer cons, final int numMessages, final boolean isListener) throws Exception - { - super(sess, numMessages); - this.cons = cons; - this.isListener = isListener; - if (this.isListener) - { - cons.setMessageListener(this); - } - } - - private boolean done; - - public void onMessage(final Message m) - { - try - { - synchronized (lock1) - { - theMessage = m; - - lock1.notify(); - } - - // Wait for message to be processed - synchronized (lock2) - { - while (!done && !finished) - { - lock2.wait(); - } - done = false; - } - - } - catch (Exception e) - { - Receiver.log.error("Failed to put in channel", e); - setFailed(true); - } - } - - protected void finished() - { - synchronized (lock2) - { - finished = true; - lock2.notify(); - } - } - - protected Message getMessage() throws Exception - { - Message m; - - if (isListener) - { - synchronized (lock1) - { - long start = System.currentTimeMillis(); - long waitTime = Receiver.RECEIVE_TIMEOUT; - while (theMessage == null && waitTime >= 0) - { - lock1.wait(waitTime); - - waitTime = Receiver.RECEIVE_TIMEOUT - (System.currentTimeMillis() - start); - } - m = theMessage; - theMessage = null; - } - } - else - { - m = cons.receive(Receiver.RECEIVE_TIMEOUT); - } - - return m; - } - - protected void processingDone() - { - if (isListener) - { - synchronized (lock2) - { - done = true; - lock2.notify(); - } - } - } - - @Override - public void run() - { - - // Small pause so as not to miss any messages in a topic - try - { - Thread.sleep(1000); - } - catch (InterruptedException e) - { - } - - try - { - String prodName = null; - Integer msgCount = null; - - while (count < numMessages) - { - Message m = getMessage(); - - if (m == null) - { - Receiver.log.error("Message is null"); - setFailed(true); - processingDone(); - return; - } - - prodName = m.getStringProperty("PROD_NAME"); - msgCount = new Integer(m.getIntProperty("MSG_NUMBER")); - - // log.info(this + " Got message " + prodName + ":" + msgCount + "M: " + m.getJMSMessageID()); - - Integer prevCount = (Integer)counts.get(prodName); - if (prevCount == null) - { - if (msgCount.intValue() != 0) - { - Receiver.log.error("First message received not zero"); - setFailed(true); - processingDone(); - return; - } - } - else - { - if (prevCount.intValue() != msgCount.intValue() - 1) - { - Receiver.log.error("Message out of sequence for " + prodName + - ", expected:" + - (prevCount.intValue() + 1) + - " got " + - msgCount); - setFailed(true); - processingDone(); - return; - } - } - counts.put(prodName, msgCount); - - count++; - - processingDone(); - } - - } - catch (Exception e) - { - Receiver.log.error("Failed to receive message", e); - setFailed(true); - } - finally - { - if (cc != null) - { - try - { - cc.close(); - } - catch (JMSException e) - { - Receiver.log.error("Failed to close connection consumer", e); - } - } - } - } - - static final class MockServerSessionPool implements ServerSessionPool - { - private final ServerSession serverSession; - - MockServerSessionPool(final Session sess) - { - serverSession = new MockServerSession(sess); - } - - public ServerSession getServerSession() throws JMSException - { - return serverSession; - } - } - - static final class MockServerSession implements ServerSession - { - Session session; - - MockServerSession(final Session sess) - { - session = sess; - } - - public Session getSession() throws JMSException - { - return session; - } - - public void start() throws JMSException - { - session.run(); - } - - } - -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3661829e/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/RecoveringReceiver.java ---------------------------------------------------------------------- diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/RecoveringReceiver.java b/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/RecoveringReceiver.java deleted file mode 100644 index eb2e4ea..0000000 --- a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/RecoveringReceiver.java +++ /dev/null @@ -1,195 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.jms.tests.stress; - -import org.apache.activemq.jms.tests.JmsTestLogger; - -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.Session; - -/** - * - * A RecoveringReceiver. - * - * A Receiver that receives messages from a destination and alternately - * acknowledges and recovers the session. - * Must be used with ack mode CLIENT_ACKNOWLEDGE - * - * - * @author Tim Fox - * - */ -public class RecoveringReceiver extends Receiver -{ - private static final JmsTestLogger log = JmsTestLogger.LOGGER; - - protected int ackSize; - - protected int recoverSize; - - class Count - { - int lastAcked; - - int lastReceived; - } - - public RecoveringReceiver(final Session sess, - final MessageConsumer cons, - final int numMessages, - final int ackSize, - final int recoverSize, - final boolean isListener) throws Exception - { - super(sess, cons, numMessages, isListener); - this.ackSize = ackSize; - this.recoverSize = recoverSize; - } - - @Override - public void run() - { - // Small pause so as not to miss any messages in a topic - try - { - Thread.sleep(1000); - } - catch (InterruptedException e) - { - } - - try - { - int iterations = numMessages / ackSize; - - for (int outerCount = 0; outerCount < iterations; outerCount++) - { - Message m = null; - for (int innerCount = 0; innerCount < ackSize; innerCount++) - { - m = getMessage(); - - if (m == null) - { - RecoveringReceiver.log.error("Message is null"); - setFailed(true); - return; - } - String prodName = m.getStringProperty("PROD_NAME"); - Integer msgCount = new Integer(m.getIntProperty("MSG_NUMBER")); - - Count count = (Count)counts.get(prodName); - if (count == null) - { - // First time - if (msgCount.intValue() != 0) - { - RecoveringReceiver.log.error("First message from " + prodName + " is not 0, it is " + msgCount); - setFailed(true); - return; - } - else - { - count = new Count(); - counts.put(prodName, count); - } - } - else - { - if (count.lastAcked != msgCount.intValue() - 1) - { - RecoveringReceiver.log.error("Message out of sequence for " + prodName + - ", expected " + - (count.lastAcked + 1)); - setFailed(true); - return; - } - } - count.lastAcked = msgCount.intValue(); - - count.lastReceived = msgCount.intValue(); - - if (innerCount == ackSize - 1) - { - m.acknowledge(); - } - processingDone(); - - } - - if (outerCount == iterations - 1) - { - break; - } - - for (int innerCount = 0; innerCount < recoverSize; innerCount++) - { - m = getMessage(); - - if (m == null) - { - RecoveringReceiver.log.error("Message is null"); - return; - } - String prodName = m.getStringProperty("PROD_NAME"); - Integer msgCount = new Integer(m.getIntProperty("MSG_NUMBER")); - - Count count = (Count)counts.get(prodName); - if (count == null) - { - // First time - if (msgCount.intValue() != 0) - { - RecoveringReceiver.log.error("First message from " + prodName + " is not 0, it is " + msgCount); - setFailed(true); - return; - } - else - { - count = new Count(); - count.lastAcked = -1; - counts.put(prodName, count); - } - } - else - { - if (count.lastReceived != msgCount.intValue() - 1) - { - RecoveringReceiver.log.error("Message out of sequence"); - setFailed(true); - return; - } - } - count.lastReceived = msgCount.intValue(); - - if (innerCount == recoverSize - 1) - { - sess.recover(); - } - processingDone(); - } - } - } - catch (Exception e) - { - RecoveringReceiver.log.error("Failed to receive message", e); - setFailed(true); - } - } - -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3661829e/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/RelayStressTest.java ---------------------------------------------------------------------- diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/RelayStressTest.java b/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/RelayStressTest.java deleted file mode 100644 index 04cd830..0000000 --- a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/RelayStressTest.java +++ /dev/null @@ -1,256 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.jms.tests.stress; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.DeliveryMode; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.Topic; -import javax.naming.InitialContext; - -import org.apache.activemq.jms.tests.ActiveMQServerTestCase; -import org.apache.activemq.jms.tests.JmsTestLogger; -import org.apache.activemq.jms.tests.util.ProxyAssertSupport; -import org.junit.After; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; - -/** - * Send messages to a topic with selector1, consumer them with multiple consumers and relay them - * back to the topic with a different selector, then consume that with more consumers. - * - * @author Tim Fox - * - * - */ -public class RelayStressTest extends ActiveMQServerTestCase -{ - @BeforeClass - public static void stressTestsEnabled() - { - org.junit.Assume.assumeTrue(JMSStressTestBase.STRESS_TESTS_ENABLED); - } - - // Constants ----------------------------------------------------- - - private static JmsTestLogger log = JmsTestLogger.LOGGER; - - // Static -------------------------------------------------------- - - // Attributes ---------------------------------------------------- - - private InitialContext ic; - - // Constructors -------------------------------------------------- - - // Public -------------------------------------------------------- - - @Override - @Before - public void setUp() throws Exception - { - super.setUp(); - - // ServerManagement.start("all"); - ic = getInitialContext(); - createTopic("StressTestTopic"); - - RelayStressTest.log.debug("setup done"); - } - - @Override - @After - public void tearDown() throws Exception - { - destroyTopic("StressTestTopic"); - ic.close(); - } - - @Test - public void testRelay() throws Exception - { - ConnectionFactory cf = (ConnectionFactory)ic.lookup("/ConnectionFactory"); - - Topic topic = (Topic)ic.lookup("/topic/StressTestTopic"); - - final int numMessages = 20000; - - final int numRelayers = 5; - - final int numConsumers = 20; - - Connection conn = cf.createConnection(); - - class Relayer implements MessageListener - { - boolean done; - - boolean failed; - - int count; - - MessageProducer prod; - - Relayer(final MessageProducer prod) - { - this.prod = prod; - } - - public void onMessage(final Message m) - { - try - { - m.clearProperties(); - m.setStringProperty("name", "Tim"); - - prod.send(m); - - count++; - - if (count == numMessages) - { - synchronized (this) - { - done = true; - notify(); - } - } - } - catch (JMSException e) - { - e.printStackTrace(); - synchronized (this) - { - done = true; - failed = true; - notify(); - } - } - } - } - - class Consumer implements MessageListener - { - boolean failed; - - boolean done; - - int count; - - public void onMessage(final Message m) - { - count++; - - if (count == numMessages * numRelayers) - { - synchronized (this) - { - done = true; - notify(); - } - } - } - } - - Relayer[] relayers = new Relayer[numRelayers]; - - Consumer[] consumers = new Consumer[numConsumers]; - - for (int i = 0; i < numRelayers; i++) - { - Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - - MessageConsumer cons = sess.createConsumer(topic, "name = 'Watt'"); - // MessageConsumer cons = sess.createConsumer(topic); - - MessageProducer prod = sess.createProducer(topic); - - relayers[i] = new Relayer(prod); - - cons.setMessageListener(relayers[i]); - } - - for (int i = 0; i < numConsumers; i++) - { - Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - - MessageConsumer cons = sess.createConsumer(topic, "name = 'Tim'"); - - consumers[i] = new Consumer(); - - cons.setMessageListener(consumers[i]); - } - - conn.start(); - - Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - - MessageProducer prod = sess.createProducer(topic); - - prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - - for (int i = 0; i < numMessages; i++) - { - Message m = sess.createMessage(); - - m.setStringProperty("name", "Watt"); - - prod.send(m); - } - - for (int i = 0; i < numRelayers; i++) - { - synchronized (relayers[i]) - { - if (!relayers[i].done) - { - relayers[i].wait(); - } - } - } - - for (int i = 0; i < numConsumers; i++) - { - synchronized (consumers[i]) - { - if (!consumers[i].done) - { - consumers[i].wait(); - } - } - } - - conn.close(); - - for (int i = 0; i < numRelayers; i++) - { - ProxyAssertSupport.assertFalse(relayers[i].failed); - } - - for (int i = 0; i < numConsumers; i++) - { - ProxyAssertSupport.assertFalse(consumers[i].failed); - } - - } -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3661829e/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/Runner.java ---------------------------------------------------------------------- diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/Runner.java b/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/Runner.java deleted file mode 100644 index d5aab5e..0000000 --- a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/Runner.java +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.jms.tests.stress; - -import org.apache.activemq.jms.tests.JmsTestLogger; - -import javax.jms.Session; - -/** - * - * A Runner. - * - * Base class for running components of a stress test - * - * @author Tim Fox - * - */ -public abstract class Runner implements Runnable -{ - protected JmsTestLogger log = JmsTestLogger.LOGGER; - - protected Session sess; - - protected int numMessages; - - private boolean failed; - - public Runner(final Session sess, final int numMessages) - { - this.sess = sess; - this.numMessages = numMessages; - } - - public abstract void run(); - - public boolean isFailed() - { - return failed; - } - - public void setFailed(final boolean failed) - { - this.failed = failed; - if (failed) - { - log.info("Marking Runner " + this + " as failed", new Exception("trace")); - } - } - -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3661829e/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/Sender.java ---------------------------------------------------------------------- diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/Sender.java b/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/Sender.java deleted file mode 100644 index f57a375..0000000 --- a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/Sender.java +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.jms.tests.stress; - -import org.apache.activemq.jms.tests.JmsTestLogger; - -import javax.jms.Message; -import javax.jms.MessageProducer; -import javax.jms.Session; - -/** - * - * A Sender. - * - * Sends messages to a destination, used in stress testing - * - * @author Tim Fox - * - */ -public class Sender extends Runner -{ - private static final JmsTestLogger log = JmsTestLogger.LOGGER; - - protected MessageProducer prod; - - protected String prodName; - - protected int count; - - public Sender(final String prodName, final Session sess, final MessageProducer prod, final int numMessages) - { - super(sess, numMessages); - this.prod = prod; - this.prodName = prodName; - } - - @Override - public void run() - { - try - { - while (count < numMessages) - { - Message m = sess.createMessage(); - m.setStringProperty("PROD_NAME", prodName); - m.setIntProperty("MSG_NUMBER", count); - prod.send(m); - count++; - } - } - catch (Exception e) - { - Sender.log.error("Failed to send message", e); - setFailed(true); - } - } - -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3661829e/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/SeveralClientsStressTest.java ---------------------------------------------------------------------- diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/SeveralClientsStressTest.java b/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/SeveralClientsStressTest.java deleted file mode 100644 index 72b33b3..0000000 --- a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/stress/SeveralClientsStressTest.java +++ /dev/null @@ -1,547 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.jms.tests.stress; -import java.util.HashSet; -import java.util.Random; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.DeliveryMode; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; -import javax.naming.Context; - -import org.apache.activemq.jms.tests.ActiveMQServerTestCase; -import org.apache.activemq.jms.tests.JmsTestLogger; -import org.apache.activemq.jms.tests.util.ProxyAssertSupport; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; - -/** - * In order for this test to run, you will need to edit /etc/security/limits.conf and change your max sockets to something bigger than 1024 - * - * It's required to re-login after this change. - * - * For Windows you need also to increase this limit (max opened files) somehow. - * - * -Example of /etc/security/limits.confg: -# -clebert hard nofile 10240 - - - * @author Clebert Suconic - */ -public class SeveralClientsStressTest extends ActiveMQServerTestCase -{ - @BeforeClass - public static void stressTestsEnabled() - { - org.junit.Assume.assumeTrue(JMSStressTestBase.STRESS_TESTS_ENABLED); - } - - protected boolean info = false; - - protected boolean startServer = true; - - // Static --------------------------------------------------------------------------------------- - - protected static long PRODUCER_ALIVE_FOR = 60000; // one minute - - protected static long CONSUMER_ALIVE_FOR = 60000; // one minutes - - protected static long TEST_ALIVE_FOR = 5 * 60 * 1000; // 5 minutes - - protected static int NUMBER_OF_PRODUCERS = 100; // this should be set to 300 later - - protected static int NUMBER_OF_CONSUMERS = 100; // this should be set to 300 later - - // a producer should have a long wait between each message sent? - protected static boolean LONG_WAIT_ON_PRODUCERS = false; - - protected static AtomicInteger producedMessages = new AtomicInteger(0); - - protected static AtomicInteger readMessages = new AtomicInteger(0); - - protected Context createContext() throws Exception - { - return getInitialContext(); - } - - // Constructors --------------------------------------------------------------------------------- - - // Public --------------------------------------------------------------------------------------- - - @Test - public void testQueue() throws Exception - { - Context ctx = createContext(); - - HashSet threads = new HashSet(); - - // A chhanel of communication between workers and the test method - LinkedBlockingQueue testChannel = new LinkedBlockingQueue(); - - for (int i = 0; i < SeveralClientsStressTest.NUMBER_OF_PRODUCERS; i++) - { - threads.add(new SeveralClientsStressTest.Producer(i, testChannel)); - } - - for (int i = 0; i < SeveralClientsStressTest.NUMBER_OF_CONSUMERS; i++) - { - threads.add(new SeveralClientsStressTest.Consumer(i, testChannel)); - } - - for (Worker worker : threads) - { - worker.start(); - } - - long timeToFinish = System.currentTimeMillis() + SeveralClientsStressTest.TEST_ALIVE_FOR; - - int numberOfProducers = SeveralClientsStressTest.NUMBER_OF_PRODUCERS; - int numberOfConsumers = SeveralClientsStressTest.NUMBER_OF_CONSUMERS; - - while (threads.size() > 0) - { - SeveralClientsStressTest.InternalMessage msg = testChannel.poll(2000, - TimeUnit.MILLISECONDS); - - log.info("Produced:" + SeveralClientsStressTest.producedMessages.get() + - " and Consumed:" + - SeveralClientsStressTest.readMessages.get() + - " messages"); - - if (msg != null) - { - if (info) - { - log.info("Received message " + msg); - } - if (msg instanceof SeveralClientsStressTest.WorkerFailed) - { - ProxyAssertSupport.fail("Worker " + msg.getWorker() + " has failed"); - } - else if (msg instanceof SeveralClientsStressTest.WorkedFinishedMessages) - { - SeveralClientsStressTest.WorkedFinishedMessages finished = (SeveralClientsStressTest.WorkedFinishedMessages)msg; - if (threads.remove(finished.getWorker())) - { - if (System.currentTimeMillis() < timeToFinish) - { - if (finished.getWorker() instanceof SeveralClientsStressTest.Producer) - { - if (info) - { - log.info("Scheduling new Producer " + numberOfProducers); - } - SeveralClientsStressTest.Producer producer = new SeveralClientsStressTest.Producer(numberOfProducers++, - testChannel); - threads.add(producer); - producer.start(); - } - else if (finished.getWorker() instanceof SeveralClientsStressTest.Consumer) - { - if (info) - { - log.info("Scheduling new ClientConsumer " + numberOfConsumers); - } - SeveralClientsStressTest.Consumer consumer = new SeveralClientsStressTest.Consumer(numberOfConsumers++, - testChannel); - threads.add(consumer); - consumer.start(); - } - } - } - else - { - log.warn(finished.getWorker() + " was not available on threads HashSet"); - } - } - } - } - - log.info("Produced:" + SeveralClientsStressTest.producedMessages.get() + - " and Consumed:" + - SeveralClientsStressTest.readMessages.get() + - " messages"); - - clearMessages(); - - log.info("Produced:" + SeveralClientsStressTest.producedMessages.get() + - " and Consumed:" + - SeveralClientsStressTest.readMessages.get() + - " messages"); - - ProxyAssertSupport.assertEquals(SeveralClientsStressTest.producedMessages.get(), - SeveralClientsStressTest.readMessages.get()); - } - - // Package protected ---------------------------------------------------------------------------- - - // Protected ------------------------------------------------------------------------------------ - - protected void clearMessages() throws Exception - { - Context ctx = createContext(); - ConnectionFactory cf = (ConnectionFactory)ctx.lookup("/ClusteredConnectionFactory"); - Connection conn = cf.createConnection(); - Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue queue = (Queue)ctx.lookup("queue/testQueue"); - MessageConsumer consumer = sess.createConsumer(queue); - - conn.start(); - - while (consumer.receive(1000) != null) - { - SeveralClientsStressTest.readMessages.incrementAndGet(); - log.info("Received JMS message on clearMessages"); - } - - conn.close(); - } - - @Override - @Before - public void setUp() throws Exception - { - super.setUp(); - - if (startServer) - { - // ServerManagement.start("all", true); - createQueue("testQueue"); - } - - clearMessages(); - SeveralClientsStressTest.producedMessages = new AtomicInteger(0); - SeveralClientsStressTest.readMessages = new AtomicInteger(0); - } - - // Private -------------------------------------------------------------------------------------- - - // Inner classes -------------------------------------------------------------------------------- - - private class Worker extends Thread - { - - protected JmsTestLogger log = JmsTestLogger.LOGGER; - - private boolean failed = false; - - private final int workerId; - - private Exception ex; - - LinkedBlockingQueue messageQueue; - - public int getWorkerId() - { - return workerId; - } - - public Exception getException() - { - return ex; - } - - public boolean isFailed() - { - return failed; - } - - protected synchronized void setFailed(final boolean failed, final Exception ex) - { - this.failed = failed; - this.ex = ex; - - log.info("Sending Exception", ex); - - sendInternalMessage(new SeveralClientsStressTest.WorkerFailed(this)); - - } - - protected void sendInternalMessage(final SeveralClientsStressTest.InternalMessage msg) - { - if (info) - { - log.info("Sending message " + msg); - } - try - { - messageQueue.put(msg); - } - catch (Exception e) - { - log.error(e, e); - setFailed(true, e); - } - } - - public Worker(final String name, final int workerId, - final LinkedBlockingQueue messageQueue) - { - super(name); - this.workerId = workerId; - this.messageQueue = messageQueue; - setDaemon(true); - } - - @Override - public String toString() - { - return this.getClass().getName() + ":" + getWorkerId(); - } - } - - final class Producer extends SeveralClientsStressTest.Worker - { - public Producer(final int producerId, - final LinkedBlockingQueue messageQueue) - { - super("Producer:" + producerId, producerId, messageQueue); - } - - Random random = new Random(); - - @Override - public void run() - { - try - { - Context ctx = createContext(); - - ConnectionFactory cf = (ConnectionFactory)ctx.lookup("/ClusteredConnectionFactory"); - - Queue queue = (Queue)ctx.lookup("queue/testQueue"); - - if (info) - { - log.info("Creating connection and producer"); - } - Connection conn = cf.createConnection(); - Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer prod = sess.createProducer(queue); - - if (getWorkerId() % 2 == 0) - { - if (info) - { - log.info("Non Persistent Producer was created"); - } - prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - } - else - { - if (info) - { - log.info("Persistent Producer was created"); - } - prod.setDeliveryMode(DeliveryMode.PERSISTENT); - } - - long timeToFinish = System.currentTimeMillis() + SeveralClientsStressTest.PRODUCER_ALIVE_FOR; - - try - { - int messageSent = 0; - while (System.currentTimeMillis() < timeToFinish) - { - prod.send(sess.createTextMessage("Message sent at " + System.currentTimeMillis())); - SeveralClientsStressTest.producedMessages.incrementAndGet(); - messageSent++; - if (messageSent % 50 == 0) - { - if (info) - { - log.info("Sent " + messageSent + " Messages"); - } - } - - if (SeveralClientsStressTest.LONG_WAIT_ON_PRODUCERS) - { - int waitTime = random.nextInt() % 2 + 1; - if (waitTime < 0) - { - waitTime *= -1; - } - Thread.sleep(waitTime * 1000); // wait 1 or 2 seconds - } - else - { - Thread.sleep(100); - } - } - sendInternalMessage(new SeveralClientsStressTest.WorkedFinishedMessages(this)); - } - finally - { - conn.close(); - } - - } - catch (Exception e) - { - log.error(e, e); - setFailed(true, e); - } - } - } - - final class Consumer extends SeveralClientsStressTest.Worker - { - public Consumer(final int consumerId, - final LinkedBlockingQueue messageQueue) - { - super("ClientConsumer:" + consumerId, consumerId, messageQueue); - } - - @Override - public void run() - { - try - { - Context ctx = createContext(); - - ConnectionFactory cf = (ConnectionFactory)ctx.lookup("/ClusteredConnectionFactory"); - - Queue queue = (Queue)ctx.lookup("queue/testQueue"); - - if (info) - { - log.info("Creating connection and consumer"); - } - Connection conn = cf.createConnection(); - Session sess = conn.createSession(true, Session.SESSION_TRANSACTED); - MessageConsumer consumer = sess.createConsumer(queue); - if (info) - { - log.info("ClientConsumer was created"); - } - - conn.start(); - - int msgs = 0; - - int transactions = 0; - - long timeToFinish = System.currentTimeMillis() + SeveralClientsStressTest.CONSUMER_ALIVE_FOR; - - try - { - while (System.currentTimeMillis() < timeToFinish) - { - Message msg = consumer.receive(1000); - if (msg != null) - { - msgs++; - if (msgs >= 50) - { - transactions++; - if (transactions % 2 == 0) - { - if (info) - { - log.info("Commit transaction"); - } - sess.commit(); - SeveralClientsStressTest.readMessages.addAndGet(msgs); - } - else - { - if (info) - { - log.info("Rollback transaction"); - } - sess.rollback(); - } - msgs = 0; - } - } - else - { - break; - } - } - - SeveralClientsStressTest.readMessages.addAndGet(msgs); - sess.commit(); - - sendInternalMessage(new SeveralClientsStressTest.WorkedFinishedMessages(this)); - } - finally - { - conn.close(); - } - - } - catch (Exception e) - { - log.error(e); - setFailed(true, e); - } - } - } - - // Objects used on the communication between Workers and the test - static class InternalMessage - { - SeveralClientsStressTest.Worker worker; - - public InternalMessage(final SeveralClientsStressTest.Worker worker) - { - this.worker = worker; - } - - public SeveralClientsStressTest.Worker getWorker() - { - return worker; - } - - @Override - public String toString() - { - return this.getClass().getName() + " worker-> " + worker.toString(); - } - } - - static class WorkedFinishedMessages extends SeveralClientsStressTest.InternalMessage - { - - public WorkedFinishedMessages(final SeveralClientsStressTest.Worker worker) - { - super(worker); - } - - } - - static class WorkerFailed extends SeveralClientsStressTest.InternalMessage - { - public WorkerFailed(final SeveralClientsStressTest.Worker worker) - { - super(worker); - } - } - -}