qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ritch...@apache.org
Subject svn commit: r502624 [5/5] - in /incubator/qpid/branches/perftesting/qpid/java: ./ broker/src/main/java/org/apache/qpid/server/ broker/src/main/java/org/apache/qpid/server/ack/ broker/src/main/java/org/apache/qpid/server/protocol/ client/src/main/java/o...
Date Fri, 02 Feb 2007 15:28:13 GMT
Modified: incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java?view=diff&rev=502624&r1=502623&r2=502624
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java Fri Feb  2 07:28:08 2007
@@ -1,54 +1,92 @@
 /*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  *
  */
 package org.apache.qpid.ping;
 
-import uk.co.thebadgerset.junit.extensions.TimingControllerAware;
-import uk.co.thebadgerset.junit.extensions.TimingController;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
 
-import javax.jms.MessageListener;
-import javax.jms.ObjectMessage;
 import javax.jms.JMSException;
 import javax.jms.Message;
+import javax.jms.ObjectMessage;
 
-import junit.framework.Assert;
 import junit.framework.Test;
 import junit.framework.TestSuite;
+
 import org.apache.log4j.Logger;
 
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.CountDownLatch;
+import org.apache.qpid.requestreply.PingPongProducer;
 
+import uk.co.thebadgerset.junit.extensions.TimingController;
+import uk.co.thebadgerset.junit.extensions.TimingControllerAware;
+import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
 
+/**
+ * PingAsyncTestPerf is a performance test that outputs multiple timings from its test method, using the timing controller
+ * interface supplied by the test runner from a seperate listener thread. It differs from the {@link PingTestPerf} test
+ * that it extends because it can output timings as replies are received, rather than waiting until all expected replies
+ * are received. This is less 'blocky' than the tests in {@link PingTestPerf}, and provides a truer simulation of sending
+ * and recieving clients working asynchronously.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><td> Responsibilities <th> Collaborations
+ * <tr><td> Send many ping messages and output timings asynchronously on batches received.
+ * </table>
+ */
 public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerAware
 {
     private static Logger _logger = Logger.getLogger(PingAsyncTestPerf.class);
 
+    /** Holds the name of the property to get the test results logging batch size. */
+    public static final String TEST_RESULTS_BATCH_SIZE_PROPNAME = "BatchSize";
+
+    /** Holds the default test results logging batch size. */
+    public static final int DEFAULT_TEST_RESULTS_BATCH_SIZE = 1000;
+
+    /** Used to hold the timing controller passed from the test runner. */
     private TimingController _timingController;
 
-    private AsyncMessageListener _listener;
+    /** Used to generate unique correlation ids for each test run. */
+    private AtomicLong corellationIdGenerator = new AtomicLong();
+
+    /** Holds test specifics by correlation id. This consists of the expected number of messages and the timing controler. */
+    private Map<String, PerCorrelationId> perCorrelationIds =
+        Collections.synchronizedMap(new HashMap<String, PerCorrelationId>());
 
+    /** Holds the batched results listener, that does logging on batch boundaries. */
+    private BatchedResultsListener batchedResultsListener = null;
+
+    /**
+     * Creates a new asynchronous ping performance test with the specified name.
+     *
+     * @param name The test name.
+     */
     public PingAsyncTestPerf(String name)
     {
         super(name);
+
+        // Sets up the test parameters with defaults.
+        ParsedProperties.setSysPropertyIfNull(TEST_RESULTS_BATCH_SIZE_PROPNAME,
+                                              Integer.toString(DEFAULT_TEST_RESULTS_BATCH_SIZE));
     }
 
     /**
@@ -65,248 +103,200 @@
         return suite;
     }
 
-    protected void setUp() throws Exception
+    /**
+     * Accepts a timing controller from the test runner.
+     *
+     * @param timingController The timing controller to register mutliple timings with.
+     */
+    public void setTimingController(TimingController timingController)
     {
-        // Create the test setups on a per thread basis, only if they have not already been created.
-
-        if (threadSetup.get() == null)
-        {
-            PerThreadSetup perThreadSetup = new PerThreadSetup();
-
-            // Extract the test set up paramaeters.
-            String brokerDetails = testParameters.getProperty(BROKER_PROPNAME);
-            String username = "guest";
-            String password = "guest";
-            String virtualpath = testParameters.getProperty(VIRTUAL_PATH_PROPNAME);
-            int destinationscount = Integer.parseInt(testParameters.getProperty(PING_DESTINATION_COUNT_PROPNAME));
-            String destinationname = testParameters.getProperty(PING_DESTINATION_NAME_PROPNAME);
-            boolean persistent = Boolean.parseBoolean(testParameters.getProperty(PERSISTENT_MODE_PROPNAME));
-            boolean transacted = Boolean.parseBoolean(testParameters.getProperty(TRANSACTED_PROPNAME));
-            String selector = null;
-            boolean verbose = Boolean.parseBoolean(testParameters.getProperty(VERBOSE_OUTPUT_PROPNAME));
-            int messageSize = Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME));
-            int rate = Integer.parseInt(testParameters.getProperty(RATE_PROPNAME));
-            boolean pubsub = Boolean.parseBoolean(testParameters.getProperty(IS_PUBSUB_PROPNAME));
-
-
-            boolean afterCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_COMMIT));
-            boolean beforeCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_COMMIT));
-            boolean afterSend = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_SEND));
-            boolean beforeSend = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_SEND));
-            boolean failOnce = Boolean.parseBoolean(testParameters.getProperty(FAIL_ONCE));
-
-            int batchSize = Integer.parseInt(testParameters.getProperty(BATCH_SIZE));
-            int commitbatchSize = Integer.parseInt(testParameters.getProperty(COMMIT_BATCH_SIZE));
-
-            // This is synchronized because there is a race condition, which causes one connection to sleep if
-            // all threads try to create connection concurrently
-            synchronized (this)
-            {
-                // Establish a client to ping a Queue and listen the reply back from same Queue
-                perThreadSetup._pingItselfClient = new TestPingItself(brokerDetails, username, password, virtualpath,
-                                                                      destinationname, selector, transacted, persistent,
-                                                                      messageSize, verbose,
-                                                                      afterCommit, beforeCommit, afterSend, beforeSend, failOnce,
-                                                                      commitbatchSize, destinationscount, rate, pubsub);
-            }
-
-            // Attach the per-thread set to the thread.
-            threadSetup.set(perThreadSetup);
-
-            _listener = new AsyncMessageListener(batchSize);
-
-            perThreadSetup._pingItselfClient.setMessageListener(_listener);
-            // Start the client connection
-            perThreadSetup._pingItselfClient.getConnection().start();
-
-        }
+        _timingController = timingController;
     }
 
-
-    public void testAsyncPingOk(int numPings)
+    /**
+     * Gets the timing controller passed in by the test runner.
+     *
+     * @return The timing controller passed in by the test runner.
+     */
+    public TimingController getTimingController()
     {
-        _timingController = this.getTimingController();
+        return _timingController;
+    }
 
-        _listener.setTotalMessages(numPings);
+    /**
+     * Sends the specified number of pings, asynchronously outputs timings on every batch boundary, and waits until
+     * all replies have been received or a time out occurs before exiting this method.
+     *
+     * @param numPings The number of pings to send.
+     */
+    public void testAsyncPingOk(int numPings) throws Exception
+    {
+        _logger.debug("public void testAsyncPingOk(int numPings): called");
 
-        PerThreadSetup perThreadSetup = threadSetup.get();
+        // Ensure that at least one ping was requeusted.
         if (numPings == 0)
         {
             _logger.error("Number of pings requested was zero.");
-            fail("Number of pings requested was zero.");
         }
 
-        // Generate a sample message. This message is already time stamped and has its reply-to destination set.
-        ObjectMessage msg = null;
-
-        try
-        {
-            msg = perThreadSetup._pingItselfClient.getTestMessage(null,
-                                                                  Integer.parseInt(testParameters.getProperty(
-                                                                          MESSAGE_SIZE_PROPNAME)),
-                                                                  Boolean.parseBoolean(testParameters.getProperty(
-                                                                          PERSISTENT_MODE_PROPNAME)));
-        }
-        catch (JMSException e)
-        {
+        // Get the per thread test setup to run the test through.
+        PerThreadSetup perThreadSetup = threadSetup.get();
+        PingClient pingClient = perThreadSetup._pingClient;
 
-        }
+        // Advance the correlation id of messages to send, to make it unique for this run.
+        String messageCorrelationId = Long.toString(corellationIdGenerator.incrementAndGet());
+        _logger.debug("messageCorrelationId = " + messageCorrelationId);
 
-        // start the test
-        long timeout = Long.parseLong(testParameters.getProperty(TIMEOUT_PROPNAME));
+        // Initialize the count and timing controller for the new correlation id.
+        PerCorrelationId perCorrelationId = new PerCorrelationId();
+        TimingController tc = getTimingController().getControllerForCurrentThread();
+        perCorrelationId._tc = tc;
+        perCorrelationId._expectedCount = numPings;
+        perCorrelationIds.put(messageCorrelationId, perCorrelationId);
 
-        String correlationID = Long.toString(perThreadSetup._pingItselfClient.getNewID());
+        // Attach the chained message listener to the ping producer to listen asynchronously for the replies to these
+        // messages.
+        pingClient.setChainedMessageListener(batchedResultsListener);
 
-        try
-        {
-            _logger.debug("Sending messages");
+        // Generate a sample message of the specified size.
+        ObjectMessage msg =
+            pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0),
+                                      testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME),
+                                      testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME));
 
