qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rupertlssm...@apache.org
Subject svn commit: r556958 [2/2] - in /incubator/qpid/branches/M2/java/integrationtests: ./ docs/ src/main/java/org/apache/qpid/interop/coordinator/ src/main/java/org/apache/qpid/interop/coordinator/testcases/ src/main/java/org/apache/qpid/interop/coordinator...
Date Tue, 17 Jul 2007 16:22:24 GMT
Added: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/InteropTestCase2BasicP2P.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/InteropTestCase2BasicP2P.java?view=auto&rev=556958
==============================================================================
--- incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/InteropTestCase2BasicP2P.java (added)
+++ incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/InteropTestCase2BasicP2P.java Tue Jul 17 09:22:16 2007
@@ -0,0 +1,95 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.interop.coordinator.testcases;
+
+import junit.framework.Assert;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.interop.coordinator.InteropTestCase;
+
+import javax.jms.Message;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Implements test case 2, from the interop test specification. This test sets up the TC2_BasicP2P test for 50
+ * messages. It checks that the sender and receiver reports both indicate that all the test messages were transmitted
+ * successfully.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Setup p2p test parameters and compare with test output. <td> {@link InteropTestCase}
+ * </table>
+ */
+public class InteropTestCase2BasicP2P extends InteropTestCase
+{
+    /** Used for debugging. */
+    private static final Logger log = Logger.getLogger(InteropTestCase2BasicP2P.class);
+
+    /**
+     * Creates a new coordinating test case with the specified name.
+     *
+     * @param name The test case name.
+     */
+    public InteropTestCase2BasicP2P(String name)
+    {
+        super(name);
+    }
+
+    /**
+     * Performs the basic P2P test case, "Test Case 2" in the specification.
+     *
+     * @throws Exception Any exceptions are allowed to fall through and fail the test.
+     */
+    public void testBasicP2P() throws Exception
+    {
+        log.debug("public void testBasicP2P(): called");
+
+        Map<String, Object> testConfig = new HashMap<String, Object>();
+        testConfig.put("TEST_NAME", "TC2_BasicP2P");
+        testConfig.put("P2P_QUEUE_AND_KEY_NAME", "tc2queue");
+        testConfig.put("P2P_NUM_MESSAGES", 50);
+
+        Message[] reports = sequenceTest(testConfig);
+
+        // Compare sender and receiver reports.
+        int messagesSent = reports[0].getIntProperty("MESSAGE_COUNT");
+        int messagesReceived = reports[1].getIntProperty("MESSAGE_COUNT");
+
+        Assert.assertEquals("The requested number of messages were not sent.", 50, messagesSent);
+        Assert.assertEquals("Sender and receiver messages sent did not match up.", messagesSent, messagesReceived);
+    }
+
+    /**
+     * Should provide a translation from the junit method name of a test to its test case name as defined in the
+     * interop testing specification. For example the method "testP2P" might map onto the interop test case name
+     * "TC2_BasicP2P".
+     *
+     * @param methodName The name of the JUnit test method.
+     * @return The name of the corresponding interop test case.
+     */
+    public String getTestCaseNameForTestMethod(String methodName)
+    {
+        return "TC2_BasicP2P";
+    }
+}

Added: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/InteropTestCase3BasicPubSub.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/InteropTestCase3BasicPubSub.java?view=auto&rev=556958
==============================================================================
--- incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/InteropTestCase3BasicPubSub.java (added)
+++ incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/InteropTestCase3BasicPubSub.java Tue Jul 17 09:22:16 2007
@@ -0,0 +1,93 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.interop.coordinator.testcases;
+
+import junit.framework.Assert;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.interop.coordinator.InteropTestCase;
+
+import javax.jms.Message;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Setup pub/sub test parameters and compare with test output. <td> {@link InteropTestCase}
+ * </table>
+ */
+public class InteropTestCase3BasicPubSub extends InteropTestCase
+{
+    /** Used for debugging. */
+    private static final Logger log = Logger.getLogger(InteropTestCase3BasicPubSub.class);
+
+    /**
+     * Creates a new coordinating test case with the specified name.
+     *
+     * @param name The test case name.
+     */
+    public InteropTestCase3BasicPubSub(String name)
+    {
+        super(name);
+    }
+
+    /**
+     * Performs the basic P2P test case, "Test Case 2" in the specification.
+     *
+     * @throws Exception Any exceptions are allowed to fall through and fail the test.
+     */
+    public void testBasicPubSub() throws Exception
+    {
+        log.debug("public void testBasicPubSub(): called");
+
+        Map<String, Object> testConfig = new HashMap<String, Object>();
+        testConfig.put("TEST_NAME", "TC3_BasicPubSub");
+        testConfig.put("PUBSUB_KEY", "tc3route");
+        testConfig.put("PUBSUB_NUM_MESSAGES", 10);
+        testConfig.put("PUBSUB_NUM_RECEIVERS", 5);
+
+        Message[] reports = sequenceTest(testConfig);
+
+        // Compare sender and receiver reports.
+        int messagesSent = reports[0].getIntProperty("MESSAGE_COUNT");
+        int messagesReceived = reports[1].getIntProperty("MESSAGE_COUNT");
+
+        Assert.assertEquals("The requested number of messages were not sent.", 10, messagesSent);
+        Assert.assertEquals("Received messages did not match up to num sent * num receivers.", messagesSent * 5,
+            messagesReceived);
+    }
+
+    /**
+     * Should provide a translation from the junit method name of a test to its test case name as defined in the
+     * interop testing specification. For example the method "testP2P" might map onto the interop test case name
+     * "TC2_BasicP2P".
+     *
+     * @param methodName The name of the JUnit test method.
+     * @return The name of the corresponding interop test case.
+     */
+    public String getTestCaseNameForTestMethod(String methodName)
+    {
+        return "TC3_BasicPubSub";
+    }
+}

Modified: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/InteropClientTestCase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/InteropClientTestCase.java?view=diff&rev=556958&r1=556957&r2=556958
==============================================================================
--- incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/InteropClientTestCase.java (original)
+++ incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/InteropClientTestCase.java Tue Jul 17 09:22:16 2007
@@ -44,7 +44,11 @@
     /** Defines the possible test case roles that an interop test case can take on. */
     public enum Roles
     {
-        SENDER, RECEIVER;
+        /** Specifies the sender role. */
+        SENDER,
+
+        /** Specifies the receiver role. */
+        RECEIVER
     }
 
     /**
@@ -78,18 +82,11 @@
     public void assignRole(Roles role, Message assignRoleMessage) throws JMSException;
 
     /**
-     * Performs the test case actions.
-     * return from here when you have finished the test.. this will signal the controller that the test has ended. 
+     * Performs the test case actions. Returning from here, indicates that the sending role has completed its test.
+     *
      * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through.
      */
     public void start() throws JMSException;
