activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject svn commit: r411631 - in /incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool: ./ spi/
Date Mon, 05 Jun 2006 01:05:42 GMT
Author: aco
Date: Sun Jun  4 18:05:41 2006
New Revision: 411631

URL: http://svn.apache.org/viewvc?rev=411631&view=rev
Log:
Updates to performance module.

Added:
    incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsClientSupport.java
    incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsFactorySupport.java
    incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsPerformanceSupport.java
    incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/spi/
    incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/spi/ActiveMQClassLoaderSPI.java
    incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/spi/ActiveMQPojoSPI.java
    incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/spi/ClassLoaderSPIConnectionFactory.java
    incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/spi/SPIConnectionFactory.java
Removed:
    incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/ConsumerTool.java
    incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsBasicClientSupport.java
    incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsConfigurableClientSupport.java
    incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsPerfClientSupport.java
    incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/ProducerTool.java
    incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/ToolSupport.java
Modified:
    incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java
    incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsProducerClient.java
    incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/PerfMeasurable.java
    incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/PerfMeasurementTool.java
    incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/ReflectionUtil.java

Added: incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsClientSupport.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsClientSupport.java?rev=411631&view=auto
==============================================================================
--- incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsClientSupport.java (added)
+++ incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsClientSupport.java Sun Jun  4 18:05:41 2006
@@ -0,0 +1,179 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.tool;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.activemq.ActiveMQConnectionFactory;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.Session;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import java.util.Properties;
+
+public class JmsClientSupport extends JmsFactorySupport {
+    private static final Log log = LogFactory.getLog(JmsClientSupport.class);
+
+    private static final String PREFIX_CONFIG_CLIENT = "client.";
+    public  static final String SESSION_AUTO_ACKNOWLEDGE    = "autoAck";
+    public  static final String SESSION_CLIENT_ACKNOWLEDGE  = "clientAck";
+    public  static final String SESSION_DUPS_OK_ACKNOWLEDGE = "dupsAck";
+    public  static final String SESSION_TRANSACTED          = "transacted";
+
+    protected Properties    clientSettings = new Properties();
+    protected Connection    jmsConnection;
+    protected Session       jmsSession;
+
+    // Client settings
+    protected String  spiClass;
+    protected boolean sessTransacted = false;
+    protected String  sessAckMode    = SESSION_AUTO_ACKNOWLEDGE;
+    protected String  destName       = "TEST.FOO";
+    protected int     destCount      = 1;
+    protected boolean destComposite  = false;
+
+    public ConnectionFactory createConnectionFactory() throws JMSException {
+         return super.createConnectionFactory(getSpiClass());
+    }
+
+    public Connection getConnection() throws JMSException {
+        if (jmsConnection == null) {
+            jmsConnection = createConnectionFactory().createConnection();
+        }
+        return jmsConnection;
+    }
+
+    public Session getSession() throws JMSException {
+        if (jmsSession == null) {
+            int ackMode;
+            if (getSessAckMode().equalsIgnoreCase(SESSION_AUTO_ACKNOWLEDGE)) {
+                ackMode = Session.AUTO_ACKNOWLEDGE;
+            } else if (getSessAckMode().equalsIgnoreCase(SESSION_CLIENT_ACKNOWLEDGE)) {
+                ackMode = Session.CLIENT_ACKNOWLEDGE;
+            } else if (getSessAckMode().equalsIgnoreCase(SESSION_DUPS_OK_ACKNOWLEDGE)) {
+                ackMode = Session.DUPS_OK_ACKNOWLEDGE;
+            } else if (getSessAckMode().equalsIgnoreCase(SESSION_TRANSACTED)) {
+                ackMode = Session.SESSION_TRANSACTED;
+            } else {
+                ackMode = Session.AUTO_ACKNOWLEDGE;
+            }
+            jmsSession = getConnection().createSession(isSessTransacted(), ackMode);
+        }
+        return jmsSession;
+    }
+
+    public Destination[] createDestination() throws JMSException {
+        Destination[] dest = new Destination[getDestCount()];
+        for (int i=0; i<getDestCount(); i++) {
+            dest[i] = createDestination(getDestName() + "." + i);
+        }
+
+        if (isDestComposite()) {
+            return new Destination[] {createDestination(getDestName() + ".>")};
+        } else {
+            return dest;
+        }
+    }
+
+    public Destination createDestination(String name) throws JMSException {
+        if (name.startsWith("queue://")) {
+            return getSession().createQueue(name.substring("queue://".length()));
+        } else if (name.startsWith("topic://")) {
+            return getSession().createTopic(name.substring("topic://".length()));
+        } else {
+            return getSession().createTopic(name);
+        }
+    }
+
+    public String getSpiClass() {
+        return spiClass;
+    }
+
+    public void setSpiClass(String spiClass) {
+        this.spiClass = spiClass;
+    }
+
+    public boolean isSessTransacted() {
+        return sessTransacted;
+    }
+
+    public void setSessTransacted(boolean sessTransacted) {
+        this.sessTransacted = sessTransacted;
+    }
+
+    public String getSessAckMode() {
+        return sessAckMode;
+    }
+
+    public void setSessAckMode(String sessAckMode) {
+        this.sessAckMode = sessAckMode;
+    }
+
+    public String getDestName() {
+        return destName;
+    }
+
+    public void setDestName(String destName) {
+        this.destName = destName;
+    }
+
+    public int getDestCount() {
+        return destCount;
+    }
+
+    public void setDestCount(int destCount) {
+        this.destCount = destCount;
+    }
+
+    public boolean isDestComposite() {
+        return destComposite;
+    }
+
+    public void setDestComposite(boolean destComposite) {
+        this.destComposite = destComposite;
+    }
+
+    public Properties getClientSettings() {
+        return clientSettings;
+    }
+
+    public void setClientSettings(Properties clientSettings) {
+        this.clientSettings = clientSettings;
+        ReflectionUtil.configureClass(this, clientSettings);
+    }
+
+    public Properties getSettings() {
+        Properties allSettings = new Properties(clientSettings);
+        allSettings.putAll(super.getSettings());
+        return allSettings;
+    }
+
+    public void setSettings(Properties settings) {
+        super.setSettings(settings);
+        ReflectionUtil.configureClass(this, clientSettings);
+    }
+
+    public void setProperty(String key, String value) {
+        if (key.startsWith(PREFIX_CONFIG_CLIENT)) {
+            clientSettings.setProperty(key, value);
+        } else {
+            super.setProperty(key, value);
+        }
+    }
+}

Modified: incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java?rev=411631&r1=411630&r2=411631&view=diff
==============================================================================
--- incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java (original)
+++ incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java Sun Jun  4 18:05:41 2006
@@ -17,153 +17,298 @@
 
 package org.apache.activemq.tool;
 
-import javax.jms.*;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.Log;
 