-            perThreadSetup._pingItselfClient.pingNoWaitForReply(msg, numPings, correlationID);
+        // Send the requested number of messages, and wait until they have all been received.
+        long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME));
+        int numReplies = pingClient.pingAndWaitForReply(msg, numPings, timeout);
 
-            _logger.debug("All sent");
-        }
-        catch (JMSException e)
+        // Check that all the replies were received and log a fail if they were not.
+        if (numReplies < numPings)
         {
-            e.printStackTrace();
-            Assert.fail("JMS Exception Received" + e);
-        }
-        catch (InterruptedException e)
-        {
-            e.printStackTrace();
+            tc.completeTest(false, numPings - numReplies);
         }
 
-        try
-        {
-            _logger.debug("Awating test finish");
+        // Remove the chained message listener from the ping producer.
+        pingClient.removeChainedMessageListener();
 
-            perThreadSetup._pingItselfClient.getEndLock(correlationID).await(timeout, TimeUnit.MILLISECONDS);
-
-            if (perThreadSetup._pingItselfClient.getEndLock(correlationID).getCount() != 0)
-            {
-                _logger.error("Timeout occured");
-            }
-            //Allow the time out to exit the loop.
-        }
-        catch (InterruptedException e)
-        {
-            //ignore
-            _logger.error("Awaiting test end was interrupted.");
-
-        }
-
-        // Fail the test if the timeout was exceeded.
-        int numReplies = numPings - (int) perThreadSetup._pingItselfClient.removeLock(correlationID).getCount();
+        // Remove the expected count and timing controller for the message correlation id, to ensure they are cleaned up.
+        perCorrelationIds.remove(messageCorrelationId);
+    }
 
-        _logger.info("Test Finished");
+    /**
+     * Performs test fixture creation on a per thread basis. This will only be called once for each test thread.
+     */
+    public void threadSetUp()
+    {
+        _logger.debug("public void threadSetUp(): called");
 
-        if (numReplies != numPings)
+        try
         {
-            try
-            {
-                perThreadSetup._pingItselfClient.commitTx(perThreadSetup._pingItselfClient.getConsumerSession());
-            }
-            catch (JMSException e)
-            {
-                _logger.error("Error commiting received messages", e);
-            }
-            try
+            // Call the set up method in the super class. This creates a PingClient pinger.
+            super.threadSetUp();
+
+            // Create the chained message listener, only if it has not already been created.  This is set up with the
+            // batch size property, to tell it what batch size to output results on. A synchronized block is used to
+            // ensure that only one thread creates this.
+            synchronized (this)
             {
-                if (_timingController != null)
+                if (batchedResultsListener == null)
                 {
-                    _logger.trace("Logging missing message count");
-                    _timingController.completeTest(false, numPings - numReplies);
+                    int batchSize = Integer.parseInt(testParameters.getProperty(TEST_RESULTS_BATCH_SIZE_PROPNAME));
+                    batchedResultsListener = new BatchedResultsListener(batchSize);
                 }
             }
-            catch (InterruptedException e)
-            {
-                //ignore
-            }
-            Assert.fail("The ping timed out after " + timeout + " ms. Messages Sent = " + numPings + ", MessagesReceived = " + numReplies);
-        }
-    }
 
-    public void setTimingController(TimingController timingController)
-    {
-        _timingController = timingController;
-    }
+            // Get the set up that the super class created.
+            PerThreadSetup perThreadSetup = threadSetup.get();
 
-    public TimingController getTimingController()
-    {
-        return _timingController;
+            // Register the chained message listener on the pinger to do its asynchronous test timings from.
+            perThreadSetup._pingClient.setChainedMessageListener(batchedResultsListener);
+        }
+        catch (Exception e)
+        {
+            _logger.warn("There was an exception during per thread setup.", e);
+        }
     }
 
-
-    private class AsyncMessageListener implements MessageListener
+    /**
+     * BatchedResultsListener is a {@link PingPongProducer.ChainedMessageListener} that can be attached to the
+     * pinger, in order to receive notifications about every message received and the number remaining to be
+     * received. Whenever the number remaining crosses a batch size boundary this results listener outputs
+     * a test timing for the actual number of messages received in the current batch.
+     */
+    private class BatchedResultsListener implements PingPongProducer.ChainedMessageListener
     {
-        private volatile int _totalMessages;
-        private int _batchSize;
-        PerThreadSetup _perThreadSetup;
-
-        public AsyncMessageListener(int batchSize)
-        {
-            this(batchSize, -1);
-        }
+        /** The test results logging batch size. */
+        int _batchSize;
 
-        public AsyncMessageListener(int batchSize, int totalMessages)
+        /**
+         * Creates a results listener on the specified batch size.
+         *
+         * @param batchSize The batch size to use.
+         */
+        public BatchedResultsListener(int batchSize)
         {
             _batchSize = batchSize;
-            _totalMessages = totalMessages;
-            _perThreadSetup = threadSetup.get();
         }
 
-        public void setTotalMessages(int newTotal)
+        /**
+         * This callback method is called from all of the pingers that this test creates. It uses the correlation id
+         * from the message to identify the timing controller for the test thread that was responsible for sending those
+         * messages.
+         *
+         * @param message        The message.
+         * @param remainingCount The count of messages remaining to be received with a particular correlation id.
+         *
+         * @throws JMSException Any underlying JMSException is allowed to fall through.
+         */
+        public void onMessage(Message message, int remainingCount) throws JMSException
         {
-            _totalMessages = newTotal;
-        }
+            _logger.debug("public void onMessage(Message message, int remainingCount = " + remainingCount + "): called");
 
-        public void onMessage(Message message)
-        {
-            try
+            // Check if a batch boundary has been crossed.
+            if ((remainingCount % _batchSize) == 0)
             {
-                _logger.trace("Message Received");
+                // Extract the correlation id from the message.
+                String correlationId = message.getJMSCorrelationID();
 
-                CountDownLatch count = _perThreadSetup._pingItselfClient.getEndLock(message.getJMSCorrelationID());
-
-                if (count != null)
+                // Get the details for the correlation id and check that they are not null. They can become null
+                // if a test times out.
+                PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationId);
+                if (perCorrelationId != null)
                 {
-                    int messagesLeft = (int) count.getCount() - 1;// minus one as we haven't yet counted the current message
+                    // Get the timing controller and expected count for this correlation id.
+                    TimingController tc = perCorrelationId._tc;
+                    int expected = perCorrelationId._expectedCount;
+
+                    // Calculate how many messages were actually received in the last batch. This will be the batch size
+                    // except where the number expected is not a multiple of the batch size and this is the first remaining
+                    // count to cross a batch size boundary, in which case it will be the number expected modulo the batch
+                    // size.
+                    int receivedInBatch = ((expected - remainingCount) < _batchSize) ? (expected % _batchSize) : _batchSize;
 
-                    if ((messagesLeft % _batchSize) == 0)
+                    // Register a test result for the correlation id.
+                    try
                     {
-                        doDone(_batchSize);
+
+                        tc.completeTest(true, receivedInBatch);
                     }
-                    else if (messagesLeft == 0)
+                    catch (InterruptedException e)
                     {
-                        doDone(_totalMessages % _batchSize);
+                        // Ignore this. It means the test runner wants to stop as soon as possible.
+                        _logger.warn("Got InterruptedException.", e);
                     }
                 }
-
+                // Else ignore, test timed out. Should log a fail here?
             }
-            catch (JMSException e)
-            {
-                _logger.warn("There was a JMSException", e);
-            }
-
         }
-
-        private void doDone(int messageCount)
-        {
-            _logger.trace("Messages received:" + messageCount);
-            _logger.trace("Total Messages :" + _totalMessages);
-
-            try
-            {
-                if (_timingController != null)
-                {
-                    _timingController.completeTest(true, messageCount);
-                }
-            }
-            catch (InterruptedException e)
-            {
-                //ignore
-            }
-        }
-
     }
 
