activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject git commit: https://issues.apache.org/jira/browse/AMQ-5294
Date Tue, 29 Jul 2014 22:05:53 GMT
Repository: activemq
Updated Branches:
  refs/heads/trunk f55edcfa2 -> e47e0e046


https://issues.apache.org/jira/browse/AMQ-5294


This closes #36

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/e47e0e04
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/e47e0e04
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/e47e0e04

Branch: refs/heads/trunk
Commit: e47e0e046350a8325ef5948c2c12b0dc4f053916
Parents: f55edcf
Author: Timothy Bish <tabish121@gmail.com>
Authored: Tue Jul 29 18:05:32 2014 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Tue Jul 29 18:05:32 2014 -0400

----------------------------------------------------------------------
 .../activemq-perf-maven-plugin/pom.xml          |   5 +
 .../activemq/tool/AbstractJmsClientSystem.java  | 100 ++++++++-----
 .../apache/activemq/tool/ClientRunBasis.java    |  21 +++
 .../apache/activemq/tool/JmsConsumerClient.java |  44 +++---
 .../apache/activemq/tool/JmsConsumerSystem.java |  11 ++
 .../apache/activemq/tool/JmsProducerClient.java | 140 +++++++++---------
 .../apache/activemq/tool/JmsProducerSystem.java |  16 ++
 .../properties/JmsClientSystemProperties.java   |  11 ++
 .../tool/properties/JmsProducerProperties.java  | 110 +++++++-------
 .../tool/properties/ReflectionUtil.java         |   6 +-
 .../sampler/AbstractPerformanceSampler.java     | 147 ++++++++++++++-----
 .../tool/sampler/PerformanceSampler.java        |  24 ++-
 .../activemq/tool/sampler/RampDownNotifier.java |  41 ++++++
 .../tool/sampler/ThroughputSamplerTask.java     |  11 +-
 .../src/main/resources/log4j.properties         |  29 ++++
 .../sampler/AbstractPerformanceSamplerTest.java | 116 +++++++++++++++
 16 files changed, 599 insertions(+), 233 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/e47e0e04/activemq-tooling/activemq-perf-maven-plugin/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-tooling/activemq-perf-maven-plugin/pom.xml b/activemq-tooling/activemq-perf-maven-plugin/pom.xml
index d6a8bcf..bdb7bbb 100644
--- a/activemq-tooling/activemq-perf-maven-plugin/pom.xml
+++ b/activemq-tooling/activemq-perf-maven-plugin/pom.xml
@@ -76,5 +76,10 @@
       <artifactId>slf4j-simple</artifactId>
       <version>${slf4j-version}</version>
     </dependency>
+    <!-- dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+    </dependency -->
+
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/activemq/blob/e47e0e04/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/AbstractJmsClientSystem.java
----------------------------------------------------------------------
diff --git a/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/AbstractJmsClientSystem.java b/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/AbstractJmsClientSystem.java
index 9d0c7fe..41cc864 100644
--- a/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/AbstractJmsClientSystem.java
+++ b/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/AbstractJmsClientSystem.java
@@ -19,8 +19,12 @@ package org.apache.activemq.tool;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Enumeration;
+import java.util.List;
 import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
 
 import javax.jms.ConnectionFactory;
 import javax.jms.ConnectionMetaData;
@@ -35,6 +39,7 @@ import org.apache.activemq.tool.reports.PerformanceReportWriter;
 import org.apache.activemq.tool.reports.VerbosePerfReportWriter;
 import org.apache.activemq.tool.reports.XmlFilePerfReportWriter;
 import org.apache.activemq.tool.sampler.CpuSamplerTask;
+import org.apache.activemq.tool.sampler.PerformanceSampler;
 import org.apache.activemq.tool.sampler.ThroughputSamplerTask;
 import org.apache.activemq.tool.spi.SPIConnectionFactory;
 import org.slf4j.Logger;
