qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rupertlssm...@apache.org
Subject svn commit: r612874 [4/5] - in /incubator/qpid/branches/M2.1/dotnet: Qpid.Buffer.Tests/Properties/ Qpid.Buffer/Properties/ Qpid.Client.Tests/Properties/ Qpid.Client.Transport.Socket.Blocking/Properties/ Qpid.Client/Properties/ Qpid.Codec/Properties/ Qp...
Date Thu, 17 Jan 2008 17:13:23 GMT
Added: incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/clocksynch/ClockSynchFailureException.csx
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/clocksynch/ClockSynchFailureException.csx?rev=612874&view=auto
==============================================================================
--- incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/clocksynch/ClockSynchFailureException.csx (added)
+++ incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/clocksynch/ClockSynchFailureException.csx Thu Jan 17 09:13:11 2008
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+namespace Apache.Qpid.Integration.Tests.framework.clocksynch
+{
+    /// <summary>
+    /// ClockSynchFailureException represents failure of a <see cref="ClockSynchronizer"/> to achieve synchronization. For example,
+    /// this could be because a reference signal is not available, or because a desired accurracy cannot be attained.
+    ///
+    /// <p/><table id="crc"><caption>CRC Card</caption>
+    /// <tr><th> Responsibilities <th> Collaborations
+    /// <tr><td> Represent failure to achieve synchronization.
+    /// </table>
+    /// </summary>
+    public class ClockSynchFailureException extends Exception
+    {
+        /// <summary>
+        /// Creates a clock synch failure exception.
+        /// </summary>
+        /// <param name="message"> The detail message (which is saved for later retrieval by the <see cref="#getMessage()"/> method). </param>
+        /// <param name="cause">   The cause (which is saved for later retrieval by the <see cref="#getCause()"/> method).  (A <tt>null</tt>
+        ///                        value is permitted, and indicates that the cause is nonexistent or unknown.)</param>
+        public ClockSynchFailureException(string message, Throwable cause)
+        {
+            super(message, cause);
+        }
+    }
+}
\ No newline at end of file

Added: incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/clocksynch/ClockSynchThread.csx
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/clocksynch/ClockSynchThread.csx?rev=612874&view=auto
==============================================================================
--- incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/clocksynch/ClockSynchThread.csx (added)
+++ incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/clocksynch/ClockSynchThread.csx Thu Jan 17 09:13:11 2008
@@ -0,0 +1,117 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using log4net;
+
+using uk.co.thebadgerset.junit.extensions.ShutdownHookable;
+using uk.co.thebadgerset.junit.extensions.Throttle;
+
+namespace Apache.Qpid.Integration.Tests.framework.clocksynch
+{
+    /// <summary>
+    /// ClockSynchThread is a convenient utility for running a thread that periodically synchronizes the clock against
+    /// a reference. Supply it with a <see cref="ClockSynchronizer"/> and a <see cref="Throttle"/> and it will continually keep the
+    /// clock up-to-date at a rate determined by the throttle.
+    ///
+    /// <p/><table id="crc"><caption>CRC Card</caption>
+    /// <tr><th> Responsibilities <th> Collaborations
+    /// <tr><td> Continually sychronize the clock at a throttled rate.
+    /// </table>
+    /// </summary>
+    public class ClockSynchThread extends Thread : ShutdownHookable
+    {
+        /// <summary> Used for debugging. </summary>
+        private static ILog log = LogManager.GetLogger(typeof(ClockSynchThread));
+
+        /// <summary> Holds the clock syncher for the synch thread. </summary>
+        private ClockSynchronizer clockSyncher;
+
+        /// <summary> Holds the throttle to limit the synch rate. </summary>
+        private Throttle throttle;
+
+        /// <summary> Flag to indicate that the periodic clock syncher should keep running. </summary>
+        bool doSynch = true;
+
+        /// <summary>
+        /// Creates a clock synchronizer thread from a clock synchronizer and a throttle.
+        /// </summary>
+        /// <param name="syncher">  The clock synchronizer. </param>
+        /// <param name="throttle"> The throttle. </param>
+        public ClockSynchThread(ClockSynchronizer syncher, Throttle throttle)
+        {
+            this.clockSyncher = syncher;
+            this.throttle = throttle;
+        }
+
+        /// <summary> Terminates the synchronization thread. </summary>
+        public void terminate()
+        {
+            doSynch = false;
+        }
+
+        /// <summary> Continually updates the clock, until <see cref="#terminate()"/> is called. </summary>
+        public void run()
+        {
+            while (doSynch)
+            {
+                // Perform a clock clockSynch.
+                try
+                {
+                    // Wait controlled by the throttle before doing the next synch.
+                    throttle.throttle();
+
+                    clockSyncher.synch();
+                    log.debug("Clock synched, delta = " + clockSyncher.getDelta() + ", epsilon = " + clockSyncher.getEpsilon()
+                              + ".");
+                }
+                // Terminate the synch thread if the synchronization cannot be achieved.
+                catch (ClockSynchFailureException e)
+                {
+                    log.debug("Cannot synchronize the clock (reference service may be down). Terminating the synch thread.");
+                    doSynch = false;
+                }
+            }
+        }
+
+        /// <summary>
+        /// Gets the clock synchronizer that is kept continually up to date.
+        /// </summary>
+        /// <return> The clock synchronizer that is kept continually up to date. </return>
+        public ClockSynchronizer getClockSyncher()
+        {
+            return clockSyncher;
+        }
+
+        /// <summary>
+        /// Supplies a shutdown hook, that terminates the synching thread.
+        /// </summary>
+        /// <return> The shut down hook. </return>
+        public Thread getShutdownHook()
+        {
+            return new Thread(new Runnable()
+                {
+                    public void run()
+                    {
+                        doSynch = false;
+                    }
+                });
+        }
+    }
+}
\ No newline at end of file

Added: incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/clocksynch/ClockSynchronizer.csx
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/clocksynch/ClockSynchronizer.csx?rev=612874&view=auto
==============================================================================
--- incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/clocksynch/ClockSynchronizer.csx (added)
+++ incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/clocksynch/ClockSynchronizer.csx Thu Jan 17 09:13:11 2008
@@ -0,0 +1,66 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+namespace Apache.Qpid.Integration.Tests.framework.clocksynch
+{
+    /// <summary>
+    /// ClockSynchronizer provides an interface through which two nodes may synchronize their clocks. It is expected that one
+    /// node will act as the reference clock, to which no delta need be applied, and the other node will act as the slave,
+    /// and which must apply a delta to its local clock to get a clock synchronized with the reference.
+    ///
+    /// <p/>The slave side will initiate the computation of a clock delta by calling the <see cref="#synch"/> method. This method
+    /// will not return until the delta has been computed, at which point there is a method to return its value, as well as
+    /// an estimate of the likely error (usually one standard deviation), in the synchronization. For convenience there is a
+    /// <see cref="#nanoTime"/> method to return the value of System.nanoTime() with the delta added in.
+    ///
+    /// <p/><table id="crc"><caption>CRC Card</caption>
+    /// <tr><th> Responsibilities <th> Collaborations
+    /// <tr><td> Trigger a clock synchronization.
+    /// <tr><td> Compute a clock delta to apply to the local clock.
+    /// <tr><td> Estimate the error in the synchronzation.
+    /// </table>
+    /// </summary>
+    public interface ClockSynchronizer
+    {
+        /// <summary>
+        /// The slave side should call this to copute a clock delta with the reference.
+        /// </summary>
+        /// <exception cref="ClockSynchFailureException"> If synchronization cannot be achieved. </exception>
+        public void synch() throws ClockSynchFailureException;
+
+        /// <summary>
+        /// Gets the clock delta in nano seconds.
+        /// </summary>
+        /// <return> The clock delta in nano seconds. </return>
+        public long getDelta();
+
+        /// <summary>
+        /// Gets an estimate of the clock error in nan seconds.
+        /// </summary>
+        /// <return> An estimate of the clock error in nan seconds. </return>
+        public long getEpsilon();
+
+        /// <summary>
+        /// Gets the local clock time with any computed delta added in.
+        /// </summary>
+        /// <return> The local clock time with any computed delta added in. </return>
+        public long nanoTime();
+    }
+}
\ No newline at end of file