+    /**
+     * Holds state specific to each correlation id, needed to output test results. This consists of the count of
+     * the total expected number of messages, and the timing controller for the thread sending those message ids.
+     */
+    private static class PerCorrelationId
+    {
+        public int _expectedCount;
+        public TimingController _tc;
+    }
 }

Added: incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java?view=auto&rev=502624
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java (added)
+++ incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java Fri Feb  2 07:28:08 2007
@@ -0,0 +1,317 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.ping;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.ObjectMessage;
+
+import junit.framework.Test;
+import junit.framework.TestSuite;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.requestreply.PingPongProducer;
+
+import uk.co.thebadgerset.junit.extensions.TimingController;
+import uk.co.thebadgerset.junit.extensions.TimingControllerAware;
+import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
+
+/**
+ * PingLatencyTestPerf is a performance test that outputs multiple timings from its test method, using the timing
+ * controller interface supplied by the test runner from a seperate listener thread. It outputs round trip timings for
+ * individual ping messages rather than for how long a complete batch of messages took to process. It also differs from
+ * the {@link PingTestPerf} test that it extends because it can output timings as replies are received, rather than
+ * waiting until all expected replies are received.
+ *
+ * <p/>This test does not output timings for every single ping message, as when running at high volume, writing the test
+ * log for a vast number of messages would slow the testing down. Instead samples ping latency occasionally. The frequency
+ * of ping sampling is set using the {@link #TEST_RESULTS_BATCH_SIZE_PROPNAME} property, to override the default of every
+ * {@link #DEFAULT_TEST_RESULTS_BATCH_SIZE}.
+ *
+ * <p/>The size parameter logged for each individual ping is set to the size of the batch of messages that the individual
+ * timed ping was taken from, rather than 1 for a single message. This is so that the total throughput (messages / time)
+ * can be calculated in order to examine the relationship between throughput and latency.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><td> Responsibilities <th> Collaborations
+ * <tr><td> Send many ping messages and output timings for sampled individual pings.
+ * </table>
+ */
+public class PingLatencyTestPerf extends PingTestPerf implements TimingControllerAware
+{
+    private static Logger _logger = Logger.getLogger(PingLatencyTestPerf.class);
+
+    /** Holds the name of the property to get the test results logging batch size. */
+    public static final String TEST_RESULTS_BATCH_SIZE_PROPNAME = "BatchSize";
+
+    /** Holds the default test results logging batch size. */
+    public static final int DEFAULT_TEST_RESULTS_BATCH_SIZE = 1000;
+
+    /** Used to hold the timing controller passed from the test runner. */
+    private TimingController _timingController;
+
+    /** Used to generate unique correlation ids for each test run. */
+    private AtomicLong corellationIdGenerator = new AtomicLong();
+
+    /** Holds test specifics by correlation id. This consists of the expected number of messages and the timing controler. */
+    private Map<String, PerCorrelationId> perCorrelationIds =
+        Collections.synchronizedMap(new HashMap<String, PerCorrelationId>());
+
+    /** Holds the batched results listener, that does logging on batch boundaries. */
+    private BatchedResultsListener batchedResultsListener = null;
+
+    /**
+     * Creates a new asynchronous ping performance test with the specified name.
+     *
+     * @param name The test name.
+     */
+    public PingLatencyTestPerf(String name)
+    {
+        super(name);
+
+        // Sets up the test parameters with defaults.
+        ParsedProperties.setSysPropertyIfNull(TEST_RESULTS_BATCH_SIZE_PROPNAME,
+                                              Integer.toString(DEFAULT_TEST_RESULTS_BATCH_SIZE));
+    }
+
+    /**
+     * Compile all the tests into a test suite.
+     */
+    public static Test suite()
+    {
+        // Build a new test suite
+        TestSuite suite = new TestSuite("Ping Latency Tests");
+
+        // Run performance tests in read committed mode.
+        suite.addTest(new PingLatencyTestPerf("testPingLatency"));
+
+        return suite;
+    }
+
+    /**
+     * Accepts a timing controller from the test runner.
+     *
+     * @param timingController The timing controller to register mutliple timings with.
+     */
+    public void setTimingController(TimingController timingController)
+    {
+        _timingController = timingController;
+    }
+
+    /**
+     * Gets the timing controller passed in by the test runner.
+     *
+     * @return The timing controller passed in by the test runner.
+     */
+    public TimingController getTimingController()
+    {
+        return _timingController;
+    }
+
+    /**
+     * Sends the specified number of pings, asynchronously outputs timings on every batch boundary, and waits until
+     * all replies have been received or a time out occurs before exiting this method.
+     *
+     * @param numPings The number of pings to send.
+     */
+    public void testPingLatency(int numPings) throws Exception
+    {
+        _logger.debug("public void testPingLatency(int numPings): called");
+
+        // Ensure that at least one ping was requeusted.
+        if (numPings == 0)
+        {
+            _logger.error("Number of pings requested was zero.");
+        }
+
+        // Get the per thread test setup to run the test through.
+        PerThreadSetup perThreadSetup = threadSetup.get();
+        PingClient pingClient = perThreadSetup._pingClient;
+
+        // Advance the correlation id of messages to send, to make it unique for this run.
+        String messageCorrelationId = Long.toString(corellationIdGenerator.incrementAndGet());
+        _logger.debug("messageCorrelationId = " + messageCorrelationId);
+
+        // Initialize the count and timing controller for the new correlation id.
+        PerCorrelationId perCorrelationId = new PerCorrelationId();
+        TimingController tc = getTimingController().getControllerForCurrentThread();
+        perCorrelationId._tc = tc;
+        perCorrelationId._expectedCount = numPings;
+        perCorrelationIds.put(messageCorrelationId, perCorrelationId);
+
+        // Attach the chained message listener to the ping producer to listen asynchronously for the replies to these
+        // messages.
+        pingClient.setChainedMessageListener(batchedResultsListener);
+
+        // Generate a sample message of the specified size.
+        ObjectMessage msg =
+            pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0),
+                                      testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME),
+                                      testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME));
+
+        // Send the requested number of messages, and wait until they have all been received.
+        long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME));
+        int numReplies = pingClient.pingAndWaitForReply(msg, numPings, timeout);
+
+        // Check that all the replies were received and log a fail if they were not.
+        if (numReplies < numPings)
+        {
+            tc.completeTest(false, 0);
+        }
+
+        // Remove the chained message listener from the ping producer.
+        pingClient.removeChainedMessageListener();
+
+        // Remove the expected count and timing controller for the message correlation id, to ensure they are cleaned up.
+        perCorrelationIds.remove(messageCorrelationId);
+    }
+
+    /**
+     * Performs test fixture creation on a per thread basis. This will only be called once for each test thread.
+     */
+    public void threadSetUp()
+    {
+        _logger.debug("public void threadSetUp(): called");
+
+        try
+        {
+            // Call the set up method in the super class. This creates a PingClient pinger.
+            super.threadSetUp();
+
+            // Create the chained message listener, only if it has not already been created.  This is set up with the
+            // batch size property, to tell it what batch size to output results on. A synchronized block is used to
+            // ensure that only one thread creates this.
+            synchronized (this)
+            {
+                if (batchedResultsListener == null)
+                {
+                    int batchSize = Integer.parseInt(testParameters.getProperty(TEST_RESULTS_BATCH_SIZE_PROPNAME));
+                    batchedResultsListener = new BatchedResultsListener(batchSize);
+                }
+            }
+
+            // Get the set up that the super class created.
+            PerThreadSetup perThreadSetup = threadSetup.get();
+
+            // Register the chained message listener on the pinger to do its asynchronous test timings from.
+            perThreadSetup._pingClient.setChainedMessageListener(batchedResultsListener);
+        }
+        catch (Exception e)
+        {
+            _logger.warn("There was an exception during per thread setup.", e);
+        }
+    }
+
+    /**
+     * BatchedResultsListener is a {@link org.apache.qpid.requestreply.PingPongProducer.ChainedMessageListener} that can
+     * be attached to the pinger, in order to receive notifications about every message received and the number remaining
+     * to be received. Whenever the number remaining crosses a batch size boundary this results listener outputs a test
+     * timing for the actual number of messages received in the current batch.
+     */
+    private class BatchedResultsListener implements PingPongProducer.ChainedMessageListener
+    {
+        /** The test results logging batch size. */
+        int _batchSize;
+
+        /**
+         * Creates a results listener on the specified batch size.
+         *
+         * @param batchSize The batch size to use.
+         */
+        public BatchedResultsListener(int batchSize)
+        {
+            _batchSize = batchSize;
+        }
+
+        /**
+         * This callback method is called from all of the pingers that this test creates. It uses the correlation id
+         * from the message to identify the timing controller for the test thread that was responsible for sending those
+         * messages.
+         *
+         * @param message        The message.
+         * @param remainingCount The count of messages remaining to be received with a particular correlation id.
+         *
+         * @throws javax.jms.JMSException Any underlying JMSException is allowed to fall through.
+         */
+        public void onMessage(Message message, int remainingCount) throws JMSException
+        {
+            _logger.debug("public void onMessage(Message message, int remainingCount = " + remainingCount + "): called");
+
+            // Check if a batch boundary has been crossed.
+            if ((remainingCount % _batchSize) == 0)
+            {
+                // Extract the correlation id from the message.
+                String correlationId = message.getJMSCorrelationID();
+
+                // Get the details for the correlation id and check that they are not null. They can become null
+                // if a test times out.
+                PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationId);
+                if (perCorrelationId != null)
+                {
+                    // Get the timing controller and expected count for this correlation id.
+                    TimingController tc = perCorrelationId._tc;
+                    int expected = perCorrelationId._expectedCount;
+
+                    // Extract the send time from the message and work out from the current time, what the ping latency was.
+                    // The ping producer time stamps messages in nanoseconds.
+                    long startTime = message.getLongProperty(PingPongProducer.MESSAGE_TIMESTAMP_PROPNAME);
+                    long now = System.nanoTime();
+                    long pingTime = now - startTime;
+
+                    // Calculate how many messages were actually received in the last batch. This will be the batch size
+                    // except where the number expected is not a multiple of the batch size and this is the first remaining
+                    // count to cross a batch size boundary, in which case it will be the number expected modulo the batch
+                    // size.
+                    int receivedInBatch = ((expected - remainingCount) < _batchSize) ? (expected % _batchSize) : _batchSize;
+
+                    // Register a test result for the correlation id.
+                    try
+                    {
+
+                        tc.completeTest(true, receivedInBatch, pingTime);
+                    }
+                    catch (InterruptedException e)
+                    {
+                        // Ignore this. It means the test runner wants to stop as soon as possible.
+                        _logger.warn("Got InterruptedException.", e);
+                    }
+                }
+                // Else ignore, test timed out. Should log a fail here?
+            }
+        }
+    }
+
+    /**
+     * Holds state specific to each correlation id, needed to output test results. This consists of the count of
+     * the total expected number of messages, and the timing controller for the thread sending those message ids.
+     */
+    private static class PerCorrelationId
+    {
+        public int _expectedCount;
+        public TimingController _tc;
+    }
+}