-
-    /**
-     * Gives notice of termination of the test case actions.
-     *
-     * @throws JMSException Any JMSException resulting from allowed to fall through.
-     */
-    public void terminate() throws JMSException, InterruptedException;
 
     /**
      * Gets a report on the actions performed by the test case in its assigned role.

Modified: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java?view=diff&rev=556958&r1=556957&r2=556958
==============================================================================
--- incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java (original)
+++ incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java Tue Jul 17 09:22:16 2007
@@ -21,42 +21,37 @@
 package org.apache.qpid.interop.testclient;
 
 import org.apache.log4j.Logger;
+
 import org.apache.qpid.interop.testclient.testcases.TestCase1DummyRun;
 import org.apache.qpid.interop.testclient.testcases.TestCase2BasicP2P;
-import org.apache.qpid.util.CommandLineParser;
-import org.apache.qpid.util.PropertiesUtils;
+import org.apache.qpid.interop.testclient.testcases.TestCase3BasicPubSub;
+import org.apache.qpid.test.framework.MessagingTestConfigProperties;
+import org.apache.qpid.test.framework.TestUtils;
+
+import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
+import uk.co.thebadgerset.junit.extensions.util.TestContextProperties;
+
+import javax.jms.*;
 
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.naming.Context;
-import javax.naming.InitialContext;
-import javax.naming.NamingException;
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Properties;
 
 /**
  * Implements a test client as described in the interop testing spec
  * (http://cwiki.apache.org/confluence/display/qpid/Interop+Testing+Specification). A test client is an agent that
  * reacts to control message sequences send by the test {@link org.apache.qpid.interop.coordinator.Coordinator}.
  *
- * <p/><table><caption>Messages Handled by TestClient</caption>
+ * <p/><table><caption>Messages Handled by SustainedTestClient</caption>
  * <tr><th> Message               <th> Action
  * <tr><td> Invite(compulsory)    <td> Reply with Enlist.
  * <tr><td> Invite(test case)     <td> Reply with Enlist if test case available.
  * <tr><td> AssignRole(test case) <td> Reply with Accept Role if matches an enlisted test. Keep test parameters.
  * <tr><td> Start                 <td> Send test messages defined by test parameters. Send report on messages sent.
  * <tr><td> Status Request        <td> Send report on messages received.
+ * <tr><td> Terminate             <td> Terminate the test client.
  * </table>
  *
  * <p><table id="crc"><caption>CRC Card</caption>
@@ -67,12 +62,11 @@
  */
 public class TestClient implements MessageListener
 {
+    /** Used for debugging. */
     private static Logger log = Logger.getLogger(TestClient.class);
 
-    public static final String CONNECTION_PROPERTY = "connectionfactory.broker";
-    public static final String CONNECTION_NAME = "broker";
+    /** Holds the default identifying name of the test client. */
     public static final String CLIENT_NAME = "java";
-    public static final String DEFAULT_CONNECTION_PROPS_RESOURCE = "org/apache/qpid/interop/connection.properties";
 
     /** Holds the URL of the broker to run the tests on. */
     public static String brokerUrl;
@@ -80,17 +74,34 @@
     /** Holds the virtual host to run the tests on. If <tt>null</tt>, then the default virtual host is used. */
     public static String virtualHost;
 
+    /**
+     * Holds the test context properties that provides the default test parameters, plus command line overrides.
+     * This is initialized with the default test parameters, to which command line overrides may be applied.
+     */
+    public static ParsedProperties testContextProperties =
+        TestContextProperties.getInstance(MessagingTestConfigProperties.defaults);
+
     /** Holds all the test cases loaded from the classpath. */
     Map<String, InteropClientTestCase> testCases = new HashMap<String, InteropClientTestCase>();
 
+    /** Holds the test case currently being run by this client. */
     protected InteropClientTestCase currentTestCase;
 
-    protected Connection _connection;
+    /** Holds the connection to the broker that the test is being coordinated on. */
+    protected Connection connection;
+
+    /** Holds the message producer to hold the test coordination over. */
     protected MessageProducer producer;
+
+    /** Holds the JMS session for the test coordination. */
     protected Session session;
 
+    /** Holds the name of this client, with a default value. */
     protected String clientName = CLIENT_NAME;
 
+    /** This flag indicates that the test client should attempt to join the currently running test case on start up. */
+    protected boolean join;
+
     /**
      * Creates a new interop test client, listenting to the specified broker and virtual host, with the specified client
      * identifying name.
@@ -99,15 +110,16 @@
      * @param virtualHost The virtual host to conect to.
      * @param clientName  The client name to use.
      */
-    public TestClient(String brokerUrl, String virtualHost, String clientName)
+    public TestClient(String brokerUrl, String virtualHost, String clientName, boolean join)
     {
-        log.debug("public TestClient(String brokerUrl = " + brokerUrl + ", String virtualHost = " + virtualHost
-                  + ", String clientName = " + clientName + "): called");
+        log.debug("public SustainedTestClient(String brokerUrl = " + brokerUrl + ", String virtualHost = " + virtualHost
+            + ", String clientName = " + clientName + "): called");
 
         // Retain the connection parameters.
         this.brokerUrl = brokerUrl;
         this.virtualHost = virtualHost;
         this.clientName = clientName;
+        this.join = join;
     }
 
     /**
@@ -124,49 +136,40 @@
      */
     public static void main(String[] args)
     {
-        // Use the command line parser to evaluate the command line.
-        CommandLineParser commandLine =
-                new CommandLineParser(
-                        new String[][]
-                                {
-                                        {"b", "The broker URL.", "broker", "false"},
-                                        {"h", "The virtual host to use.", "virtual host", "false"},
-                                        {"n", "The test client name.", "name", "false"}
-                                });
+        // Override the default broker url to be localhost:5672.
+        testContextProperties.setProperty(MessagingTestConfigProperties.BROKER_PROPNAME, "tcp://localhost:5672");
 
-        // Capture the command line arguments or display errors and correct usage and then exit.
-        Properties options = null;
-
-        try
-        {
-            options = commandLine.parseCommandLine(args);
-        }
-        catch (IllegalArgumentException e)
-        {
-            System.out.println(commandLine.getErrors());
-            System.out.println(commandLine.getUsage());
-            System.exit(1);
-        }
+        // Use the command line parser to evaluate the command line with standard handling behaviour (print errors
+        // and usage then exist if there are errors).
+        // Any options and trailing name=value pairs are also injected into the test context properties object,
+        // to override any defaults that may have been set up.
+        ParsedProperties options =
+            new ParsedProperties(uk.co.thebadgerset.junit.extensions.util.CommandLineParser.processCommandLine(args,
+                    new uk.co.thebadgerset.junit.extensions.util.CommandLineParser(
+                        new String[][]
+                        {
+                            { "b", "The broker URL.", "broker", "false" },
+                            { "h", "The virtual host to use.", "virtual host", "false" },
+                            { "o", "The name of the directory to output test timings to.", "dir", "false" },
+                            { "n", "The name of the test client.", "name", "false" },
+                            { "j", "Join this test client to running test.", "false" }
+                        }), testContextProperties));
 
         // Extract the command line options.
         String brokerUrl = options.getProperty("b");
         String virtualHost = options.getProperty("h");
         String clientName = options.getProperty("n");
-
-        // Add all the trailing command line options (name=value pairs) to system properties. Tests may pick up
-        // overridden values from there.
-        commandLine.addCommandLineToSysProperties();
+        boolean join = options.getPropertyAsBoolean("j");
 
         // Create a test client and start it running.
-        TestClient client = new TestClient(brokerUrl, virtualHost, (clientName == null) ? CLIENT_NAME : clientName);
+        TestClient client = new TestClient(brokerUrl, virtualHost, (clientName == null) ? CLIENT_NAME : clientName, join);
 
         // Use a class path scanner to find all the interop test case implementations.
+        // Hard code the test classes till the classpath scanner is fixed.
         Collection<Class<? extends InteropClientTestCase>> testCaseClasses =
-                new ArrayList<Class<? extends InteropClientTestCase>>();
+            new ArrayList<Class<? extends InteropClientTestCase>>();
         // ClasspathScanner.getMatches(InteropClientTestCase.class, "^TestCase.*", true);
-        // Hard code the test classes till the classpath scanner is fixed.
-        Collections.addAll(testCaseClasses,
-                           new Class[]{TestCase1DummyRun.class, TestCase2BasicP2P.class, TestClient.class});
+        Collections.addAll(testCaseClasses, TestCase1DummyRun.class, TestCase2BasicP2P.class, TestCase3BasicPubSub.class);
 
         try
         {
@@ -182,7 +185,10 @@
     /**
      * Starts the interop test client running. This causes it to start listening for incoming test invites.
      *
-     * @throws JMSException Any underlying JMSExceptions are allowed to fall through. @param testCaseClasses
+     * @param testCaseClasses The classes of the available test cases. The test case names from these are used to
+     *                        matchin incoming test invites against.
+     *
+     * @throws JMSException Any underlying JMSExceptions are allowed to fall through.
      */
     protected void start(Collection<Class<? extends InteropClientTestCase>> testCaseClasses) throws JMSException
     {
@@ -209,84 +215,36 @@
         }
 
         // Open a connection to communicate with the coordinator on.
-        _connection = createConnection(DEFAULT_CONNECTION_PROPS_RESOURCE, clientName, brokerUrl, virtualHost);
-
-        session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        connection = TestUtils.createConnection(testContextProperties);
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
         // Set this up to listen for control messages.
-        MessageConsumer consumer = session.createConsumer(session.createTopic("iop.control." + clientName));
+        Topic privateControlTopic = session.createTopic("iop.control." + clientName);
+        MessageConsumer consumer = session.createConsumer(privateControlTopic);
         consumer.setMessageListener(this);
 
-        MessageConsumer consumer2 = session.createConsumer(session.createTopic("iop.control"));
+        Topic controlTopic = session.createTopic("iop.control");
+        MessageConsumer consumer2 = session.createConsumer(controlTopic);
         consumer2.setMessageListener(this);
 
         // Create a producer to send replies with.
         producer = session.createProducer(null);
 
-        // Start listening for incoming control messages.
-        _connection.start();
-    }
-
-
-    public static Connection createConnection(String connectionPropsResource, String brokerUrl, String virtualHost)
-    {
-        return createConnection(connectionPropsResource, "clientID", brokerUrl, virtualHost);
-    }
-
-    /**
-     * Establishes a JMS connection using a properties file and qpids built in JNDI implementation. This is a simple
-     * convenience method for code that does anticipate handling connection failures. All exceptions that indicate that
-     * the connection has failed, are wrapped as rutime exceptions, preumably handled by a top level failure handler.
-     *
-     * @param connectionPropsResource The name of the connection properties file.
-     * @param clientID
-     * @param brokerUrl               The broker url to connect to, <tt>null</tt> to use the default from the
-     *                                properties.
-     * @param virtualHost             The virtual host to connectio to, <tt>null</tt> to use the default.
-     *
-     * @return A JMS conneciton.
-     *
-     * @todo Make username/password configurable. Allow multiple urls for fail over. Once it feels right, move it to a
-     * Utils library class.
-     */
-    public static Connection createConnection(String connectionPropsResource, String clientID, String brokerUrl, String virtualHost)
-    {
-        log.debug("public static Connection createConnection(String connectionPropsResource = " + connectionPropsResource
-                  + ", String brokerUrl = " + brokerUrl + ", String clientID = " + clientID
-                  + ", String virtualHost = " + virtualHost + " ): called");
-
-        try
+        // If the join flag was set, then broadcast a join message to notify the coordinator that a new test client
+        // is available to join the current test case, if it supports it. This message may be ignored, or it may result
+        // in this test client receiving a test invite.
+        if (join)
         {
-            Properties connectionProps =
-                    PropertiesUtils.getProperties(TestClient.class.getClassLoader().getResourceAsStream(
-                            connectionPropsResource));
+            Message joinMessage = session.createMessage();
 
-            if (brokerUrl != null)
-            {
-                String connectionString =
-                        "amqp://guest:guest@" + clientID + "/" + ((virtualHost != null) ? virtualHost : "") + "?brokerlist='" + brokerUrl + "'";
-                connectionProps.setProperty(CONNECTION_PROPERTY, connectionString);
-            }
-
-            Context ctx = new InitialContext(connectionProps);
-
-            ConnectionFactory cf = (ConnectionFactory) ctx.lookup(CONNECTION_NAME);
-            Connection connection = cf.createConnection();
-
-            return connection;
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
-        }
-        catch (NamingException e)
-        {
-            throw new RuntimeException(e);
-        }
-        catch (JMSException e)
-        {
-            throw new RuntimeException(e);
+            joinMessage.setStringProperty("CONTROL_TYPE", "JOIN");
+            joinMessage.setStringProperty("CLIENT_NAME", clientName);
+            joinMessage.setStringProperty("CLIENT_PRIVATE_CONTROL_KEY", "iop.control." + clientName);
+            producer.send(controlTopic, joinMessage);
         }
+
+        // Start listening for incoming control messages.
+        connection.start();
     }
 
     /**
@@ -394,16 +352,8 @@
             {
                 log.info("Received termination instruction from coordinator.");
 
-//                try
-//                {
-//                    currentTestCase.terminate();
-//                }
-//                catch (InterruptedException e)
-//                {
-//                    //
-//                }
                 // Is a cleaner shutdown needed?
-                _connection.close();
+                connection.close();
                 System.exit(0);
             }
             else

Modified: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase1DummyRun.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase1DummyRun.java?view=diff&rev=556958&r1=556957&r2=556958
==============================================================================
--- incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase1DummyRun.java (original)
+++ incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase1DummyRun.java Tue Jul 17 09:22:16 2007
@@ -43,8 +43,15 @@
  */
 public class TestCase1DummyRun implements InteropClientTestCase
 {
+    /** Used for debugging. */
     private static final Logger log = Logger.getLogger(TestCase1DummyRun.class);
 
+    /**
+     * Should provide the name of the test case that this class implements. The exact names are defined in the
+     * interop testing spec.
+     *
+     * @return The name of the test case that this implements.
+     */
     public String getName()
     {
         log.debug("public String getName(): called");
@@ -52,6 +59,15 @@
         return "TC1_DummyRun";
     }
 
+    /**
+     * Determines whether the test invite that matched this test case is acceptable.
+     *
+     * @param inviteMessage The invitation to accept or reject.
+     *
+     * @return <tt>true</tt> to accept the invitation, <tt>false</tt> to reject it.
+     *
+     * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through.
+     */
     public boolean acceptInvite(Message inviteMessage) throws JMSException
     {
         log.debug("public boolean acceptInvite(Message inviteMessage): called");
@@ -60,6 +76,15 @@
         return true;
     }
 
+    /**
+     * Assigns the role to be played by this test case. The test parameters are fully specified in the
+     * assignment message. When this method return the test case will be ready to execute.
+     *
+     * @param role              The role to be played; sender or receiver.
+     * @param assignRoleMessage The role assingment message, contains the full test parameters.
+     *
+     * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through.
+     */
     public void assignRole(Roles role, Message assignRoleMessage) throws JMSException
     {
         log.debug("public void assignRole(Roles role, Message assignRoleMessage): called");
@@ -67,6 +92,9 @@
         // Do nothing, both roles are the same.
     }
 
+    /**
+     * Performs the test case actions. Returning from here, indicates that the sending role has completed its test.
+     */
     public void start()
     {
         log.debug("public void start(): called");
@@ -74,11 +102,15 @@
         // Do nothing.
     }
 
-    public void terminate() throws JMSException
-    {
-        //todo
-    }
-
+    /**
+     * Gets a report on the actions performed by the test case in its assigned role.
+     *
+     * @param session The session to create the report message in.
+     *
+     * @return The report message.
+     *
+     * @throws JMSException Any JMSExceptions resulting from creating the report are allowed to fall through.
+     */
     public Message getReport(Session session) throws JMSException
     {
         log.debug("public Message getReport(Session session): called");
@@ -87,6 +119,11 @@
         return session.createTextMessage("Dummy Run, Ok.");
     }
 
+    /**
+     * Handles incoming test messages. Does nothing.
+     *
+     * @param message The incoming test message.
+     */
     public void onMessage(Message message)
     {
         log.debug("public void onMessage(Message message = " + message + "): called");

Modified: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase2BasicP2P.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase2BasicP2P.java?view=diff&rev=556958&r1=556957&r2=556958
==============================================================================
--- incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase2BasicP2P.java (original)
+++ incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase2BasicP2P.java Tue Jul 17 09:22:16 2007
@@ -20,12 +20,13 @@
  */
 package org.apache.qpid.interop.testclient.testcases;
 
-import javax.jms.*;
-
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.interop.testclient.InteropClientTestCase;
 import org.apache.qpid.interop.testclient.TestClient;
+import org.apache.qpid.test.framework.TestUtils;
+
+import javax.jms.*;
 
 /**
  * Implements test case 2, basic P2P. Sends/received a specified number of messages to a specified route on the
@@ -54,9 +55,6 @@
     /** The number of test messages to send. */
     private int numMessages;
 
-    /** The routing key to send them to on the default direct exchange. */
-    private Destination sendDestination;
-
     /** The connection to send the test messages on. */
     private Connection connection;
 
@@ -118,14 +116,12 @@
         this.role = role;
 
         // Create a new connection to pass the test messages on.
-        connection =
-            TestClient.createConnection(TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE, TestClient.brokerUrl,
-                TestClient.virtualHost);
+        connection = TestUtils.createConnection(TestClient.testContextProperties);
         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
         // Extract and retain the test parameters.
         numMessages = assignRoleMessage.getIntProperty("P2P_NUM_MESSAGES");
-        sendDestination = session.createQueue(assignRoleMessage.getStringProperty("P2P_QUEUE_AND_KEY_NAME"));
+        Destination sendDestination = session.createQueue(assignRoleMessage.getStringProperty("P2P_QUEUE_AND_KEY_NAME"));
 
         log.debug("numMessages = " + numMessages);
         log.debug("sendDestination = " + sendDestination);
@@ -149,7 +145,9 @@
     }
 
     /**
-     * Performs the test case actions.
+     * Performs the test case actions. Returning from here, indicates that the sending role has completed its test.
+     *
+     * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through.
      */
     public void start() throws JMSException
     {
@@ -168,11 +166,6 @@
                 messageCount++;
             }
         }
-    }
-
-    public void terminate() throws JMSException
-    {
-        //todo
     }
 
     /**

Modified: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java?view=diff&rev=556958&r1=556957&r2=556958
==============================================================================
--- incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java (original)
+++ incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java Tue Jul 17 09:22:16 2007
@@ -18,14 +18,15 @@
  * under the License.
  *
  */
-
 package org.apache.qpid.interop.testclient.testcases;
 
-import javax.jms.*;
-
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.interop.testclient.InteropClientTestCase;
+import org.apache.qpid.interop.testclient.TestClient;
+import org.apache.qpid.test.framework.TestUtils;
+
+import javax.jms.*;
 
 /**
  * Implements test case 3, basic pub/sub. Sends/received a specified number of messages to a specified route on the
@@ -55,12 +56,6 @@
     /** The number of test messages to send. */
     private int numMessages;
 
-    /** The number of receiver connection to use. */
-    private int numReceivers;
-
-    /** The routing key to send them to on the default direct exchange. */
-    private Destination sendDestination;
-
     /** The connections to send/receive the test messages on. */
     private Connection[] connection;
 
@@ -123,7 +118,7 @@
 
         // Extract and retain the test parameters.
         numMessages = assignRoleMessage.getIntProperty("PUBSUB_NUM_MESSAGES");
-        numReceivers = assignRoleMessage.getIntProperty("PUBSUB_NUM_RECEIVERS");
+        int numReceivers = assignRoleMessage.getIntProperty("PUBSUB_NUM_RECEIVERS");
         String sendKey = assignRoleMessage.getStringProperty("PUBSUB_KEY");
 
         log.debug("numMessages = " + numMessages);
@@ -139,13 +134,11 @@
             connection = new Connection[1];
             session = new Session[1];
 
-            connection[0] =
-                org.apache.qpid.interop.testclient.TestClient.createConnection(org.apache.qpid.interop.testclient.TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE, org.apache.qpid.interop.testclient.TestClient.brokerUrl,
-                    org.apache.qpid.interop.testclient.TestClient.virtualHost);
+            connection[0] = TestUtils.createConnection(TestClient.testContextProperties);
             session[0] = connection[0].createSession(false, Session.AUTO_ACKNOWLEDGE);
 
             // Extract and retain the test parameters.
-            sendDestination = session[0].createTopic(sendKey);
+            Destination sendDestination = session[0].createTopic(sendKey);
 
             producer = session[0].createProducer(sendDestination);
             break;
@@ -159,9 +152,7 @@
 
             for (int i = 0; i < numReceivers; i++)
             {
-                connection[i] =
-                    org.apache.qpid.interop.testclient.TestClient.createConnection(org.apache.qpid.interop.testclient.TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE, org.apache.qpid.interop.testclient.TestClient.brokerUrl,
-                        org.apache.qpid.interop.testclient.TestClient.virtualHost);
+                connection[i] = TestUtils.createConnection(TestClient.testContextProperties);
                 session[i] = connection[i].createSession(false, Session.AUTO_ACKNOWLEDGE);
 
                 sendDestination = session[i].createTopic(sendKey);
@@ -174,14 +165,16 @@
         }
 
         // Start all the connection dispatcher threads running.
-        for (int i = 0; i < connection.length; i++)
+        for (Connection conn : connection)
         {
-            connection[i].start();
+            conn.start();
         }
     }
 
     /**
-     * Performs the test case actions.
+     * Performs the test case actions. Returning from here, indicates that the sending role has completed its test.
+     *
+     * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through.
      */
     public void start() throws JMSException
     {
@@ -202,11 +195,6 @@
         }
     }
 