-public class JmsConsumerClient extends JmsPerfClientSupport implements MessageListener {
-
-    private ConnectionFactory factory = null;
-
-    private String factoryClass = null;
-    private String brokerUrl = null;
-    private String destinationName = null;
-    private Destination destination = null;
-
-    private boolean isDurable = false;
-    private boolean isAsync = true;
-
-    public JmsConsumerClient(ConnectionFactory factory) {
-        this.factory = factory;
+import javax.jms.MessageListener;
+import javax.jms.MessageConsumer;
+import javax.jms.JMSException;
+import javax.jms.Destination;
+import javax.jms.Topic;
+import javax.jms.Message;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class JmsConsumerClient extends JmsPerformanceSupport {
+    private static final Log log = LogFactory.getLog(JmsConsumerClient.class);
+
+    private static final String PREFIX_CONFIG_CONSUMER = "consumer.";
+    public  static final String TIME_BASED_RECEIVING  = "time";
+    public  static final String COUNT_BASED_RECEIVING = "count";
+
+    protected Properties      jmsConsumerSettings = new Properties();
+    protected MessageConsumer jmsConsumer;
+
+    protected boolean durable   = false;
+    protected boolean asyncRecv = true;
+    protected String  consumerName = "TestConsumerClient";
+
+    protected long   recvCount    = 1000000;       // Receive a million messages by default
+    protected long   recvDuration = 5 * 60 * 1000; // Receive for 5 mins by default
+    protected String recvType     = TIME_BASED_RECEIVING;
+
+    public void receiveMessages() throws JMSException {
+        if (listener != null) {
+            listener.onConfigEnd(this);
+        }
+        if (isAsyncRecv()) {
+            receiveAsyncMessages();
+        } else {
+            receiveSyncMessages();
+        }
     }
 
-    public JmsConsumerClient(ConnectionFactory factory, String destinationName) {
-        this.factory = factory;
-        this.setDestinationName(destinationName);
-    }
+    public void receiveSyncMessages() throws JMSException {
+        if (getJmsConsumer() == null) {
+            createJmsConsumer();
+        }
 
-    public JmsConsumerClient(String factoryClass, String brokerUrl, String destinationName) {
-        this.factoryClass = factoryClass;
-        this.brokerUrl = brokerUrl;
-        this.destinationName = destinationName;
+        try {
+            getConnection().start();
+            if (listener != null) {
+                listener.onConsumeStart(this);
+            }
+            if (getRecvType().equalsIgnoreCase(TIME_BASED_RECEIVING)) {
+                long endTime = System.currentTimeMillis() + getRecvDuration();
+                while (System.currentTimeMillis() < endTime) {
+                    getJmsConsumer().receive();
+                    incThroughput();
+                }
+            } else {
+                int count = 0;
+                while (count < getRecvCount()) {
+                    getJmsConsumer().receive();
+                    incThroughput();
+                    count++;
+                }
+            }
+        } finally {
+            if (listener != null) {
+                listener.onConsumeEnd(this);
+            }
+            getConnection().close();
+        }
     }
 
-    public JmsConsumerClient(String brokerUrl, String destinationName) {
-        this.brokerUrl = brokerUrl;
-        this.destinationName = destinationName;
-    }
+    public void receiveAsyncMessages() throws JMSException {
+        if (getJmsConsumer() == null) {
+            createJmsConsumer();
+        }
 
-    public void createConsumer(long duration) throws JMSException {
-        listener.onConfigEnd(this);
+        if (getRecvType().equalsIgnoreCase(TIME_BASED_RECEIVING)) {
+            getJmsConsumer().setMessageListener(new MessageListener() {
+                public void onMessage(Message msg) {
+                    incThroughput();
+                }
+            });
 
-        // Create connection factory
-        if (factory != null) {
-            createConnectionFactory(factory);
-        } else if (factoryClass != null) {
-            createConnectionFactory(factoryClass, brokerUrl);
+            try {
+                getConnection().start();
+                if (listener != null) {
+                    listener.onConsumeStart(this);
+                }
+                try {
+                    Thread.sleep(getRecvDuration());
+                } catch (InterruptedException e) {
+                    throw new JMSException("JMS consumer thread sleep has been interrupted. Message: " + e.getMessage());
+                }
+            } finally {
+                if (listener != null) {
+                    listener.onConsumeEnd(this);
+                }
+                getConnection().close();
+            }
         } else {
-            createConnectionFactory(brokerUrl);
-        }
+            final AtomicInteger count = new AtomicInteger(0);
+            getJmsConsumer().setMessageListener(new MessageListener() {
+                public void onMessage(Message msg) {
+                    incThroughput();
+                    count.incrementAndGet();
+                    count.notify();
+                }
+            });
 
-        if (getDestination() == null) {
-            setDestination(getDestinationName());
+            try {
+                getConnection().start();
+                if (listener != null) {
+                    listener.onConsumeStart(this);
+                }
+                try {
+                    while (count.get() < getRecvCount()) {
+                        count.wait();
+                    }
+                } catch (InterruptedException e) {
+                    throw new JMSException("JMS consumer thread wait has been interrupted. Message: " + e.getMessage());
+                }
+            } finally {
+                if (listener != null) {
+                    listener.onConsumeEnd(this);
+                }
+                getConnection().close();
+            }
         }
+    }
+
+    public MessageConsumer createJmsConsumer() throws JMSException {
+        Destination[] dest = createDestination();
+        return createJmsConsumer(dest[0]);
+    }
 
-        if (isDurable) {
-            createDurableSubscriber((Topic) getDestination(), getClass().getName());
+    public MessageConsumer createJmsConsumer(Destination dest) throws JMSException {
+        if (isDurable()) {
+            jmsConsumer = getSession().createDurableSubscriber((Topic)dest, getConsumerName());
         } else {
-            createMessageConsumer(getDestination());
+            jmsConsumer = getSession().createConsumer(dest);
         }
+        return jmsConsumer;
+    }
 
-        if (isAsync) {
-            getMessageConsumer().setMessageListener(this);
-            getConnection().start();
-
-            try {
-                Thread.sleep(duration);
-            } catch (InterruptedException e) {
-                throw new JMSException("Error while consumer is sleeping " + e.getMessage());
-            }
+    public MessageConsumer createJmsConsumer(Destination dest, String selector, boolean noLocal) throws JMSException {
+        if (isDurable()) {
+            jmsConsumer = getSession().createDurableSubscriber((Topic)dest, getConsumerName(), selector, noLocal);
         } else {
-            getConnection().start();
-            consumeMessages(getMessageConsumer(), duration);
+            jmsConsumer = getSession().createConsumer(dest, selector, noLocal);
         }
+        return jmsConsumer;
+    }
 
-        close(); //close consumer, session, and connection.
-        listener.onConfigEnd(this);
+    public MessageConsumer getJmsConsumer() {
+        return jmsConsumer;
     }
 
-    //Increments throughput
-    public void onMessage(Message message) {
-        System.out.println(message.toString());
-        this.incThroughput();
+    public Properties getJmsConsumerSettings() {
+        return jmsConsumerSettings;
     }
 
-    protected void consumeMessages(MessageConsumer consumer, long duration) throws JMSException {
+    public void setJmsConsumerSettings(Properties jmsConsumerSettings) {
+        this.jmsConsumerSettings = jmsConsumerSettings;
+        ReflectionUtil.configureClass(this, jmsConsumerSettings);
+    }
 
-        long currentTime = System.currentTimeMillis();
-        long endTime = currentTime + duration;
+    public boolean isDurable() {
+        return durable;
+    }
 
-        while (System.currentTimeMillis() <= endTime) {
-            Message message = consumer.receive();
-            onMessage(message);
-        }
+    public void setDurable(boolean durable) {
+        this.durable = durable;
     }
 
-    protected void close() throws JMSException {
-        getMessageConsumer().close();
-        getSession().close();
-        getConnection().close();
+    public boolean isAsyncRecv() {
+        return asyncRecv;
     }
 
-    public static void main(String[] args) throws Exception {
-        JmsConsumerClient cons = new JmsConsumerClient("org.apache.activemq.ActiveMQConnectionFactory", "tcp://localhost:61616", "topic://TEST.FOO");
-        cons.setPerfEventListener(new PerfEventAdapter());
-        cons.createConsumer(20000);
+    public void setAsyncRecv(boolean asyncRecv) {
+        this.asyncRecv = asyncRecv;
     }
 
-    // Helper Methods
+    public String getConsumerName() {
+        return consumerName;
+    }
 
-    public boolean isDurable() {
-        return isDurable;
+    public void setConsumerName(String consumerName) {
+        this.consumerName = consumerName;
     }
 
-    public void setDurable(boolean durable) {
-        isDurable = durable;
+    public long getRecvCount() {
+        return recvCount;
     }
 
-    public boolean isAsync() {
-        return isAsync;
+    public void setRecvCount(long recvCount) {
+        this.recvCount = recvCount;
     }
 
-    public void setAsync(boolean async) {
-        isAsync = async;
+    public long getRecvDuration() {
+        return recvDuration;
     }
 
-    public String getDestinationName() {
-        return this.destinationName;
+    public void setRecvDuration(long recvDuration) {
+        this.recvDuration = recvDuration;
     }
 
-    public void setDestinationName(String destinationName) {
-        this.destinationName = destinationName;
+    public String getRecvType() {
+        return recvType;
     }
 
-    public Destination getDestination() {
-        return this.destination;
+    public void setRecvType(String recvType) {
+        this.recvType = recvType;
     }
 
-    public void setDestination(Destination dest) {
-        this.destination = dest;
+    public Properties getSettings() {
+        Properties allSettings = new Properties(jmsConsumerSettings);
+        allSettings.putAll(super.getSettings());
+        return allSettings;
     }
 
-    public void setDestination(String destinationName) throws JMSException {
+    public void setSettings(Properties settings) {
+        super.setSettings(settings);
+        ReflectionUtil.configureClass(this, jmsConsumerSettings);
+    }
 
-        this.setDestinationName(destinationName);
-        // Create destinations
-        if (this.getDestinationName().startsWith("topic://")) {
-            setDestination(createTopic(getDestinationName().substring("topic://".length())));
-        } else if (getDestinationName().startsWith("queue://")) {
-            setDestination(createQueue(getDestinationName().substring("queue://".length())));
+    public void setProperty(String key, String value) {
+        if (key.startsWith(PREFIX_CONFIG_CONSUMER)) {
+            jmsConsumerSettings.setProperty(key, value);
         } else {
-            setDestination(createQueue(getDestinationName()));
+            super.setProperty(key, value);
         }
+    }
+
+    public static void main(String[] args) throws JMSException {
+        String[] options = new String[22];
+        options[0] = "-Dsampler.duration=60000";     // 1 min
+        options[1] = "-Dsampler.interval=5000";      // 5 secs
+        options[2] = "-Dsampler.rampUpTime=10000";   // 10 secs
+        options[3] = "-Dsampler.rampDownTime=10000"; // 10 secs
+
+        options[4] = "-Dclient.spiClass=org.apache.activemq.tool.spi.ActiveMQPojoSPI";
+        options[5] = "-Dclient.sessTransacted=false";
+        options[6] = "-Dclient.sessAckMode=autoAck";
+        options[7] = "-Dclient.destName=topic://FOO.BAR.TEST";
+        options[8] = "-Dclient.destCount=1";
+        options[9] = "-Dclient.destComposite=false";
+
+        options[10] = "-Dconsumer.durable=false";
+        options[11] = "-Dconsumer.asyncRecv=true";
+        options[12] = "-Dconsumer.recvCount=1000";     // 1000 messages
+        options[13] = "-Dconsumer.recvDuration=60000"; // 1 min
+        options[14] = "-Dconsumer.recvType=time";
+
+        options[15] = "-Dfactory.brokerUrl=tcp://localhost:61616";
+        options[16] = "-Dfactory.clientID=consumerSampleClient";
+        options[17] = "-Dfactory.optimAck=true";
+        options[18] = "-Dfactory.optimDispatch=true";
+        options[19] = "-Dfactory.prefetchQueue=100";
+        options[20] = "-Dfactory.prefetchTopic=32767";
+        options[21] = "-Dfactory.useRetroactive=false";
+
+        args = options;
+
+        Properties samplerSettings  = new Properties();
+        Properties consumerSettings = new Properties();
+
+        for (int i=0; i<args.length; i++) {
+            // Get property define options only
+            if (args[i].startsWith("-D")) {
+                String propDefine = args[i].substring("-D".length());
+                int  index = propDefine.indexOf("=");
+                String key = propDefine.substring(0, index);
+                String val = propDefine.substring(index+1);
+                if (key.startsWith("sampler.")) {
+                    samplerSettings.setProperty(key, val);
+                } else {
+                    consumerSettings.setProperty(key, val);
+                }
+            }
+        }
+
+        JmsConsumerClient client = new JmsConsumerClient();
+        client.setSettings(consumerSettings);
+
+        PerfMeasurementTool sampler = new PerfMeasurementTool();
+        sampler.setSamplerSettings(samplerSettings);
+        sampler.registerClient(client);
+        sampler.startSampler();
+
+        client.setPerfEventListener(sampler);
+        client.receiveMessages();
     }
 }

Added: incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsFactorySupport.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsFactorySupport.java?rev=411631&view=auto
==============================================================================
--- incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsFactorySupport.java (added)
+++ incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsFactorySupport.java Sun Jun  4 18:05:41 2006
@@ -0,0 +1,95 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.tool;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.activemq.tool.spi.SPIConnectionFactory;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import java.util.Properties;
+import java.util.Iterator;
+
+public class JmsFactorySupport {
+    private static final Log log = LogFactory.getLog(JmsFactorySupport.class);
+
+    private static final String PREFIX_CONFIG_FACTORY = "factory.";
+
+    private   SPIConnectionFactory spiFactory;
+    protected ConnectionFactory jmsFactory;
+    protected Properties jmsFactorySettings = new Properties();
+
+    public ConnectionFactory createConnectionFactory(String spiClass) throws JMSException {
+        jmsFactory = loadJmsFactory(spiClass, jmsFactorySettings);
+        return jmsFactory;
+    }
+
+    protected ConnectionFactory loadJmsFactory(String spiClass, Properties factorySettings) throws JMSException {
+        try {
+            Class spi = Class.forName(spiClass);
+            spiFactory = (SPIConnectionFactory)spi.newInstance();
+            ConnectionFactory jmsFactory = spiFactory.createConnectionFactory(factorySettings);
+            log.debug("Created: " + jmsFactory.getClass().getName() + " using SPIConnectionFactory: " + spiFactory.getClass().getName());
+            return jmsFactory;
+        } catch (Exception e) {
+            throw new JMSException(e.getMessage());
+        }
+    }
+
+    public ConnectionFactory getJmsFactory() {
+        return jmsFactory;
+    }
+
+    public Properties getJmsFactorySettings() {
+        return jmsFactorySettings;
+    }
+
+    public void setJmsFactorySettings(Properties jmsFactorySettings) {
+        this.jmsFactorySettings = jmsFactorySettings;
+        try {
+            spiFactory.configureConnectionFactory(jmsFactory, jmsFactorySettings);
+        } catch (Exception e) {
+            log.warn(e);
+        }
+    }
+
+    public Properties getSettings() {
+        return jmsFactorySettings;
+    }
+
+    public void setSettings(Properties settings) {
+        for (Iterator i=settings.keySet().iterator(); i.hasNext();) {
+            String key = (String)i.next();
+            String val = settings.getProperty(key);
+            setProperty(key, val);
+        }
+        try {
+            spiFactory.configureConnectionFactory(jmsFactory, jmsFactorySettings);
+        } catch (Exception e) {
+            log.warn(e);
+        }
+    }
+
+    public void setProperty(String key, String value) {
+        if (key.startsWith(PREFIX_CONFIG_FACTORY)) {
+            jmsFactorySettings.setProperty(key, value);
+        } else {
+            log.warn("Unknown setting: " + key + "=" + value);
+        }
+    }
+}

Added: incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsPerformanceSupport.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsPerformanceSupport.java?rev=411631&view=auto
==============================================================================
--- incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsPerformanceSupport.java (added)
+++ incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsPerformanceSupport.java Sun Jun  4 18:05:41 2006
@@ -0,0 +1,64 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.tool;
+
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.JMSException;
+
+public class JmsPerformanceSupport extends JmsClientSupport implements PerfMeasurable {
+
+    protected AtomicLong throughput = new AtomicLong(0);
+
+    protected PerfEventListener listener = null;
+
+    public void reset() {
+        setThroughput(0);
+    }
+
+    public String getClientName() {
+        try {
+            return getConnection().getClientID();
+        } catch (JMSException e) {
+            return "";
+        }
+    }
+
+    public long getThroughput() {
+        return throughput.get();
+    }
+
+    public void setThroughput(long val) {
+        throughput.set(val);
+    }
+
+    public void incThroughput() {
+        throughput.incrementAndGet();
+    }
+
+    public void incThroughput(long val) {
+        throughput.addAndGet(val);
+    }
+
+    public void setPerfEventListener(PerfEventListener listener) {
+        this.listener = listener;
+    }
+
+    public PerfEventListener getPerfEventListener() {
+        return listener;
+    }
+}

Modified: incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsProducerClient.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsProducerClient.java?rev=411631&r1=411630&r2=411631&view=diff
==============================================================================
--- incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsProducerClient.java (original)
+++ incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsProducerClient.java Sun Jun  4 18:05:41 2006
@@ -16,138 +16,80 @@
  */
 package org.apache.activemq.tool;
 
-import javax.jms.ConnectionFactory;
-import javax.jms.TextMessage;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.jms.MessageProducer;
 import javax.jms.Destination;
+import javax.jms.TextMessage;
 import javax.jms.JMSException;
-import java.util.Map;
+import java.util.Properties;
 import java.util.Arrays;
 
-public class JmsProducerClient extends JmsPerfClientSupport {
-
-    private ConnectionFactory factory = null;
-    private String factoryClass = "";
-    private String brokerUrl = "";
-    private String[] destName  = null;
-
-    private Destination[] dest  = null;
-    private TextMessage message = null;
-
-    public JmsProducerClient(ConnectionFactory factory, String destName) {
-        this.factory  = factory;
-        this.destName = new String[] {destName};
-    }
-
-    public JmsProducerClient(String factoryClass, String brokerUrl, String destName) {
-        this.factoryClass = factoryClass;
-        this.brokerUrl    = brokerUrl;
-        this.destName     = new String[] {destName};
-    }
-
-    public JmsProducerClient(String brokerUrl, String destName) {
-        this.brokerUrl = brokerUrl;
-        this.destName  = new String[] {destName};
-    }
-
-    public JmsProducerClient(ConnectionFactory factory, String[] destName) {
-        this.factory  = factory;
-        this.destName = destName;
-    }
-
-    public JmsProducerClient(String factoryClass, String brokerUrl, String[] destName) {
-        this.factoryClass = factoryClass;
-        this.brokerUrl    = brokerUrl;
-        this.destName     = destName;
-    }
-
-    public JmsProducerClient(String brokerUrl, String[] destName) {
-        this.brokerUrl = brokerUrl;
-        this.destName  = destName;
-    }
-
-    public void createProducer() throws JMSException {
-        createProducer(0);
-    }
-
-    public void createProducer(Map settings) throws JMSException {
-        createProducer(0, settings);
-    }
-
-    public void createProducer(int messageSize, Map settings) throws JMSException {
-        addConfigParam(settings);
-        createProducer(messageSize);
-    }
-
-    public void createProducer(int messageSize) throws JMSException {
-
-        listener.onConfigStart(this);
+public class JmsProducerClient extends JmsPerformanceSupport {
+    private static final Log log = LogFactory.getLog(JmsProducerClient.class);
 
-        // Create connection factory
-        if (factory != null) {
-            createConnectionFactory(factory);
-        } else if (factoryClass != null) {
-            createConnectionFactory(factoryClass, brokerUrl);
-        } else {
-            createConnectionFactory(brokerUrl);
-        }
-        createConnectionFactory(brokerUrl);
-
-
-        // Create destinations
-        dest = new Destination[destName.length];
-        for (int i=0; i<destName.length; i++) {
-            if (destName[i].startsWith("topic://")) {
-                dest[i] = createTopic(destName[i].substring("topic://".length()));
-            } else if (destName[i].startsWith("queue://")) {
-                dest[i] = createQueue(destName[i].substring("queue://".length()));
-            } else {
-                dest[i] = createQueue(destName[i]);
-            }
+    private static final String PREFIX_CONFIG_PRODUCER = "producer.";
+    public  static final String TIME_BASED_SENDING  = "time";
+    public  static final String COUNT_BASED_SENDING = "count";
+
+    protected Properties      jmsProducerSettings = new Properties();
+    protected MessageProducer jmsProducer;
+    protected TextMessage     jmsTextMessage;
+
+    protected int    messageSize  = 1024;          // Send 1kb messages by default
+    protected long   sendCount    = 1000000;       // Send a million messages by default
+    protected long   sendDuration = 5 * 60 * 1000; // Send for 5 mins by default
+    protected String sendType     = TIME_BASED_SENDING;
+
+    public void sendMessages() throws JMSException {
+        if (listener != null) {
+            listener.onConfigEnd(this);
         }
+        // Send a specific number of messages
+        if (sendType.equalsIgnoreCase(COUNT_BASED_SENDING)) {
+            sendCountBasedMessages(getSendCount());
 
-        // Create actual message producer
-        if (dest.length > 1) {
-            createMessageProducer(null);
+        // Send messages for a specific duration
         } else {
-            createMessageProducer(dest[0]);
+            sendTimeBasedMessages(getSendDuration());
         }
-
-        // Create message to sent
-        if (messageSize > 0) {
-            byte[] val = new byte[messageSize];
-            Arrays.fill(val, (byte)0);
-            String buff = new String(val);
-            message = createTextMessage(buff);
-        }
-
-        listener.onConfigEnd(this);
     }
 
     public void sendCountBasedMessages(long messageCount) throws JMSException {
         // Parse through different ways to send messages
         // Avoided putting the condition inside the loop to prevent effect on performance
+        Destination[] dest = createDestination();
+
+        // Create a producer, if none is created.
+        if (getJmsProducer() == null) {
+            if (dest.length == 1) {
+                createJmsProducer(dest[0]);
+            } else {
+                createJmsProducer();
+            }
+        }
         try {
             getConnection().start();
+            if (listener != null) {
+                listener.onPublishStart(this);
+            }
             // Send one type of message only, avoiding the creation of different messages on sending
-            if (message != null) {
+            if (getJmsTextMessage() != null) {
                 // Send to more than one actual destination
                 if (dest.length > 1) {
-                    listener.onPublishStart(this);
                     for (int i=0; i<messageCount; i++) {
                         for (int j=0; j<dest.length; j++) {
-                            getMessageProducer().send(dest[j], message);
+                            getJmsProducer().send(dest[j], getJmsTextMessage());
                             incThroughput();
                         }
                     }
-                    listener.onPublishEnd(this);
                 // Send to only one actual destination
                 } else {
-                    listener.onPublishStart(this);
                     for (int i=0; i<messageCount; i++) {
-                        getMessageProducer().send(message);
+                        getJmsProducer().send(getJmsTextMessage());
                         incThroughput();
                     }
-                    listener.onPublishEnd(this);
                 }
 
             // Send different type of messages using indexing to identify each one.
@@ -156,26 +98,25 @@
             } else {
                 // Send to more than one actual destination
                 if (dest.length > 1) {
-                    listener.onPublishStart(this);
                     for (int i=0; i<messageCount; i++) {
                         for (int j=0; j<dest.length; j++) {
-                            getMessageProducer().send(dest[j], createTextMessage("Text Message [" + i + "]"));
+                            getJmsProducer().send(dest[j], createJmsTextMessage("Text Message [" + i + "]"));
                             incThroughput();
                         }
                     }
-                    listener.onPublishEnd(this);
 
                 // Send to only one actual destination
                 } else {
-                    listener.onPublishStart(this);
                     for (int i=0; i<messageCount; i++) {
-                        getMessageProducer().send(createTextMessage("Text Message [" + i + "]"));
+                        getJmsProducer().send(createJmsTextMessage("Text Message [" + i + "]"));
                         incThroughput();
                     }
-                    listener.onPublishEnd(this);
                 }
             }
         } finally {
+            if (listener != null) {
+                listener.onPublishEnd(this);
+            }
             getConnection().close();
         }
     }
@@ -185,29 +126,39 @@
         // Parse through different ways to send messages
         // Avoided putting the condition inside the loop to prevent effect on performance
 
-        // Send one type of message only, avoiding the creation of different messages on sending
+        Destination[] dest = createDestination();
+
+        // Create a producer, if none is created.
+        if (getJmsProducer() == null) {
+            if (dest.length == 1) {
+                createJmsProducer(dest[0]);
+            } else {
+                createJmsProducer();
+            }
+        }
+
         try {
             getConnection().start();
+            if (listener != null) {
+                listener.onPublishStart(this);
+            }
 
-            if (message != null) {
+            // Send one type of message only, avoiding the creation of different messages on sending
+            if (getJmsTextMessage() != null) {
                 // Send to more than one actual destination
                 if (dest.length > 1) {
-                    listener.onPublishStart(this);
                     while (System.currentTimeMillis() < endTime) {
                         for (int j=0; j<dest.length; j++) {
-                            getMessageProducer().send(dest[j], message);
+                            getJmsProducer().send(dest[j], getJmsTextMessage());
                             incThroughput();
                         }
                     }
-                    listener.onPublishEnd(this);
                 // Send to only one actual destination
                 } else {
-                    listener.onPublishStart(this);
                     while (System.currentTimeMillis() < endTime) {
-                        getMessageProducer().send(message);
+                        getJmsProducer().send(getJmsTextMessage());
                         incThroughput();
                     }
-                    listener.onPublishEnd(this);
                 }
 
             // Send different type of messages using indexing to identify each one.
@@ -217,66 +168,183 @@
                 // Send to more than one actual destination
                 long count = 1;
                 if (dest.length > 1) {
-                    listener.onPublishStart(this);
                     while (System.currentTimeMillis() < endTime) {
                         for (int j=0; j<dest.length; j++) {
-                            getMessageProducer().send(dest[j], createTextMessage("Text Message [" + count++ + "]"));
+                            getJmsProducer().send(dest[j], createJmsTextMessage("Text Message [" + count++ + "]"));
                             incThroughput();
                         }
                     }
-                    listener.onPublishEnd(this);
 
                 // Send to only one actual destination
                 } else {
-                    listener.onPublishStart(this);
                     while (System.currentTimeMillis() < endTime) {
-                        getMessageProducer().send(createTextMessage("Text Message [" + count++ + "]"));
+
+                        getJmsProducer().send(createJmsTextMessage("Text Message [" + count++ + "]"));
                         incThroughput();
                     }
-                    listener.onPublishEnd(this);
                 }
             }
         } finally {
+            if (listener != null) {
+                listener.onPublishEnd(this);
+            }
             getConnection().close();
         }
     }
 
-    public static void main(String[] args) throws Exception {
-        final long duration     = 1 * 60 * 1000;
-        long rampUpTime   = 5 * 1000;
-        long rampDownTime = 5 * 1000;
-        long interval     = 1000;
-
-        PerfMeasurementTool tool = new PerfMeasurementTool();
-        tool.setDuration(duration);
-        tool.setInterval(interval);
-        tool.setRampUpTime(rampUpTime);
-        tool.setRampDownTime(rampDownTime);
-
-        JmsProducerClient[] client = new JmsProducerClient[10];
-        for (int i=0; i<10; i++) {
-            client[i] = new JmsProducerClient("org.apache.activemq.ActiveMQConnectionFactory", "tcp://localhost:61616", "topic://TEST.FOO");
-            client[i].addConfigParam("factory.asyncSend", "true");
-            client[i].setPerfEventListener(new PerfEventAdapter());
-            client[i].createProducer();
-            tool.registerClient(client[i]);
-        }
+    public Properties getJmsProducerSettings() {
+        return jmsProducerSettings;
+    }
 
-        tool.startSampler();
+    public void setJmsProducerSettings(Properties jmsProducerSettings) {
+        this.jmsProducerSettings = jmsProducerSettings;
+        ReflectionUtil.configureClass(this, jmsProducerSettings);
+    }
 
-        for (int i=0; i<10; i++) {
-            final JmsProducerClient p = client[i];
-            Thread t = new Thread(new Runnable() {
-                public void run() {
-                    try {
-                        p.sendTimeBasedMessages(duration);
-                    } catch (JMSException e) {
-                        e.printStackTrace();
-                    }
-                }
-            });
-            t.start();
+    public MessageProducer createJmsProducer() throws JMSException {
+        jmsProducer = getSession().createProducer(null);
+        return jmsProducer;
+    }
 
+    public MessageProducer createJmsProducer(Destination dest) throws JMSException {
+        jmsProducer = getSession().createProducer(dest);
+        return jmsProducer;
+    }
+
+    public MessageProducer getJmsProducer() {
+        return jmsProducer;
+    }
+
+    public TextMessage createJmsTextMessage() throws JMSException {
+        return createJmsTextMessage(getMessageSize());
+    }
+
+    public TextMessage createJmsTextMessage(int size) throws JMSException {
+        jmsTextMessage = getSession().createTextMessage(buildText("", size));
+        return jmsTextMessage;
+    }
+
+    public TextMessage createJmsTextMessage(String text) throws JMSException {
+        jmsTextMessage = getSession().createTextMessage(buildText(text, getMessageSize()));
+        return jmsTextMessage;
+    }
+
+    public TextMessage getJmsTextMessage() {
+        return jmsTextMessage;
+    }
+
+    protected String buildText(String text, int size) {
+        byte[] data = new byte[size - text.length()];
+        Arrays.fill(data, (byte)0);
+        return text + new String(data);
+    }
+
+    public int getMessageSize() {
+        return messageSize;
+    }
+
+    public void setMessageSize(int messageSize) {
+        this.messageSize = messageSize;
+    }
+
+    public long getSendCount() {
+        return sendCount;
+    }
+
+    public void setSendCount(long sendCount) {
+        this.sendCount = sendCount;
+    }
+
+    public long getSendDuration() {
+        return sendDuration;
+    }
+
+    public void setSendDuration(long sendDuration) {
+        this.sendDuration = sendDuration;
+    }
+
+    public String getSendType() {
+        return sendType;
+    }
+
+    public void setSendType(String sendType) {
+        this.sendType = sendType;
+    }
+
+    public Properties getSettings() {
+        Properties allSettings = new Properties(jmsProducerSettings);
+        allSettings.putAll(super.getSettings());
+        return allSettings;
+    }
+
+    public void setSettings(Properties settings) {
+        super.setSettings(settings);
+        ReflectionUtil.configureClass(this, jmsProducerSettings);
+    }
+
+    public void setProperty(String key, String value) {
+        if (key.startsWith(PREFIX_CONFIG_PRODUCER)) {
+            jmsProducerSettings.setProperty(key, value);
+        } else {
+            super.setProperty(key, value);
+        }
+    }
+
+    public static void main(String[] args) throws JMSException {
+        String[] options = new String[17];
+        options[0] = "-Dsampler.duration=60000";     // 1 min
+        options[1] = "-Dsampler.interval=5000";      // 5 secs
+        options[2] = "-Dsampler.rampUpTime=10000";   // 10 secs
+        options[3] = "-Dsampler.rampDownTime=10000"; // 10 secs
+
+        options[4] = "-Dclient.spiClass=org.apache.activemq.tool.spi.ActiveMQPojoSPI";
+        options[5] = "-Dclient.sessTransacted=false";
+        options[6] = "-Dclient.sessAckMode=autoAck";
+        options[7] = "-Dclient.destName=topic://FOO.BAR.TEST";
+        options[8] = "-Dclient.destCount=1";
+        options[9] = "-Dclient.destComposite=false";
+
+        options[10] = "-Dproducer.messageSize=1024";
+        options[11] = "-Dproducer.sendCount=1000";     // 1000 messages
+        options[12] = "-Dproducer.sendDuration=60000"; // 1 min
+        options[13] = "-Dproducer.sendType=time";
+
+        options[14] = "-Dfactory.brokerUrl=tcp://localhost:61616";
+        options[15] = "-Dfactory.clientID=producerSampleClient";
+        options[16] = "-Dfactory.asyncSend=true";
+
+        args = options;
+
+        Properties samplerSettings  = new Properties();
+        Properties producerSettings = new Properties();
+
+        for (int i=0; i<args.length; i++) {
+            // Get property define options only
+            if (args[i].startsWith("-D")) {
+                String propDefine = args[i].substring("-D".length());
+                int  index = propDefine.indexOf("=");
+                String key = propDefine.substring(0, index);
+                String val = propDefine.substring(index+1);
+                if (key.startsWith("sampler.")) {
+                    samplerSettings.setProperty(key, val);
+                } else {
+                    producerSettings.setProperty(key, val);
+                }
+            }
         }
+
+        JmsProducerClient client = new JmsProducerClient();
+        client.setSettings(producerSettings);
+
+        PerfMeasurementTool sampler = new PerfMeasurementTool();
+        sampler.setSamplerSettings(samplerSettings);
+        sampler.registerClient(client);
+        sampler.startSampler();
+
+        client.setPerfEventListener(sampler);
+
+        // This will reuse only a single message every send, which will improve performance
+        client.createJmsTextMessage();
+        client.sendMessages();
     }
 }

Modified: incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/PerfMeasurable.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/PerfMeasurable.java?rev=411631&r1=411630&r2=411631&view=diff
==============================================================================
--- incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/PerfMeasurable.java (original)
+++ incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/PerfMeasurable.java Sun Jun  4 18:05:41 2006
@@ -16,13 +16,13 @@
  */
 package org.apache.activemq.tool;
 
-import java.util.Map;
+import java.util.Properties;
 
 public interface PerfMeasurable {
     public void reset();
     public String getClientName();
     public long getThroughput();
-    public Map  getClientSettings();
+    public Properties getSettings();
     public void setPerfEventListener(PerfEventListener listener);
     public PerfEventListener getPerfEventListener();
 }

Modified: incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/PerfMeasurementTool.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/PerfMeasurementTool.java?rev=411631&r1=411630&r2=411631&view=diff
==============================================================================
--- incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/PerfMeasurementTool.java (original)
+++ incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/PerfMeasurementTool.java Sun Jun  4 18:05:41 2006
@@ -16,13 +16,13 @@
  */
 package org.apache.activemq.tool;
 
-import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.jms.JMSException;
 import java.util.List;
 import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.Properties;
 
 public class PerfMeasurementTool implements PerfEventListener, Runnable {
     private long duration     = 5 * 60 * 1000; // 5 mins by default test duration
@@ -33,14 +33,13 @@
 
     private AtomicBoolean start = new AtomicBoolean(false);
     private AtomicBoolean stop  = new AtomicBoolean(false);
+    private Properties samplerSettings = new Properties();
 
     private List perfClients = new ArrayList();
-    private AtomicInteger unstartedClients = new AtomicInteger(0);
 
     public void registerClient(PerfMeasurable client) {
         client.setPerfEventListener(this);
         perfClients.add(client);
-        unstartedClients.incrementAndGet();
     }
 
     public void registerClient(PerfMeasurable[] clients) {
@@ -49,10 +48,13 @@
         }
     }
 
-    public void startSampler() {
-        Thread t = new Thread(this);
-        t.setName("Performance Sampler");
-        t.start();
+    public Properties getSamplerSettings() {
+        return samplerSettings;
+    }
+
+    public void setSamplerSettings(Properties samplerSettings) {
+        this.samplerSettings = samplerSettings;
+        ReflectionUtil.configureClass(this, samplerSettings);
     }
 
     public long getDuration() {
@@ -110,6 +112,12 @@
 
     public void onException(PerfMeasurable client, Exception e) {
         stop.set(true);
+    }
+
+    public void startSampler() {
+        Thread t = new Thread(this);
+        t.setName("Performance Sampler");
+        t.start();
     }
 
     public void run() {

Modified: incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/ReflectionUtil.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/ReflectionUtil.java?rev=411631&r1=411630&r2=411631&view=diff
==============================================================================
--- incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/ReflectionUtil.java (original)
+++ incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/ReflectionUtil.java Sun Jun  4 18:05:41 2006
@@ -19,10 +19,11 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import java.util.Map;
 import java.util.Iterator;
 import java.util.StringTokenizer;
+import java.util.Properties;
 import java.lang.reflect.Method;
+import java.lang.reflect.Field;
 
 public final class ReflectionUtil {
     private static final Log log = LogFactory.getLog(ReflectionUtil.class);
@@ -31,116 +32,98 @@
 
     }
 
-    public static void configureClass(Object obj, String key, Object val) {
+    public static void configureClass(Object obj, String key, String val) {
         try {
+            String debugInfo;
+
             Object target = obj;
             Class  targetClass = obj.getClass();
 
-            //System.out.print("Invoking: " + targetClass);
+            // DEBUG: Debugging Info
+            debugInfo = "Invoking: " + targetClass.getName();
 
             StringTokenizer tokenizer = new StringTokenizer(key, ".");
+
+            // NOTE: Skip the first token, it is assume that this is an indicator for the object itself
+            tokenizer.nextToken();
+
+            // For nested settings, get the object first
             for (int j=0; j<tokenizer.countTokens()-1; j++) {
                 // Find getter method first
                 String name = tokenizer.nextToken();
                 String getMethod = "get" + name.substring(0,1).toUpperCase() + name.substring(1);
                 Method method = targetClass.getMethod(getMethod, new Class[] {});
-                target = method.invoke(target, null);
+                target = method.invoke(target, (Object[])null);
                 targetClass = target.getClass();
 
-                //System.out.print("." + getMethod + "()");
+                debugInfo += ("." + getMethod + "()");
             }
 
-            // Invoke setter method of last class
-            String name = tokenizer.nextToken();
-            String methodName = "set" + name.substring(0,1).toUpperCase() + name.substring(1);
-
-            Method method = getPrimitiveMethod(targetClass, methodName, val);
-            Object[] objVal = {val};
-            method.invoke(target, objVal);
-            //method.invoke(target, val);
-            //System.out.println("." + methodName + "(" + val + ")");
+            // Property name
+            String property = tokenizer.nextToken();
+
+            // Determine data type of property
+            Class propertyType = getField(targetClass, property).getType();
+
+            // Get setter method
+            String setterMethod = "set" + property.substring(0,1).toUpperCase() + property.substring(1);
+
+            // Set primitive type
+            debugInfo += ("." + setterMethod + "(" + propertyType.getName() + ": " + val + ")");
+            if (propertyType.isPrimitive()) {
+                if (propertyType == Boolean.TYPE) {
+                    targetClass.getMethod(setterMethod, new Class[] {boolean.class}).invoke(target, new Object[] {Boolean.valueOf(val)});
+                } else if (propertyType == Integer.TYPE) {
+                    targetClass.getMethod(setterMethod, new Class[] {int.class}).invoke(target, new Object[] {Integer.valueOf(val)});
+                } else if (propertyType == Long.TYPE) {
+                    targetClass.getMethod(setterMethod, new Class[] {long.class}).invoke(target, new Object[] {Long.valueOf(val)});
+                } else if (propertyType == Double.TYPE) {
+                    targetClass.getMethod(setterMethod, new Class[] {double.class}).invoke(target, new Object[] {Double.valueOf(val)});
+                } else if (propertyType == Float.TYPE) {
+                    targetClass.getMethod(setterMethod, new Class[] {float.class}).invoke(target, new Object[] {Float.valueOf(val)});
+                } else if (propertyType == Short.TYPE) {
+                    targetClass.getMethod(setterMethod, new Class[] {short.class}).invoke(target, new Object[] {Short.valueOf(val)});
+                } else if (propertyType == Byte.TYPE) {
+                    targetClass.getMethod(setterMethod, new Class[] {byte.class}).invoke(target, new Object[] {Byte.valueOf(val)});
+                } else if (propertyType == Character.TYPE) {
+                    targetClass.getMethod(setterMethod, new Class[] {char.class}).invoke(target, new Object[] {val.charAt(0)});
+                }
+            } else {
+                // Set String type
+                if (propertyType == String.class) {
+                    targetClass.getMethod(setterMethod, new Class[] {String.class}).invoke(target, new Object[] {val});
+
+                // For unknown object type, try to call the valueOf method of the object
+                // to convert the string to the target object type
+                } else {
+                    Object param = propertyType.getMethod("valueOf", new Class[] {String.class}).invoke(null, val);
+                    targetClass.getMethod(setterMethod, new Class[] {propertyType}).invoke(target, new Object[] {param});
+                }
+            }
+            log.debug(debugInfo);
+
         } catch (Exception e) {
-            log.warn("", e);
+            log.warn(e);
         }
     }
 
-    public static void configureClass(Object obj, Map props) {
+    public static void configureClass(Object obj, Properties props) {
         for (Iterator i = props.keySet().iterator(); i.hasNext();) {
             String key = (String)i.next();
-            Object val = props.get(key);
+            String val = props.getProperty(key);
 
             configureClass(obj, key, val);
         }
     }
 
-    private static Method getPrimitiveMethod(Class objClass, String methodName, Object param) throws NoSuchMethodException {
-        if (param instanceof Boolean) {
-            try {
-                // Try using the primitive type first
-                return objClass.getMethod(methodName, new Class[] {boolean.class});
-            } catch (NoSuchMethodException e) {
-                // Try using the wrapper class next
-                return objClass.getMethod(methodName, new Class[] {Boolean.class});
-            }
-        } else if (param instanceof Integer) {
-            try {
-                // Try using the primitive type first
-                return objClass.getMethod(methodName, new Class[] {int.class});
-            } catch (NoSuchMethodException e) {
-                // Try using the wrapper class next
-                return objClass.getMethod(methodName, new Class[] {Integer.class});
-            }
-        } else if (param instanceof Long) {
-            try {
-                // Try using the primitive type first
-                return objClass.getMethod(methodName, new Class[] {long.class});
-            } catch (NoSuchMethodException e) {
-                // Try using the wrapper class next
-                return objClass.getMethod(methodName, new Class[] {Long.class});
-            }
-        } else if (param instanceof Short) {
-            try {
-                // Try using the primitive type first
-                return objClass.getMethod(methodName, new Class[] {short.class});
-            } catch (NoSuchMethodException e) {
-                // Try using the wrapper class next
-                return objClass.getMethod(methodName, new Class[] {Short.class});
-            }
-        } else if (param instanceof Byte) {
-            try {
-                // Try using the primitive type first
-                return objClass.getMethod(methodName, new Class[] {byte.class});
-            } catch (NoSuchMethodException e) {
-                // Try using the wrapper class next
-                return objClass.getMethod(methodName, new Class[] {Byte.class});
-            }
-        } else if (param instanceof Character) {
-            try {
-                // Try using the primitive type first
-                return objClass.getMethod(methodName, new Class[] {char.class});
-            } catch (NoSuchMethodException e) {
-                // Try using the wrapper class next
-                return objClass.getMethod(methodName, new Class[] {Character.class});
-            }
-        } else if (param instanceof Double) {
-            try {
-                // Try using the primitive type first
-                return objClass.getMethod(methodName, new Class[] {double.class});
-            } catch (NoSuchMethodException e) {
-                // Try using the wrapper class next
-                return objClass.getMethod(methodName, new Class[] {Double.class});
-            }
-        } else if (param instanceof Float) {
+    public static Field getField(Class targetClass, String fieldName) throws NoSuchFieldException {
+        while (targetClass != null) {
             try {
-                // Try using the primitive type first
-                return objClass.getMethod(methodName, new Class[] {float.class});
-            } catch (NoSuchMethodException e) {
-                // Try using the wrapper class next
-                return objClass.getMethod(methodName, new Class[] {Float.class});
+                return targetClass.getDeclaredField(fieldName);
+            } catch (NoSuchFieldException e) {
+                targetClass = targetClass.getSuperclass();
             }
-        } else {
-            // parameter is not a primitive
-            return objClass.getMethod(methodName, new Class[] {param.getClass()});
         }
+        throw new NoSuchFieldException(fieldName);
     }
 }

Added: incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/spi/ActiveMQClassLoaderSPI.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/spi/ActiveMQClassLoaderSPI.java?rev=411631&view=auto
==============================================================================
--- incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/spi/ActiveMQClassLoaderSPI.java (added)
+++ incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/spi/ActiveMQClassLoaderSPI.java Sun Jun  4 18:05:41 2006
@@ -0,0 +1,23 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.tool.spi;
+
+public class ActiveMQClassLoaderSPI extends ClassLoaderSPIConnectionFactory {
+    public String getClassName() {
+        return "org.apache.activemq.ActiveMQConnectionFactory";
+    }
+}

Added: incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/spi/ActiveMQPojoSPI.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/spi/ActiveMQPojoSPI.java?rev=411631&view=auto
==============================================================================
--- incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/spi/ActiveMQPojoSPI.java (added)
+++ incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/spi/ActiveMQPojoSPI.java Sun Jun  4 18:05:41 2006
@@ -0,0 +1,145 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.tool.spi;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+
+import javax.jms.ConnectionFactory;
+import java.util.Properties;
+
+public class ActiveMQPojoSPI implements SPIConnectionFactory {
+    public static final String KEY_BROKER_URL        = "factory.brokerUrl";
+    public static final String KEY_USERNAME          = "factory.username";
+    public static final String KEY_PASSWORD          = "factory.password";
+    public static final String KEY_CLIENT_ID         = "factory.clientID";
+
+    public static final String KEY_ASYNC_SEND        = "factory.asyncSend";
+    public static final String KEY_ASYNC_DISPATCH    = "factory.asyncDispatch";
+    public static final String KEY_ASYNC_SESSION     = "factory.asyncSession";
+    public static final String KEY_CLOSE_TIMEOUT     = "factory.closeTimeout";
+    public static final String KEY_COPY_MSG_ON_SEND  = "factory.copyMsgOnSend";
+    public static final String KEY_DISABLE_TIMESTAMP = "factory.disableTimestamp";
+    public static final String KEY_DEFER_OBJ_SERIAL  = "factory.deferObjSerial";
+    public static final String KEY_ON_SEND_PREP_MSG  = "factory.onSendPrepMsg";
+    public static final String KEY_OPTIM_ACK         = "factory.optimAck";
+    public static final String KEY_OPTIM_DISPATCH    = "factory.optimDispatch";
+    public static final String KEY_PREFETCH_QUEUE    = "factory.prefetchQueue";
+    public static final String KEY_PREFETCH_TOPIC    = "factory.prefetchTopic";
+    public static final String KEY_USE_COMPRESSION   = "factory.useCompression";
+    public static final String KEY_USE_RETROACTIVE   = "factory.useRetroactive";
+
+    public ConnectionFactory createConnectionFactory(Properties settings) throws Exception {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
+        configureConnectionFactory(factory, settings);
+        return factory;
+    }
+
+    public void configureConnectionFactory(ConnectionFactory jmsFactory, Properties settings) throws Exception {
+        ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)jmsFactory;
+        String setting;
+
+        setting = settings.getProperty(KEY_BROKER_URL);
+        if (setting != null && setting.length() > 0) {
+            factory.setBrokerURL(setting);
+        }
+
+        setting = settings.getProperty(KEY_USERNAME);
+        if (setting != null && setting.length() > 0) {
+            factory.setUserName(setting);
+        }
+
+        setting = settings.getProperty(KEY_PASSWORD);
+        if (setting != null && setting.length() > 0) {
+            factory.setPassword(setting);
+        }
+
+        setting = settings.getProperty(KEY_CLIENT_ID);
+        if (setting != null && setting.length() > 0) {
+            factory.setClientID(setting);
+        }
+
+        setting = settings.getProperty(KEY_ASYNC_SEND);
+        if (setting != null && setting.length() > 0) {
+            factory.setUseAsyncSend(Boolean.parseBoolean(setting));
+        }
+
+        setting = settings.getProperty(KEY_ASYNC_DISPATCH);
+        if (setting != null && setting.length() > 0) {
+            factory.setAsyncDispatch(Boolean.parseBoolean(setting));
+        }
+
+        setting = settings.getProperty(KEY_ASYNC_SESSION);
+        if (setting != null && setting.length() > 0) {
+            factory.setAlwaysSessionAsync(Boolean.parseBoolean(setting));
+        }
+
+        setting = settings.getProperty(KEY_CLOSE_TIMEOUT);
+        if (setting != null && setting.length() > 0) {
+            factory.setCloseTimeout(Integer.parseInt(setting));
+        }
+
+        setting = settings.getProperty(KEY_COPY_MSG_ON_SEND);
+        if (setting != null && setting.length() > 0) {
+            factory.setCopyMessageOnSend(Boolean.parseBoolean(setting));
+        }
+
+        setting = settings.getProperty(KEY_DISABLE_TIMESTAMP);
+        if (setting != null && setting.length() > 0) {
+            factory.setDisableTimeStampsByDefault(Boolean.parseBoolean(setting));
+        }
+
+        setting = settings.getProperty(KEY_DEFER_OBJ_SERIAL);
+        if (setting != null && setting.length() > 0) {
+            factory.setObjectMessageSerializationDefered(Boolean.parseBoolean(setting));
+        }
+
+        setting = settings.getProperty(KEY_ON_SEND_PREP_MSG);
+        if (setting != null && setting.length() > 0) {
+            factory.setOnSendPrepareMessageBody(Boolean.parseBoolean(setting));
+        }
+
+        setting = settings.getProperty(KEY_OPTIM_ACK);
+        if (setting != null && setting.length() > 0) {
+            factory.setOptimizeAcknowledge(Boolean.parseBoolean(setting));
+        }
+
+        setting = settings.getProperty(KEY_OPTIM_DISPATCH);
+        if (setting != null && setting.length() > 0) {
+            factory.setOptimizedMessageDispatch(Boolean.parseBoolean(setting));
+        }
+
+        setting = settings.getProperty(KEY_PREFETCH_QUEUE);
+        if (setting != null && setting.length() > 0) {
+            factory.getPrefetchPolicy().setQueuePrefetch(Integer.parseInt(setting));
+        }
+
+        setting = settings.getProperty(KEY_PREFETCH_TOPIC);
+        if (setting != null && setting.length() > 0) {
+            factory.getPrefetchPolicy().setTopicPrefetch(Integer.parseInt(setting));
+        }
+
+        setting = settings.getProperty(KEY_USE_COMPRESSION);
+        if (setting != null && setting.length() > 0) {
+            factory.setUseCompression(Boolean.parseBoolean(setting));
+        }
+
+        setting = settings.getProperty(KEY_USE_RETROACTIVE);
+        if (setting != null && setting.length() > 0) {
+            factory.setUseRetroactiveConsumer(Boolean.parseBoolean(setting));
+        }
+    }
+}

Added: incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/spi/ClassLoaderSPIConnectionFactory.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/spi/ClassLoaderSPIConnectionFactory.java?rev=411631&view=auto
==============================================================================
--- incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/spi/ClassLoaderSPIConnectionFactory.java (added)
+++ incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/spi/ClassLoaderSPIConnectionFactory.java Sun Jun  4 18:05:41 2006
@@ -0,0 +1,37 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.tool.spi;
+
+import org.apache.activemq.tool.ReflectionUtil;
+
+import javax.jms.ConnectionFactory;
+import java.util.Properties;
+
+public abstract class ClassLoaderSPIConnectionFactory implements SPIConnectionFactory {
+    public ConnectionFactory createConnectionFactory(Properties settings) throws Exception {
+        Class factoryClass = Class.forName(getClassName());
+        ConnectionFactory factory = (ConnectionFactory)factoryClass.newInstance();
+        configureConnectionFactory(factory, settings);
+        return factory;
+    }
+
+    public void configureConnectionFactory(ConnectionFactory jmsFactory, Properties settings) throws Exception {
+        ReflectionUtil.configureClass(jmsFactory, settings);
+    }
+
+    public abstract String getClassName();
+}

Added: incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/spi/SPIConnectionFactory.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/spi/SPIConnectionFactory.java?rev=411631&view=auto
==============================================================================
--- incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/spi/SPIConnectionFactory.java (added)
+++ incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/spi/SPIConnectionFactory.java Sun Jun  4 18:05:41 2006
@@ -0,0 +1,25 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.tool.spi;
+
+import javax.jms.ConnectionFactory;
+import java.util.Properties;
+
+public interface SPIConnectionFactory {
+    public ConnectionFactory createConnectionFactory(Properties settings) throws Exception;
+    public void configureConnectionFactory(ConnectionFactory jmsFactory, Properties settings) throws Exception;
+}



Mime
View raw message