Propchange: incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java?view=diff&rev=502624&r1=502623&r2=502624
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java Fri Feb  2 07:28:08 2007
@@ -1,7 +1,25 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
 package org.apache.qpid.ping;
 
-import java.util.Properties;
-
 import javax.jms.*;
 
 import junit.framework.Assert;
@@ -10,169 +28,85 @@
 
 import org.apache.log4j.Logger;
 
+import org.apache.qpid.requestreply.PingPongProducer;
+
 import uk.co.thebadgerset.junit.extensions.AsymptoticTestCase;
+import uk.co.thebadgerset.junit.extensions.TestThreadAware;
+import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
 
 /**
  * PingTestPerf is a ping test, that has been written with the intention of being scaled up to run many times
  * simultaneously to simluate many clients/producers/connections.
- * <p/>
+ *
  * <p/>A single run of the test using the default JUnit test runner will result in the sending and timing of a single
  * full round trip ping. This test may be scaled up using a suitable JUnit test runner.
- * <p/>
+ *
  * <p/>The setup/teardown cycle establishes a connection to a broker and sets up a queue to send ping messages to and a
  * temporary queue for replies. This setup is only established once for all the test repeats/threads that may be run,
  * except if the connection is lost in which case an attempt to re-establish the setup is made.
- * <p/>
+ *
  * <p/>The test cycle is: Connects to a queue, creates a temporary queue, creates messages containing a property that
  * is the name of the temporary queue, fires off a message on the original queue and waits for a response on the
  * temporary queue.
- * <p/>
+ *
  * <p/>Configurable test properties: message size, transacted or not, persistent or not. Broker connection details.
- * <p/>
+ *
  * <p><table id="crc"><caption>CRC Card</caption>
  * <tr><th> Responsibilities <th> Collaborations
  * </table>
  *
  * @author Rupert Smith
  */