-    public void terminate() throws JMSException, InterruptedException
-    {
-        //todo
-    }
-
     /**
      * Gets a report on the actions performed by the test case in its assigned role.
      *
@@ -221,9 +209,9 @@
         log.debug("public Message getReport(Session session): called");
 
         // Close the test connections.
-        for (int i = 0; i < connection.length; i++)
+        for (Connection conn : connection)
         {
-            connection[i].close();
+            conn.close();
         }
 
         // Generate a report message containing the count of the number of messages passed.

Added: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedClientTestCase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedClientTestCase.java?view=auto&rev=556958
==============================================================================
--- incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedClientTestCase.java (added)
+++ incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedClientTestCase.java Tue Jul 17 09:22:16 2007
@@ -0,0 +1,905 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.sustained;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.client.AMQNoConsumersException;
+import org.apache.qpid.client.AMQNoRouteException;
+import org.apache.qpid.interop.testclient.TestClient;
+import org.apache.qpid.interop.testclient.testcases.TestCase3BasicPubSub;
+import org.apache.qpid.test.framework.TestUtils;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * Implements test case 3, basic pub/sub. Sends/received a specified number of messages to a specified route on the
+ * default topic exchange, using the specified number of receiver connections. Produces reports on the actual number of
+ * messages sent/received.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Supply the name of the test case that this implements.
+ * <tr><td> Accept/Reject invites based on test parameters.
+ * <tr><td> Adapt to assigned roles.
+ * <tr><td> Send required number of test messages using pub/sub. <tr><td> Generate test reports.
+ * </table>
+ */
+public class SustainedClientTestCase extends TestCase3BasicPubSub implements ExceptionListener
+{
+    /** Used for debugging. */
+    private static final Logger log = Logger.getLogger(SustainedClientTestCase.class);
+
+    /** Used to log to the console. */
+    private static final Logger console = Logger.getLogger("SustainedTest");
+
+    /** The role to be played by the test. */
+    private Roles role;
+
+    /** The number of receiver connection to use. */
+    private int numReceivers;
+
+    /** The routing key to send them to on the default direct exchange. */
+    private Destination sendDestination;
+
+    /** The routing key to send updates to on the default direct exchange. */
+    private Destination sendUpdateDestination;
+
+    /** The connections to send/receive the test messages on. */
+    private Connection[] connection;
+
+    /** The sessions to send/receive the test messages on. */
+    private Session[] session;
+
+    /** The producer to send the test messages with. */
+    MessageProducer producer;
+
+    /** Adapter that adjusts the send rate based on the updates from clients. */
+    SustainedRateAdapter _rateAdapter;
+
+    /**  */
+    int _batchSize;
+
+    private static final long TEN_MILLI_SEC = 10000000;
+    private static final int DEBUG_LOG_UPATE_INTERVAL = 10;
+    private static final int LOG_UPATE_INTERVAL = 10;
+    private static final boolean SLEEP_PER_MESSAGE = Boolean.getBoolean("sleepPerMessage");
+
+    /**
+     * Should provide the name of the test case that this class implements. The exact names are defined in the interop
+     * testing spec.
+     *
+     * @return The name of the test case that this implements.
+     */
+    public String getName()
+    {
+        log.debug("public String getName(): called");
+
+        return "Perf_SustainedPubSub";
+    }
+
+    /**
+     * Assigns the role to be played by this test case. The test parameters are fully specified in the assignment
+     * message. When this method return the test case will be ready to execute.
+     *
+     * @param role              The role to be played; sender or receiver.
+     * @param assignRoleMessage The role assingment message, contains the full test parameters.
+     *
+     * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through.
+     */
+    public void assignRole(Roles role, Message assignRoleMessage) throws JMSException
+    {
+        log.debug("public void assignRole(Roles role = " + role + ", Message assignRoleMessage = " + assignRoleMessage
+            + "): called");
+
+        // Take note of the role to be played.
+        this.role = role;
+
+        // Extract and retain the test parameters.
+        numReceivers = assignRoleMessage.getIntProperty("SUSTAINED_NUM_RECEIVERS");
+        _batchSize = assignRoleMessage.getIntProperty("SUSTAINED_UPDATE_INTERVAL");
+        String sendKey = assignRoleMessage.getStringProperty("SUSTAINED_KEY");
+        String sendUpdateKey = assignRoleMessage.getStringProperty("SUSTAINED_UPDATE_KEY");
+        int ackMode = assignRoleMessage.getIntProperty("ACKNOWLEDGE_MODE");
+        String clientName = assignRoleMessage.getStringProperty("CLIENT_NAME");
+
+        if (log.isDebugEnabled())
+        {
+            log.debug("numReceivers = " + numReceivers);
+            log.debug("_batchSize = " + _batchSize);
+            log.debug("ackMode = " + ackMode);
+            log.debug("sendKey = " + sendKey);
+            log.debug("sendUpdateKey = " + sendUpdateKey);
+            log.debug("role = " + role);
+        }
+
+        switch (role)
+        {
+        // Check if the sender role is being assigned, and set up a single message producer if so.
+        case SENDER:
+            console.info("Creating Sender");
+            // Create a new connection to pass the test messages on.
+            connection = new Connection[1];
+            session = new Session[1];
+
+            connection[0] = TestUtils.createConnection(TestClient.testContextProperties);
+            session[0] = connection[0].createSession(false, ackMode);
+
+            // Extract and retain the test parameters.
+            sendDestination = session[0].createTopic(sendKey);
+
+            connection[0].setExceptionListener(this);
+
+            producer = session[0].createProducer(sendDestination);
+
+            sendUpdateDestination = session[0].createTopic(sendUpdateKey);
+            MessageConsumer updateConsumer = session[0].createConsumer(sendUpdateDestination);
+
+            _rateAdapter = new SustainedRateAdapter(this);
+            updateConsumer.setMessageListener(_rateAdapter);
+
+            break;
+
+        // Otherwise the receiver role is being assigned, so set this up to listen for messages on the required number
+        // of receiver connections.
+        case RECEIVER:
+            console.info("Creating Receiver");
+            // Create the required number of receiver connections.
+            connection = new Connection[numReceivers];
+            session = new Session[numReceivers];
+
+            for (int i = 0; i < numReceivers; i++)
+            {
+                connection[i] = TestUtils.createConnection(TestClient.testContextProperties);
+                session[i] = connection[i].createSession(false, ackMode);
+
+                sendDestination = session[i].createTopic(sendKey);
+
+                sendUpdateDestination = session[i].createTopic(sendUpdateKey);
+
+                MessageConsumer consumer = session[i].createConsumer(sendDestination);
+
+                consumer.setMessageListener(new SustainedListener(clientName + "-" + i, _batchSize, session[i],
+                        sendUpdateDestination));
+            }
+
+            break;
+        }
+
+        // Start all the connection dispatcher threads running.
+        for (int i = 0; i < connection.length; i++)
+        {
+            connection[i].start();
+        }
+    }
+
+    /** Performs the test case actions. */
+    public void start() throws JMSException
+    {
+        log.debug("public void start(): called");
+
+        // Check that the sender role is being performed.
+        switch (role)
+        {
+        // Check if the sender role is being assigned, and set up a single message producer if so.
+        case SENDER:
+            _rateAdapter.run();
+            break;
+        case RECEIVER:
+
+        }
+
+        // return from here when you have finished the test.. this will signal the controller and
+    }
+
+    public void terminate() throws JMSException, InterruptedException
+    {
+        if (_rateAdapter != null)
+        {
+            _rateAdapter.stop();
+        }
+    }
+
+    /**
+     * Gets a report on the actions performed by the test case in its assigned role.
+     *
+     * @param session The session to create the report message in.
+     *
+     * @return The report message.
+     *
+     * @throws JMSException Any JMSExceptions resulting from creating the report are allowed to fall through.
+     */
+    public Message getReport(Session session) throws JMSException
+    {
+        log.debug("public Message getReport(Session session): called");
+
+        // Close the test connections.
+        for (int i = 0; i < connection.length; i++)
+        {
+            connection[i].close();
+        }
+
+        Message report = session.createMessage();
+        report.setStringProperty("CONTROL_TYPE", "REPORT");
+
+        return report;
+    }
+
+    public void onException(JMSException jmsException)
+    {
+        Exception linked = jmsException.getLinkedException();
+
+        if (linked != null)
+        {
+            if (log.isDebugEnabled())
+            {
+                log.debug("Linked Exception:" + linked);
+            }
+
+            if ((linked instanceof AMQNoRouteException) || (linked instanceof AMQNoConsumersException))
+            {
+                if (log.isDebugEnabled())
+                {
+                    if (linked instanceof AMQNoConsumersException)
+                    {
+                        log.warn("No clients currently available for message:"
+                            + ((AMQNoConsumersException) linked).getUndeliveredMessage());
+                    }
+                    else
+                    {
+                        log.warn("No route for message");
+                    }
+                }
+
+                // Tell the rate adapter that there are no clients ready yet
+                _rateAdapter.NO_CLIENTS = true;
+            }
+        }
+        else
+        {
+            log.warn("Exception:" + linked);
+        }
+    }
+
+    /**
+     * Inner class that listens for messages and sends a report for the time taken between receiving the 'start' and
+     * 'end' messages.
+     */
+    class SustainedListener implements MessageListener
+    {
+        /** Number of messages received */
+        private long _received = 0;
+        /** The number of messages in the batch */
+        private int _batchSize = 0;
+        /** Record of the when the 'start' messagse was sen */
+        private Long _startTime;
+        /** Message producer to use to send reports */
+        MessageProducer _updater;
+        /** Session to create the report message on */
+        Session _session;
+        /** Record of the client ID used for this SustainedListnener */
+        String _client;
+
+        /**
+         * Main Constructor
+         *
+         * @param clientname      The _client id used to identify this connection.
+         * @param batchSize       The number of messages that are to be sent per batch. Note: This is not used to
+         *                        control the interval between sending reports.
+         * @param session         The session used for communication.
+         * @param sendDestination The destination that update reports should be sent to.
+         *
+         * @throws JMSException My occur if creatingthe Producer fails
+         */
+        public SustainedListener(String clientname, int batchSize, Session session, Destination sendDestination)
+            throws JMSException
+        {
+            _batchSize = batchSize;
+            _client = clientname;
+            _session = session;
+            _updater = session.createProducer(sendDestination);
+        }
+
+        public void onMessage(Message message)
+        {
+            if (log.isTraceEnabled())
+            {
+                log.trace("Message " + _received + "received in listener");
+            }
+
+            if (message instanceof TextMessage)
+            {
+                try
+                {
+                    _received++;
+                    if (((TextMessage) message).getText().equals("start"))
+                    {
+                        log.debug("Starting Batch");
+                        _startTime = System.nanoTime();
+                    }
+                    else if (((TextMessage) message).getText().equals("end"))
+                    {
+                        if (_startTime != null)
+                        {
+                            long currentTime = System.nanoTime();
+                            sendStatus(currentTime - _startTime, _received, message.getIntProperty("BATCH"));
+                            log.debug("End Batch");
+                        }
+                    }
+                }
+                catch (JMSException e)
+                {
+                    // ignore error
+                }
+            }
+
+        }
+
+        /**
+         * sendStatus creates and sends the report back to the publisher
+         *
+         * @param time     taken for the the last batch
+         * @param received Total number of messages received.
+         * @param batchNumber the batch number
+         * @throws JMSException if an error occurs during the send
+         */
+        private void sendStatus(long time, long received, int batchNumber) throws JMSException
+        {
+            Message updateMessage = _session.createTextMessage("update");
+            updateMessage.setStringProperty("CLIENT_ID", ":" + _client);
+            updateMessage.setStringProperty("CONTROL_TYPE", "UPDATE");
+            updateMessage.setLongProperty("RECEIVED", received);
+            updateMessage.setIntProperty("BATCH", batchNumber);
+            updateMessage.setLongProperty("DURATION", time);
+
+            if (log.isInfoEnabled())
+            {
+                log.info("**** SENDING [" + batchNumber + "]**** " + "CLIENT_ID:" + _client + " RECEIVED:" + received
+                    + " BATCH:" + batchNumber + " DURATION:" + time);
+            }
+
+            // Output on the main console.info the details of this batch
+            if ((batchNumber % 10) == 0)
+            {
+                console.info("Sending Report [" + batchNumber + "] " + "CLIENT_ID:" + _client + " RECEIVED:" + received
+                    + " BATCH:" + batchNumber + " DURATION:" + time);
+            }
+
+            _updater.send(updateMessage);
+        }
+    }
+
+    /**
+     * This class is used here to adjust the _delay value which in turn is used to control the number of messages/second
+     * that are sent through the test system.
+     *
+     * By keeping a record of the messages recevied and the average time taken to process the batch size can be
+     * calculated and so the delay can be adjusted to maintain that rate.
+     *
+     * Given that delays of < 10ms can be rounded up the delay is only used between messages if the _delay > 10ms * no
+     * messages in the batch. Otherwise the delay is used at the end of the batch.
+     */
+    class SustainedRateAdapter implements MessageListener, Runnable
+    {
+        private SustainedClientTestCase _client;
+        private long _batchVariance = Integer.getInteger("batchVariance", 3); // no. batches to allow drifting
+        private long _timeVariance = TEN_MILLI_SEC * 5; // no. nanos between send and report delay (10ms)
+        private volatile long _delay; // in nanos
+        private long _sent;
+        private Map<String, Long> _slowClients = new HashMap<String, Long>();
+        private static final long PAUSE_SLEEP = TEN_MILLI_SEC / 1000; // 10 ms
+        private static final long NO_CLIENT_SLEEP = 1000; // 1s
+        private volatile boolean NO_CLIENTS = true;
+        private int _delayShifting;
+        private final int REPORTS_WITHOUT_CHANGE = Integer.getInteger("stableReportCount", 5);
+        private boolean _warmedup = false;
+        private static final long EXPECTED_TIME_PER_BATCH = 100000L;
+        private int _warmUpBatches = Integer.getInteger("warmUpBatches", 10);
+
+        SustainedRateAdapter(SustainedClientTestCase client)
+        {
+            _client = client;
+        }
+
+        public void onMessage(Message message)
+        {
+            if (log.isDebugEnabled())
+            {
+                log.debug("SustainedRateAdapter onMessage(Message message = " + message + "): called");
+            }
+
+            try
+            {
+                String controlType = message.getStringProperty("CONTROL_TYPE");
+
+                // Check if the message is a test invite.
+                if ("UPDATE".equals(controlType))
+                {
+                    NO_CLIENTS = false;
+                    long duration = message.getLongProperty("DURATION");
+                    long totalReceived = message.getLongProperty("RECEIVED");
+                    String client = message.getStringProperty("CLIENT_ID");
+                    int batchNumber = message.getIntProperty("BATCH");
+
+                    if (log.isInfoEnabled() && ((batchNumber % DEBUG_LOG_UPATE_INTERVAL) == 0))
+                    {
+                        log.info("Update Report: CLIENT_ID:" + client + " RECEIVED:" + totalReceived + " Recevied BATCH:"
+                            + batchNumber + " DURATION:" + duration);
+                    }
+
+                    recordSlow(client, totalReceived, batchNumber);
+
+                    adjustDelay(client, batchNumber, duration);
+
+                    // Warm up completes when:
+                    // we haven't warmed up
+                    // and the number of batches sent to each client is at least half of the required warmup batches
+                    if (!_warmedup && (batchNumber >= _warmUpBatches))
+                    {
+                        _warmedup = true;
+                        _warmup.countDown();
+
+                    }
+                }
+            }
+            catch (JMSException e)
+            {
+                //
+            }
+        }
+
+        CountDownLatch _warmup = new CountDownLatch(1);
+
+        int _numBatches = 10000;
+
+        // long[] _timings = new long[_numBatches];
+        private boolean _running = true;
+
+        public void run()
+        {
+            console.info("Warming up");
+
+            doBatch(_warmUpBatches);
+
+            try
+            {
+                // wait for warmup to complete.
+                _warmup.await();
+
+                // set delay to the average length of the batches
+                _delay = _totalDuration / _warmUpBatches / delays.size();
+
+                console.info("Warmup complete delay set : " + _delay + " based on _totalDuration: " + _totalDuration
+                    + " over no. batches: " + _warmUpBatches + " with client count: " + delays.size());
+
+                _totalDuration = 0L;
+                _totalReceived = 0L;
+                _sent = 0L;
+            }
+            catch (InterruptedException e)
+            {
+                //
+            }
+
+            doBatch(_numBatches);
+
+        }
+
+        private void doBatch(int batchSize) // long[] timings,
+        {
+            TextMessage testMessage = null;
+            try
+            {
+                testMessage = _client.session[0].createTextMessage("start");
+
+                for (int batch = 0; batch <= batchSize; batch++)
+                // while (_running)
+                {
+                    long start = System.nanoTime();
+
+                    testMessage.setText("start");
+                    testMessage.setIntProperty("BATCH", batch);
+
+                    _client.producer.send(testMessage);
+                    _rateAdapter.sentMessage();
+
+                    testMessage.setText("test");
+                    // start at 2 so start and end count as part of batch
+                    for (int m = 2; m < _batchSize; m++)
+                    {
+                        _client.producer.send(testMessage);
+                        _rateAdapter.sentMessage();
+                    }
+
+                    testMessage.setText("end");
+                    _client.producer.send(testMessage);
+                    _rateAdapter.sentMessage();
+
+                    long end = System.nanoTime();
+
+                    long sendtime = end - start;
+
+                    if (log.isDebugEnabled())
+                    {
+                        log.info("Sent batch[" + batch + "](" + _batchSize + ") in " + sendtime); // timings[batch]);
+                    }
+
+                    if ((batch % LOG_UPATE_INTERVAL) == 0)
+                    {
+                        console.info("Sent Batch[" + batch + "](" + _batchSize + ")" + status());
+                    }
+
+                    _rateAdapter.sleepBatch();
+
+                }
+            }
+            catch (JMSException e)
+            {
+                console.error("Runner ended");
+            }
+        }
+
+        private String status()
+        {
+            return " TotalDuration: " + _totalDuration + " for " + delays.size() + " consumers" + " Delay is " + _delay
+                + " resulting in "
+                + ((_delay > (TEN_MILLI_SEC * _batchSize)) ? ((_delay / _batchSize) + "/msg") : (_delay + "/batch"));
+        }
+
+        private void sleepBatch()
+        {
+            if (checkForSlowClients())
+            { // if there werwe slow clients we have already slept so don't sleep anymore again.
+                return;
+            }
+
+            if (!SLEEP_PER_MESSAGE)
+            {
+                // per batch sleep.. if sleep is to small to spread over the batch.
+                if (_delay <= (TEN_MILLI_SEC * _batchSize))
+                {
+                    sleepLong(_delay);
+                }
+                else
+                {
+                    log.info("Not sleeping _delay > ten*batch is:" + _delay);
+                }
+            }
+        }
+
+        public void stop()
+        {
+            _running = false;
+        }
+
+        Map<String, Long> delays = new HashMap<String, Long>();
+        Long _totalReceived = 0L;
+        Long _totalDuration = 0L;
+        int _skipUpdate = 0;
+
+        /**
+         * Adjust the delay for sending messages based on this update from the client
+         *
+         * @param client        The client that send this update
+         * @param duration      The time taken for the last batch of messagse
+         * @param batchNumber   The reported batchnumber from the client
+         */
+        private void adjustDelay(String client, int batchNumber, long duration)
+        {
+            // Retrieve the current total time taken for this client.
+            Long currentTime = delays.get(client);
+
+            // Add the new duration time to this client
+            if (currentTime == null)
+            {
+                currentTime = duration;
+            }
+            else
+            {
+                currentTime += duration;
+            }
+
+            delays.put(client, currentTime);
+
+            long batchesSent = _sent / _batchSize;
+
+            // ensure we don't divide by zero
+            if (batchesSent == 0)
+            {
+                batchesSent = 1L;
+            }
+
+            _totalReceived += _batchSize;
+            _totalDuration += duration;
+
+            // calculate average duration accross clients per batch
+            long averageDuration = _totalDuration / delays.size() / batchesSent;
+
+            // calculate the difference between current send delay and average report delay
+            long diff = (duration) - averageDuration;
+
+            if (log.isInfoEnabled() && ((batchNumber % DEBUG_LOG_UPATE_INTERVAL) == 0))
+            {
+                log.info("TotalDuration:" + _totalDuration + " for " + delays.size() + " consumers." + " on batch: "
+                    + batchesSent + " received batch: " + batchNumber + " Batch Duration: " + duration + " Average: "
+                    + averageDuration + " so diff: " + diff + " for : " + client + " Delay is " + _delay + " resulting in "
+                    + ((_delay > (TEN_MILLI_SEC * _batchSize)) ? ((_delay / _batchSize) + "/msg") : (_delay + "/batch")));
+            }
+
+            // if the averageDuration differs from the current by more than the specified variane then adjust delay.
+            if (Math.abs(diff) > _timeVariance)
+            {
+
+                // if the the _delay is larger than the required duration to send report
+                // speed up
+                if (diff > TEN_MILLI_SEC)
+                {
+                    _delay -= TEN_MILLI_SEC;
+
+                    if (_delay < 0)
+                    {
+                        _delay = 0;
+                        log.info("Reset _delay to 0");
+                        delayStable();
+                    }
+                    else
+                    {
+                        delayChanged();
+                    }
+
+                }
+                else if (diff < 0) // diff < 0 diff cannot be 0 as it is > _timeVariance
+                {
+                    // the report took longer
+                    _delay += TEN_MILLI_SEC;
+                    delayChanged();
+                }
+            }
+            else
+            {
+                delayStable();
+            }
+
+            // If we have a consumer that is behind with the batches.
+            if ((batchesSent - batchNumber) > _batchVariance)
+            {
+                log.debug("Increasing _delay as sending more than receiving");
+
+                _delay += 2 * TEN_MILLI_SEC;
+                delayChanged();
+            }
+
+        }
+
+        /** Reset the number of iterations before we say the delay has stabilised. */
+        private void delayChanged()
+        {
+            _delayShifting = REPORTS_WITHOUT_CHANGE;
+        }
+
+        /**
+         * Record the fact that delay has stabilised If delay has stablised for REPORTS_WITHOUT_CHANGE then it will
+         * output Delay stabilised
+         */
+        private void delayStable()
+        {
+            _delayShifting--;
+
+            if (_delayShifting < 0)
+            {
+                _delayShifting = 0;
+                console.debug("Delay stabilised:" + _delay);
+            }
+        }
+
+        /**
+         * Checks that the client has received enough messages. If the client has fallen behind then they are put in the
+         * _slowClients lists which will increase the delay.
+         *
+         * @param client   The client identifier to check
+         * @param received the number of messages received by that client
+         * @param batchNumber
+         */
+        private void recordSlow(String client, long received, int batchNumber)
+        {
+            if (Math.abs(batchNumber - (_sent / _batchSize)) > _batchVariance)
+            {
+                _slowClients.put(client, received);
+            }
+            else
+            {
+                _slowClients.remove(client);
+            }
+        }
+
+        /** Incrment the number of sent messages and then sleep, if required. */
+        public void sentMessage()
+        {
+
+            _sent++;
+
+            if (_delay > (TEN_MILLI_SEC * _batchSize))
+            {
+                long batchDelay = _delay / _batchSize;
+                // less than 10ms sleep doesn't always work.
+                // _delay is in nano seconds
+                // if (batchDelay < (TEN_MILLI_SEC))
+                // {
+                // sleep(0, (int) batchDelay);
+                // }
+                // else
+                {
+                    // if (batchDelay < 30000000000L)
+                    {
+                        sleepLong(batchDelay);
+                    }
+                }
+            }
+            else
+            {
+                if (SLEEP_PER_MESSAGE && (_delay > 0))
+                {
+                    sleepLong(_delay / _batchSize);
+                }
+            }
+        }
+
+        /**
+         * Check at the end of each batch and pause sending messages to allow slow clients to catch up.
+         *
+         * @return true if there were slow clients that caught up.
+         */
+        private boolean checkForSlowClients()
+        {
+            // This will allways be true as we are running this at the end of each batchSize
+            // if (_sent % _batchSize == 0)
+            {
+                // Cause test to pause when we have slow
+                if (!_slowClients.isEmpty() || NO_CLIENTS)
+                {
+
+                    while (!_slowClients.isEmpty())
+                    {
+                        if (log.isInfoEnabled() && ((_sent / _batchSize % DEBUG_LOG_UPATE_INTERVAL) == 0))
+                        {
+                            String clients = "";
+                            Iterator it = _slowClients.keySet().iterator();
+                            while (it.hasNext())
+                            {
+                                clients += it.next();
+                                if (it.hasNext())
+                                {
+                                    clients += ", ";
+                                }
+                            }
+
+                            log.info("Pausing for slow clients:" + clients);
+                        }
+
+                        if (console.isDebugEnabled() && ((_sent / _batchSize % LOG_UPATE_INTERVAL) == 0))
+                        {
+                            console.debug(_slowClients.size() + " slow clients.");
+                        }
+
+                        sleep(PAUSE_SLEEP);
+                    }
+
+                    if (NO_CLIENTS)
+                    {
+                        sleep(NO_CLIENT_SLEEP);
+                    }
+
+                    log.debug("Continuing");
+
+                    return true;
+                }
+                else
+                {
+                    if ((_sent / _batchSize % LOG_UPATE_INTERVAL) == 0)
+                    {
+                        console.info("Total Delay :" + _delay + " "
+                            + ((_delayShifting == 0) ? "Stablised" : ("Not Stablised(" + _delayShifting + ")")));
+                    }
+                }
+
+            }
+
+            return false;
+        }
+
+        /**
+         * Sleep normally takes micro-seconds this allows the use of a nano-second value.
+         *
+         * @param delay nanoseconds to sleep for.
+         */
+        private void sleepLong(long delay)
+        {
+            sleep(delay / 1000000, (int) (delay % 1000000));
+        }
+
+        /**
+         * Sleep for the specified micro-seconds.
+         * @param sleep microseconds to sleep for.
+         */
+        private void sleep(long sleep)
+        {
+            sleep(sleep, 0);
+        }
+
+        /**
+         * Perform the sleep , swallowing any InteruptException.
+         *
+         * NOTE: If a sleep request is > 10s then reset only sleep for 5s
+         *
+         * @param milli to sleep for
+         * @param nano sub miliseconds to sleep for
+         */
+        private void sleep(long milli, int nano)
+        {
+            try
+            {
+                log.debug("Sleep:" + milli + ":" + nano);
+                if (milli > 10000)
+                {
+
+                    if (_delay == milli)
+                    {
+                        _totalDuration = _totalReceived / _batchSize * EXPECTED_TIME_PER_BATCH;
+                        log.error("Sleeping for more than 10 seconds adjusted to 5s!:" + (milli / 1000)
+                            + "s. Reset _totalDuration:" + _totalDuration);
+                    }
+                    else
+                    {
+                        log.error("Sleeping for more than 10 seconds adjusted to 5s!:" + (milli / 1000) + "s");
+                    }
+
+                    milli = 5000;
+                }
+
+                Thread.sleep(milli, nano);
+            }
+            catch (InterruptedException e)
+            {
+                //
+            }
+        }
+
+        public void setClient(SustainedClientTestCase client)
+        {
+            _client = client;
+        }
+    }
+
+}

