activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r608902 - in /activemq/trunk/activemq-core: ./ src/test/java/org/apache/activemq/load/
Date Fri, 04 Jan 2008 16:07:19 GMT
Author: rajdavies
Date: Fri Jan  4 08:07:18 2008
New Revision: 608902

URL: http://svn.apache.org/viewvc?rev=608902&view=rev
Log:
Added basic load test for queues

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadClient.java  
(with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadController.java
  (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadTest.java   (with
props)
Modified:
    activemq/trunk/activemq-core/pom.xml

Modified: activemq/trunk/activemq-core/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/pom.xml?rev=608902&r1=608901&r2=608902&view=diff
==============================================================================
--- activemq/trunk/activemq-core/pom.xml (original)
+++ activemq/trunk/activemq-core/pom.xml Fri Jan  4 08:07:18 2008
@@ -338,6 +338,9 @@
 
             <!-- These are performance tests so take too long to run -->
             <exclude>**/perf/*</exclude>
+            
+            <!-- These are load tests so take too long to run -->
+            <exclude>**/load/*</exclude>
 
             <!-- http://jira.activemq.org/jira/browse/AMQ-594 -->
             <exclude>**/SimpleNetworkTest.*</exclude>

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadClient.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadClient.java?rev=608902&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadClient.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadClient.java Fri
Jan  4 08:07:18 2008
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.load;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.perf.PerfRate;
+
+/**
+ * @version $Revision: 1.3 $
+ */
+public class LoadClient implements Runnable{
+    protected String name;
+    protected ConnectionFactory factory;
+    protected Connection connection;
+    protected Destination startDestination;
+    protected Destination nextDestination;
+    protected Session session;
+    protected MessageConsumer consumer;
+    protected MessageProducer producer;
+    protected PerfRate rate = new PerfRate();
+    protected int deliveryMode = DeliveryMode.PERSISTENT;
+    private boolean connectionPerMessage = false;
+    private boolean running;
+    private int timeout = 10000;
+    
+
+    public LoadClient(String name,ConnectionFactory factory) {
+       this.name=name;
+       this.factory = factory;
+    }
+
+   
+
+    public synchronized void start() throws JMSException {
+        if (!running) {
+            rate.reset();
+            running = true;
+            if (!connectionPerMessage) {
+                connection = factory.createConnection();
+                connection.start();
+                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                consumer = session.createConsumer(this.startDestination);
+                producer = session.createProducer(this.nextDestination);
+                producer.setDeliveryMode(this.deliveryMode);
+                
+            }
+            
+            Thread t = new  Thread(this);
+            t.setName(name);
+            t.start();
+        }
+    }
+
+    public void stop() throws JMSException, InterruptedException {
+        running = false;
+        connection.stop();
+    }
+
+    
+    public void run() {
+        try {
+            while (running) {
+                String result = consume();
+                if (result == null && running) {
+                    throw new Exception(name + "Failed to consume ");
+                }
+                send(result);
+                rate.increment();
+            }
+        } catch (Throwable e) {
+            e.printStackTrace();
+        } 
+    }
+    
+    protected String consume() throws JMSException {
+        Connection con  = null;
+        MessageConsumer c = consumer;
+        if (connectionPerMessage){
+            con = factory.createConnection();
+            con.start();
+            Session s = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            c = s.createConsumer(startDestination);
+        }
+        TextMessage result = (TextMessage) c.receive(timeout);
+        if (connectionPerMessage) {
+            con.close();
+        }
+        return result != null ? result.getText() : null;
+    }
+    
+    protected void send(String text) throws JMSException {
+        Connection con  = connection;
+        MessageProducer p = producer;
+        Session s = session;
+        if (connectionPerMessage){
+            con = factory.createConnection();
+            con.start();
+            s = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            p = s.createProducer(nextDestination);
+            p.setDeliveryMode(deliveryMode);
+        }
+        TextMessage message = s.createTextMessage(text);
+        p.send(message);
+        //System.out.println(name + " SENT " + text + " TO " + nextDestination);
+        if (connectionPerMessage) {
+            con.close();
+        }
+    }
+
+
+
+    public String getName() {
+        return name;
+    }
+
+
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+
+
+    public Destination getStartDestination() {
+        return startDestination;
+    }
+
+
+
+    public void setStartDestination(Destination startDestination) {
+        this.startDestination = startDestination;
+    }
+
+
+
+    public Destination getNextDestination() {
+        return nextDestination;
+    }
+
+
+
+    public void setNextDestination(Destination nextDestination) {
+        this.nextDestination = nextDestination;
+    }
+
+
+
+    public int getDeliveryMode() {
+        return deliveryMode;
+    }
+
+
+
+    public void setDeliveryMode(int deliveryMode) {
+        this.deliveryMode = deliveryMode;
+    }
+
+
+
+    public boolean isConnectionPerMessage() {
+        return connectionPerMessage;
+    }
+
+
+
+    public void setConnectionPerMessage(boolean connectionPerMessage) {
+        this.connectionPerMessage = connectionPerMessage;
+    }
+
+
+
+    public int getTimeout() {
+        return timeout;
+    }
+
+
+
+    public void setTimeout(int timeout) {
+        this.timeout = timeout;
+    }
+
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadClient.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadController.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadController.java?rev=608902&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadController.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadController.java
Fri Jan  4 08:07:18 2008
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.load;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.perf.PerfRate;
+
+/**
+ * @version $Revision: 1.3 $
+ */
+public class LoadController implements Runnable{
+    protected ConnectionFactory factory;
+    protected Connection connection;
+    protected Destination startDestination;
+    protected Destination controlDestination;
+    protected Session session;
+    protected MessageConsumer consumer;
+    protected MessageProducer producer;
+    protected PerfRate rate = new PerfRate();
+    protected int numberOfBatches = 1;
+    protected int batchSize = 1000;
+    protected int deliveryMode = DeliveryMode.PERSISTENT;
+    private boolean connectionPerMessage = false;
+    private int timeout = 5000;
+    private boolean running = false;
+    private final CountDownLatch stopped = new CountDownLatch(1);
+    
+
+    public LoadController(ConnectionFactory factory) {
+       this.factory = factory;
+    }
+
+   
+
+    public synchronized void start() throws JMSException {
+        if (!running) {
+            rate.reset();
+            running = true;
+            if (!connectionPerMessage) {
+                connection = factory.createConnection();
+                connection.start();
+                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                consumer = session.createConsumer(this.controlDestination);
+                producer = session.createProducer(this.startDestination);
+                producer.setDeliveryMode(this.deliveryMode);
+                
+            }
+            
+            Thread t = new  Thread(this);
+            t.setName("LoadController");
+            t.start();
+        }
+    }
+
+    public void stop() throws JMSException, InterruptedException {
+        running = false;
+        stopped.await();
+        //stopped.await(1,TimeUnit.SECONDS);
+        connection.stop();
+    }
+
+    
+    public void run() {
+        try {
+
+            for (int i = 0; i < numberOfBatches; i++) {
+                for (int j = 0; j < batchSize; j++) {
+                    String payLoad = "batch[" + i + "]no:" + j;
+                    send(payLoad);
+                    String result = consume();
+                    if (result == null || !result.equals(payLoad)) {
+                        throw new Exception("Failed to consume " + payLoad
+                                + " GOT " + result);
+                    }
+                    System.out.println("Control got " + result);
+                    rate.increment();
+                }
+            }
+
+        } catch (Throwable e) {
+            e.printStackTrace();
+        } finally {
+            stopped.countDown();
+        }
+    }
+    
+    protected String consume() throws JMSException {
+        Connection con  = null;
+        MessageConsumer c = consumer;
+        if (connectionPerMessage){
+            con = factory.createConnection();
+            con.start();
+            Session s = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            c = s.createConsumer(controlDestination);
+        }
+        TextMessage result = (TextMessage) c.receive(timeout);
+        if (connectionPerMessage) {
+            con.close();
+        }
+        return result != null ? result.getText() : null;
+    }
+    
+    protected void send(String text) throws JMSException {
+        Connection con  = null;
+        MessageProducer p = producer;
+        Session s = session;
+        if (connectionPerMessage){
+            con = factory.createConnection();
+            con.start();
+            s = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            p = s.createProducer(startDestination);
+            p.setDeliveryMode(deliveryMode);
+        }
+        TextMessage message = s.createTextMessage(text);
+        p.send(message);
+        if (connectionPerMessage) {
+            con.close();
+        }
+    }
+
+
+
+    public Destination getStartDestination() {
+        return startDestination;
+    }
+
+
+
+    public void setStartDestination(Destination startDestination) {
+        this.startDestination = startDestination;
+    }
+
+
+
+    public Destination getControlDestination() {
+        return controlDestination;
+    }
+
+
+
+    public void setControlDestination(Destination controlDestination) {
+        this.controlDestination = controlDestination;
+    }
+
+
+
+    public int getNumberOfBatches() {
+        return numberOfBatches;
+    }
+
+
+
+    public void setNumberOfBatches(int numberOfBatches) {
+        this.numberOfBatches = numberOfBatches;
+    }
+
+
+
+    public int getBatchSize() {
+        return batchSize;
+    }
+
+
+
+    public void setBatchSize(int batchSize) {
+        this.batchSize = batchSize;
+    }
+
+
+
+    public int getDeliveryMode() {
+        return deliveryMode;
+    }
+
+
+
+    public void setDeliveryMode(int deliveryMode) {
+        this.deliveryMode = deliveryMode;
+    }
+
+
+
+    public boolean isConnectionPerMessage() {
+        return connectionPerMessage;
+    }
+
+
+
+    public void setConnectionPerMessage(boolean connectionPerMessage) {
+        this.connectionPerMessage = connectionPerMessage;
+    }
+
+
+
+    public int getTimeout() {
+        return timeout;
+    }
+
+
+
+    public void setTimeout(int timeout) {
+        this.timeout = timeout;
+    }
+
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadController.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadTest.java?rev=608902&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadTest.java Fri
Jan  4 08:07:18 2008
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.load;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Session;
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * @version $Revision: 1.3 $
+ */
+public class LoadTest extends TestCase {
+
+    private static final Log LOG = LogFactory.getLog(LoadTest.class);
+    
+    protected BrokerService broker;
+    protected String bindAddress="tcp://localhost:61616";
+    
+    protected LoadController controller;
+    protected LoadClient[] clients;
+    protected ConnectionFactory factory;
+    protected Destination destination;
+    protected int numberOfClients = 10;
+    protected int deliveryMode = DeliveryMode.PERSISTENT;
+    protected int batchSize = 1000;
+    protected int numberOfBatches = 4;
+    protected int timeout = Integer.MAX_VALUE;
+    protected boolean connectionPerMessage = true;
+    protected Connection managementConnection;
+    protected Session managementSession;
+
+    /**
+     * Sets up a test where the producer and consumer have their own connection.
+     * 
+     * @see junit.framework.TestCase#setUp()
+     */
+    protected void setUp() throws Exception {
+        if (broker == null) {
+            broker = createBroker(bindAddress);
+        }
+        factory = createConnectionFactory(bindAddress);
+        managementConnection = factory.createConnection();
+        managementSession = managementConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        
+        Destination startDestination = createDestination(managementSession, getClass()+".start");
+        Destination endDestination = createDestination(managementSession, getClass()+".end");
+        LOG.info("Running with " + numberOfClients + " clients");
+        controller = new LoadController(factory);
+        controller.setBatchSize(batchSize);
+        controller.setNumberOfBatches(numberOfBatches);
+        controller.setDeliveryMode(deliveryMode);
+        controller.setConnectionPerMessage(connectionPerMessage);
+        controller.setStartDestination(startDestination);
+        controller.setControlDestination(endDestination);
+        controller.setTimeout(timeout);
+        clients = new LoadClient[numberOfClients];
+        for (int i = 0; i < numberOfClients; i++) {
+            Destination inDestination = null;
+            if (i==0) {
+                inDestination = startDestination;
+            }else {
+                inDestination = createDestination(managementSession, getClass() + ".client."+(i));
+            }
+            Destination outDestination = null;
+            if (i==(numberOfClients-1)) {
+                outDestination = endDestination;
+            }else {
+                outDestination = createDestination(managementSession, getClass() + ".client."+(i+1));
+            }
+            LoadClient client = new LoadClient("client("+i+")",factory);
+            client.setTimeout(timeout);
+            client.setDeliveryMode(deliveryMode);
+            client.setConnectionPerMessage(connectionPerMessage);
+            client.setStartDestination(inDestination);
+            client.setNextDestination(outDestination);
+            clients[i] = client;
+        }
+        
+        super.setUp();
+    }
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        managementConnection.close();
+        for (int i = 0; i < numberOfClients; i++) {
+            clients[i].stop();
+        }
+        controller.stop();
+        if (broker != null) {
+            broker.stop();
+            broker = null;
+        }
+    }
+
+    protected Destination createDestination(Session s, String destinationName) throws JMSException
{
+        return s.createQueue(destinationName);
+    }
+
+    /**
+     * Factory method to create a new broker
+     * 
+     * @throws Exception
+     */
+    protected BrokerService createBroker(String uri) throws Exception {
+        BrokerService answer = new BrokerService();
+        configureBroker(answer,uri);
+        answer.start();
+        return answer;
+    }
+
+    
+    
+    protected void configureBroker(BrokerService answer,String uri) throws Exception {
+        answer.setDeleteAllMessagesOnStartup(true);
+        answer.addConnector(uri);
+        answer.setUseShutdownHook(false);
+    }
+
+    protected ActiveMQConnectionFactory createConnectionFactory(String uri) throws Exception
{
+        return new ActiveMQConnectionFactory(uri);
+    }
+
+    public void testLoad() throws JMSException, InterruptedException {
+        for (int i = 0; i < numberOfClients; i++) {
+            clients[i].start();
+        }
+        controller.start();
+        controller.stop();
+        
+    }
+
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadTest.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message