Added: incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/clocksynch/LocalClockSynchronizer.csx
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/clocksynch/LocalClockSynchronizer.csx?rev=612874&view=auto
==============================================================================
--- incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/clocksynch/LocalClockSynchronizer.csx (added)
+++ incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/clocksynch/LocalClockSynchronizer.csx Thu Jan 17 09:13:11 2008
@@ -0,0 +1,70 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+namespace Apache.Qpid.Integration.Tests.framework.clocksynch
+{
+
+    /// <summary>
+    /// LocalClockSynchronizer is a fake <see cref="ClockSynchronizer"/> that simply calls System.nanoTime(). It exists so that
+    /// the same tests can be run distributed or locally, taking timings against the ClockSynchronizer interface without
+    /// being aware of how they are being run.
+    ///
+    /// <p/><table id="crc"><caption>CRC Card</caption>
+    /// <tr><th> Responsibilities <th> Collaborations
+    /// <tr><td> Supply the local clock with no delta.
+    /// </table>
+    /// </summary>
+    public class LocalClockSynchronizer : ClockSynchronizer
+    {
+        /// <summary>
+        /// The slave side should call this to copute a clock delta with the reference.
+        /// </summary>
+        /// <exception cref="Apache.Qpid.Integration.Tests.framework.clocksynch.ClockSynchFailureException"> If synchronization cannot be achieved. </exception>
+        public void synch() throws ClockSynchFailureException
+        { }
+
+        /// <summary>
+        /// Gets the clock delta in nano seconds.
+        /// </summary>
+        /// <return> The clock delta in nano seconds. </return>
+        public long getDelta()
+        {
+            return 0L;
+        }
+
+        /// <summary>
+        /// Gets an estimate of the clock error in nan seconds.
+        /// </summary>
+        /// <return> An estimate of the clock error in nan seconds. </return>
+        public long getEpsilon()
+        {
+            return 0L;
+        }
+
+        /// <summary>
+        /// Gets the local clock time with any computed delta added in.
+        /// </summary>
+        /// <return> The local clock time with any computed delta added in. </return>
+        public long nanoTime()
+        {
+            return System.nanoTime();
+        }
+    }
+}
\ No newline at end of file