Added: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCase.java?view=auto&rev=556958
==============================================================================
--- incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCase.java (added)
+++ incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCase.java Tue Jul 17 09:22:16 2007
@@ -0,0 +1,125 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.sustained;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.interop.coordinator.DropInTest;
+import org.apache.qpid.interop.coordinator.TestClientDetails;
+import org.apache.qpid.interop.coordinator.FanOutTestCase;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * SustainedTestCase is a {@link FanOutTestCase} that runs the "Perf_SustainedPubSub" test case. This consists of one
+ * test client sending, and several receiving, and attempts to find the highest rate at which messages can be broadcast
+ * to the receivers. It is also a {@link DropInTest} to which more test clients may be added during a test run.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td>
+ * </table>
+ */
+public class SustainedTestCase extends FanOutTestCase implements DropInTest
+{
+    /** Used for debugging. */
+    Logger log = Logger.getLogger(SustainedTestCase.class);
+
+    /** Holds the root name of the topic on which to send the test messages. */
+    private static final String SUSTAINED_KEY = "Perf_SustainedPubSub";
+
+    /**
+     * Creates a new coordinating test case with the specified name.
+     *
+     * @param name The test case name.
+     */
+    public SustainedTestCase(String name)
+    {
+        super(name);
+    }
+
+    /**
+     * Performs a single test run of the sustained test.
+     *
+     * @throws Exception Any exceptions are allowed to fall through and fail the test.
+     */
+    public void testBasicPubSub() throws Exception
+    {
+        log.debug("public void testSinglePubSubCycle(): called");
+
+        Map<String, Object> testConfig = new HashMap<String, Object>();
+        testConfig.put("TEST_NAME", "Perf_SustainedPubSub");
+        testConfig.put("SUSTAINED_KEY", SUSTAINED_KEY);
+        testConfig.put("SUSTAINED_NUM_RECEIVERS", Integer.getInteger("numReceives", 2));
+        testConfig.put("SUSTAINED_UPDATE_INTERVAL", Integer.getInteger("batchSize", 1000));
+        testConfig.put("SUSTAINED_UPDATE_KEY", SUSTAINED_KEY + ".UPDATE");
+        testConfig.put("ACKNOWLEDGE_MODE", Integer.getInteger("ackMode", AMQSession.AUTO_ACKNOWLEDGE));
+
+        log.info("Created Config: " + testConfig.entrySet().toArray());
+
+        sequenceTest(testConfig);
+    }
+
+    /**
+     * Accepts a late joining client into this test case. The client will be enlisted with a control message
+     * with the 'CONTROL_TYPE' field set to the value 'LATEJOIN'. It should also provide values for the fields:
+     *
+     * <p/><table>
+     * <tr><td> CLIENT_NAME <td> A unique name for the new client.
+     * <tr><td> CLIENT_PRIVATE_CONTROL_KEY <td> The key for the route on which the client receives its control messages.
+     * </table>
+     *
+     * @param message The late joiners join message.
+     *
+     * @throws JMSException Any JMS Exception are allowed to fall through, indicating that the join failed.
+     */
+    public void lateJoin(Message message) throws JMSException
+    {
+        // Extract the joining clients details from its join request message.
+        TestClientDetails clientDetails = new TestClientDetails();
+        clientDetails.clientName = message.getStringProperty("CLIENT_NAME");
+        clientDetails.privateControlKey = message.getStringProperty("CLIENT_PRIVATE_CONTROL_KEY");
+
+        // Register the joining client, but do block for confirmation as cannot do a synchronous receiver during this
+        // method call, as it may have been called from an 'onMessage' method.
+        assignReceiverRole(clientDetails, new HashMap<String, Object>(), false);
+    }
+
+    /**
+     * Should provide a translation from the junit method name of a test to its test case name as known to the test
+     * clients that will run the test. The purpose of this is to convert the JUnit method name into the correct test
+     * case name to place into the test invite. For example the method "testP2P" might map onto the interop test case
+     * name "TC2_BasicP2P".
+     *
+     * @param methodName The name of the JUnit test method.
+     *
+     * @return The name of the corresponding interop test case.
+     */
+    public String getTestCaseNameForTestMethod(String methodName)
+    {
+        return "Perf_SustainedPubSub";
+    }
+}



Mime
View raw message