@@ -49,7 +54,6 @@ public abstract class AbstractJmsClientSystem extends AbstractObjectProperties {
     // Properties
     protected JmsFactoryProperties factory = new JmsFactoryProperties();
     protected ThroughputSamplerTask tpSampler = new ThroughputSamplerTask();
-    protected CpuSamplerTask cpuSampler = new CpuSamplerTask();
 
     private int clientDestIndex;
     private int clientDestCount;
@@ -62,69 +66,97 @@ public abstract class AbstractJmsClientSystem extends AbstractObjectProperties {
 
         // Create performance sampler
         PerformanceReportWriter writer = createPerfWriter();
-        tpSampler.setPerfReportWriter(writer);
-        cpuSampler.setPerfReportWriter(writer);
-
         writer.openReportWriter();
         writer.writeProperties("jvmSettings", System.getProperties());
         writer.writeProperties("testSystemSettings", ReflectionUtil.retrieveObjectProperties(getSysTest()));
         writer.writeProperties("jmsFactorySettings", ReflectionUtil.retrieveObjectProperties(jmsConnFactory));
         writer.writeProperties("jmsClientSettings", ReflectionUtil.retrieveObjectProperties(getJmsClientProperties()));
-        writer.writeProperties("tpSamplerSettings", ReflectionUtil.retrieveObjectProperties(tpSampler));
-        writer.writeProperties("cpuSamplerSettings", ReflectionUtil.retrieveObjectProperties(cpuSampler));
 
+        // set up performance samplers indicated by the user
+        List<PerformanceSampler> samplers = new ArrayList<>();
+
+        Set<String> requestedSamplers = getSysTest().getSamplersSet();
+        if (requestedSamplers.contains(JmsClientSystemProperties.SAMPLER_TP)) {
+            writer.writeProperties("tpSamplerSettings", ReflectionUtil.retrieveObjectProperties(tpSampler));
+            samplers.add(tpSampler);
+        }
+
+        if (requestedSamplers.contains(JmsClientSystemProperties.SAMPLER_CPU)) {
+            CpuSamplerTask cpuSampler = new CpuSamplerTask();
+            writer.writeProperties("cpuSamplerSettings", ReflectionUtil.retrieveObjectProperties(cpuSampler));
+
+            try {
+                cpuSampler.createPlugin();
+                samplers.add(cpuSampler);
+            } catch (IOException e) {
+                LOG.warn("Unable to start CPU sampler plugin. Reason: " + e.getMessage());
+            }
+        }
+
+        // spawn client threads
         clientThreadGroup = new ThreadGroup(getSysTest().getClientPrefix() + " Thread Group");
-        for (int i = 0; i < getSysTest().getNumClients(); i++) {
-            distributeDestinations(getSysTest().getDestDistro(), i, getSysTest().getNumClients(), getSysTest().getTotalDests());
+
+        int numClients = getSysTest().getNumClients();
+        final CountDownLatch clientCompletionLatch = new CountDownLatch(numClients);
+        for (int i = 0; i < numClients; i++) {
+            distributeDestinations(getSysTest().getDestDistro(), i, numClients, getSysTest().getTotalDests());
 
             final String clientName = getSysTest().getClientPrefix() + i;
             final int clientDestIndex = this.clientDestIndex;
             final int clientDestCount = this.clientDestCount;
             Thread t = new Thread(clientThreadGroup, new Runnable() {
+                @Override
                 public void run() {
                     runJmsClient(clientName, clientDestIndex, clientDestCount);
+                    LOG.info("Client completed");
+                    clientCompletionLatch.countDown();
                 }
             });
             t.setName(getSysTest().getClientPrefix() + i + " Thread");
             t.start();
         }
 
-        // Run samplers
-        if (getSysTest().getSamplers().indexOf(JmsClientSystemProperties.SAMPLER_TP) > -1) {
-            tpSampler.startSampler();
+        // start the samplers
+        final CountDownLatch samplerCompletionLatch = new CountDownLatch(requestedSamplers.size());
+        for (PerformanceSampler sampler : samplers) {
+            sampler.setPerfReportWriter(writer);
+            sampler.startSampler(samplerCompletionLatch, getClientRunBasis(), getClientRunDuration());
         }
 
-        if (getSysTest().getSamplers().indexOf(JmsClientSystemProperties.SAMPLER_CPU) > -1) {
+        try {
+            // wait for the clients to finish
+            clientCompletionLatch.await();
+            LOG.debug("All clients completed");
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        } finally {
+            // if count-based, ramp-down time is not relevant, shut the samplers down
+            if (getClientRunBasis() == ClientRunBasis.count) {
+                for (PerformanceSampler sampler : samplers) {
+                    sampler.finishSampling();
+                }
+            }
+
             try {
-                cpuSampler.createPlugin();
-                cpuSampler.startSampler();
-            } catch (IOException e) {
-                LOG.warn("Unable to start CPU sampler plugin. Reason: " + e.getMessage());
+                LOG.debug("Waiting for samplers to shut down");
+                samplerCompletionLatch.await();
+                LOG.debug("All samplers completed");
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            } finally {
+                writer.closeReportWriter();
             }
         }
+    }
 
-        tpSampler.waitUntilDone();
-        cpuSampler.waitUntilDone();
+    protected abstract ClientRunBasis getClientRunBasis();
 
-        writer.closeReportWriter();
-    }
+    protected abstract long getClientRunDuration();
 
     public ThroughputSamplerTask getTpSampler() {
         return tpSampler;
     }
 
-    public void setTpSampler(ThroughputSamplerTask tpSampler) {
-        this.tpSampler = tpSampler;
-    }
-
-    public CpuSamplerTask getCpuSampler() {
-        return cpuSampler;
-    }
-
-    public void setCpuSampler(CpuSamplerTask cpuSampler) {
-        this.cpuSampler = cpuSampler;
-    }
-
     public JmsFactoryProperties getFactory() {
         return factory;
     }
@@ -204,7 +236,7 @@ public abstract class AbstractJmsClientSystem extends AbstractObjectProperties {
 
     protected ConnectionFactory loadJmsFactory(String spiClass, Properties factorySettings) throws JMSException {
         try {
-            Class spi = Class.forName(spiClass);
+            Class<?> spi = Class.forName(spiClass);
             SPIConnectionFactory spiFactory = (SPIConnectionFactory)spi.newInstance();
             ConnectionFactory jmsFactory = spiFactory.createConnectionFactory(factorySettings);
             LOG.info("Created: " + jmsFactory.getClass().getName() + " using SPIConnectionFactory: " + spiFactory.getClass().getName());
@@ -220,7 +252,7 @@ public abstract class AbstractJmsClientSystem extends AbstractObjectProperties {
         props.setJmsVersion(metaData.getJMSVersion());
 
         String jmsProperties = "";
-        Enumeration jmsProps = metaData.getJMSXPropertyNames();
+        Enumeration<?> jmsProps = metaData.getJMSXPropertyNames();
         while (jmsProps.hasMoreElements()) {
             jmsProperties += jmsProps.nextElement().toString() + ",";
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/e47e0e04/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/ClientRunBasis.java
----------------------------------------------------------------------
diff --git a/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/ClientRunBasis.java b/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/ClientRunBasis.java
new file mode 100644
index 0000000..b3d6373
--- /dev/null
+++ b/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/ClientRunBasis.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.tool;
+
+public enum ClientRunBasis {
+    count, time;
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/e47e0e04/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java
----------------------------------------------------------------------
diff --git a/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java b/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java
index a1d9f78..7351d02 100644
--- a/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java
+++ b/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java
@@ -83,11 +83,9 @@ public class JmsConsumerClient extends AbstractJmsMeasurableClient {
             LOG.info("Starting to synchronously receive messages for " + duration + " ms...");
             long endTime = System.currentTimeMillis() + duration;
 
-            int counter = 0;
             while (System.currentTimeMillis() < endTime) {
                 getJmsConsumer().receive();
                 incThroughput();
-                counter++;
                 sleep();
                 commitTxIfNecessary();
             }
@@ -134,13 +132,14 @@ public class JmsConsumerClient extends AbstractJmsMeasurableClient {
         }
 
         getJmsConsumer().setMessageListener(new MessageListener() {
+            @Override
             public void onMessage(Message msg) {
                 incThroughput();
                 sleep();
                 try {
-                	commitTxIfNecessary();
+                    commitTxIfNecessary();
                 } catch (JMSException ex) {
-                	LOG.error("Error committing transaction: " + ex.getMessage());
+                    LOG.error("Error committing transaction: " + ex.getMessage());
                 }
             }
         });
@@ -170,17 +169,18 @@ public class JmsConsumerClient extends AbstractJmsMeasurableClient {
 
         final AtomicInteger recvCount = new AtomicInteger(0);
         getJmsConsumer().setMessageListener(new MessageListener() {
+            @Override
             public void onMessage(Message msg) {
                 incThroughput();
                 recvCount.incrementAndGet();
                 synchronized (recvCount) {
                     recvCount.notify();
-                } 
-                
+                }
+
                 try {
-                	commitTxIfNecessary();
+                    commitTxIfNecessary();
                 } catch (JMSException ex) {
-                	LOG.error("Error committing transaction: " + ex.getMessage());
+                    LOG.error("Error committing transaction: " + ex.getMessage());
                 }
             }
         });
@@ -209,11 +209,11 @@ public class JmsConsumerClient extends AbstractJmsMeasurableClient {
 
     public MessageConsumer createJmsConsumer() throws JMSException {
         Destination[] dest = createDestination(destIndex, destCount);
-        
+
         if (this.client.getMessageSelector() == null)
-        	return createJmsConsumer(dest[0]);
-        else 
-        	return createJmsConsumer(dest[0], this.client.getMessageSelector(), false);
+            return createJmsConsumer(dest[0]);
+        else
+            return createJmsConsumer(dest[0], this.client.getMessageSelector(), false);
     }
 
     public MessageConsumer createJmsConsumer(Destination dest) throws JMSException {
@@ -252,26 +252,28 @@ public class JmsConsumerClient extends AbstractJmsMeasurableClient {
         return jmsConsumer;
     }
 
+    @Override
     public JmsClientProperties getClient() {
         return client;
     }
 
+    @Override
     public void setClient(JmsClientProperties clientProps) {
         client = (JmsConsumerProperties)clientProps;
     }
-    
+
     /**
-     * A way to throttle the consumer. Time to sleep is 
-     * configured via recvDelay property. 
+     * A way to throttle the consumer. Time to sleep is
+     * configured via recvDelay property.
      */
     protected void sleep() {
         if (client.getRecvDelay() > 0) {
-        	try {
-        		LOG.trace("Sleeping for " + client.getRecvDelay() + " milliseconds");
-        		Thread.sleep(client.getRecvDelay());
-        	} catch (java.lang.InterruptedException ex) {
-        		LOG.warn(ex.getMessage());
-        	}
+            try {
+                LOG.trace("Sleeping for " + client.getRecvDelay() + " milliseconds");
+                Thread.sleep(client.getRecvDelay());
+            } catch (java.lang.InterruptedException ex) {
+                LOG.warn(ex.getMessage());
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/e47e0e04/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerSystem.java
----------------------------------------------------------------------
diff --git a/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerSystem.java b/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerSystem.java
index 640d52f..34feeaa 100644
--- a/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerSystem.java
+++ b/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerSystem.java
@@ -48,6 +48,17 @@ public class JmsConsumerSystem extends AbstractJmsClientSystem {
         this.consumer = consumer;
     }
 
+    @Override
+    protected ClientRunBasis getClientRunBasis() {
+        assert (consumer != null);
+        return ClientRunBasis.valueOf(consumer.getRecvType().toLowerCase());
+    }
+
+    @Override
+    protected long getClientRunDuration() {
+        return consumer.getRecvDuration();
+    }
+
     protected void runJmsClient(String clientName, int clientDestIndex, int clientDestCount) {
         ThroughputSamplerTask sampler = getTpSampler();
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/e47e0e04/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsProducerClient.java
----------------------------------------------------------------------
diff --git a/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsProducerClient.java b/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsProducerClient.java
index d4a5c8c..e59a857 100644
--- a/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsProducerClient.java
+++ b/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsProducerClient.java
@@ -21,7 +21,6 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.FileReader;
 import java.io.IOException;
-
 import java.util.Arrays;
 import java.util.Set;
 
@@ -56,11 +55,12 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient {
     public void sendMessages() throws JMSException {
         // Send a specific number of messages
         if (client.getSendType().equalsIgnoreCase(JmsProducerProperties.COUNT_BASED_SENDING)) {
-            sendCountBasedMessages(client.getSendCount());
-
+            long sendCount = client.getSendCount();
+            sendCountBasedMessages(sendCount);
         // Send messages for a specific duration
         } else {
-            sendTimeBasedMessages(client.getSendDuration());
+            long sendDuration = client.getSendDuration();
+            sendTimeBasedMessages(sendDuration);
         }
     }
 
@@ -90,18 +90,18 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient {
         try {
             getConnection().start();
             if (client.getMsgFileName() != null) {
-            	LOG.info("Starting to publish " +
-            		messageCount + 
-            		" messages from file " + 
-            		client.getMsgFileName()
-            	);
+                LOG.info("Starting to publish " +
+                    messageCount +
+                    " messages from file " +
+                    client.getMsgFileName()
+                );
             } else {
-            	LOG.info("Starting to publish " +
-            		messageCount +
-            		" messages of size " +
-            		client.getMessageSize() + 
-            		" byte(s)." 
-            	);
+                LOG.info("Starting to publish " +
+                    messageCount +
+                    " messages of size " +
+                    client.getMessageSize() +
+                    " byte(s)."
+                );
             }
 
             // Send one type of message only, avoiding the creation of different messages on sending
@@ -155,6 +155,7 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient {
                 }
             }
         } finally {
+            LOG.info("Finished sending");
             getConnection().close();
         }
     }
@@ -178,17 +179,17 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient {
         try {
             getConnection().start();
             if (client.getMsgFileName() != null) {
-            	LOG.info("Starting to publish messages from file " + 
-            			client.getMsgFileName() + 
-            			" for " +
-            			duration + 
-            			" ms");
+                LOG.info("Starting to publish messages from file " +
+                        client.getMsgFileName() +
+                        " for " +
+                        duration +
+                        " ms");
             } else {
-            	LOG.info("Starting to publish " + 
-            			client.getMessageSize() + 
-            			" byte(s) messages for " + 
-            			duration + 
-            			" ms");
+                LOG.info("Starting to publish " +
+                        client.getMessageSize() +
+                        " byte(s) messages for " +
+                        duration +
+                        " ms");
             }
             // Send one type of message only, avoiding the creation of different messages on sending
             if (!client.isCreateNewMsg()) {
@@ -243,6 +244,7 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient {
                 }
             }
         } finally {
+            LOG.info("Finished sending");
             getConnection().close();
         }
     }
@@ -282,22 +284,22 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient {
     }
 
     public TextMessage createJmsTextMessage() throws JMSException {
-    	if (client.getMsgFileName() != null) {
-    		return loadJmsMessage();
-    	} else {
+        if (client.getMsgFileName() != null) {
+            return loadJmsMessage();
+        } else {
           return createJmsTextMessage(client.getMessageSize());
-    	}
+        }
     }
 
     public TextMessage createJmsTextMessage(int size) throws JMSException {
         jmsTextMessage = getSession().createTextMessage(buildText("", size));
-        
+
         // support for adding message headers
         Set<String> headerKeys = this.client.getHeaderKeys();
         for (String key : headerKeys) {
-        	jmsTextMessage.setObjectProperty(key, this.client.getHeaderValue(key));
+            jmsTextMessage.setObjectProperty(key, this.client.getHeaderValue(key));
         }
-        
+
         return jmsTextMessage;
     }
 
@@ -310,10 +312,12 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient {
         return jmsTextMessage;
     }
 
+    @Override
     public JmsClientProperties getClient() {
         return client;
     }
 
+    @Override
     public void setClient(JmsClientProperties clientProps) {
         client = (JmsProducerProperties)clientProps;
     }
@@ -323,49 +327,49 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient {
         Arrays.fill(data, (byte) 0);
         return text + new String(data);
     }
-    
+
     protected void sleep() {
         if (client.getSendDelay() > 0) {
-        	try {
-        		LOG.trace("Sleeping for " + client.getSendDelay() + " milliseconds");
-        		Thread.sleep(client.getSendDelay());
-        	} catch (java.lang.InterruptedException ex) {
-        		LOG.warn(ex.getMessage());
-        	}
+            try {
+                LOG.trace("Sleeping for " + client.getSendDelay() + " milliseconds");
+                Thread.sleep(client.getSendDelay());
+            } catch (java.lang.InterruptedException ex) {
+                LOG.warn(ex.getMessage());
+            }
         }
     }
-    
+
     /**
      * loads the message to be sent from the specified TextFile
      */
     protected TextMessage loadJmsMessage() throws JMSException {
-    	try {
-    		// couple of sanity checks upfront 
-    		if (client.getMsgFileName() == null) {
-    			throw new JMSException("Invalid filename specified.");
-    		}
-    		
-    		File f = new File(client.getMsgFileName());
-    		if (f.isDirectory()) {
-    			throw new JMSException("Cannot load from " + 
-    					client.getMsgFileName() + 
-    					" as it is a directory not a text file.");
-    		} 
-    		
-    		// try to load file
-    		BufferedReader br = new BufferedReader(new FileReader(f));
-    		StringBuffer payload = new StringBuffer();
-    		String tmp = null;
-    		while ((tmp = br.readLine()) != null) {
-    			payload.append(tmp);
-    		}
-    		jmsTextMessage = getSession().createTextMessage(payload.toString());
-    		return jmsTextMessage;
-    		
-    	} catch (FileNotFoundException ex) {
-    		throw new JMSException(ex.getMessage());
-    	} catch (IOException iox) {
-    		throw new JMSException(iox.getMessage());
-    	}
+        try {
+            // couple of sanity checks upfront
+            if (client.getMsgFileName() == null) {
+                throw new JMSException("Invalid filename specified.");
+            }
+
+            File f = new File(client.getMsgFileName());
+            if (f.isDirectory()) {
+                throw new JMSException("Cannot load from " +
+                        client.getMsgFileName() +
+                        " as it is a directory not a text file.");
+            }
+
+            // try to load file
+            BufferedReader br = new BufferedReader(new FileReader(f));
+            StringBuffer payload = new StringBuffer();
+            String tmp = null;
+            while ((tmp = br.readLine()) != null) {
+                payload.append(tmp);
+            }
+            br.close();
+            jmsTextMessage = getSession().createTextMessage(payload.toString());
+            return jmsTextMessage;
+        } catch (FileNotFoundException ex) {
+            throw new JMSException(ex.getMessage());
+        } catch (IOException iox) {
+            throw new JMSException(iox.getMessage());
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/e47e0e04/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsProducerSystem.java
----------------------------------------------------------------------
diff --git a/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsProducerSystem.java b/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsProducerSystem.java
index f501270..dc6e024 100644
--- a/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsProducerSystem.java
+++ b/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsProducerSystem.java
@@ -28,14 +28,17 @@ public class JmsProducerSystem extends AbstractJmsClientSystem {
     protected JmsProducerSystemProperties sysTest = new JmsProducerSystemProperties();
     protected JmsProducerProperties producer = new JmsProducerProperties();
 
+    @Override
     public JmsClientSystemProperties getSysTest() {
         return sysTest;
     }
 
+    @Override
     public void setSysTest(JmsClientSystemProperties sysTestProps) {
         sysTest = (JmsProducerSystemProperties)sysTestProps;
     }
 
+    @Override
     public JmsClientProperties getJmsClientProperties() {
         return getProducer();
     }
@@ -48,6 +51,19 @@ public class JmsProducerSystem extends AbstractJmsClientSystem {
         this.producer = producer;
     }
 
+    @Override
+    protected ClientRunBasis getClientRunBasis() {
+        assert (producer != null);
+        return ClientRunBasis.valueOf(producer.getSendType().toLowerCase());
+    }
+
+    @Override
+    protected long getClientRunDuration() {
+        return producer.getSendDuration();
+    }
+
+
+    @Override
     protected void runJmsClient(String clientName, int clientDestIndex, int clientDestCount) {
         ThroughputSamplerTask sampler = getTpSampler();
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/e47e0e04/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/properties/JmsClientSystemProperties.java
----------------------------------------------------------------------
diff --git a/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/properties/JmsClientSystemProperties.java b/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/properties/JmsClientSystemProperties.java
index 091b283..0c26a7d 100644
--- a/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/properties/JmsClientSystemProperties.java
+++ b/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/properties/JmsClientSystemProperties.java
@@ -17,8 +17,11 @@
 package org.apache.activemq.tool.properties;
 
 import java.io.File;
+import java.util.HashSet;
+import java.util.Set;
 
 public class JmsClientSystemProperties extends AbstractObjectProperties {
+    
     public static final String DEST_DISTRO_ALL    = "all";    // Each client will send/receive to all destination;
     public static final String DEST_DISTRO_EQUAL  = "equal";  // Equally divide the number of destinations to the number of clients
     public static final String DEST_DISTRO_DIVIDE = "divide"; // Divide the destination among the clients, even if some have more destination than others
@@ -71,6 +74,14 @@ public class JmsClientSystemProperties extends AbstractObjectProperties {
         return samplers;
     }
 
+    public Set<String> getSamplersSet() {
+        Set<String> samplersSet = new HashSet<>();
+        for (String sampler : samplers.split(",")) {
+            samplersSet.add(sampler.trim());
+        }
+        return samplersSet;
+    }
+
     public void setSamplers(String samplers) {
         this.samplers = samplers;
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/e47e0e04/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/properties/JmsProducerProperties.java
----------------------------------------------------------------------
diff --git a/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/properties/JmsProducerProperties.java b/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/properties/JmsProducerProperties.java
index 573ba1c..3e85df6 100644
--- a/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/properties/JmsProducerProperties.java
+++ b/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/properties/JmsProducerProperties.java
@@ -24,14 +24,14 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class JmsProducerProperties extends JmsClientProperties {
-	
-	private static final Logger LOG = LoggerFactory.getLogger(ReflectionUtil.class);
-	
+
+    private static final Logger LOG = LoggerFactory.getLogger(ReflectionUtil.class);
+
     public static final String TIME_BASED_SENDING  = "time"; // Produce messages base on a time interval
     public static final String COUNT_BASED_SENDING = "count"; // Produce a specific count of messages
     public static final String DELIVERY_MODE_PERSISTENT     = "persistent"; // Persistent message delivery
     public static final String DELIVERY_MODE_NON_PERSISTENT = "nonpersistent"; // Non-persistent message delivery
-    
+
     protected String deliveryMode = DELIVERY_MODE_NON_PERSISTENT; // Message delivery mode
     protected int messageSize = 1024; // Send 1kb messages by default
     protected long sendCount  = 1000000; // Send a million messages by default
@@ -39,15 +39,15 @@ public class JmsProducerProperties extends JmsClientProperties {
     protected String sendType = TIME_BASED_SENDING;
     protected long sendDelay = 0;  // delay in milliseconds between each producer send
     protected String msgFileName = null; // for sending a particular msg from a file
-    
+
     protected Map<String,Object> headerMap = null;
- 
-    
+
+
     // If true, create a different message on each send, otherwise reuse.
-    protected boolean createNewMsg; 
-    
+    protected boolean createNewMsg;
+
     public JmsProducerProperties() {
-    	this.headerMap = new HashMap();
+        this.headerMap = new HashMap<String, Object>();
     }
 
     public String getDeliveryMode() {
@@ -97,72 +97,72 @@ public class JmsProducerProperties extends JmsClientProperties {
     public void setCreateNewMsg(boolean createNewMsg) {
         this.createNewMsg = createNewMsg;
     }
-    
+
     public void setSendDelay(long delay) {
-    	this.sendDelay = delay;
+        this.sendDelay = delay;
     }
-    
+
     public long getSendDelay() {
-    	return this.sendDelay;
+        return this.sendDelay;
     }
-    
-    
+
+
     /* Operations for supporting message headers */
-    
+
     /**
-     * Method for setting a message header. 
+     * Method for setting a message header.
      * @param encodedHeader - the header is encoded as a string using this syntax:
      * encodedHeader = [headerkey '=' headervalue ':' ]*
      * E.g. an encodedHeader could read "JMSType=car", or
      * "JMSType=car:MyHeader=MyValue"
      *
-     * That implies neither the header key nor the value 
+     * That implies neither the header key nor the value
      * can contain any of the characters ':' and '='.
      */
     public void setHeader(String encodedHeader) {
-    	
-    	// remove any trailing ':' characters
-    	if (encodedHeader.endsWith(":")) {
-    		encodedHeader = encodedHeader.substring(0, encodedHeader.length()-1);
-    	}
-
-    	// split headers 
-    	String headers[] = encodedHeader.split(":");
-    	for (String h : headers) {
-    		
-    		// split into header name and value
-    		String tokens[] = h.split("=");
-    		
-    		// sanity check, don't allow empty string for header names
-    		if (tokens.length != 2 || tokens[0].equals("") || tokens[1].equals("") ) {
-    			LOG.error("Error parsing message headers. Header: \"" + h +
-    					"\". This header will be ignored.");
-    		} else {
-    			this.headerMap.put(tokens[0], tokens[1]);
-    		}
-    	}
-    }
-    
+
+        // remove any trailing ':' characters
+        if (encodedHeader.endsWith(":")) {
+            encodedHeader = encodedHeader.substring(0, encodedHeader.length()-1);
+        }
+
+        // split headers
+        String headers[] = encodedHeader.split(":");
+        for (String h : headers) {
+
+            // split into header name and value
+            String tokens[] = h.split("=");
+
+            // sanity check, don't allow empty string for header names
+            if (tokens.length != 2 || tokens[0].equals("") || tokens[1].equals("") ) {
+                LOG.error("Error parsing message headers. Header: \"" + h +
+                        "\". This header will be ignored.");
+            } else {
+                this.headerMap.put(tokens[0], tokens[1]);
+            }
+        }
+    }
+
     public Set<String> getHeaderKeys() {
-    	return this.headerMap.keySet();
+        return this.headerMap.keySet();
     }
-    
+
     public Object getHeaderValue(String key) {
-    	return this.headerMap.get(key);
-    }  
-    
+        return this.headerMap.get(key);
+    }
+
     public void clearHeaders() {
-    	this.headerMap.clear();
+        this.headerMap.clear();
     }
-    
+
     public void setMsgFileName(String file) {
-    	LOG.info("\"producer.msgFileName\" specified. " +
-    			"Will ignore setting \"producer.messageSize\".");
-    	this.msgFileName = file;
+        LOG.info("\"producer.msgFileName\" specified. " +
+                "Will ignore setting \"producer.messageSize\".");
+        this.msgFileName = file;
     }
-    
+
     public String getMsgFileName() {
-    	return this.msgFileName;
+        return this.msgFileName;
     }
-    
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/e47e0e04/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/properties/ReflectionUtil.java
----------------------------------------------------------------------
diff --git a/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/properties/ReflectionUtil.java b/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/properties/ReflectionUtil.java
index 8e8413c..fc6d7bb 100644
--- a/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/properties/ReflectionUtil.java
+++ b/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/properties/ReflectionUtil.java
@@ -39,7 +39,7 @@ public final class ReflectionUtil {
             String debugInfo;
 
             Object target = obj;
-            Class targetClass = obj.getClass();
+            Class<?> targetClass = obj.getClass();
 
             // DEBUG: Debugging Info
             debugInfo = "Invoking: " + targetClass.getName();
@@ -92,7 +92,7 @@ public final class ReflectionUtil {
             if (setterMethod == null) {
                 throw new IllegalAccessException("Unable to find appropriate setter method signature for property: " + property);
             }
-            Class paramType = setterMethod.getParameterTypes()[0];
+            Class<?> paramType = setterMethod.getParameterTypes()[0];
 
             // Set primitive type
             debugInfo += "." + setterMethod + "(" + paramType.getName() + ": " + val + ")";
@@ -160,7 +160,7 @@ public final class ReflectionUtil {
     }
 
     public static void configureClass(Object obj, Properties props) {
-        for (Iterator i = props.keySet().iterator(); i.hasNext();) {
+        for (Iterator<Object> i = props.keySet().iterator(); i.hasNext();) {
             try {
                 String key = (String)i.next();
                 String val = props.getProperty(key);

http://git-wip-us.apache.org/repos/asf/activemq/blob/e47e0e04/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/sampler/AbstractPerformanceSampler.java
----------------------------------------------------------------------
diff --git a/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/sampler/AbstractPerformanceSampler.java b/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/sampler/AbstractPerformanceSampler.java
index 47d5cb6..9f3fa31 100644
--- a/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/sampler/AbstractPerformanceSampler.java
+++ b/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/sampler/AbstractPerformanceSampler.java
@@ -16,89 +16,173 @@
  */
 package org.apache.activemq.tool.sampler;
 
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.activemq.tool.ClientRunBasis;
 import org.apache.activemq.tool.properties.AbstractObjectProperties;
 import org.apache.activemq.tool.reports.PerformanceReportWriter;
+import org.apache.commons.lang.Validate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public abstract class AbstractPerformanceSampler extends AbstractObjectProperties implements PerformanceSampler {
     
-    protected long rampUpTime = 30 * 1000; // 30 secs
-    protected long rampDownTime = 30 * 1000; // 30 secs
-    protected long duration = 5 * 60 * 1000; // 5 mins
+    private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+    protected long rampUpPercent = 0;
+    protected long rampDownPercent = 0;
+
+    // the following are all optionally set; they are otherwise worked out at run time
+    protected Long rampUpTime;
+    protected Long rampDownTime;
+    protected Long duration;
+
     protected long interval = 1000; // 1 sec
     protected PerformanceReportWriter perfReportWriter;
     protected PerformanceEventListener perfEventListener;
     protected final AtomicBoolean isRunning = new AtomicBoolean(false);
+    protected CountDownLatch completionLatch;
     protected long sampleIndex;
 
-    public long getRampUpTime() {
+    @Override
+    public Long getRampUpTime() {
         return rampUpTime;
     }
 
+    @Override
     public void setRampUpTime(long rampUpTime) {
         this.rampUpTime = rampUpTime;
     }
 
-    public long getRampDownTime() {
+    @Override
+    public Long getRampDownTime() {
         return rampDownTime;
     }
 
+    @Override
     public void setRampDownTime(long rampDownTime) {
         this.rampDownTime = rampDownTime;
     }
 
-    public long getDuration() {
+    @Override
+    public Long getDuration() {
         return duration;
     }
 
+    @Override
     public void setDuration(long duration) {
         this.duration = duration;
     }
 
+    @Override
     public long getInterval() {
         return interval;
     }
 
+    @Override
     public void setInterval(long interval) {
         this.interval = interval;
     }
 
+    @Override
+    public long getRampUpPercent() {
+        return rampUpPercent;
+    }
+
+    @Override
+    public void setRampUpPercent(long rampUpPercent) {
+        Validate.isTrue((rampUpPercent >= 0) && (rampUpPercent <= 100), "rampUpPercent must be a value between 0 and 100");
+        this.rampUpPercent = rampUpPercent;
+    }
+
+    @Override
+    public long getRampDownPercent() {
+        return rampDownPercent;
+    }
+
+    @Override
+    public void setRampDownPercent(long rampDownPercent) {
+        Validate.isTrue((rampDownPercent >= 0) && (rampDownPercent < 100), "rampDownPercent must be a value between 0 and 99");
+        this.rampDownPercent = rampDownPercent;
+    }
+
+    @Override
     public PerformanceReportWriter getPerfReportWriter() {
         return perfReportWriter;
     }
 
+    @Override
     public void setPerfReportWriter(PerformanceReportWriter perfReportWriter) {
         this.perfReportWriter = perfReportWriter;
     }
 
+    @Override
     public PerformanceEventListener getPerfEventListener() {
         return perfEventListener;
     }
 
+    @Override
     public void setPerfEventListener(PerformanceEventListener perfEventListener) {
         this.perfEventListener = perfEventListener;
     }
 
-    public void startSampler() {
-        isRunning.set(true);
+    @Override
+    public void startSampler(CountDownLatch completionLatch, ClientRunBasis clientRunBasis, long clientRunDuration) {
+        Validate.notNull(clientRunBasis);
+        Validate.notNull(completionLatch);
+
+        if (clientRunBasis == ClientRunBasis.time) {
+            // override the default durations
+            // if the user has overridden a duration, then use that
+            duration = (duration == null) ? clientRunDuration : this.duration;
+            rampUpTime = (rampUpTime == null) ? (duration / 100 * rampUpPercent) : this.rampUpTime;
+            rampDownTime = (rampDownTime == null) ? (duration / 100 * rampDownPercent) : this.rampDownTime;
+
+            Validate.isTrue(duration >= (rampUpTime + rampDownTime),
+                    "Ramp times (up: " + rampDownTime + ", down: " + rampDownTime + ") exceed the sampler duration (" + duration + ")");
+            log.info("Sampling duration: {} ms, ramp up: {} ms, ramp down: {} ms", duration, rampUpTime, rampDownTime);
+
+            // spawn notifier thread to stop the sampler, taking ramp-down time into account
+            Thread notifier = new Thread(new RampDownNotifier(this));
+            notifier.setName("RampDownNotifier[" + this.getClass().getSimpleName() + "]");
+            notifier.start();
+        } else {
+            log.info("Performance test running on count basis; ignoring duration and ramp times");
+            setRampUpTime(0);
+            setRampDownTime(0);
+        }
+
+        this.completionLatch = completionLatch;
         Thread t = new Thread(this);
+        t.setName(this.getClass().getSimpleName());
         t.start();
+        isRunning.set(true);
     }
 
+    @Override
+    public void finishSampling() {
+        isRunning.set(false);
+    }
+
+    @Override
     public void run() {
         try {
+            log.debug("Ramp up start");
             onRampUpStart();
             if (perfEventListener != null) {
                 perfEventListener.onRampUpStart(this);
             }
 
-            try {
-                Thread.sleep(rampUpTime);
-            } catch (InterruptedException e) {
-                e.printStackTrace();
+            if (rampUpTime > 0) {
+                try {
+                    Thread.sleep(rampUpTime);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
             }
 
+            log.debug("Sampler start");
             onSamplerStart();
             if (perfEventListener != null) {
                 perfEventListener.onSamplerStart(this);
@@ -106,34 +190,32 @@ public abstract class AbstractPerformanceSampler extends AbstractObjectPropertie
 
             sample();
 
+            log.debug("Sampler end");
             onSamplerEnd();
             if (perfEventListener != null) {
                 perfEventListener.onSamplerEnd(this);
             }
 
-            try {
-                Thread.sleep(rampDownTime);
-            } catch (InterruptedException e) {
-                e.printStackTrace();
+            if (rampDownTime > 0) {
+                try {
+                    Thread.sleep(rampDownTime);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
             }
 
+            log.debug("Ramp down end");
             onRampDownEnd();
             if (perfEventListener != null) {
                 perfEventListener.onRampDownEnd(this);
             }
         } finally {
-            isRunning.set(false);
-            synchronized (isRunning) {
-                isRunning.notifyAll();
-            }
+            completionLatch.countDown();
         }
     }
 
     protected void sample() {
-        // Compute for the actual duration window of the sampler
-        long endTime = System.currentTimeMillis() + duration - rampDownTime - rampUpTime;
-
-        while (System.currentTimeMillis() < endTime) {
+        while (isRunning.get()) {
             try {
                 Thread.sleep(interval);
             } catch (InterruptedException e) {
@@ -144,24 +226,9 @@ public abstract class AbstractPerformanceSampler extends AbstractObjectPropertie
         }
     }
 
+    @Override
     public abstract void sampleData();
 
-    public boolean isRunning() {
-        return isRunning.get();
-    }
-
-    public void waitUntilDone() {
-        while (isRunning()) {
-            try {
-                synchronized (isRunning) {
-                    isRunning.wait(0);
-                }
-            } catch (InterruptedException e) {
-                e.printStackTrace();
-            }
-        }
-    }
-
     // Call back functions to customize behavior of thread.
     protected void onRampUpStart() {
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/e47e0e04/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/sampler/PerformanceSampler.java
----------------------------------------------------------------------
diff --git a/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/sampler/PerformanceSampler.java b/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/sampler/PerformanceSampler.java
index 9efe5a3..0e8088b 100644
--- a/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/sampler/PerformanceSampler.java
+++ b/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/sampler/PerformanceSampler.java
@@ -16,19 +16,22 @@
  */
 package org.apache.activemq.tool.sampler;
 
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.activemq.tool.ClientRunBasis;
 import org.apache.activemq.tool.reports.PerformanceReportWriter;
 
 public interface PerformanceSampler extends Runnable {
-    
-    long getRampUpTime();
+
+    Long getRampUpTime();
 
     void setRampUpTime(long rampUpTime);
 
-    long getRampDownTime();
+    Long getRampDownTime();
 
     void setRampDownTime(long rampDownTime);
 
-    long getDuration();
+    Long getDuration();
 
     void setDuration(long duration);
 
@@ -36,6 +39,14 @@ public interface PerformanceSampler extends Runnable {
 
     void setInterval(long interval);
 
+    long getRampUpPercent();
+
+    void setRampUpPercent(long rampUpPercent);
+
+    long getRampDownPercent();
+
+    void setRampDownPercent(long rampDownPercent);
+
     PerformanceReportWriter getPerfReportWriter();
 
     void setPerfReportWriter(PerformanceReportWriter writer);
@@ -44,9 +55,10 @@ public interface PerformanceSampler extends Runnable {
 
     void setPerfEventListener(PerformanceEventListener listener);
 
+    void finishSampling();
+
     void sampleData();
 
-    boolean isRunning();
+    void startSampler(CountDownLatch completionLatch, ClientRunBasis clientRunBasis, long clientRunDuration);
 
-    void waitUntilDone();
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/e47e0e04/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/sampler/RampDownNotifier.java
----------------------------------------------------------------------
diff --git a/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/sampler/RampDownNotifier.java b/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/sampler/RampDownNotifier.java
new file mode 100644
index 0000000..65d7c0d
--- /dev/null
+++ b/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/sampler/RampDownNotifier.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.tool.sampler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RampDownNotifier implements Runnable {
+    private final static Logger LOG = LoggerFactory.getLogger(RampDownNotifier.class);
+    private final PerformanceSampler sampler;
+
+    public RampDownNotifier(PerformanceSampler sampler) {
+        this.sampler = sampler;
+    }
+
+    @Override
+    public void run() {
+        try {
+            Thread.sleep(sampler.getDuration() - sampler.getRampDownTime());
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        } finally {
+            LOG.debug("Ramping down sampler");
+            sampler.finishSampling();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/e47e0e04/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/sampler/ThroughputSamplerTask.java
----------------------------------------------------------------------
diff --git a/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/sampler/ThroughputSamplerTask.java b/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/sampler/ThroughputSamplerTask.java
index d717912..c35d251 100644
--- a/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/sampler/ThroughputSamplerTask.java
+++ b/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/sampler/ThroughputSamplerTask.java
@@ -16,23 +16,21 @@
  */
 package org.apache.activemq.tool.sampler;
 
-import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.activemq.tool.reports.AbstractPerfReportWriter;
 
 public class ThroughputSamplerTask extends AbstractPerformanceSampler {
 
-    private final Object mutex = new Object();
-    private List<MeasurableClient> clients = new ArrayList<MeasurableClient>();
+    private final List<MeasurableClient> clients = new CopyOnWriteArrayList<>();
 
     public void registerClient(MeasurableClient client) {
-        synchronized (mutex) {
-            clients.add(client);
-        }
+        clients.add(client);
     }
 
+    @Override
     public void sampleData() {
         for (Iterator<MeasurableClient> i = clients.iterator(); i.hasNext();) {
             MeasurableClient client = i.next();
@@ -44,6 +42,7 @@ public class ThroughputSamplerTask extends AbstractPerformanceSampler {
         }
     }
 
+    @Override
     protected void onSamplerStart() {
         // Reset the throughput of the clients
         for (Iterator<MeasurableClient> i = clients.iterator(); i.hasNext();) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/e47e0e04/activemq-tooling/activemq-perf-maven-plugin/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/activemq-tooling/activemq-perf-maven-plugin/src/main/resources/log4j.properties b/activemq-tooling/activemq-perf-maven-plugin/src/main/resources/log4j.properties
new file mode 100644
index 0000000..1840b59
--- /dev/null
+++ b/activemq-tooling/activemq-perf-maven-plugin/src/main/resources/log4j.properties
@@ -0,0 +1,29 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+#
+# The logging properties used during tests..
+#
+log4j.rootLogger=INFO, stdout
+
+log4j.logger.org.apache.activemq.tool=DEBUG
+
+# CONSOLE appender not used by default
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d [%-15.15t] - %-5p %-30.30c{1} - %m%n
+#log4j.appender.stdout.layout.ConversionPattern=%d [%-15.15t] - %-5p %-30.30c{1} - %-10.10X{activemq.broker} %-20.20X{activemq.connector} %-10.10X{activemq.destination} - %m%n

http://git-wip-us.apache.org/repos/asf/activemq/blob/e47e0e04/activemq-tooling/activemq-perf-maven-plugin/src/test/java/org/apache/activemq/tool/sampler/AbstractPerformanceSamplerTest.java
----------------------------------------------------------------------
diff --git a/activemq-tooling/activemq-perf-maven-plugin/src/test/java/org/apache/activemq/tool/sampler/AbstractPerformanceSamplerTest.java b/activemq-tooling/activemq-perf-maven-plugin/src/test/java/org/apache/activemq/tool/sampler/AbstractPerformanceSamplerTest.java
new file mode 100644
index 0000000..e22fea2
--- /dev/null
+++ b/activemq-tooling/activemq-perf-maven-plugin/src/test/java/org/apache/activemq/tool/sampler/AbstractPerformanceSamplerTest.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.tool.sampler;
+
+import org.apache.activemq.tool.ClientRunBasis;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.CountDownLatch;
+
+import static org.junit.Assert.*;
+
+public class AbstractPerformanceSamplerTest {
+
+    private class EmptySampler extends AbstractPerformanceSampler {
+        @Override
+        public void sampleData() {}
+    }
+
+    private AbstractPerformanceSampler sampler;
+    private CountDownLatch samplerLatch;
+
+    @Before
+    public void setUpSampler() {
+        sampler = new EmptySampler();
+        samplerLatch = new CountDownLatch(1);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testSetRampUpPercent_exceeds100() {
+        sampler.setRampUpPercent(101);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testSetRampUpPercent_lessThan0() {
+        sampler.setRampUpPercent(-1);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testSetRampDownPercent_exceeds99() {
+        sampler.setRampDownPercent(100);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testSetRampDownPercent_lessThan0() {
+        sampler.setRampDownPercent(-1);
+    }
+
+    @Test
+    public void testSamplerOnCountBasis() throws InterruptedException {
+        final CountDownLatch latch = samplerLatch;
+        sampler.startSampler(latch, ClientRunBasis.count, 0);
+        sampler.finishSampling();
+        samplerLatch.await();
+        assertNull(sampler.getDuration());
+        assertEquals(0, (long) sampler.getRampUpTime());
+        assertEquals(0, (long) sampler.getRampDownTime());
+    }
+
+    @Test
+    public void testSamplerOnTimeBasis_matchesClientSettings() throws InterruptedException {
+        final CountDownLatch latch = samplerLatch;
+        sampler.startSampler(latch, ClientRunBasis.time, 1000);
+        samplerLatch.await();
+        assertEquals(1000, (long) sampler.getDuration());
+        assertEquals(0, (long) sampler.getRampUpTime());
+        assertEquals(0, (long) sampler.getRampDownTime());
+    }
+
+    @Test
+    public void testSamplerOnTimeBasis_percentageOverrides() throws InterruptedException {
+        final CountDownLatch latch = samplerLatch;
+        sampler.setRampUpPercent(10);
+        sampler.setRampDownPercent(20);
+        sampler.startSampler(latch, ClientRunBasis.time, 1000);
+        samplerLatch.await();
+        assertEquals(1000, (long) sampler.getDuration());
+        assertEquals(100, (long) sampler.getRampUpTime());
+        assertEquals(200, (long) sampler.getRampDownTime());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testSamplerOnTimeBasis_percentageOverridesExceedSamplerDuration() throws InterruptedException {
+        final CountDownLatch latch = samplerLatch;
+        sampler.setRampUpPercent(60);
+        sampler.setRampDownPercent(41);
+        sampler.startSampler(latch, ClientRunBasis.time, 1000);
+    }
+
+    @Test
+    public void testSamplerOnTimeBasis_timeOverrides() throws InterruptedException {
+        final CountDownLatch latch = samplerLatch;
+        sampler.setRampUpTime(10);
+        sampler.setRampDownTime(20);
+        sampler.startSampler(latch, ClientRunBasis.time, 1000);
+        samplerLatch.await();
+        assertEquals(1000, (long) sampler.getDuration());
+        assertEquals(10, (long) sampler.getRampUpTime());
+        assertEquals(20, (long) sampler.getRampDownTime());
+    }
+}
\ No newline at end of file


Mime
View raw message