Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 43808 invoked from network); 4 Jan 2008 16:07:49 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 4 Jan 2008 16:07:49 -0000 Received: (qmail 75348 invoked by uid 500); 4 Jan 2008 16:07:38 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 75322 invoked by uid 500); 4 Jan 2008 16:07:37 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 75313 invoked by uid 99); 4 Jan 2008 16:07:37 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Jan 2008 08:07:37 -0800 X-ASF-Spam-Status: No, hits=-98.0 required=10.0 tests=ALL_TRUSTED,URIBL_BLACK X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Jan 2008 16:07:15 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 4CB2B1A9832; Fri, 4 Jan 2008 08:07:20 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r608902 - in /activemq/trunk/activemq-core: ./ src/test/java/org/apache/activemq/load/ Date: Fri, 04 Jan 2008 16:07:19 -0000 To: commits@activemq.apache.org From: rajdavies@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080104160720.4CB2B1A9832@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: rajdavies Date: Fri Jan 4 08:07:18 2008 New Revision: 608902 URL: http://svn.apache.org/viewvc?rev=608902&view=rev Log: Added basic load test for queues Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadClient.java (with props) activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadController.java (with props) activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadTest.java (with props) Modified: activemq/trunk/activemq-core/pom.xml Modified: activemq/trunk/activemq-core/pom.xml URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/pom.xml?rev=608902&r1=608901&r2=608902&view=diff ============================================================================== --- activemq/trunk/activemq-core/pom.xml (original) +++ activemq/trunk/activemq-core/pom.xml Fri Jan 4 08:07:18 2008 @@ -338,6 +338,9 @@ **/perf/* + + + **/load/* **/SimpleNetworkTest.* Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadClient.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadClient.java?rev=608902&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadClient.java (added) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadClient.java Fri Jan 4 08:07:18 2008 @@ -0,0 +1,208 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.load; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.perf.PerfRate; + +/** + * @version $Revision: 1.3 $ + */ +public class LoadClient implements Runnable{ + protected String name; + protected ConnectionFactory factory; + protected Connection connection; + protected Destination startDestination; + protected Destination nextDestination; + protected Session session; + protected MessageConsumer consumer; + protected MessageProducer producer; + protected PerfRate rate = new PerfRate(); + protected int deliveryMode = DeliveryMode.PERSISTENT; + private boolean connectionPerMessage = false; + private boolean running; + private int timeout = 10000; + + + public LoadClient(String name,ConnectionFactory factory) { + this.name=name; + this.factory = factory; + } + + + + public synchronized void start() throws JMSException { + if (!running) { + rate.reset(); + running = true; + if (!connectionPerMessage) { + connection = factory.createConnection(); + connection.start(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + consumer = session.createConsumer(this.startDestination); + producer = session.createProducer(this.nextDestination); + producer.setDeliveryMode(this.deliveryMode); + + } + + Thread t = new Thread(this); + t.setName(name); + t.start(); + } + } + + public void stop() throws JMSException, InterruptedException { + running = false; + connection.stop(); + } + + + public void run() { + try { + while (running) { + String result = consume(); + if (result == null && running) { + throw new Exception(name + "Failed to consume "); + } + send(result); + rate.increment(); + } + } catch (Throwable e) { + e.printStackTrace(); + } + } + + protected String consume() throws JMSException { + Connection con = null; + MessageConsumer c = consumer; + if (connectionPerMessage){ + con = factory.createConnection(); + con.start(); + Session s = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + c = s.createConsumer(startDestination); + } + TextMessage result = (TextMessage) c.receive(timeout); + if (connectionPerMessage) { + con.close(); + } + return result != null ? result.getText() : null; + } + + protected void send(String text) throws JMSException { + Connection con = connection; + MessageProducer p = producer; + Session s = session; + if (connectionPerMessage){ + con = factory.createConnection(); + con.start(); + s = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + p = s.createProducer(nextDestination); + p.setDeliveryMode(deliveryMode); + } + TextMessage message = s.createTextMessage(text); + p.send(message); + //System.out.println(name + " SENT " + text + " TO " + nextDestination); + if (connectionPerMessage) { + con.close(); + } + } + + + + public String getName() { + return name; + } + + + + public void setName(String name) { + this.name = name; + } + + + + public Destination getStartDestination() { + return startDestination; + } + + + + public void setStartDestination(Destination startDestination) { + this.startDestination = startDestination; + } + + + + public Destination getNextDestination() { + return nextDestination; + } + + + + public void setNextDestination(Destination nextDestination) { + this.nextDestination = nextDestination; + } + + + + public int getDeliveryMode() { + return deliveryMode; + } + + + + public void setDeliveryMode(int deliveryMode) { + this.deliveryMode = deliveryMode; + } + + + + public boolean isConnectionPerMessage() { + return connectionPerMessage; + } + + + + public void setConnectionPerMessage(boolean connectionPerMessage) { + this.connectionPerMessage = connectionPerMessage; + } + + + + public int getTimeout() { + return timeout; + } + + + + public void setTimeout(int timeout) { + this.timeout = timeout; + } + +} Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadClient.java ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadController.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadController.java?rev=608902&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadController.java (added) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadController.java Fri Jan 4 08:07:18 2008 @@ -0,0 +1,233 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.load; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.perf.PerfRate; + +/** + * @version $Revision: 1.3 $ + */ +public class LoadController implements Runnable{ + protected ConnectionFactory factory; + protected Connection connection; + protected Destination startDestination; + protected Destination controlDestination; + protected Session session; + protected MessageConsumer consumer; + protected MessageProducer producer; + protected PerfRate rate = new PerfRate(); + protected int numberOfBatches = 1; + protected int batchSize = 1000; + protected int deliveryMode = DeliveryMode.PERSISTENT; + private boolean connectionPerMessage = false; + private int timeout = 5000; + private boolean running = false; + private final CountDownLatch stopped = new CountDownLatch(1); + + + public LoadController(ConnectionFactory factory) { + this.factory = factory; + } + + + + public synchronized void start() throws JMSException { + if (!running) { + rate.reset(); + running = true; + if (!connectionPerMessage) { + connection = factory.createConnection(); + connection.start(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + consumer = session.createConsumer(this.controlDestination); + producer = session.createProducer(this.startDestination); + producer.setDeliveryMode(this.deliveryMode); + + } + + Thread t = new Thread(this); + t.setName("LoadController"); + t.start(); + } + } + + public void stop() throws JMSException, InterruptedException { + running = false; + stopped.await(); + //stopped.await(1,TimeUnit.SECONDS); + connection.stop(); + } + + + public void run() { + try { + + for (int i = 0; i < numberOfBatches; i++) { + for (int j = 0; j < batchSize; j++) { + String payLoad = "batch[" + i + "]no:" + j; + send(payLoad); + String result = consume(); + if (result == null || !result.equals(payLoad)) { + throw new Exception("Failed to consume " + payLoad + + " GOT " + result); + } + System.out.println("Control got " + result); + rate.increment(); + } + } + + } catch (Throwable e) { + e.printStackTrace(); + } finally { + stopped.countDown(); + } + } + + protected String consume() throws JMSException { + Connection con = null; + MessageConsumer c = consumer; + if (connectionPerMessage){ + con = factory.createConnection(); + con.start(); + Session s = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + c = s.createConsumer(controlDestination); + } + TextMessage result = (TextMessage) c.receive(timeout); + if (connectionPerMessage) { + con.close(); + } + return result != null ? result.getText() : null; + } + + protected void send(String text) throws JMSException { + Connection con = null; + MessageProducer p = producer; + Session s = session; + if (connectionPerMessage){ + con = factory.createConnection(); + con.start(); + s = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + p = s.createProducer(startDestination); + p.setDeliveryMode(deliveryMode); + } + TextMessage message = s.createTextMessage(text); + p.send(message); + if (connectionPerMessage) { + con.close(); + } + } + + + + public Destination getStartDestination() { + return startDestination; + } + + + + public void setStartDestination(Destination startDestination) { + this.startDestination = startDestination; + } + + + + public Destination getControlDestination() { + return controlDestination; + } + + + + public void setControlDestination(Destination controlDestination) { + this.controlDestination = controlDestination; + } + + + + public int getNumberOfBatches() { + return numberOfBatches; + } + + + + public void setNumberOfBatches(int numberOfBatches) { + this.numberOfBatches = numberOfBatches; + } + + + + public int getBatchSize() { + return batchSize; + } + + + + public void setBatchSize(int batchSize) { + this.batchSize = batchSize; + } + + + + public int getDeliveryMode() { + return deliveryMode; + } + + + + public void setDeliveryMode(int deliveryMode) { + this.deliveryMode = deliveryMode; + } + + + + public boolean isConnectionPerMessage() { + return connectionPerMessage; + } + + + + public void setConnectionPerMessage(boolean connectionPerMessage) { + this.connectionPerMessage = connectionPerMessage; + } + + + + public int getTimeout() { + return timeout; + } + + + + public void setTimeout(int timeout) { + this.timeout = timeout; + } + +} Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadController.java ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadTest.java?rev=608902&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadTest.java (added) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadTest.java Fri Jan 4 08:07:18 2008 @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.load; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Session; +import junit.framework.TestCase; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * @version $Revision: 1.3 $ + */ +public class LoadTest extends TestCase { + + private static final Log LOG = LogFactory.getLog(LoadTest.class); + + protected BrokerService broker; + protected String bindAddress="tcp://localhost:61616"; + + protected LoadController controller; + protected LoadClient[] clients; + protected ConnectionFactory factory; + protected Destination destination; + protected int numberOfClients = 10; + protected int deliveryMode = DeliveryMode.PERSISTENT; + protected int batchSize = 1000; + protected int numberOfBatches = 4; + protected int timeout = Integer.MAX_VALUE; + protected boolean connectionPerMessage = true; + protected Connection managementConnection; + protected Session managementSession; + + /** + * Sets up a test where the producer and consumer have their own connection. + * + * @see junit.framework.TestCase#setUp() + */ + protected void setUp() throws Exception { + if (broker == null) { + broker = createBroker(bindAddress); + } + factory = createConnectionFactory(bindAddress); + managementConnection = factory.createConnection(); + managementSession = managementConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Destination startDestination = createDestination(managementSession, getClass()+".start"); + Destination endDestination = createDestination(managementSession, getClass()+".end"); + LOG.info("Running with " + numberOfClients + " clients"); + controller = new LoadController(factory); + controller.setBatchSize(batchSize); + controller.setNumberOfBatches(numberOfBatches); + controller.setDeliveryMode(deliveryMode); + controller.setConnectionPerMessage(connectionPerMessage); + controller.setStartDestination(startDestination); + controller.setControlDestination(endDestination); + controller.setTimeout(timeout); + clients = new LoadClient[numberOfClients]; + for (int i = 0; i < numberOfClients; i++) { + Destination inDestination = null; + if (i==0) { + inDestination = startDestination; + }else { + inDestination = createDestination(managementSession, getClass() + ".client."+(i)); + } + Destination outDestination = null; + if (i==(numberOfClients-1)) { + outDestination = endDestination; + }else { + outDestination = createDestination(managementSession, getClass() + ".client."+(i+1)); + } + LoadClient client = new LoadClient("client("+i+")",factory); + client.setTimeout(timeout); + client.setDeliveryMode(deliveryMode); + client.setConnectionPerMessage(connectionPerMessage); + client.setStartDestination(inDestination); + client.setNextDestination(outDestination); + clients[i] = client; + } + + super.setUp(); + } + + protected void tearDown() throws Exception { + super.tearDown(); + managementConnection.close(); + for (int i = 0; i < numberOfClients; i++) { + clients[i].stop(); + } + controller.stop(); + if (broker != null) { + broker.stop(); + broker = null; + } + } + + protected Destination createDestination(Session s, String destinationName) throws JMSException { + return s.createQueue(destinationName); + } + + /** + * Factory method to create a new broker + * + * @throws Exception + */ + protected BrokerService createBroker(String uri) throws Exception { + BrokerService answer = new BrokerService(); + configureBroker(answer,uri); + answer.start(); + return answer; + } + + + + protected void configureBroker(BrokerService answer,String uri) throws Exception { + answer.setDeleteAllMessagesOnStartup(true); + answer.addConnector(uri); + answer.setUseShutdownHook(false); + } + + protected ActiveMQConnectionFactory createConnectionFactory(String uri) throws Exception { + return new ActiveMQConnectionFactory(uri); + } + + public void testLoad() throws JMSException, InterruptedException { + for (int i = 0; i < numberOfClients; i++) { + clients[i].start(); + } + controller.start(); + controller.stop(); + + } + +} Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadTest.java ------------------------------------------------------------------------------ svn:eol-style = native