activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andytay...@apache.org
Subject [1/5] activemq-6 git commit: ACTIVEMQ6-78 Adding tests to evaluate this task
Date Thu, 12 Feb 2015 15:14:47 GMT
Repository: activemq-6
Updated Branches:
  refs/heads/master 1d5a7a10d -> b8db8b051


ACTIVEMQ6-78 Adding tests to evaluate this task

https://issues.apache.org/jira/browse/ACTIVEMQ6-78

This commit is just adding tests I used to debug the blocked calls issue
There are some profiling parameters you can use that I added as a comment to the pom

The reason this is a separate commit is that it would be easier to validate the results of
optimizations while
checking after and before any changes


Project: http://git-wip-us.apache.org/repos/asf/activemq-6/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-6/commit/41b28f4b
Tree: http://git-wip-us.apache.org/repos/asf/activemq-6/tree/41b28f4b
Diff: http://git-wip-us.apache.org/repos/asf/activemq-6/diff/41b28f4b

Branch: refs/heads/master
Commit: 41b28f4b23cb42bb168a7b1dfb0fb6e5d6faa053
Parents: 1d5a7a1
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Tue Feb 10 10:53:02 2015 -0500
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Wed Feb 11 12:47:01 2015 -0500

----------------------------------------------------------------------
 pom.xml                                         |   7 +-
 .../sends/AbstractSendReceivePerfTest.java      | 247 +++++++++++++++++++
 .../tests/performance/sends/ClientACKPerf.java  | 130 ++++++++++
 .../sends/MeasureCommitPerfTest.java            |  81 ++++++
 .../tests/performance/sends/PreACKPerf.java     |  93 +++++++
 5 files changed, 557 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-6/blob/41b28f4b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c3ae18a..4172145 100644
--- a/pom.xml
+++ b/pom.xml
@@ -79,7 +79,12 @@
        see https://intellij-support.jetbrains.com/entries/23395793
 
        Also see: http://youtrack.jetbrains.com/issue/IDEA-125696