-public class PingTestPerf extends AsymptoticTestCase //implements TimingControllerAware
+public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware
 {
     private static Logger _logger = Logger.getLogger(PingTestPerf.class);
 
-    /**
-     * Holds the name of the property to get the test message size from.
-     */
-    protected static final String MESSAGE_SIZE_PROPNAME = "messagesize";
-
-    /**
-     * Holds the name of the property to get the ping queue name from.
-     */
-    protected static final String PING_DESTINATION_NAME_PROPNAME = "destinationname";
-
-    /**
-     * holds the queue count, if the test is being performed with multiple queues
-     */
-    protected static final String PING_DESTINATION_COUNT_PROPNAME = "destinationscount";
-
-    /**
-     * Holds the name of the property to get the test delivery mode from.
-     */
-    protected static final String PERSISTENT_MODE_PROPNAME = "persistent";
-
-    /**
-     * Holds the name of the property to get the test transactional mode from.
-     */
-    protected static final String TRANSACTED_PROPNAME = "transacted";
-
-    /**
-     * Holds the name of the property to get the test broker url from.
-     */
-    protected static final String BROKER_PROPNAME = "broker";
-
-    /**
-     * Holds the name of the property to get the test broker virtual path.
-     */
-    protected static final String VIRTUAL_PATH_PROPNAME = "virtualPath";
-
-    /**
-     * Holds the name of the property to get the waiting timeout for response messages.
-     */
-    protected static final String TIMEOUT_PROPNAME = "timeout";
-
-    /** Holds the name of the property to get the message rate from. */
-    protected static final String RATE_PROPNAME = "rate";
-
-    protected static final String VERBOSE_OUTPUT_PROPNAME = "verbose";
-
-    /** Holds the true or false depending on wether it is P2P test or PubSub */
-    protected static final String IS_PUBSUB_PROPNAME = "pubsub";
-    /**
-     * Holds the size of message body to attach to the ping messages.
-     */
-    protected static final int MESSAGE_SIZE_DEFAULT = 1024;
-
-    protected static final int BATCH_SIZE_DEFAULT = 1000;
-
-    protected static final int COMMIT_BATCH_SIZE_DEFAULT = BATCH_SIZE_DEFAULT;
-
-    /**
-     * Holds the name of the queue to which pings are sent.
-     */
-    private static final String PING_DESTINATION_NAME_DEFAULT = "ping";
-
-    /**
-     * Holds the message delivery mode to use for the test.
-     */
-    protected static final boolean PERSISTENT_MODE_DEFAULT = false;
-
-    /**
-     * Holds the transactional mode to use for the test.
-     */
-    protected static final boolean TRANSACTED_DEFAULT = false;
-
-    /**
-     * Holds the default broker url for the test.
-     */
-    protected static final String BROKER_DEFAULT = "tcp://localhost:5672";
-
-    /**
-     * Holds the default virtual path for the test.
-     */
-    protected static final String VIRTUAL_PATH_DEFAULT = "/test";
-
-    /**
-     * Sets a default ping timeout.
-     */
-    protected static final long TIMEOUT_DEFAULT = 3000;
-
-    /** Holds the default rate. A value of zero means infinity, only values of 1 or greater are meaningfull. */
-    private static final int RATE_DEFAULT = 0;
-
-    protected static final String FAIL_AFTER_COMMIT = "FailAfterCommit";
-    protected static final String FAIL_BEFORE_COMMIT = "FailBeforeCommit";
-    protected static final String FAIL_AFTER_SEND = "FailAfterSend";
-    protected static final String FAIL_BEFORE_SEND = "FailBeforeSend";
-    protected static final String COMMIT_BATCH_SIZE = "CommitBatchSize";
-    protected static final String BATCH_SIZE = "BatchSize";
-    protected static final String FAIL_ONCE = "FailOnce";
-
-    /**
-     * Thread local to hold the per-thread test setup fields.
-     */
+    /** Thread local to hold the per-thread test setup fields. */
     ThreadLocal<PerThreadSetup> threadSetup = new ThreadLocal<PerThreadSetup>();
 
-    Object _lock = new Object();
-
-    // Set up a property reader to extract the test parameters from. Once ContextualProperties is available in
-    // the project dependencies, use it to get property overrides for configurable tests and to notify the test runner
-    // of the test parameters to log with the results.
-    protected Properties testParameters = System.getProperties();
-    //private Properties testParameters = new ContextualProperties(System.getProperties());
+    /** Holds a property reader to extract the test parameters from. */
+    protected ParsedProperties testParameters = new ParsedProperties(System.getProperties());
 
     public PingTestPerf(String name)
     {
         super(name);
-        // Sets up the test parameters with defaults.
 
-        setSystemPropertyIfNull(FAIL_AFTER_COMMIT, "false");
-        setSystemPropertyIfNull(FAIL_BEFORE_COMMIT, "false");
-        setSystemPropertyIfNull(FAIL_AFTER_SEND, "false");
-        setSystemPropertyIfNull(FAIL_BEFORE_SEND, "false");
-        setSystemPropertyIfNull(FAIL_ONCE, "true");
-
-        setSystemPropertyIfNull(BATCH_SIZE, Integer.toString(BATCH_SIZE_DEFAULT));
-        setSystemPropertyIfNull(COMMIT_BATCH_SIZE, Integer.toString(COMMIT_BATCH_SIZE_DEFAULT));
-        setSystemPropertyIfNull(MESSAGE_SIZE_PROPNAME, Integer.toString(MESSAGE_SIZE_DEFAULT));
-        setSystemPropertyIfNull(PING_DESTINATION_NAME_PROPNAME, PING_DESTINATION_NAME_DEFAULT);
-        setSystemPropertyIfNull(PERSISTENT_MODE_PROPNAME, Boolean.toString(PERSISTENT_MODE_DEFAULT));
-        setSystemPropertyIfNull(TRANSACTED_PROPNAME, Boolean.toString(TRANSACTED_DEFAULT));
-        setSystemPropertyIfNull(BROKER_PROPNAME, BROKER_DEFAULT);
-        setSystemPropertyIfNull(VIRTUAL_PATH_PROPNAME, VIRTUAL_PATH_DEFAULT);
-        setSystemPropertyIfNull(TIMEOUT_PROPNAME, Long.toString(TIMEOUT_DEFAULT));
-        setSystemPropertyIfNull(PING_DESTINATION_COUNT_PROPNAME, Integer.toString(0));
-        setSystemPropertyIfNull(VERBOSE_OUTPUT_PROPNAME, Boolean.toString(false));
-        setSystemPropertyIfNull(RATE_PROPNAME, Integer.toString(RATE_DEFAULT));
-        setSystemPropertyIfNull(IS_PUBSUB_PROPNAME, Boolean.toString(false));
+        // Sets up the test parameters with defaults.
+        ParsedProperties.setSysPropertyIfNull(PingPongProducer.COMMIT_BATCH_SIZE_PROPNAME,
+                                              Integer.toString(PingPongProducer.DEFAULT_TX_BATCH_SIZE));
+        ParsedProperties.setSysPropertyIfNull(PingPongProducer.MESSAGE_SIZE_PROPNAME,
+                                              Integer.toString(PingPongProducer.DEFAULT_MESSAGE_SIZE));
+        ParsedProperties.setSysPropertyIfNull(PingPongProducer.PING_QUEUE_NAME_PROPNAME,
+                                              PingPongProducer.DEFAULT_PING_DESTINATION_NAME);
+        ParsedProperties.setSysPropertyIfNull(PingPongProducer.PERSISTENT_MODE_PROPNAME,
+                                              Boolean.toString(PingPongProducer.DEFAULT_PERSISTENT_MODE));
+        ParsedProperties.setSysPropertyIfNull(PingPongProducer.TRANSACTED_PROPNAME,
+                                              Boolean.toString(PingPongProducer.DEFAULT_TRANSACTED));
+        ParsedProperties.setSysPropertyIfNull(PingPongProducer.BROKER_PROPNAME, PingPongProducer.DEFAULT_BROKER);
+        ParsedProperties.setSysPropertyIfNull(PingPongProducer.USERNAME_PROPNAME, PingPongProducer.DEFAULT_USERNAME);
+        ParsedProperties.setSysPropertyIfNull(PingPongProducer.PASSWORD_PROPNAME, PingPongProducer.DEFAULT_PASSWORD);
+        ParsedProperties.setSysPropertyIfNull(PingPongProducer.VIRTUAL_PATH_PROPNAME, PingPongProducer.DEFAULT_VIRTUAL_PATH);
+        ParsedProperties.setSysPropertyIfNull(PingPongProducer.VERBOSE_OUTPUT_PROPNAME,
+                                              Boolean.toString(PingPongProducer.DEFAULT_VERBOSE));
+        ParsedProperties.setSysPropertyIfNull(PingPongProducer.RATE_PROPNAME,
+                                              Integer.toString(PingPongProducer.DEFAULT_RATE));
+        ParsedProperties.setSysPropertyIfNull(PingPongProducer.IS_PUBSUB_PROPNAME,
+                                              Boolean.toString(PingPongProducer.DEFAULT_PUBSUB));
+        ParsedProperties.setSysPropertyIfNull(PingPongProducer.COMMIT_BATCH_SIZE_PROPNAME,
+                                              Integer.toString(PingPongProducer.DEFAULT_TX_BATCH_SIZE));
+        ParsedProperties.setSysPropertyIfNull(PingPongProducer.TIMEOUT_PROPNAME,
+                                              Long.toString(PingPongProducer.DEFAULT_TIMEOUT));
+        ParsedProperties.setSysPropertyIfNull(PingPongProducer.PING_DESTINATION_COUNT_PROPNAME,
+                                              Integer.toString(PingPongProducer.DEFAULT_DESTINATION_COUNT));
+        ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_AFTER_COMMIT_PROPNAME,
+                                              PingPongProducer.DEFAULT_FAIL_AFTER_COMMIT);
+        ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_BEFORE_COMMIT_PROPNAME,
+                                              PingPongProducer.DEFAULT_FAIL_BEFORE_COMMIT);
+        ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_AFTER_SEND_PROPNAME,
+                                              PingPongProducer.DEFAULT_FAIL_AFTER_SEND);
+        ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_BEFORE_SEND_PROPNAME,
+                                              PingPongProducer.DEFAULT_FAIL_BEFORE_SEND);
+        ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_ONCE_PROPNAME, PingPongProducer.DEFAULT_FAIL_ONCE);
     }
 
     /**
@@ -187,20 +121,6 @@
         suite.addTest(new PingTestPerf("testPingOk"));
 
         return suite;
-               //return new junit.framework.TestSuite(PingTestPerf.class);
-    }
-
-    protected static void setSystemPropertyIfNull(String propName, String propValue)
-    {
-        if (System.getProperty(propName) == null)
-        {
-            System.setProperty(propName, propValue);
-        }
-    }
-
-    public void testPing(int jim) throws Exception
-    {
-        testPingOk(1);
     }
 
     public void testPingOk(int numPings) throws Exception
@@ -214,15 +134,15 @@
 
         // Generate a sample message. This message is already time stamped and has its reply-to destination set.
         ObjectMessage msg =
-            perThreadSetup._pingItselfClient.getTestMessage(null,
-                                                            Integer.parseInt(testParameters.getProperty(
-                                                                                 MESSAGE_SIZE_PROPNAME)),
-                                                            Boolean.parseBoolean(testParameters.getProperty(
-                                                                                     PERSISTENT_MODE_PROPNAME)));
+            perThreadSetup._pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0),
+                                                      testParameters.getPropertyAsInteger(
+                                                          PingPongProducer.MESSAGE_SIZE_PROPNAME),
+                                                      testParameters.getPropertyAsBoolean(
+                                                          PingPongProducer.PERSISTENT_MODE_PROPNAME));
 
         // start the test
-        long timeout = Long.parseLong(testParameters.getProperty(TIMEOUT_PROPNAME));
-        int numReplies = perThreadSetup._pingItselfClient.pingAndWaitForReply(msg, numPings, timeout);
+        long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME));
+        int numReplies = perThreadSetup._pingClient.pingAndWaitForReply(msg, numPings, timeout);
 
         // Fail the test if the timeout was exceeded.
         if (numReplies != numPings)
@@ -232,75 +152,87 @@
         }
     }
 
-
-    protected void setUp() throws Exception
+    /**
+     * Performs test fixture creation on a per thread basis. This will only be called once for each test thread.
+     */
+    public void threadSetUp()
     {
-        // Log4j will propagate the test name as a thread local in all log output.
-        // Carefull when using this, it can cause memory leaks when not cleaned up properly.
-        //NDC.push(getName());
-
-        // Create the test setups on a per thread basis, only if they have not already been created.
+        _logger.debug("public void threadSetUp(): called");
 
-        if (threadSetup.get() == null)
+        try
         {
             PerThreadSetup perThreadSetup = new PerThreadSetup();
 
             // Extract the test set up paramaeters.
-            String brokerDetails = testParameters.getProperty(BROKER_PROPNAME);
-            String username = "guest";
-            String password = "guest";
-            String virtualpath = testParameters.getProperty(VIRTUAL_PATH_PROPNAME);
-            int destinationscount = Integer.parseInt(testParameters.getProperty(PING_DESTINATION_COUNT_PROPNAME));
-            String destinationname = testParameters.getProperty(PING_DESTINATION_NAME_PROPNAME);
-            boolean persistent = Boolean.parseBoolean(testParameters.getProperty(PERSISTENT_MODE_PROPNAME));
-            boolean transacted = Boolean.parseBoolean(testParameters.getProperty(TRANSACTED_PROPNAME));
-            String selector = null;
-            boolean verbose = Boolean.parseBoolean(testParameters.getProperty(VERBOSE_OUTPUT_PROPNAME));
-            int messageSize = Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME));
-            int rate = Integer.parseInt(testParameters.getProperty(RATE_PROPNAME));
-            boolean pubsub = Boolean.parseBoolean(testParameters.getProperty(IS_PUBSUB_PROPNAME));
-
-            boolean afterCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_COMMIT));
-            boolean beforeCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_COMMIT));
-            boolean afterSend = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_SEND));
-            boolean beforeSend = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_SEND));
-            boolean failOnce = Boolean.parseBoolean(testParameters.getProperty(FAIL_ONCE));
+            String brokerDetails = testParameters.getProperty(PingPongProducer.BROKER_PROPNAME);
+            String username = testParameters.getProperty(PingPongProducer.USERNAME_PROPNAME);
+            String password = testParameters.getProperty(PingPongProducer.PASSWORD_PROPNAME);
+            String virtualPath = testParameters.getProperty(PingPongProducer.VIRTUAL_PATH_PROPNAME);
+            String destinationName = testParameters.getProperty(PingPongProducer.PING_QUEUE_NAME_PROPNAME);
+            boolean persistent = testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME);
+            boolean transacted = testParameters.getPropertyAsBoolean(PingPongProducer.TRANSACTED_PROPNAME);
+            String selector = testParameters.getProperty(PingPongProducer.SELECTOR_PROPNAME);
+            boolean verbose = testParameters.getPropertyAsBoolean(PingPongProducer.VERBOSE_OUTPUT_PROPNAME);
+            int messageSize = testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME);
+            int rate = testParameters.getPropertyAsInteger(PingPongProducer.RATE_PROPNAME);
+            boolean pubsub = testParameters.getPropertyAsBoolean(PingPongProducer.IS_PUBSUB_PROPNAME);
+            boolean failAfterCommit = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_AFTER_COMMIT_PROPNAME);
+            boolean failBeforeCommit = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_BEFORE_COMMIT_PROPNAME);
+            boolean failAfterSend = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_AFTER_SEND_PROPNAME);
+            boolean failBeforeSend = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_BEFORE_SEND_PROPNAME);
+            int batchSize = testParameters.getPropertyAsInteger(PingPongProducer.COMMIT_BATCH_SIZE_PROPNAME);
+            Boolean failOnce = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_ONCE_PROPNAME);
 