Added: incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/clocksynch/UDPClockSynchronizer.csx
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/clocksynch/UDPClockSynchronizer.csx?rev=612874&view=auto
==============================================================================
--- incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/clocksynch/UDPClockSynchronizer.csx (added)
+++ incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/clocksynch/UDPClockSynchronizer.csx Thu Jan 17 09:13:11 2008
@@ -0,0 +1,453 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using log4net;
+
+using uk.co.thebadgerset.junit.extensions.util.CommandLineParser;
+using uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
+
+using java.io.IOException;
+using java.net.*;
+using java.nio.ByteBuffer;
+using java.util.Arrays;
+
+namespace Apache.Qpid.Integration.Tests.framework.clocksynch
+{
+    /// <summary>
+    /// UDPClockSynchronizer is a <see cref="ClockSynchronizer"/> that sends pings as UDP datagrams, and uses the following simple
+    /// algorithm to perform clock synchronization:
+    ///
+    /// <ol>
+    /// <li>Slave initiates synchronization with a Reference clock.</li>
+    /// <li>Slave stamps current local time on a "time request" message and sends to the Reference.</li>
+    /// <li>Upon receipt by Reference, Reference stamps Reference-time and returns.</li>
+    /// <li>Upon receipt by Slave, Slave subtracts current time from sent time and divides by two to compute latency. It
+    ///     subtracts current time from Reference time to determine Slave-Reference time delta and adds in the
+    ///     half-latency to get the correct clock delta.</li>
+    /// <li>The first result is immediately used to update the clock since it will get the local clock into at least
+    ///     the right ballpark.</li>
+    /// <li>The Slave repeats steps 2 through 4, 15 more times.</li>
+    /// <li>The results of the packet receipts are accumulated and sorted in lowest-latency to highest-latency order. The
+    ///     median latency is determined by picking the mid-point sample from this ordered list.</li>
+    /// <li>All samples outside 1 standard-deviation from the median are discarded and the remaining samples
+    ///     are averaged using an arithmetic mean.</li>
+    /// </ol>
+    ///
+    /// <p/>The use of UDP datagrams, instead of TCP based communication eliminates the hidden delays that TCP can introduce,
+    /// as it can transparently re-order or re-send packets, or introduce delays as packets are naggled.
+    ///
+    /// <p/><table id="crc"><caption>CRC Card</caption>
+    /// <tr><th> Responsibilities <th> Collaborations
+    /// <tr><td> Trigger a clock synchronziation.
+    /// <tr><td> Compute a clock delta to apply to the local clock.
+    /// <tr><td> Estimate the error in the synchronzation.
+    /// </table>
+    /// </summary>
+    public class UDPClockSynchronizer : ClockSynchronizer
+    {
+        /// <summary> Used for debugging. </summary>
+        // private static ILog log = LogManager.GetLogger(typeof(UDPClockSynchronizer));
+
+        /// <summary> Defines the timeout to use when waiting for responses to time requests. </summary>
+        private static final int TIMEOUT = 50;
+
+        /// <summary> The clock delta. </summary>
+        private long delta = 0L;
+
+        /// <summary> Holds an estimate of the clock error relative to the reference clock. </summary>
+        private long epsilon = 0L;
+
+        /// <summary> Holds the address of the reference clock. </summary>
+        private InetAddress referenceAddress;
+
+        /// <summary> Holds the socket to communicate with the reference service over. </summary>
+        private DatagramSocket socket;
+
+        /// <summary> Used to control the shutdown in the main test loop. </summary>
+        private static bool doSynch = true;
+
+        /// <summary>
+        /// Creates a clock synchronizer against the specified address for the reference.
+        /// </summary>
+        /// <param name="address"> The address of the reference service. </param>
+        public UDPClockSynchronizer(string address)
+        {
+            try
+            {
+                referenceAddress = InetAddress.getByName(address);
+            }
+            catch (UnknownHostException e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+
+        /// <summary>
+        /// The slave side should call this to compute a clock delta with the reference.
+        /// </summary>
+        /// <exception cref="ClockSynchFailureException"> If synchronization cannot be achieved, due to unavailability of the reference
+        /// time service. </exception>
+        public void synch() throws ClockSynchFailureException
+        {
+            try
+            {
+                socket = new DatagramSocket();
+                socket.setSoTimeout(TIMEOUT);
+
+                // Synchronize on a single ping, to get the clock into the right ball-park.
+                synch(1);
+
+                // Synchronize on 15 pings.
+                synch(15);
+
+                // And again, for greater accuracy, on 31.
+                synch(31);
+
+                socket.close();
+            }
+            catch (SocketException e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+
+        /// <summary>
+        /// Updates the synchronization delta by performing the specified number of reference clock requests.
+        /// </summary>
+        /// <param name="n"> The number of reference clock request cycles to perform. </param>
+        ///
+        /// <exception cref="ClockSynchFailureException"> If synchronization cannot be achieved, due to unavailability of the reference 
+        ///                                               time service. </exception>
+        protected void synch(int n) throws ClockSynchFailureException
+        {
+            // log.debug("protected void synch(int n = " + n + "): called");
+
+            // Create an array of deltas by performing n reference pings.
+            long[] delta = new long[n];
+
+            for (int i = 0; i < n; i++)
+            {
+                delta[i] = ping();
+            }
+
+            // Reject any deltas that are larger than 1 s.d. above the median.
+            long median = median(delta);
+            long sd = standardDeviation(delta);
+
+            // log.debug("median = " + median);
+            // log.debug("sd = " + sd);
+
+            long[] tempDeltas = new long[n];
+            int count = 0;
+
+            for (int i = 0; i < n; i++)
+            {
+                if ((delta[i] <= (median + sd)) && (delta[i] >= (median - sd)))
+                {
+                    tempDeltas[count] = delta[i];
+                    count++;
+                }
+                else
+                {
+                    // log.debug("Rejected: " + delta[i]);
+                }
+            }
+
+            System.arraycopy(tempDeltas, 0, delta, 0, count);
+
+            // Estimate the delta as the mean of the remaining deltas.
+            this.delta += mean(delta);
+
+            // Estimate the error as the standard deviation of the remaining deltas.
+            this.epsilon = standardDeviation(delta);
+
+            // log.debug("this.delta = " + this.delta);
+            // log.debug("this.epsilon = " + this.epsilon);
+        }
+
+        /// <summary>
+        /// Performs a single reference clock request cycle and returns the estimated delta relative to the local clock.
+        /// This is computed as the half-latency of the requst cycle, plus the reference clock, minus the local clock.
+        /// </summary>
+        /// <return> The estimated clock delta. </return>
+        ///
+        /// <exception cref="ClockSynchFailureException"> If the reference service is not responding. </exception>
+        protected long ping() throws ClockSynchFailureException
+        {
+            // log.debug("protected long ping(): called");
+
+            try
+            {
+                byte[] buf = new byte[256];
+
+                bool timedOut = false;
+                long start = 0L;
+                long refTime = 0L;
+                long localTime = 0L;
+                long latency = 0L;
+                int failCount = 0;
+
+                // Keep trying the ping until it gets a response, or 10 tries in a row all time out.
+                do
+                {
+                    // Start timing the request latency.
+                    start = nanoTime();
+
+                    // Get the reference time.
+                    DatagramPacket packet =
+                        new DatagramPacket(buf, buf.length, referenceAddress, UDPClockReference.REFERENCE_PORT);
+                    socket.send(packet);
+                    packet = new DatagramPacket(buf, buf.length);
+
+                    timedOut = false;
+
+                    try
+                    {
+                        socket.receive(packet);
+                    }
+                    catch (SocketTimeoutException e)
+                    {
+                        timedOut = true;
+                        failCount++;
+
+                        continue;
+                    }
+
+                    ByteBuffer bbuf = ByteBuffer.wrap(packet.getData());
+                    refTime = bbuf.getLong();
+
+                    // Stop timing the request latency.
+                    localTime = nanoTime();
+                    latency = localTime - start;
+
+                    // log.debug("refTime = " + refTime);
+                    // log.debug("localTime = " + localTime);
+                    // log.debug("start = " + start);
+                    // log.debug("latency = " + latency);
+                    // log.debug("delta = " + ((latency / 2) + (refTime - localTime)));
+
+                }
+                while (timedOut && (failCount < 10));
+
+                // Fail completely if the fail count is too high.
+                if (failCount >= 10)
+                {
+                    throw new ClockSynchFailureException("Clock reference not responding.", null);
+                }
+
+                // Estimate delta as (ref clock + half-latency) - local clock.
+                return (latency / 2) + (refTime - localTime);
+            }
+            catch (IOException e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+
+        /// <summary>
+        /// Gets the clock delta in nano seconds.
+        /// </summary>
+        /// <return> The clock delta in nano seconds. </return>
+        public long getDelta()
+        {
+            return delta;
+        }
+
+        /// <summary>
+        /// Gets an estimate of the clock error in nan seconds.
+        /// </summary>
+        /// <return> An estimate of the clock error in nan seconds. </return>
+        public long getEpsilon()
+        {
+            return epsilon;
+        }
+
+        /// <summary>
+        /// Gets the local clock time with any computed delta added in.
+        /// </summary>
+        /// <return> The local clock time with any computed delta added in. </return>
+        public long nanoTime()
+        {
+            return System.nanoTime() + delta;
+        }
+
+        /// <summary>
+        /// Computes the median of a series of values.
+        /// </summary>
+        /// <param name="values"> The values. </param>
+        ///
+        /// <return> The median. </return>
+        public static long median(long[] values)
+        {
+            // log.debug("public static long median(long[] values = " + Arrays.ToString(values) + "): called");
+
+            long median;
+
+            // Order the list of values.
+            long[] orderedValues = new long[values.length];
+            System.arraycopy(values, 0, orderedValues, 0, values.length);
+            Arrays.sort(orderedValues);
+
+            // Check if the median is computed from a pair of middle value.
+            if ((orderedValues.length % 2) == 0)
+            {
+                int middle = orderedValues.length / 2;
+
+                median = (orderedValues[middle] + orderedValues[middle - 1]) / 2;
+            }
+            // The median is computed from a single middle value.
+            else
+            {
+                median = orderedValues[orderedValues.length / 2];
+            }
+
+            // log.debug("median = " + median);
+
+            return median;
+        }
+
+        /// <summary>
+        /// Computes the mean of a series of values.
+        /// </summary>
+        /// <param name="values"> The values. </param>
+        ///
+        /// <return> The mean. </return>
+        public static long mean(long[] values)
+        {
+            // log.debug("public static long mean(long[] values = " + Arrays.ToString(values) + "): called");
+
+            long total = 0L;
+
+            for (long value : values)
+            {
+                total += value;
+            }
+
+            long mean = total / values.length;
+
+            // log.debug("mean = " + mean);
+
+            return mean;
+        }
+
+        /// <summary>
+        /// Computes the variance of series of values.
+        /// </summary>
+        /// <param name="values"> The values. </param>
+        ///
+        /// <return> The variance of the values. </return>
+        public static long variance(long[] values)
+        {
+            // log.debug("public static long variance(long[] values = " + Arrays.ToString(values) + "): called");
+
+            long mean = mean(values);
+
+            long totalVariance = 0;
+
+            for (long value : values)
+            {
+                long diff = (value - mean);
+                totalVariance += diff/// diff;
+                    }
+
+            long variance = totalVariance / values.length;
+
+            // log.debug("variance = " + variance);
+
+            return variance;
+        }
+
+        /// <summary>
+        /// Computes the standard deviation of a series of values.
+        /// </summary>
+        /// <param name="values"> The values. </param>
+        ///
+        /// <return> The standard deviation. </return>
+        public static long standardDeviation(long[] values)
+        {
+            // log.debug("public static long standardDeviation(long[] values = " + Arrays.ToString(values) + "): called");
+
+            long sd = Double.valueOf(Math.sqrt(variance(values))).longValue();
+
+            // log.debug("sd = " + sd);
+
+            return sd;
+        }
+
+        /// <summary>
+        /// For testing purposes. Supply address of reference clock as arg 1.
+        /// </summary>
+        /// <param name="args"> Address of reference clock as arg 1. </param>
+        public static void main(String[] args)
+        {
+            ParsedProperties options =
+                new ParsedProperties(CommandLineParser.processCommandLine(args,
+                                                                          new CommandLineParser(
+                                                                                                new String[][]
+                                                                                                {
+                                                                                                    { "1", "Address of clock reference service.", "address", "true" }
+                                                                                                }), System.getProperties()));
+
+            string address = options.getProperty("1");
+
+            // Create a clock synchronizer.
+            UDPClockSynchronizer clockSyncher = new UDPClockSynchronizer(address);
+
+            // Set up a shutdown hook for it.
+            Runtime.getRuntime().addShutdownHook(new Thread(new Runnable()
+                {
+                    public void run()
+                    {
+                        doSynch = false;
+                    }
+                }));
+
+            // Repeat the clock synching until the user kills the progam.
+            while (doSynch)
+            {
+                // Perform a clock clockSynch.
+                try
+                {
+                    clockSyncher.synch();
+
+                    // Print out the clock delta and estimate of the error.
+                    System.out.println("Delta = " + clockSyncher.getDelta());
+                    System.out.println("Epsilon = " + clockSyncher.getEpsilon());
+
+                    try
+                    {
+                        Thread.sleep(250);
+                    }
+                    catch (InterruptedException e)
+                    {
+                        // Restore the interrupted status and terminate the loop.
+                        Thread.currentThread().interrupt();
+                        doSynch = false;
+                    }
+                }
+                // Terminate if the reference time service is unavailable.
+                catch (ClockSynchFailureException e)
+                {
+                    doSynch = false;
+                }
+            }
+        }
+    }
+}
\ No newline at end of file

Added: incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/distributedcircuit/TestClient.csx
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/distributedcircuit/TestClient.csx?rev=612874&view=auto
==============================================================================
--- incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/distributedcircuit/TestClient.csx (added)
+++ incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/distributedcircuit/TestClient.csx Thu Jan 17 09:13:11 2008
@@ -0,0 +1,493 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using log4net;
+using org.apache.log4j.NDC;
+
+using Apache.Qpid.Integration.Tests.framework.MessagingTestConfigProperties;
+using Apache.Qpid.Integration.Tests.framework.TestUtils;
+using Apache.Qpid.Integration.Tests.framework.clocksynch.ClockSynchThread;
+using Apache.Qpid.Integration.Tests.framework.clocksynch.UDPClockSynchronizer;
+using org.apache.qpid.util.ReflectionUtils;
+using org.apache.qpid.util.ReflectionUtilsException;
+
+using uk.co.thebadgerset.junit.extensions.SleepThrottle;
+using uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
+using uk.co.thebadgerset.junit.extensions.util.TestContextProperties;
+
+using javax.jms.*;
+
+using java.util.*;
+
+namespace Apache.Qpid.Integration.Tests.framework.distributedtesting
+{
+    /// <summary>
+    /// Implements a test client as described in the interop testing spec
+    /// (http://cwiki.apache.org/confluence/display/qpid/Interop+Testing+Specification). A test client is an agent that
+    /// reacts to control message sequences send by the test <see cref="Coordinator"/>.
+    ///
+    /// <p/><table><caption>Messages Handled by TestClient</caption>
+    /// <tr><th> Message               <th> Action
+    /// <tr><td> Invite(compulsory)    <td> Reply with Enlist.
+    /// <tr><td> Invite(test case)     <td> Reply with Enlist if test case available.
+    /// <tr><td> AssignRole(test case) <td> Reply with Accept Role if matches an enlisted test. Keep test parameters.
+    /// <tr><td> Start                 <td> Send test messages defined by test parameters. Send report on messages sent.
+    /// <tr><td> Status Request        <td> Send report on messages received.
+    /// <tr><td> Terminate             <td> Terminate the test client.
+    /// <tr><td> ClockSynch            <td> Synch clock against the supplied UDP address.
+    /// </table>
+    ///
+    /// <p><table id="crc"><caption>CRC Card</caption>
+    /// <tr><th> Responsibilities <th> Collaborations
+    /// <tr><td> Handle all incoming control messages. <td> <see cref="TestClientControlledTest"/>
+    /// <tr><td> Configure and look up test cases by name. <td> <see cref="TestClientControlledTest"/>
+    /// </table>
+    /// </summary>
+    public class TestClient : MessageListener
+    {
+        /// <summary> Used for debugging. </summary>
+        private static ILog log = LogManager.GetLogger(typeof(TestClient));
+
+        /// <summary> Used for reporting to the console. </summary>
+        private static ILog console = LogManager.GetLogger("CONSOLE");
+
+        /// <summary> Holds the default identifying name of the test client. </summary>
+        public static final string CLIENT_NAME = "java";
+
+        /// <summary> Holds the URL of the broker to run the tests on. </summary>
+        public static string brokerUrl;
+
+        /// <summary> Holds the virtual host to run the tests on. If <tt>null</tt>, then the default virtual host is used. </summary>
+        public static string virtualHost;
+
+        /// <summary>
+        /// Holds the test context properties that provides the default test parameters, plus command line overrides.
+        /// This is initialized with the default test parameters, to which command line overrides may be applied.
+        /// </summary>
+        public static ParsedProperties testContextProperties =
+            TestContextProperties.getInstance(MessagingTestConfigProperties.defaults);
+
+        /// <summary> Holds all the test cases loaded from the classpath. </summary>
+        Map<String, TestClientControlledTest> testCases = new HashMap<String, TestClientControlledTest>();
+
+        /// <summary> Holds the test case currently being run by this client. </summary>
+        protected TestClientControlledTest currentTestCase;
+
+        /// <summary> Holds the connection to the broker that the test is being coordinated on. </summary>
+        protected Connection connection;
+
+        /// <summary> Holds the message producer to hold the test coordination over. </summary>
+        protected MessageProducer producer;
+
+        /// <summary> Holds the JMS controlSession for the test coordination. </summary>
+        protected Session session;
+
+        /// <summary> Holds the name of this client, with a default value. </summary>
+        protected string clientName = CLIENT_NAME;
+
+        /// <summary> This flag indicates that the test client should attempt to join the currently running test case on start up. </summary>
+        protected bool join;
+
+        /// <summary> Holds the clock synchronizer for the test node. </summary>
+        ClockSynchThread clockSynchThread;
+
+        /// <summary>
+        /// Creates a new interop test client, listenting to the specified broker and virtual host, with the specified client
+        /// identifying name.
+        /// </summary>
+        /// <param name="pBrokerUrl">   The url of the broker to connect to. </param>
+        /// <param name="pVirtualHost"> The virtual host to conect to. </param>
+        /// <param name="clientName">  The client name to use. </param>
+        /// <param name="join">        Flag to indicate that this client should attempt to join running tests. </param>
+        public TestClient(string pBrokerUrl, string pVirtualHost, string clientName, bool join)
+        {
+            log.debug("public TestClient(string pBrokerUrl = " + pBrokerUrl + ", string pVirtualHost = " + pVirtualHost
+                      + ", string clientName = " + clientName + ", bool join = " + join + "): called");
+
+            // Retain the connection parameters.
+            brokerUrl = pBrokerUrl;
+            virtualHost = pVirtualHost;
+            this.clientName = clientName;
+            this.join = join;
+        }
+
+        /// <summary>
+        /// The entry point for the interop test coordinator. This client accepts the following command line arguments:
+        ///
+        /// <p/><table>
+        /// <tr><td> -b         <td> The broker URL.       <td> Optional.
+        /// <tr><td> -h         <td> The virtual host.     <td> Optional.
+        /// <tr><td> -n         <td> The test client name. <td> Optional.
+        /// <tr><td> name=value <td> Trailing argument define name/value pairs. Added to system properties. <td> Optional.
+        /// </table>
+        /// </summary>
+        /// <param name="args"> The command line arguments. </param>
+        public static void main(String[] args)
+        {
+            log.debug("public static void main(String[] args = " + Arrays.ToString(args) + "): called");
+            console.info("Qpid Distributed Test Client.");
+
+            // Override the default broker url to be localhost:5672.
+            testContextProperties.setProperty(MessagingTestConfigProperties.BROKER_PROPNAME, "tcp://localhost:5672");
+
+            // Use the command line parser to evaluate the command line with standard handling behaviour (print errors
+            // and usage then exist if there are errors).
+            // Any options and trailing name=value pairs are also injected into the test context properties object,
+            // to override any defaults that may have been set up.
+            ParsedProperties options =
+                new ParsedProperties(uk.co.thebadgerset.junit.extensions.util.CommandLineParser.processCommandLine(args,
+                                                                                                                   new uk.co.thebadgerset.junit.extensions.util.CommandLineParser(
+                                                                                                                                                                                  new String[][]
+                                                                                                                                                                                  {
+                                                                                                                                                                                      { "b", "The broker URL.", "broker", "false" },
+                                                                                                                                                                                      { "h", "The virtual host to use.", "virtual host", "false" },
+                                                                                                                                                                                      { "o", "The name of the directory to output test timings to.", "dir", "false" },
+                                                                                                                                                                                      { "n", "The name of the test client.", "name", "false" },
+                                                                                                                                                                                      { "j", "Join this test client to running test.", "false" }
+                                                                                                                                                                                  }), testContextProperties));
+
+            // Extract the command line options.
+            string brokerUrl = options.getProperty("b");
+            string virtualHost = options.getProperty("h");
+            string clientName = options.getProperty("n");
+            clientName = (clientName == null) ? CLIENT_NAME : clientName;
+            bool join = options.getPropertyAsBoolean("j");
+
+            // To distinguish logging output set up an NDC on the client name.
+            NDC.push(clientName);
+
+            // Create a test client and start it running.
+            TestClient client = new TestClient(brokerUrl, virtualHost, clientName, join);
+
+            // Use a class path scanner to find all the interop test case implementations.
+            // Hard code the test classes till the classpath scanner is fixed.
+            Collection<Class<? extends TestClientControlledTest>> testCaseClasses =
+                new ArrayList<Class<? extends TestClientControlledTest>>();
+            // ClasspathScanner.getMatches(TestClientControlledTest.class, "^TestCase.*", true);
+            testCaseClasses.addAll(loadTestCases("org.apache.qpid.interop.clienttestcases.TestCase1DummyRun",
+                                                 "org.apache.qpid.interop.clienttestcases.TestCase2BasicP2P",
+                                                 "org.apache.qpid.interop.clienttestcases.TestCase3BasicPubSub",
+                                                 "org.apache.qpid.interop.clienttestcases.TestCase4P2PMessageSize",
+                                                 "org.apache.qpid.interop.clienttestcases.TestCase5PubSubMessageSize",
+                                                 "Apache.Qpid.Integration.Tests.framework.distributedcircuit.TestClientCircuitEnd"));
+
+            try
+            {
+                client.start(testCaseClasses);
+            }
+            catch (Exception e)
+            {
+                log.error("The test client was unable to start.", e);
+                console.info(e.getMessage());
+                System.exit(1);
+            }
+        }
+
+        /// <summary>
+        /// Parses a list of class names, and loads them if they are available on the class path.
+        /// </summary>
+        /// <param name="classNames"> The names of the classes to load. </param>
+        ///
+        /// <return> A list of the loaded test case classes. </return>
+        public static IList<Class<? extends TestClientControlledTest>> loadTestCases(String... classNames)
+        {
+            IList<Class<? extends TestClientControlledTest>> testCases =
+                new LinkedList<Class<? extends TestClientControlledTest>>();
+
+            for (string className : classNames)
+            {
+                try
+                {
+                    Class<?> cls = ReflectionUtils.forName(className);
+                    testCases.add((Class<? extends TestClientControlledTest>) cls);
+                }
+                catch (ReflectionUtilsException e)
+                {
+                    // Ignore, class could not be found, so test not available.
+                    console.warn("Requested class " + className + " cannot be found, ignoring it.");
+                }
+                catch (ClassCastException e)
+                {
+                    // Ignore, class was not of correct type to be a test case.
+                    console.warn("Requested class " + className + " is not an instance of TestClientControlledTest.");
+                }
+            }
+
+            return testCases;
+        }
+
+        /// <summary>
+        /// Starts the interop test client running. This causes it to start listening for incoming test invites.
+        /// </summary>
+        /// <param name="testCaseClasses"> The classes of the available test cases. The test case names from these are used to </param>
+        ///                        matchin incoming test invites against.
+        ///
+        /// <exception cref="JMSException"> Any underlying JMSExceptions are allowed to fall through. </exception>
+        protected void start(Collection<Class<? extends TestClientControlledTest>> testCaseClasses) throws JMSException
+        {
+            log.debug("protected void start(Collection<Class<? extends TestClientControlledTest>> testCaseClasses = "
+                      + testCaseClasses + "): called");
+
+            // Create all the test case implementations and index them by the test names.
+            for (Class<? extends TestClientControlledTest> nextClass : testCaseClasses)
+            {
+                try
+                {
+                    TestClientControlledTest testCase = nextClass.newInstance();
+                    testCases.put(testCase.getName(), testCase);
+                }
+                catch (InstantiationException e)
+                {
+                    log.warn("Could not instantiate test case class: " + nextClass.getName(), e);
+                    // Ignored.
+                }
+                catch (IllegalAccessException e)
+                {
+                    log.warn("Could not instantiate test case class due to illegal access: " + nextClass.getName(), e);
+                    // Ignored.
+                }
+            }
+
+            // Open a connection to communicate with the coordinator on.
+            connection = TestUtils.createConnection(testContextProperties);
+            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            // Set this up to listen for control messages.
+            Topic privateControlTopic = session.createTopic("iop.control." + clientName);
+            MessageConsumer consumer = session.createConsumer(privateControlTopic);
+            consumer.setMessageListener(this);
+
+            Topic controlTopic = session.createTopic("iop.control");
+            MessageConsumer consumer2 = session.createConsumer(controlTopic);
+            consumer2.setMessageListener(this);
+
+            // Create a producer to send replies with.
+            producer = session.createProducer(null);
+
+            // If the join flag was set, then broadcast a join message to notify the coordinator that a new test client
+            // is available to join the current test case, if it supports it. This message may be ignored, or it may result
+            // in this test client receiving a test invite.
+            if (join)
+            {
+                Message joinMessage = session.createMessage();
+
+                joinMessage.setStringProperty("CONTROL_TYPE", "JOIN");
+                joinMessage.setStringProperty("CLIENT_NAME", clientName);
+                joinMessage.setStringProperty("CLIENT_PRIVATE_CONTROL_KEY", "iop.control." + clientName);
+                producer.send(controlTopic, joinMessage);
+            }
+
+            // Start listening for incoming control messages.
+            connection.start();
+        }
+
+        /// <summary>
+        /// Handles all incoming control messages.
+        /// </summary>
+        /// <param name="message"> The incoming message. </param>
+        public void onMessage(Message message)
+        {
+            NDC.push(clientName);
+            log.debug("public void onMessage(Message message = " + message + "): called");
+
+            try
+            {
+                string controlType = message.getStringProperty("CONTROL_TYPE");
+                string testName = message.getStringProperty("TEST_NAME");
+
+                log.debug("Received control of type '" + controlType + "' for the test '" + testName + "'");
+
+                // Check if the message is a test invite.
+                if ("INVITE".equals(controlType))
+                {
+                    // Flag used to indicate that an enlist should be sent. Only enlist to compulsory invites or invites
+                    // for which test cases exist.
+                    bool enlist = false;
+
+                    if (testName != null)
+                    {
+                        log.debug("Got an invite to test: " + testName);
+
+                        // Check if the requested test case is available.
+                        TestClientControlledTest testCase = testCases.get(testName);
+
+                        if (testCase != null)
+                        {
+                            log.debug("Found implementing class for test '" + testName + "', enlisting for it.");
+
+                            // Check if the test case will accept the invitation.
+                            enlist = testCase.acceptInvite(message);
+
+                            log.debug("The test case "
+                                      + (enlist ? " accepted the invite, enlisting for it."
+                                         : " did not accept the invite, not enlisting."));
+
+                            // Make the requested test case the current test case.
+                            currentTestCase = testCase;
+                        }
+                        else
+                        {
+                            log.debug("Received an invite to the test '" + testName + "' but this test is not known.");
+                        }
+                    }
+                    else
+                    {
+                        log.debug("Got a compulsory invite, enlisting for it.");
+
+                        enlist = true;
+                    }
+
+                    if (enlist)
+                    {
+                        // Reply with the client name in an Enlist message.
+                        Message enlistMessage = session.createMessage();
+                        enlistMessage.setStringProperty("CONTROL_TYPE", "ENLIST");
+                        enlistMessage.setStringProperty("CLIENT_NAME", clientName);
+                        enlistMessage.setStringProperty("CLIENT_PRIVATE_CONTROL_KEY", "iop.control." + clientName);
+                        enlistMessage.setJMSCorrelationID(message.getJMSCorrelationID());
+
+                        log.debug("Sending enlist message '" + enlistMessage + "' to " + message.getJMSReplyTo());
+
+                        producer.send(message.getJMSReplyTo(), enlistMessage);
+                    }
+                    else
+                    {
+                        // Reply with the client name in an Decline message.
+                        Message enlistMessage = session.createMessage();
+                        enlistMessage.setStringProperty("CONTROL_TYPE", "DECLINE");
+                        enlistMessage.setStringProperty("CLIENT_NAME", clientName);
+                        enlistMessage.setStringProperty("CLIENT_PRIVATE_CONTROL_KEY", "iop.control." + clientName);
+                        enlistMessage.setJMSCorrelationID(message.getJMSCorrelationID());
+
+                        log.debug("Sending decline message '" + enlistMessage + "' to " + message.getJMSReplyTo());
+
+                        producer.send(message.getJMSReplyTo(), enlistMessage);
+                    }
+                }
+                else if ("ASSIGN_ROLE".equals(controlType))
+                {
+                    // Assign the role to the current test case.
+                    string roleName = message.getStringProperty("ROLE");
+
+                    log.debug("Got a role assignment to role: " + roleName);
+
+                    TestClientControlledTest.Roles role = Enum.valueOf(TestClientControlledTest.Roles.class, roleName);
+
+                    currentTestCase.assignRole(role, message);
+
+                    // Reply by accepting the role in an Accept Role message.
+                    Message acceptRoleMessage = session.createMessage();
+                    acceptRoleMessage.setStringProperty("CLIENT_NAME", clientName);
+                    acceptRoleMessage.setStringProperty("CONTROL_TYPE", "ACCEPT_ROLE");
+                    acceptRoleMessage.setJMSCorrelationID(message.getJMSCorrelationID());
+
+                    log.debug("Sending accept role message '" + acceptRoleMessage + "' to " + message.getJMSReplyTo());
+
+                    producer.send(message.getJMSReplyTo(), acceptRoleMessage);
+                }
+                else if ("START".equals(controlType) || "STATUS_REQUEST".equals(controlType))
+                {
+                    if ("START".equals(controlType))
+                    {
+                        log.debug("Got a start notification.");
+
+                        // Extract the number of test messages to send from the start notification.
+                        int numMessages;
+
+                        try
+                        {
+                            numMessages = message.getIntProperty("MESSAGE_COUNT");
+                        }
+                        catch (NumberFormatException e)
+                        {
+                            // If the number of messages is not specified, use the default of one.
+                            numMessages = 1;
+                        }
+
+                        // Start the current test case.
+                        currentTestCase.start(numMessages);
+                    }
+                    else
+                    {
+                        log.debug("Got a status request.");
+                    }
+
+                    // Generate the report from the test case and reply with it as a Report message.
+                    Message reportMessage = currentTestCase.getReport(session);
+                    reportMessage.setStringProperty("CLIENT_NAME", clientName);
+                    reportMessage.setStringProperty("CONTROL_TYPE", "REPORT");
+                    reportMessage.setJMSCorrelationID(message.getJMSCorrelationID());
+
+                    log.debug("Sending report message '" + reportMessage + "' to " + message.getJMSReplyTo());
+
+                    producer.send(message.getJMSReplyTo(), reportMessage);
+                }
+                else if ("TERMINATE".equals(controlType))
+                {
+                    console.info("Received termination instruction from coordinator.");
+
+                    // Is a cleaner shutdown needed?
+                    connection.close();
+                    System.exit(0);
+                }
+                else if ("CLOCK_SYNCH".equals(controlType))
+                {
+                    log.debug("Received clock synch command.");
+                    string address = message.getStringProperty("ADDRESS");
+
+                    log.debug("address = " + address);
+
+                    // Re-create (if necessary) and start the clock synch thread to synch the clock every ten seconds.
+                    if (clockSynchThread != null)
+                    {
+                        clockSynchThread.terminate();
+                    }
+
+                    SleepThrottle throttle = new SleepThrottle();
+                    throttle.setRate(0.1f);
+
+                    clockSynchThread = new ClockSynchThread(new UDPClockSynchronizer(address), throttle);
+                    clockSynchThread.start();
+                }
+                else
+                {
+                    // Log a warning about this but otherwise ignore it.
+                    log.warn("Got an unknown control message, controlType = " + controlType + ", message = " + message);
+                }
+            }
+            catch (JMSException e)
+            {
+                // Log a warning about this, but otherwise ignore it.
+                log.warn("Got JMSException whilst handling message: " + message, e);
+            }
+            // Log any runtimes that fall through this message handler. These are fatal errors for the test client.
+            catch (RuntimeException e)
+            {
+                log.error("The test client message handler got an unhandled exception: ", e);
+                console.info("The message handler got an unhandled exception, terminating the test client.");
+                System.exit(1);
+            }
+            finally
+            {
+                NDC.pop();
+            }
+        }
+    }
+}
\ No newline at end of file

Added: incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/distributedcircuit/TestClientCircuitEnd.csx
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/distributedcircuit/TestClientCircuitEnd.csx?rev=612874&view=auto
==============================================================================
--- incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/distributedcircuit/TestClientCircuitEnd.csx (added)
+++ incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/distributedcircuit/TestClientCircuitEnd.csx Thu Jan 17 09:13:11 2008
@@ -0,0 +1,312 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using log4net;
+
+using Apache.Qpid.Integration.Tests.framework.*;
+using Apache.Qpid.Integration.Tests.framework.distributedtesting.TestClientControlledTest;
+
+using uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
+using uk.co.thebadgerset.junit.extensions.util.TestContextProperties;
+
+using javax.jms.*;
+
+namespace Apache.Qpid.Integration.Tests.framework.distributedcircuit
+{
+    /// <summary>
+    /// A TestClientCircuitEnd is a <see cref="CircuitEnd"/> that may be controlled from a
+    /// <see cref="Apache.Qpid.Integration.Tests.framework.distributedtesting.TestClient"/>, and that forms a single publishing or
+    /// receiving end point in a distributed test <see cref="Apache.Qpid.Integration.Tests.framework.Circuit"/>.
+    ///
+    /// <p/>When operating in the SENDER role, this circuit end is capable of acting as part of the default circuit test
+    /// procedure (described in the class comment for <see cref="Apache.Qpid.Integration.Tests.framework.Circuit"/>). That is, it will
+    /// send the number of test messages required, using the test configuration parameters given in the test invite, and
+    /// return a report on its activities to the circuit controller.
+    ///
+    /// <p/>When operation in the RECEIVER role, this circuit end acts as part of the default circuit test procedure. It will
+    /// receive test messages, on the setup specified in the test configuration parameters, and keep count of the messages
+    /// received, and time taken to receive them. When requested by the circuit controller to provide a report, it will
+    /// return this report of its activities.
+    ///
+    /// <p/><table id="crc"><caption>CRC Card</caption>
+    /// <tr><th> Responsibilities <th> Collaborations
+    /// <tr><td> Provide a message producer for sending messages.
+    ///     <td> <see cref="CircuitEnd"/>, <see cref="LocalCircuitFactory"/>, <see cref="TestUtils"/>
+    /// <tr><td> Provide a message consumer for receiving messages.
+    ///     <td> <see cref="CircuitEnd"/>, <see cref="LocalCircuitFactory"/>, <see cref="TestUtils"/>
+    /// <tr><td> Supply the name of the test case that this implements.
+    /// <tr><td> Accept/Reject invites based on test parameters. <td> <see cref="MessagingTestConfigProperties"/>
+    /// <tr><td> Adapt to assigned roles. <td> <see cref="TestClientControlledTest.Roles"/>
+    /// <tr><td> Perform test case actions. <td> <see cref="MessageMonitor"/>
+    /// <tr><td> Generate test reports. <td> <see cref="MessageMonitor"/>
+    /// </table>
+    /// </summary>
+    public class TestClientCircuitEnd : CircuitEnd, TestClientControlledTest
+    {
+        /// <summary> Used for debugging. </summary>
+        private static ILog log = LogManager.GetLogger(typeof(TestClientCircuitEnd));
+
+        /// <summary> Holds the test parameters. </summary>
+        ParsedProperties testProps;
+
+        /// <summary> The number of test messages to send. </summary>
+        private int numMessages;
+
+        /// <summary> The role to be played by the test. </summary>
+        private Roles role;
+
+        /// <summary> The connection to send the test messages on. </summary>
+        private Connection connection;
+
+        /// <summary> Holds the circuit end for this test. </summary>
+        CircuitEnd circuitEnd;
+
+        /// <summary>
+        /// Holds a message monitor for this circuit end, either the monitor on the consumer when in RECEIVER more, or
+        /// a monitor updated on every message sent, when acting as a SENDER.
+        MessageMonitor messageMonitor;
+
+        /// <summary>
+        /// Should provide the name of the test case that this class implements. The exact names are defined in the
+        /// interop testing spec.
+        /// </summary>
+        /// <return> The name of the test case that this implements. </return>
+        public string getName()
+        {
+            return "DEFAULT_CIRCUIT_TEST";
+        }
+
+        /// <summary>
+        /// Determines whether the test invite that matched this test case is acceptable.
+        /// </summary>
+        /// <param name="inviteMessage"> The invitation to accept or reject. </param>
+        /// <return> <tt>true</tt> to accept the invitation, <tt>false</tt> to reject it. </return>
+        /// </summary>
+        /// <exception cref="JMSException"> Any JMSException resulting from reading the message are allowed to fall through. </exception>
+        public bool acceptInvite(Message inviteMessage) throws JMSException
+        {
+            log.debug("public bool acceptInvite(Message inviteMessage): called");
+
+            // Populate the test parameters from the invitation.
+            testProps = TestContextProperties.getInstance(MessagingTestConfigProperties.defaults);
+
+            for (Object key : testProps.keySet())
+            {
+                string propName = (String) key;
+
+                // If the test parameters is overridden by the invitation, use it instead.
+                string inviteValue = inviteMessage.getStringProperty(propName);
+
+                if (inviteValue != null)
+                {
+                    testProps.setProperty(propName, inviteValue);
+                    log.debug("Test invite supplied override to " + propName + " of " + inviteValue);
+                }
+
+            }
+
+            // Accept the invitation.
+            return true;
+        }
+
+        /// <summary>
+        /// Assigns the role to be played by this test case. The test parameters are fully specified in the
+        /// assignment message. When this method return the test case will be ready to execute.
+        /// </summary>
+        /// <param name="role">              The role to be played; sender or receivers. </param>
+        /// <param name="assignRoleMessage"> The role assingment message, contains the full test parameters. </param>
+        ///
+        /// <exception cref="JMSException"> Any JMSException resulting from reading the message are allowed to fall through. </exception>
+        public void assignRole(Roles role, Message assignRoleMessage) throws JMSException
+        {
+            log.debug("public void assignRole(Roles role, Message assignRoleMessage): called");
+
+            // Take note of the role to be played.
+            this.role = role;
+
+            // Extract and retain the test parameters.
+            numMessages = 1; // assignRoleMessage.getIntProperty("NUM_MESSAGES");
+
+            // Connect using the test parameters.
+            connection = TestUtils.createConnection(testProps);
+
+            // Create a circuit end that matches the assigned role and test parameters.
+            LocalCircuitFactory circuitFactory = new LocalCircuitFactory();
+
+            switch (role)
+            {
+                // Check if the sender role is being assigned, and set up a message producer if so.
+            case SENDER:
+
+                // Set up the publisher.
+                circuitEnd = circuitFactory.createPublisherCircuitEnd(connection, testProps, 0L);
+
+                // Create a custom message monitor that will be updated on every message sent.
+                messageMonitor = new MessageMonitor();
+
+                break;
+
+                // Otherwise the receivers role is being assigned, so set this up to listen for messages.
+            case RECEIVER:
+
+                // Set up the receiver.
+                circuitEnd = circuitFactory.createReceiverCircuitEnd(connection, testProps, 0L);
+
+                // Use the message monitor from the consumer for stats.
+                messageMonitor = getMessageMonitor();
+
+                break;
+            }
+
+            // Reset all messaging stats for the report.
+            messageMonitor.reset();
+
+            connection.start();
+        }
+
+        /// <summary>
+        /// Performs the test case actions. Returning from here, indicates that the sending role has completed its test.
+        /// </summary>
+        /// <param name="numMessages"> The number of test messages to send. </param>
+        ///
+        /// <exception cref="JMSException"> Any JMSException resulting from reading the message are allowed to fall through. </exception>
+        ///
+        /// <remarks> Add round robin on destinations where multiple destinations being used.</remarks>
+        ///
+        /// <remarks> Add rate limiting when rate limit specified on publishers.</remarks>
+        ///
+        /// <remarks> Add Max pending message size protection. The receiver will have to send back some acks once in a while,
+        ///       to notify the publisher that its messages are being consumed. This makes the safety valve harder to
+        ///       implement than in the single VM case. For example, if the limit is 1000 messages, might want to get back
+        ///       an ack every 500, to notify the publisher that it can keep sending. What about pub/sub tests? Will it be
+        ///       necessary to wait for an ack from every receiver? This will have the effect of rate limiting to slow
+        ///       consumers too.</remarks>
+        ///
+        /// <remarks> Add commits on every commit batch size boundary.</remarks>
+        public void start(int numMessages) throws JMSException
+        {
+            log.debug("public void start(): called");
+
+            // If in the SENDER role, send the specified number of test messages to the circuit destinations.
+            if (role.equals(Roles.SENDER))
+            {
+                Message testMessage = getSession().createMessage();
+
+                for (int i = 0; i < numMessages; i++)
+                {
+                    getProducer().send(testMessage);
+
+                    // Increment the message count and timings.
+                    messageMonitor.onMessage(testMessage);
+                }
+            }
+        }
+
+        /// <summary>
+        /// Gets a report on the actions performed by the test case in its assigned role.
+        /// </summary>
+        /// <param name="session"> The controlSession to create the report message in. </param>
+        /// <return> The report message. </return>
+        ///
+        /// <exception cref="JMSException"> Any JMSExceptions resulting from creating the report are allowed to fall through. </exception>
+        public Message getReport(Session session) throws JMSException
+        {
+            Message report = session.createMessage();
+            report.setStringProperty("CONTROL_TYPE", "REPORT");
+
+            // Add the count of messages sent/received to the report.
+            report.setIntProperty("MESSAGE_COUNT", messageMonitor.getNumMessage());
+
+            // Add the time to send/receive messages to the report.
+            report.setLongProperty("TEST_TIME", messageMonitor.getTime());
+
+            // Add any exceptions detected to the report.
+
+            return report;
+        }
+
+        /// <summary>
+        /// Gets the message producer at this circuit end point.
+        /// </summary>
+        /// <return> The message producer at with this circuit end point. </return>
+        public MessageProducer getProducer()
+        {
+            return circuitEnd.getProducer();
+        }
+
+        /// <summary>
+        /// Gets the message consumer at this circuit end point.
+        /// </summary>
+        /// <return> The message consumer at this circuit end point. </return>
+        public MessageConsumer getConsumer()
+        {
+            return circuitEnd.getConsumer();
+        }
+
+        /// <summary>
+        /// Send the specified message over the producer at this end point.
+        /// </summary>
+        /// <param name="message"> The message to send. </param>
+        ///
+        /// <exception cref="JMSException"> Any JMS exception occuring during the send is allowed to fall through. </exception>
+        public void send(Message message) throws JMSException
+        {
+            // Send the message on the circuit ends producer.
+            circuitEnd.send(message);
+        }
+
+        /// <summary>
+        /// Gets the JMS Session associated with this circuit end point.
+        /// </summary>
+        /// <return> The JMS Session associated with this circuit end point. </return>
+        public Session getSession()
+        {
+            return circuitEnd.getSession();
+        }
+
+        /// <summary>
+        /// Closes the message producers and consumers and the sessions, associated with this circuit end point.
+        ///
+        /// <exception cref="JMSException"> Any JMSExceptions occurring during the close are allowed to fall through. </exception>
+        public void close() throws JMSException
+        {
+            // Close the producer and consumer.
+            circuitEnd.close();
+        }
+
+        /// <summary>
+        /// Returns the message monitor for reporting on received messages on this circuit end.
+        /// </summary>
+        /// <return> The message monitor for this circuit end. </return>
+        public MessageMonitor getMessageMonitor()
+        {
+            return circuitEnd.getMessageMonitor();
+        }
+
+        /// <summary>
+        /// Returns the exception monitor for reporting on exceptions received on this circuit end.
+        /// </summary>
+        /// <return> The exception monitor for this circuit end. </return>
+        public ExceptionMonitor getExceptionMonitor()
+        {
+            return circuitEnd.getExceptionMonitor();
+        }
+    }
+}
\ No newline at end of file

Added: incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/distributedcircuit/TestClientControlledTest.csx
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/distributedcircuit/TestClientControlledTest.csx?rev=612874&view=auto
==============================================================================
--- incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/distributedcircuit/TestClientControlledTest.csx (added)
+++ incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/distributedcircuit/TestClientControlledTest.csx Thu Jan 17 09:13:11 2008
@@ -0,0 +1,104 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using javax.jms.JMSException;
+using javax.jms.Message;
+using javax.jms.MessageListener;
+using javax.jms.Session;
+
+namespace Apache.Qpid.Integration.Tests.framework.distributedtesting
+{
+    /// <summary>
+    /// TestClientControlledTest provides an interface that classes implementing test cases to run on a <see cref="TestClient"/>
+    /// node can use. Implementations must be Java beans, that is, to provide a default constructor and to implement the
+    /// <see cref="#getName"/> method.
+    ///
+    /// <p/>The methods specified in this interface are called when the <see cref="TestClient"/> receives control instructions to
+    /// apply to the test. There are control instructions to present the test case with the test invite, so that it may
+    /// choose whether or not to participate in the test, assign the test to play the sender or receiver role, start the
+    /// test and obtain the test status report.
+    ///
+    /// <p><table id="crc"><caption>CRC Card</caption>
+    /// <tr><th> Responsibilities
+    /// <tr><td> Supply the name of the test case that this implements.
+    /// <tr><td> Accept/Reject invites based on test parameters.
+    /// <tr><td> Adapt to assigned roles.
+    /// <tr><td> Perform test case actions.
+    /// <tr><td> Generate test reports.
+    /// </table>
+    /// </summary>
+    public interface TestClientControlledTest
+    {
+        /// <summary> Defines the possible test case roles that an interop test case can take on. </summary>
+        public enum Roles
+        {
+            /// <summary> Specifies the sender role. </summary>
+            SENDER,
+
+            /// <summary> Specifies the receivers role. </summary>
+            RECEIVER
+        }
+
+        /// <summary>
+        /// Should provide the name of the test case that this class implements. The exact names are defined in the
+        /// interop testing spec.
+        /// </summary>
+        /// <return> The name of the test case that this implements. </return>
+        public string getName();
+
+        /// <summary>
+        /// Determines whether the test invite that matched this test case is acceptable.
+        /// </summary>
+        /// <param name="inviteMessage"> The invitation to accept or reject. </param>
+        ///
+        /// <return> <tt>true</tt> to accept the invitation, <tt>false</tt> to reject it. </return>
+        ///
+        /// <exception cref="JMSException"> Any JMSException resulting from reading the message are allowed to fall through. </exception>
+        public bool acceptInvite(Message inviteMessage) throws JMSException;
+
+        /// <summary>
+        /// Assigns the role to be played by this test case. The test parameters are fully specified in the
+        /// assignment message. When this method return the test case will be ready to execute.
+        /// </summary>
+        /// <param name="role">              The role to be played; sender or receivers. </param>
+        /// <param name="assignRoleMessage"> The role assingment message, contains the full test parameters. </param>
+        ///
+        /// <exception cref="JMSException"> Any JMSException resulting from reading the message are allowed to fall through. </exception>
+        public void assignRole(Roles role, Message assignRoleMessage) throws JMSException;
+
+        /// <summary>
+        /// Performs the test case actions. Returning from here, indicates that the sending role has completed its test.
+        /// </summary>
+        /// <param name="numMessages"> The number of test messages to send. </param>
+        ///
+        /// <exception cref="JMSException"> Any JMSException resulting from reading the message are allowed to fall through. </exception>
+        public void start(int numMessages) throws JMSException;
+
+        /// <summary>
+        /// Gets a report on the actions performed by the test case in its assigned role.
+        /// </summary>
+        /// <param name="session"> The controlSession to create the report message in. </param>
+        ///
+        /// <return> The report message. </return>
+        ///
+        /// <exception cref="JMSException"> Any JMSExceptions resulting from creating the report are allowed to fall through. </exception>
+        public Message getReport(Session session) throws JMSException;
+    }
+}
\ No newline at end of file



Mime
View raw message