-     -->
+
+
+       For profiling add this line and use jmc (Java Mission Control) to evaluate the results:
+           -XX:+UnlockCommercialFeatures -XX:+FlightRecorder -XX:StartFlightRecording=delay=30s,duration=120s,filename=/tmp/myrecording.jfr
+
+      -->
  
       <activemq-surefire-argline>-Djava.util.logging.manager=org.jboss.logmanager.LogManager
          -Dlogging.configuration=file:${activemq.basedir}/tests/config/logging.properties

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/41b28f4b/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/AbstractSendReceivePerfTest.java
----------------------------------------------------------------------
diff --git a/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/AbstractSendReceivePerfTest.java
b/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/AbstractSendReceivePerfTest.java
new file mode 100644
index 0000000..c8b2d7f
--- /dev/null
+++ b/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/AbstractSendReceivePerfTest.java
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.tests.performance.sends;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.api.core.TransportConfiguration;
+import org.apache.activemq.api.jms.ActiveMQJMSClient;
+import org.apache.activemq.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.core.settings.impl.AddressSettings;
+import org.apache.activemq.tests.util.JMSTestBase;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Client-ack time
+ *
+ * @author Clebert Suconic
+ */
+public abstract class AbstractSendReceivePerfTest extends JMSTestBase
+{
+   protected static final String Q_NAME = "test-queue-01";
+   private Queue queue;
+
+   protected AtomicBoolean running = new AtomicBoolean(true);
+
+
+   @Override
+   @Before
+   public void setUp() throws Exception
+   {
+      super.setUp();
+
+      jmsServer.createQueue(false, Q_NAME, null, true, Q_NAME);
+      queue = ActiveMQJMSClient.createQueue(Q_NAME);
+
+      AddressSettings settings = new AddressSettings();
+      settings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
+      settings.setMaxSizeBytes(Long.MAX_VALUE);
+      server.getAddressSettingsRepository().clear();
+      server.getAddressSettingsRepository().addMatch("#", settings);
+
+   }
+
+
+   @Override
+   protected void registerConnectionFactory() throws Exception
+   {
+      List<TransportConfiguration> connectorConfigs = new ArrayList<TransportConfiguration>();
+      connectorConfigs.add(new TransportConfiguration(NETTY_CONNECTOR_FACTORY));
+
+      createCF(connectorConfigs, "/cf");
+
+      cf = (ConnectionFactory) namingContext.lookup("/cf");
+   }
+
+
+   private static final java.util.logging.Logger LOGGER = java.util.logging.Logger.getLogger(AbstractSendReceivePerfTest.class.getName());
+
+
+   @Test
+   public void testSendReceive() throws Exception
+   {
+      long numberOfSamples = Long.getLong("HORNETQ_TEST_SAMPLES", 1000);
+
+
+      MessageReceiver receiver = new MessageReceiver(Q_NAME, numberOfSamples);
+      receiver.start();
+      MessageSender sender = new MessageSender(Q_NAME);
+      sender.start();
+
+      receiver.join();
+      sender.join();
+
+      assertFalse(receiver.failed);
+      assertFalse(sender.failed);
+
+   }
+
+   final Semaphore pendingCredit = new Semaphore(5000);
+
+   /**
+    * to be called after a message is consumed
+    * so the flow control of the test kicks in.
+    */
+   protected final void afterConsume(Message message)
+   {
+      if (message != null)
+      {
+         pendingCredit.release();
+      }
+   }
+
+
+   protected final void beforeSend()
+   {
+      while (running.get())
+      {
+         try
+         {
+            if (pendingCredit.tryAcquire(1, TimeUnit.SECONDS))
+            {
+               return;
+            }
+            else
+            {
+               System.out.println("Couldn't get credits!");
+            }
+         }
+         catch (Throwable e)
+         {
+            throw new RuntimeException(e.getMessage(), e);
+         }
+      }
+   }
+
+
+
+
+   private class MessageReceiver extends Thread
+   {
+      private final String qName;
+      private final long numberOfSamples;
+
+      public boolean failed = false;
+
+      public MessageReceiver(String qname, long numberOfSamples) throws Exception
+      {
+         super("Receiver " + qname);
+         this.qName = qname;
+         this.numberOfSamples = numberOfSamples;
+      }
+
+      @Override
+      public void run()
+      {
+         try
+         {
+            LOGGER.info("Receiver: Connecting");
+            Connection c = cf.createConnection();
+
+            consumeMessages(c, qName);
+
+            c.close();
+         }
+         catch (Exception e)
+         {
+            e.printStackTrace();
+            failed = true;
+         }
+         finally
+         {
+            running.set(false);
+         }
+      }
+   }
+
+   protected abstract void consumeMessages(Connection c, String qName) throws Exception;
+
+   private class MessageSender extends Thread
+   {
+      protected String qName;
+
+      public boolean failed = false;
+
+      public MessageSender(String qname) throws Exception
+      {
+         super("Sender " + qname);
+
+         this.qName = qname;
+      }
+
+      @Override
+      public void run()
+      {
+         try
+         {
+            LOGGER.info("Sender: Connecting");
+            Connection c = cf.createConnection();
+
+            sendMessages(c, qName);
+
+            c.close();
+
+         }
+         catch (Exception e)
+         {
+            failed = true;
+            if (e instanceof InterruptedException)
+            {
+               LOGGER.info("Sender done.");
+            }
+            else
+            {
+               e.printStackTrace();
+            }
+         }
+      }
+   }
+
+   /* This will by default send non persistent messages */
+   protected void sendMessages(Connection c, String qName) throws JMSException
+   {
+      Session s = null;
+      s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      LOGGER.info("Sender: Using AUTO-ACK session");
+
+
+      Queue q = s.createQueue(qName);
+      MessageProducer producer = s.createProducer(null);
+      producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+
+      long sent = 0;
+      while (running.get())
+      {
+         beforeSend();
+         producer.send(q, s.createTextMessage("Message_" + (sent++)));
+      }
+   }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/41b28f4b/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/ClientACKPerf.java
----------------------------------------------------------------------
diff --git a/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/ClientACKPerf.java
b/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/ClientACKPerf.java
new file mode 100644
index 0000000..6dba4f4
--- /dev/null
+++ b/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/ClientACKPerf.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.tests.performance.sends;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * @author clebertsuconic
+ */
+@RunWith(Parameterized.class)
+public class ClientACKPerf extends AbstractSendReceivePerfTest
+{
+
+   @Parameterized.Parameters(name = "batchSize={0}")
+   public static Collection<Object[]> data()
+   {
+      List<Object[]> list = Arrays.asList(new Object[][]{
+         {1},
+         {2000}});
+
+      System.out.println("Size = " + list.size());
+      return list;
+   }
+
+   public ClientACKPerf(int batchSize)
+   {
+      super();
+      this.batchSize = batchSize;
+   }
+
+
+   public final int batchSize;
+
+   @Override
+   protected void consumeMessages(Connection c, String qName) throws Exception
+   {
+      int mode = 0;
+      mode = Session.CLIENT_ACKNOWLEDGE;
+
+      System.out.println("Receiver: Using PRE-ACK mode");
+
+      Session s = c.createSession(false, mode);
+      Queue q = s.createQueue(qName);
+      MessageConsumer consumer = s.createConsumer(q, null, false);
+
+      c.start();
+
+      Message m = null;
+
+      long totalTimeACKTime = 0;
+
+
+      long start = System.currentTimeMillis();
+
+      long nmessages = 0;
+      long timeout = System.currentTimeMillis() + 60 * 1000;
+      while (timeout > System.currentTimeMillis())
+      {
+         m = consumer.receive(5000);
+         afterConsume(m);
+
+
+         if (m == null)
+         {
+            throw new Exception("Failed with m = null");
+         }
+
+         if (nmessages++ % batchSize == 0)
+         {
+            long startACK = System.nanoTime();
+            m.acknowledge();
+            long endACK = System.nanoTime();
+            totalTimeACKTime += (endACK - startACK);
+         }
+
+
+         if (nmessages % 10000 == 0)
+         {
+            printMsgsSec(start, nmessages, totalTimeACKTime);
+         }
+      }
+
+
+      printMsgsSec(start, nmessages, totalTimeACKTime);
+   }
+
+
+
+   protected void printMsgsSec(final long start, final double nmessages, final double totalTimeACKTime)
+   {
+
+      long end = System.currentTimeMillis();
+      double elapsed = ((double) end - (double) start) / 1000f;
+
+      double messagesPerSecond = nmessages / elapsed;
+      double nAcks = nmessages / batchSize;
+
+      System.out.println("batchSize=" + batchSize + ", numberOfMessages="
+                            + nmessages + ", elapsedTime=" + elapsed + " msgs/sec= " + messagesPerSecond
+ ",totalTimeAcking=" +  String.format("%10.4f", totalTimeACKTime) +
+                            ", avgACKTime=" + String.format("%10.4f", (totalTimeACKTime /
nAcks)));
+
+   }
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/41b28f4b/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/MeasureCommitPerfTest.java
----------------------------------------------------------------------
diff --git a/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/MeasureCommitPerfTest.java
b/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/MeasureCommitPerfTest.java
new file mode 100644
index 0000000..0528d0e
--- /dev/null
+++ b/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/MeasureCommitPerfTest.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.tests.performance.sends;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Session;
+
+
+/**
+ * @author clebertsuconic
+ */
+
+public class MeasureCommitPerfTest extends AbstractSendReceivePerfTest
+{
+   @Override
+   protected void consumeMessages(Connection c, String qName) throws Exception
+   {
+   }
+
+
+   /* This will by default send non persistent messages */
+   protected void sendMessages(Connection c, String qName) throws JMSException
+   {
+      Session s = c.createSession(true, Session.SESSION_TRANSACTED);
+
+
+      long timeout = System.currentTimeMillis() + 30 * 1000;
+
+      long startMeasure = System.currentTimeMillis() + 5000;
+      long start = 0;
+      long committs = 0;
+      while (timeout > System.currentTimeMillis())
+      {
+
+         if (start == 0 && System.currentTimeMillis() > startMeasure)
+         {
+            System.out.println("heat up");
+            start = System.currentTimeMillis();
+            committs = 0;
+         }
+
+         s.commit();
+         committs++;
+         if (start > 0 && committs % 1000 == 0) printCommitsSecond(start, committs);
+      }
+      printCommitsSecond(start, committs);
+
+      s.close();
+   }
+
+
+   protected void printCommitsSecond(final long start, final double committs)
+   {
+
+      long end = System.currentTimeMillis();
+      double elapsed = ((double) end - (double) start) / 1000f;
+
+      double commitsPerSecond = committs / elapsed;
+
+      System.out.println("end = " + end + ", start=" + start + ", numberOfMessages="
+                            + committs + ", elapsed=" + elapsed + " msgs/sec= " + commitsPerSecond);
+
+   }
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/41b28f4b/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/PreACKPerf.java
----------------------------------------------------------------------
diff --git a/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/PreACKPerf.java
b/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/PreACKPerf.java
new file mode 100644
index 0000000..a6d2906
--- /dev/null
+++ b/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/PreACKPerf.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.tests.performance.sends;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.activemq.api.jms.ActiveMQJMSConstants;
+
+/**
+ * @author clebertsuconic
+ */
+
+public class PreACKPerf extends AbstractSendReceivePerfTest
+{
+   @Override
+   protected void consumeMessages(Connection c, String qName) throws Exception
+   {
+      int mode = 0;
+      mode = ActiveMQJMSConstants.PRE_ACKNOWLEDGE;
+
+      System.out.println("Receiver: Using PRE-ACK mode");
+
+      Session s = c.createSession(false, mode);
+      Queue q = s.createQueue(qName);
+      MessageConsumer consumer = s.createConsumer(q, null, false);
+
+      c.start();
+
+      Message m = null;
+
+
+      long start = System.currentTimeMillis();
+
+      long nmessages = 0;
+      long timeout = System.currentTimeMillis() + 30 * 1000;
+      while (timeout > System.currentTimeMillis())
+      {
+         m = consumer.receive(5000);
+
+         nmessages++;
+
+         if (m == null)
+         {
+            throw new Exception("Failed with m = null");
+         }
+
+         if (nmessages % 10000 == 0)
+         {
+            printMsgsSec(start, nmessages);
+         }
+
+      }
+
+      long end = System.currentTimeMillis();
+
+      printMsgsSec(start, nmessages);
+   }
+
+
+
+   protected void printMsgsSec(final long start, final double nmessages)
+   {
+
+      long end = System.currentTimeMillis();
+      double elapsed = ((double) end - (double) start) / 1000f;
+
+      double messagesPerSecond = nmessages / elapsed;
+
+      System.out.println("end = " + end + ", start=" + start + ", numberOfMessages="
+                            + nmessages + ", elapsed=" + elapsed + " msgs/sec= " + messagesPerSecond);
+
+   }
+
+
+}


Mime
View raw message