-            int batchSize = Integer.parseInt(testParameters.getProperty(BATCH_SIZE));
+            // Extract the test set up paramaeters.
+            int destinationscount =
+                Integer.parseInt(testParameters.getProperty(PingPongProducer.PING_DESTINATION_COUNT_PROPNAME));
 
             // This is synchronized because there is a race condition, which causes one connection to sleep if
-            // all threads try to create connection concurrently
-            synchronized (_lock)
+            // all threads try to create connection concurrently.
+            synchronized (this)
             {
                 // Establish a client to ping a Destination and listen the reply back from same Destination
-                perThreadSetup._pingItselfClient = new TestPingItself(brokerDetails, username, password, virtualpath,
-                                                                      destinationname, selector, transacted, persistent,
-                                                                      messageSize, verbose, afterCommit, beforeCommit,
-                                                                      afterSend, beforeSend, failOnce, batchSize, destinationscount,
-                                                                      rate, pubsub);
+                perThreadSetup._pingClient = new PingClient(brokerDetails, username, password, virtualPath, destinationName,
+                                                            selector, transacted, persistent, messageSize, verbose,
+                                                            failAfterCommit, failBeforeCommit, failAfterSend, failBeforeSend,
+                                                            failOnce, batchSize, destinationscount, rate, pubsub);
             }
             // Start the client connection
-            perThreadSetup._pingItselfClient.getConnection().start();
+            perThreadSetup._pingClient.getConnection().start();
 
             // Attach the per-thread set to the thread.
             threadSetup.set(perThreadSetup);
         }
+        catch (Exception e)
+        {
+            _logger.warn("There was an exception during per thread setup.", e);
+        }
     }
 
-    protected void tearDown() throws Exception
+    /**
+     * Performs test fixture clean
+     */
+    public void threadTearDown()
     {
+        _logger.debug("public void threadTearDown(): called");
+
         try
         {
-            /*
-            if ((_pingItselfClient != null) && (_pingItselfClient.getConnection() != null))
+            // Get the per thread test fixture.
+            PerThreadSetup perThreadSetup = threadSetup.get();
+
+            // Close the pingers so that it cleans up its connection cleanly.
+            synchronized (this)
             {
-                _pingItselfClient.getConnection().close();
+                perThreadSetup._pingClient.close();
             }
-            */
+
+            // Ensure the per thread fixture is reclaimed.
+            threadSetup.remove();
         }
-        finally
+        catch (JMSException e)
         {
-            //NDC.pop();
+            _logger.warn("There was an exception during per thread tear down.");
         }
     }
 
@@ -309,6 +241,6 @@
         /**
          * Holds the test ping client.
          */
-        protected TestPingItself _pingItselfClient;
+        protected PingClient _pingClient;
     }
 }

Modified: incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java?view=diff&rev=502624&r1=502623&r2=502624
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java Fri Feb  2 07:28:08 2007
@@ -1,7 +1,25 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
 package org.apache.qpid.requestreply;
 
-import java.util.Properties;
-
 import javax.jms.*;
 
 import junit.framework.Assert;
@@ -11,149 +29,87 @@
 import org.apache.log4j.Logger;
 
 import uk.co.thebadgerset.junit.extensions.AsymptoticTestCase;
+import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
 
 /**
  * PingPongTestPerf is a full round trip ping test, that has been written with the intention of being scaled up to run
  * many times simultaneously to simluate many clients/producer/connections. A full round trip ping sends a message from
  * a producer to a conumer, then the consumer replies to the message on a temporary queue.
- * <p/>
+ *
  * <p/>A single run of the test using the default JUnit test runner will result in the sending and timing of the number
  * of pings specified by the test size and time how long it takes for all of these to complete. This test may be scaled
- * up using a suitable JUnit test runner. See {@link TKTestRunner} or {@link PPTestRunner} for more information on how
- * to do this.
- * <p/>
+ * up using a suitable JUnit test runner. See {@link uk.co.thebadgerset.junit.extensions.TKTestRunner} for more
+ * information on how to do this.
+ *
  * <p/>The setup/teardown cycle establishes a connection to a broker and sets up a queue to send ping messages to and a
  * temporary queue for replies. This setup is only established once for all the test repeats, but each test threads
  * gets its own connection/producer/consumer, this is only re-established if the connection is lost.
- * <p/>
+ *
  * <p/>The test cycle is: Connects to a queue, creates a temporary queue, creates messages containing a property that
  * is the name of the temporary queue, fires off many messages on the original queue and waits for them all to come
  * back on the temporary queue.
- * <p/>
+ *
  * <p/>Configurable test properties: message size, transacted or not, persistent or not. Broker connection details.
- * <p/>
+ *
  * <p><table id="crc"><caption>CRC Card</caption>
  * <tr><th> Responsibilities <th> Collaborations
  * </table>
  *
  * @author Rupert Smith
  */
-public class PingPongTestPerf extends AsymptoticTestCase //implements TimingControllerAware
+public class PingPongTestPerf extends AsymptoticTestCase
 {
     private static Logger _logger = Logger.getLogger(PingPongTestPerf.class);
 
-    /**
-     * Holds the name of the property to get the test message size from.
-     */
-    private static final String MESSAGE_SIZE_PROPNAME = "messagesize";
-
-    /**
-     * Holds the name of the property to get the ping queue name from.
-     */
-    private static final String PING_QUEUE_NAME_PROPNAME = "destinationname";
-
-    /**
-     * Holds the name of the property to get the test delivery mode from.
-     */
-    private static final String PERSISTENT_MODE_PROPNAME = "persistent";
-
-    /**
-     * Holds the name of the property to get the test transactional mode from.
-     */
-    private static final String TRANSACTED_PROPNAME = "transacted";
-
-    /**
-     * Holds the name of the property to get the test broker url from.
-     */
-    private static final String BROKER_PROPNAME = "broker";
-
-    /**
-     * Holds the name of the property to get the test broker virtual path.
-     */
-    private static final String VIRTUAL_PATH_PROPNAME = "virtualPath";
-
-    /**
-     * Holds the size of message body to attach to the ping messages.
-     */
-    private static final int MESSAGE_SIZE_DEFAULT = 0;
-
-    private static final int BATCH_SIZE_DEFAULT = 2;
-
-    /**
-     * Holds the name of the queue to which pings are sent.
-     */
-    private static final String PING_QUEUE_NAME_DEFAULT = "ping";
-
-    /**
-     * Holds the message delivery mode to use for the test.
-     */
-    private static final boolean PERSISTENT_MODE_DEFAULT = false;
-
-    /**
-     * Holds the transactional mode to use for the test.
-     */
-    private static final boolean TRANSACTED_DEFAULT = false;
-
-    /**
-     * Holds the default broker url for the test.
-     */
-    private static final String BROKER_DEFAULT = "tcp://localhost:5672";
-
-    /**
-     * Holds the default virtual path for the test.
-     */
-    private static final String VIRTUAL_PATH_DEFAULT = "/test";
-
-    /**
-     * Sets a default ping timeout.
-     */
-    private static final long TIMEOUT = 15000;
-
-    /** Holds the name of the property to get the message rate from. */
-    private static final String RATE_PROPNAME = "rate";
-
-    private static final String VERBOSE_OUTPUT_PROPNAME = "verbose";
-    
-    /** Holds the true or false depending on wether it is P2P test or PubSub */
-    private static final String IS_PUBSUB_PROPNAME = "pubsub";
-
-    /** Holds the default rate. A value of zero means infinity, only values of 1 or greater are meaningfull. */
-    private static final int RATE_DEFAULT = 0;
-
-    private static final String FAIL_AFTER_COMMIT = "FailAfterCommit";
-    private static final String FAIL_BEFORE_COMMIT = "FailBeforeCommit";
-    private static final String FAIL_AFTER_SEND = "FailAfterSend";
-    private static final String FAIL_BEFORE_SEND = "FailBeforeSend";
-    private static final String BATCH_SIZE = "BatchSize";
-    private static final String FAIL_ONCE = "FailOnce";
-
-    /**
-     * Thread local to hold the per-thread test setup fields.
-     */
+    /** Thread local to hold the per-thread test setup fields. */
     ThreadLocal<PerThreadSetup> threadSetup = new ThreadLocal<PerThreadSetup>();
-    Object _lock = new Object();
 
     // Set up a property reader to extract the test parameters from. Once ContextualProperties is available in
     // the project dependencies, use it to get property overrides for configurable tests and to notify the test runner
     // of the test parameters to log with the results. It also providers some basic type parsing convenience methods.
-    private Properties testParameters = System.getProperties();
-    //private Properties testParameters = new ContextualProperties(System.getProperties());
+    //private Properties testParameters = System.getProperties();
+    private ParsedProperties testParameters = new ParsedProperties(System.getProperties());
 
     public PingPongTestPerf(String name)
     {
         super(name);
 
         // Sets up the test parameters with defaults.
-        setSystemPropertyIfNull(BATCH_SIZE, Integer.toString(BATCH_SIZE_DEFAULT));
-        setSystemPropertyIfNull(MESSAGE_SIZE_PROPNAME, Integer.toString(MESSAGE_SIZE_DEFAULT));
-        setSystemPropertyIfNull(PING_QUEUE_NAME_PROPNAME, PING_QUEUE_NAME_DEFAULT);
-        setSystemPropertyIfNull(PERSISTENT_MODE_PROPNAME, Boolean.toString(PERSISTENT_MODE_DEFAULT));
-        setSystemPropertyIfNull(TRANSACTED_PROPNAME, Boolean.toString(TRANSACTED_DEFAULT));
-        setSystemPropertyIfNull(BROKER_PROPNAME, BROKER_DEFAULT);
-        setSystemPropertyIfNull(VIRTUAL_PATH_PROPNAME, VIRTUAL_PATH_DEFAULT);
-        setSystemPropertyIfNull(VERBOSE_OUTPUT_PROPNAME, Boolean.toString(false));
-        setSystemPropertyIfNull(RATE_PROPNAME, Integer.toString(RATE_DEFAULT));
-        setSystemPropertyIfNull(IS_PUBSUB_PROPNAME, Boolean.toString(false));
+        ParsedProperties.setSysPropertyIfNull(PingPongProducer.COMMIT_BATCH_SIZE_PROPNAME,
+                                              Integer.toString(PingPongProducer.DEFAULT_TX_BATCH_SIZE));
+        ParsedProperties.setSysPropertyIfNull(PingPongProducer.MESSAGE_SIZE_PROPNAME,
+                                              Integer.toString(PingPongProducer.DEFAULT_MESSAGE_SIZE));
+        ParsedProperties.setSysPropertyIfNull(PingPongProducer.PING_QUEUE_NAME_PROPNAME,
+                                              PingPongProducer.DEFAULT_PING_DESTINATION_NAME);
+        ParsedProperties.setSysPropertyIfNull(PingPongProducer.PERSISTENT_MODE_PROPNAME,
+                                              Boolean.toString(PingPongProducer.DEFAULT_PERSISTENT_MODE));
+        ParsedProperties.setSysPropertyIfNull(PingPongProducer.TRANSACTED_PROPNAME,
+                                              Boolean.toString(PingPongProducer.DEFAULT_TRANSACTED));
+        ParsedProperties.setSysPropertyIfNull(PingPongProducer.BROKER_PROPNAME, PingPongProducer.DEFAULT_BROKER);
+        ParsedProperties.setSysPropertyIfNull(PingPongProducer.USERNAME_PROPNAME, PingPongProducer.DEFAULT_USERNAME);
+        ParsedProperties.setSysPropertyIfNull(PingPongProducer.PASSWORD_PROPNAME, PingPongProducer.DEFAULT_PASSWORD);
+        ParsedProperties.setSysPropertyIfNull(PingPongProducer.VIRTUAL_PATH_PROPNAME, PingPongProducer.DEFAULT_VIRTUAL_PATH);
+        ParsedProperties.setSysPropertyIfNull(PingPongProducer.VERBOSE_OUTPUT_PROPNAME,
+                                              Boolean.toString(PingPongProducer.DEFAULT_VERBOSE));
+        ParsedProperties.setSysPropertyIfNull(PingPongProducer.RATE_PROPNAME,
+                                              Integer.toString(PingPongProducer.DEFAULT_RATE));
+        ParsedProperties.setSysPropertyIfNull(PingPongProducer.IS_PUBSUB_PROPNAME,
+                                              Boolean.toString(PingPongProducer.DEFAULT_PUBSUB));
+        ParsedProperties.setSysPropertyIfNull(PingPongProducer.COMMIT_BATCH_SIZE_PROPNAME,
+                                              Integer.toString(PingPongProducer.DEFAULT_TX_BATCH_SIZE));
+        ParsedProperties.setSysPropertyIfNull(PingPongProducer.TIMEOUT_PROPNAME,
+                                              Long.toString(PingPongProducer.DEFAULT_TIMEOUT));
+        ParsedProperties.setSysPropertyIfNull(PingPongProducer.PING_DESTINATION_COUNT_PROPNAME,
+                                              Integer.toString(PingPongProducer.DEFAULT_DESTINATION_COUNT));
+        ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_AFTER_COMMIT_PROPNAME,
+                                              PingPongProducer.DEFAULT_FAIL_AFTER_COMMIT);
+        ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_BEFORE_COMMIT_PROPNAME,
+                                              PingPongProducer.DEFAULT_FAIL_BEFORE_COMMIT);
+        ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_AFTER_SEND_PROPNAME,
+                                              PingPongProducer.DEFAULT_FAIL_AFTER_SEND);
+        ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_BEFORE_SEND_PROPNAME,
+                                              PingPongProducer.DEFAULT_FAIL_BEFORE_SEND);
+        ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_ONCE_PROPNAME, PingPongProducer.DEFAULT_FAIL_ONCE);
     }
 
     /**
@@ -185,19 +141,15 @@
 
         // Generate a sample message. This message is already time stamped and has its reply-to destination set.
         ObjectMessage msg =
-            perThreadSetup._testPingProducer.getTestMessage(perThreadSetup._testPingProducer.getReplyDestination(),
-                                                            Integer.parseInt(testParameters.getProperty(
-                                                                                 MESSAGE_SIZE_PROPNAME)),
-                                                            Boolean.parseBoolean(testParameters.getProperty(
-                                                                                     PERSISTENT_MODE_PROPNAME)));
-
-        // Use the test timing controller to reset the test timer now and obtain the current time.
-        // This can be used to remove the message creation time from the test.
-        //TestTimingController timingUtils = getTimingController();
-        //long startTime = timingUtils.restart();
+            perThreadSetup._testPingProducer.getTestMessage(perThreadSetup._testPingProducer.getReplyDestinations().get(0),
+                                                            testParameters.getPropertyAsInteger(
+                                                                PingPongProducer.MESSAGE_SIZE_PROPNAME),
+                                                            testParameters.getPropertyAsBoolean(
+                                                                PingPongProducer.PERSISTENT_MODE_PROPNAME));
 
         // Send the message and wait for a reply.
-        int numReplies = perThreadSetup._testPingProducer.pingAndWaitForReply(msg, numPings, TIMEOUT);
+        int numReplies =
+            perThreadSetup._testPingProducer.pingAndWaitForReply(msg, numPings, PingPongProducer.DEFAULT_TIMEOUT);
 
         // Fail the test if the timeout was exceeded.
         if (numReplies != numPings)
@@ -206,82 +158,93 @@
         }
     }
 
-    protected void setUp() throws Exception
+    /**
+     * Performs test fixture creation on a per thread basis. This will only be called once for each test thread.
+     */
+    public void threadSetUp()
     {
-        // Log4j will propagate the test name as a thread local in all log output.
-        // Carefull when using this, it can cause memory leaks when not cleaned up properly.
-        //NDC.push(getName());
-
-        // Create the test setups on a per thread basis, only if they have not already been created.
-        if (threadSetup.get() == null)
+        try
         {
             PerThreadSetup perThreadSetup = new PerThreadSetup();
 
             // Extract the test set up paramaeters.
-            String brokerDetails = testParameters.getProperty(BROKER_PROPNAME);
-            String username = "guest";
-            String password = "guest";
-            String virtualpath = testParameters.getProperty(VIRTUAL_PATH_PROPNAME);
-            String queueName = testParameters.getProperty(PING_QUEUE_NAME_PROPNAME);
-            boolean persistent = Boolean.parseBoolean(testParameters.getProperty(PERSISTENT_MODE_PROPNAME));
-            boolean transacted = Boolean.parseBoolean(testParameters.getProperty(TRANSACTED_PROPNAME));
-            String selector = null;
-            boolean verbose = Boolean.parseBoolean(testParameters.getProperty(VERBOSE_OUTPUT_PROPNAME));
-            int messageSize = Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME));
-            int rate = Integer.parseInt(testParameters.getProperty(RATE_PROPNAME));
-            boolean pubsub = Boolean.parseBoolean(testParameters.getProperty(IS_PUBSUB_PROPNAME));
-
-            boolean afterCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_COMMIT));
-            boolean beforeCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_COMMIT));
-            boolean afterSend = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_SEND));
-            boolean beforeSend = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_SEND));
-            int batchSize = Integer.parseInt(testParameters.getProperty(BATCH_SIZE));
-            Boolean failOnce = Boolean.parseBoolean(testParameters.getProperty(FAIL_ONCE));
+            String brokerDetails = testParameters.getProperty(PingPongProducer.BROKER_PROPNAME);
+            String username = testParameters.getProperty(PingPongProducer.USERNAME_PROPNAME);
+            String password = testParameters.getProperty(PingPongProducer.PASSWORD_PROPNAME);
+            String virtualPath = testParameters.getProperty(PingPongProducer.VIRTUAL_PATH_PROPNAME);
+            String destinationName = testParameters.getProperty(PingPongProducer.PING_QUEUE_NAME_PROPNAME);
+            boolean persistent = testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME);
+            boolean transacted = testParameters.getPropertyAsBoolean(PingPongProducer.TRANSACTED_PROPNAME);
+            String selector = testParameters.getProperty(PingPongProducer.SELECTOR_PROPNAME);
+            boolean verbose = testParameters.getPropertyAsBoolean(PingPongProducer.VERBOSE_OUTPUT_PROPNAME);
+            int messageSize = testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME);
+            int rate = testParameters.getPropertyAsInteger(PingPongProducer.RATE_PROPNAME);
+            boolean pubsub = testParameters.getPropertyAsBoolean(PingPongProducer.IS_PUBSUB_PROPNAME);
+            boolean failAfterCommit = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_AFTER_COMMIT_PROPNAME);
+            boolean failBeforeCommit = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_BEFORE_COMMIT_PROPNAME);
+            boolean failAfterSend = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_AFTER_SEND_PROPNAME);
+            boolean failBeforeSend = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_BEFORE_SEND_PROPNAME);
+            int batchSize = testParameters.getPropertyAsInteger(PingPongProducer.COMMIT_BATCH_SIZE_PROPNAME);
+            Boolean failOnce = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_ONCE_PROPNAME);
 
-            synchronized(_lock)
+            synchronized (this)
             {
                 // Establish a bounce back client on the ping queue to bounce back the pings.
-                perThreadSetup._testPingBouncer = new PingPongBouncer(brokerDetails, username, password, virtualpath,
-                                                          queueName, persistent, transacted, selector, verbose, pubsub);
+                perThreadSetup._testPingBouncer = new PingPongBouncer(brokerDetails, username, password, virtualPath,
+                                                                      destinationName, persistent, transacted, selector,
+                                                                      verbose, pubsub);
 
                 // Start the connections for client and producer running.
                 perThreadSetup._testPingBouncer.getConnection().start();
 
                 // Establish a ping-pong client on the ping queue to send the pings with.
 
-                perThreadSetup._testPingProducer = new PingPongProducer(brokerDetails, username, password, virtualpath,
-                                                                    queueName, selector, transacted, persistent, messageSize,
-                                                                    verbose, afterCommit, beforeCommit, afterSend,
-                                                                    beforeSend, failOnce, batchSize, 0, rate, pubsub);
+                perThreadSetup._testPingProducer = new PingPongProducer(brokerDetails, username, password, virtualPath,
+                                                                        destinationName, selector, transacted, persistent,
+                                                                        messageSize, verbose, failAfterCommit,
+                                                                        failBeforeCommit, failAfterSend, failBeforeSend,
+                                                                        failOnce, batchSize, 0, rate, pubsub);
                 perThreadSetup._testPingProducer.getConnection().start();
             }
 
             // Attach the per-thread set to the thread.
             threadSetup.set(perThreadSetup);
         }
+        catch (Exception e)
+        {
+            _logger.warn("There was an exception during per thread setup.", e);
+        }
     }
 
-    protected void tearDown() throws Exception
+    /**
+     * Performs test fixture clean
+     */
+    public void threadTearDown()
     {
+        _logger.debug("public void threadTearDown(): called");
+
         try
         {
-            /**if ((_testPingBouncer != null) && (_testPingBouncer.getConnection() != null))
-             {
-             _testPingBouncer.getConnection().close();
-             }
-            
-             if ((_testPingProducer != null) && (_testPingProducer.getConnection() != null))
-             {
-             _testPingProducer.getConnection().close();
-             }*/
+            // Get the per thread test fixture.
+            PerThreadSetup perThreadSetup = threadSetup.get();
+
+            // Close the pingers so that it cleans up its connection cleanly.
+            synchronized (this)
+            {
+                perThreadSetup._testPingProducer.close();
+                //perThreadSetup._testPingBouncer.close();
+            }
+
+            // Ensure the per thread fixture is reclaimed.
+            threadSetup.remove();
         }
-        finally
+        catch (JMSException e)
         {
-            //NDC.pop();
+            _logger.warn("There was an exception during per thread tear down.");
         }
     }
 
-    private static class PerThreadSetup
+    protected static class PerThreadSetup
     {
         /**
          * Holds the test ping-pong producer.

Modified: incubator/qpid/branches/perftesting/qpid/java/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/pom.xml?view=diff&rev=502624&r1=502623&r2=502624
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/pom.xml (original)
+++ incubator/qpid/branches/perftesting/qpid/java/pom.xml Fri Feb  2 07:28:08 2007
@@ -458,7 +458,7 @@
                 <groupId>uk.co.thebadgerset</groupId>
                 <artifactId>junit-toolkit</artifactId>
                 <version>0.5-SNAPSHOT</version>
-                <scope>test</scope>
+                <scope>compile</scope>
             </dependency>
 
             <!-- Qpid Version Dependencies -->



Mime
View raw message