Return-Path: Delivered-To: apmail-incubator-qpid-commits-archive@locus.apache.org Received: (qmail 13107 invoked from network); 17 Jan 2008 17:14:27 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 17 Jan 2008 17:14:27 -0000 Received: (qmail 65307 invoked by uid 500); 17 Jan 2008 17:14:17 -0000 Delivered-To: apmail-incubator-qpid-commits-archive@incubator.apache.org Received: (qmail 65299 invoked by uid 500); 17 Jan 2008 17:14:17 -0000 Mailing-List: contact qpid-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: qpid-dev@incubator.apache.org Delivered-To: mailing list qpid-commits@incubator.apache.org Received: (qmail 65290 invoked by uid 99); 17 Jan 2008 17:14:17 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 Jan 2008 09:14:17 -0800 X-ASF-Spam-Status: No, hits=-100.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 Jan 2008 17:14:00 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 540401A984D; Thu, 17 Jan 2008 09:13:48 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r612874 [4/5] - in /incubator/qpid/branches/M2.1/dotnet: Qpid.Buffer.Tests/Properties/ Qpid.Buffer/Properties/ Qpid.Client.Tests/Properties/ Qpid.Client.Transport.Socket.Blocking/Properties/ Qpid.Client/Properties/ Qpid.Codec/Properties/ Qp... Date: Thu, 17 Jan 2008 17:13:23 -0000 To: qpid-commits@incubator.apache.org From: rupertlssmith@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080117171349.540401A984D@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Added: incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/clocksynch/ClockSynchFailureException.csx URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/clocksynch/ClockSynchFailureException.csx?rev=612874&view=auto ============================================================================== --- incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/clocksynch/ClockSynchFailureException.csx (added) +++ incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/clocksynch/ClockSynchFailureException.csx Thu Jan 17 09:13:11 2008 @@ -0,0 +1,45 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +namespace Apache.Qpid.Integration.Tests.framework.clocksynch +{ + /// + /// ClockSynchFailureException represents failure of a to achieve synchronization. For example, + /// this could be because a reference signal is not available, or because a desired accurracy cannot be attained. + /// + ///

+ ///
CRC Card
Responsibilities Collaborations + ///
Represent failure to achieve synchronization. + ///
+ ///

+ public class ClockSynchFailureException extends Exception + { + /// + /// Creates a clock synch failure exception. + /// + /// The detail message (which is saved for later retrieval by the method). + /// The cause (which is saved for later retrieval by the method). (A null + /// value is permitted, and indicates that the cause is nonexistent or unknown.) + public ClockSynchFailureException(string message, Throwable cause) + { + super(message, cause); + } + } +} \ No newline at end of file Added: incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/clocksynch/ClockSynchThread.csx URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/clocksynch/ClockSynchThread.csx?rev=612874&view=auto ============================================================================== --- incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/clocksynch/ClockSynchThread.csx (added) +++ incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/clocksynch/ClockSynchThread.csx Thu Jan 17 09:13:11 2008 @@ -0,0 +1,117 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using log4net; + +using uk.co.thebadgerset.junit.extensions.ShutdownHookable; +using uk.co.thebadgerset.junit.extensions.Throttle; + +namespace Apache.Qpid.Integration.Tests.framework.clocksynch +{ + /// + /// ClockSynchThread is a convenient utility for running a thread that periodically synchronizes the clock against + /// a reference. Supply it with a and a and it will continually keep the + /// clock up-to-date at a rate determined by the throttle. + /// + ///

+ ///
CRC Card
Responsibilities Collaborations + ///
Continually sychronize the clock at a throttled rate. + ///
+ ///

+ public class ClockSynchThread extends Thread : ShutdownHookable + { + /// Used for debugging. + private static ILog log = LogManager.GetLogger(typeof(ClockSynchThread)); + + /// Holds the clock syncher for the synch thread. + private ClockSynchronizer clockSyncher; + + /// Holds the throttle to limit the synch rate. + private Throttle throttle; + + /// Flag to indicate that the periodic clock syncher should keep running. + bool doSynch = true; + + /// + /// Creates a clock synchronizer thread from a clock synchronizer and a throttle. + /// + /// The clock synchronizer. + /// The throttle. + public ClockSynchThread(ClockSynchronizer syncher, Throttle throttle) + { + this.clockSyncher = syncher; + this.throttle = throttle; + } + + /// Terminates the synchronization thread. + public void terminate() + { + doSynch = false; + } + + /// Continually updates the clock, until is called. + public void run() + { + while (doSynch) + { + // Perform a clock clockSynch. + try + { + // Wait controlled by the throttle before doing the next synch. + throttle.throttle(); + + clockSyncher.synch(); + log.debug("Clock synched, delta = " + clockSyncher.getDelta() + ", epsilon = " + clockSyncher.getEpsilon() + + "."); + } + // Terminate the synch thread if the synchronization cannot be achieved. + catch (ClockSynchFailureException e) + { + log.debug("Cannot synchronize the clock (reference service may be down). Terminating the synch thread."); + doSynch = false; + } + } + } + + /// + /// Gets the clock synchronizer that is kept continually up to date. + /// + /// The clock synchronizer that is kept continually up to date. + public ClockSynchronizer getClockSyncher() + { + return clockSyncher; + } + + /// + /// Supplies a shutdown hook, that terminates the synching thread. + /// + /// The shut down hook. + public Thread getShutdownHook() + { + return new Thread(new Runnable() + { + public void run() + { + doSynch = false; + } + }); + } + } +} \ No newline at end of file Added: incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/clocksynch/ClockSynchronizer.csx URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/clocksynch/ClockSynchronizer.csx?rev=612874&view=auto ============================================================================== --- incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/clocksynch/ClockSynchronizer.csx (added) +++ incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/clocksynch/ClockSynchronizer.csx Thu Jan 17 09:13:11 2008 @@ -0,0 +1,66 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +namespace Apache.Qpid.Integration.Tests.framework.clocksynch +{ + /// + /// ClockSynchronizer provides an interface through which two nodes may synchronize their clocks. It is expected that one + /// node will act as the reference clock, to which no delta need be applied, and the other node will act as the slave, + /// and which must apply a delta to its local clock to get a clock synchronized with the reference. + /// + ///

The slave side will initiate the computation of a clock delta by calling the method. This method + /// will not return until the delta has been computed, at which point there is a method to return its value, as well as + /// an estimate of the likely error (usually one standard deviation), in the synchronization. For convenience there is a + /// method to return the value of System.nanoTime() with the delta added in. + /// + ///

+ ///
CRC Card
Responsibilities Collaborations + ///
Trigger a clock synchronization. + ///
Compute a clock delta to apply to the local clock. + ///
Estimate the error in the synchronzation. + ///
+ ///

+ public interface ClockSynchronizer + { + /// + /// The slave side should call this to copute a clock delta with the reference. + /// + /// If synchronization cannot be achieved. + public void synch() throws ClockSynchFailureException; + + /// + /// Gets the clock delta in nano seconds. + /// + /// The clock delta in nano seconds. + public long getDelta(); + + /// + /// Gets an estimate of the clock error in nan seconds. + /// + /// An estimate of the clock error in nan seconds. + public long getEpsilon(); + + /// + /// Gets the local clock time with any computed delta added in. + /// + /// The local clock time with any computed delta added in. + public long nanoTime(); + } +} \ No newline at end of file Added: incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/clocksynch/LocalClockSynchronizer.csx URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/clocksynch/LocalClockSynchronizer.csx?rev=612874&view=auto ============================================================================== --- incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/clocksynch/LocalClockSynchronizer.csx (added) +++ incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/clocksynch/LocalClockSynchronizer.csx Thu Jan 17 09:13:11 2008 @@ -0,0 +1,70 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +namespace Apache.Qpid.Integration.Tests.framework.clocksynch +{ + + /// + /// LocalClockSynchronizer is a fake that simply calls System.nanoTime(). It exists so that + /// the same tests can be run distributed or locally, taking timings against the ClockSynchronizer interface without + /// being aware of how they are being run. + /// + ///

+ ///
CRC Card
Responsibilities Collaborations + ///
Supply the local clock with no delta. + ///
+ ///

+ public class LocalClockSynchronizer : ClockSynchronizer + { + /// + /// The slave side should call this to copute a clock delta with the reference. + /// + /// If synchronization cannot be achieved. + public void synch() throws ClockSynchFailureException + { } + + /// + /// Gets the clock delta in nano seconds. + /// + /// The clock delta in nano seconds. + public long getDelta() + { + return 0L; + } + + /// + /// Gets an estimate of the clock error in nan seconds. + /// + /// An estimate of the clock error in nan seconds. + public long getEpsilon() + { + return 0L; + } + + /// + /// Gets the local clock time with any computed delta added in. + /// + /// The local clock time with any computed delta added in. + public long nanoTime() + { + return System.nanoTime(); + } + } +} \ No newline at end of file Added: incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/clocksynch/UDPClockSynchronizer.csx URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/clocksynch/UDPClockSynchronizer.csx?rev=612874&view=auto ============================================================================== --- incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/clocksynch/UDPClockSynchronizer.csx (added) +++ incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/clocksynch/UDPClockSynchronizer.csx Thu Jan 17 09:13:11 2008 @@ -0,0 +1,453 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using log4net; + +using uk.co.thebadgerset.junit.extensions.util.CommandLineParser; +using uk.co.thebadgerset.junit.extensions.util.ParsedProperties; + +using java.io.IOException; +using java.net.*; +using java.nio.ByteBuffer; +using java.util.Arrays; + +namespace Apache.Qpid.Integration.Tests.framework.clocksynch +{ + /// + /// UDPClockSynchronizer is a that sends pings as UDP datagrams, and uses the following simple + /// algorithm to perform clock synchronization: + /// + ///
    + ///
  1. Slave initiates synchronization with a Reference clock.
  2. + ///
  3. Slave stamps current local time on a "time request" message and sends to the Reference.
  4. + ///
  5. Upon receipt by Reference, Reference stamps Reference-time and returns.
  6. + ///
  7. Upon receipt by Slave, Slave subtracts current time from sent time and divides by two to compute latency. It + /// subtracts current time from Reference time to determine Slave-Reference time delta and adds in the + /// half-latency to get the correct clock delta.
  8. + ///
  9. The first result is immediately used to update the clock since it will get the local clock into at least + /// the right ballpark.
  10. + ///
  11. The Slave repeats steps 2 through 4, 15 more times.
  12. + ///
  13. The results of the packet receipts are accumulated and sorted in lowest-latency to highest-latency order. The + /// median latency is determined by picking the mid-point sample from this ordered list.
  14. + ///
  15. All samples outside 1 standard-deviation from the median are discarded and the remaining samples + /// are averaged using an arithmetic mean.
  16. + ///
+ /// + ///

The use of UDP datagrams, instead of TCP based communication eliminates the hidden delays that TCP can introduce, + /// as it can transparently re-order or re-send packets, or introduce delays as packets are naggled. + /// + ///

+ ///
CRC Card
Responsibilities Collaborations + ///
Trigger a clock synchronziation. + ///
Compute a clock delta to apply to the local clock. + ///
Estimate the error in the synchronzation. + ///
+ ///

+ public class UDPClockSynchronizer : ClockSynchronizer + { + /// Used for debugging. + // private static ILog log = LogManager.GetLogger(typeof(UDPClockSynchronizer)); + + /// Defines the timeout to use when waiting for responses to time requests. + private static final int TIMEOUT = 50; + + /// The clock delta. + private long delta = 0L; + + /// Holds an estimate of the clock error relative to the reference clock. + private long epsilon = 0L; + + /// Holds the address of the reference clock. + private InetAddress referenceAddress; + + /// Holds the socket to communicate with the reference service over. + private DatagramSocket socket; + + /// Used to control the shutdown in the main test loop. + private static bool doSynch = true; + + /// + /// Creates a clock synchronizer against the specified address for the reference. + /// + /// The address of the reference service. + public UDPClockSynchronizer(string address) + { + try + { + referenceAddress = InetAddress.getByName(address); + } + catch (UnknownHostException e) + { + throw new RuntimeException(e); + } + } + + /// + /// The slave side should call this to compute a clock delta with the reference. + /// + /// If synchronization cannot be achieved, due to unavailability of the reference + /// time service. + public void synch() throws ClockSynchFailureException + { + try + { + socket = new DatagramSocket(); + socket.setSoTimeout(TIMEOUT); + + // Synchronize on a single ping, to get the clock into the right ball-park. + synch(1); + + // Synchronize on 15 pings. + synch(15); + + // And again, for greater accuracy, on 31. + synch(31); + + socket.close(); + } + catch (SocketException e) + { + throw new RuntimeException(e); + } + } + + /// + /// Updates the synchronization delta by performing the specified number of reference clock requests. + /// + /// The number of reference clock request cycles to perform. + /// + /// If synchronization cannot be achieved, due to unavailability of the reference + /// time service. + protected void synch(int n) throws ClockSynchFailureException + { + // log.debug("protected void synch(int n = " + n + "): called"); + + // Create an array of deltas by performing n reference pings. + long[] delta = new long[n]; + + for (int i = 0; i < n; i++) + { + delta[i] = ping(); + } + + // Reject any deltas that are larger than 1 s.d. above the median. + long median = median(delta); + long sd = standardDeviation(delta); + + // log.debug("median = " + median); + // log.debug("sd = " + sd); + + long[] tempDeltas = new long[n]; + int count = 0; + + for (int i = 0; i < n; i++) + { + if ((delta[i] <= (median + sd)) && (delta[i] >= (median - sd))) + { + tempDeltas[count] = delta[i]; + count++; + } + else + { + // log.debug("Rejected: " + delta[i]); + } + } + + System.arraycopy(tempDeltas, 0, delta, 0, count); + + // Estimate the delta as the mean of the remaining deltas. + this.delta += mean(delta); + + // Estimate the error as the standard deviation of the remaining deltas. + this.epsilon = standardDeviation(delta); + + // log.debug("this.delta = " + this.delta); + // log.debug("this.epsilon = " + this.epsilon); + } + + /// + /// Performs a single reference clock request cycle and returns the estimated delta relative to the local clock. + /// This is computed as the half-latency of the requst cycle, plus the reference clock, minus the local clock. + /// + /// The estimated clock delta. + /// + /// If the reference service is not responding. + protected long ping() throws ClockSynchFailureException + { + // log.debug("protected long ping(): called"); + + try + { + byte[] buf = new byte[256]; + + bool timedOut = false; + long start = 0L; + long refTime = 0L; + long localTime = 0L; + long latency = 0L; + int failCount = 0; + + // Keep trying the ping until it gets a response, or 10 tries in a row all time out. + do + { + // Start timing the request latency. + start = nanoTime(); + + // Get the reference time. + DatagramPacket packet = + new DatagramPacket(buf, buf.length, referenceAddress, UDPClockReference.REFERENCE_PORT); + socket.send(packet); + packet = new DatagramPacket(buf, buf.length); + + timedOut = false; + + try + { + socket.receive(packet); + } + catch (SocketTimeoutException e) + { + timedOut = true; + failCount++; + + continue; + } + + ByteBuffer bbuf = ByteBuffer.wrap(packet.getData()); + refTime = bbuf.getLong(); + + // Stop timing the request latency. + localTime = nanoTime(); + latency = localTime - start; + + // log.debug("refTime = " + refTime); + // log.debug("localTime = " + localTime); + // log.debug("start = " + start); + // log.debug("latency = " + latency); + // log.debug("delta = " + ((latency / 2) + (refTime - localTime))); + + } + while (timedOut && (failCount < 10)); + + // Fail completely if the fail count is too high. + if (failCount >= 10) + { + throw new ClockSynchFailureException("Clock reference not responding.", null); + } + + // Estimate delta as (ref clock + half-latency) - local clock. + return (latency / 2) + (refTime - localTime); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + + /// + /// Gets the clock delta in nano seconds. + /// + /// The clock delta in nano seconds. + public long getDelta() + { + return delta; + } + + /// + /// Gets an estimate of the clock error in nan seconds. + /// + /// An estimate of the clock error in nan seconds. + public long getEpsilon() + { + return epsilon; + } + + /// + /// Gets the local clock time with any computed delta added in. + /// + /// The local clock time with any computed delta added in. + public long nanoTime() + { + return System.nanoTime() + delta; + } + + /// + /// Computes the median of a series of values. + /// + /// The values. + /// + /// The median. + public static long median(long[] values) + { + // log.debug("public static long median(long[] values = " + Arrays.ToString(values) + "): called"); + + long median; + + // Order the list of values. + long[] orderedValues = new long[values.length]; + System.arraycopy(values, 0, orderedValues, 0, values.length); + Arrays.sort(orderedValues); + + // Check if the median is computed from a pair of middle value. + if ((orderedValues.length % 2) == 0) + { + int middle = orderedValues.length / 2; + + median = (orderedValues[middle] + orderedValues[middle - 1]) / 2; + } + // The median is computed from a single middle value. + else + { + median = orderedValues[orderedValues.length / 2]; + } + + // log.debug("median = " + median); + + return median; + } + + /// + /// Computes the mean of a series of values. + /// + /// The values. + /// + /// The mean. + public static long mean(long[] values) + { + // log.debug("public static long mean(long[] values = " + Arrays.ToString(values) + "): called"); + + long total = 0L; + + for (long value : values) + { + total += value; + } + + long mean = total / values.length; + + // log.debug("mean = " + mean); + + return mean; + } + + /// + /// Computes the variance of series of values. + /// + /// The values. + /// + /// The variance of the values. + public static long variance(long[] values) + { + // log.debug("public static long variance(long[] values = " + Arrays.ToString(values) + "): called"); + + long mean = mean(values); + + long totalVariance = 0; + + for (long value : values) + { + long diff = (value - mean); + totalVariance += diff/// diff; + } + + long variance = totalVariance / values.length; + + // log.debug("variance = " + variance); + + return variance; + } + + /// + /// Computes the standard deviation of a series of values. + /// + /// The values. + /// + /// The standard deviation. + public static long standardDeviation(long[] values) + { + // log.debug("public static long standardDeviation(long[] values = " + Arrays.ToString(values) + "): called"); + + long sd = Double.valueOf(Math.sqrt(variance(values))).longValue(); + + // log.debug("sd = " + sd); + + return sd; + } + + /// + /// For testing purposes. Supply address of reference clock as arg 1. + /// + /// Address of reference clock as arg 1. + public static void main(String[] args) + { + ParsedProperties options = + new ParsedProperties(CommandLineParser.processCommandLine(args, + new CommandLineParser( + new String[][] + { + { "1", "Address of clock reference service.", "address", "true" } + }), System.getProperties())); + + string address = options.getProperty("1"); + + // Create a clock synchronizer. + UDPClockSynchronizer clockSyncher = new UDPClockSynchronizer(address); + + // Set up a shutdown hook for it. + Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() + { + public void run() + { + doSynch = false; + } + })); + + // Repeat the clock synching until the user kills the progam. + while (doSynch) + { + // Perform a clock clockSynch. + try + { + clockSyncher.synch(); + + // Print out the clock delta and estimate of the error. + System.out.println("Delta = " + clockSyncher.getDelta()); + System.out.println("Epsilon = " + clockSyncher.getEpsilon()); + + try + { + Thread.sleep(250); + } + catch (InterruptedException e) + { + // Restore the interrupted status and terminate the loop. + Thread.currentThread().interrupt(); + doSynch = false; + } + } + // Terminate if the reference time service is unavailable. + catch (ClockSynchFailureException e) + { + doSynch = false; + } + } + } + } +} \ No newline at end of file Added: incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/distributedcircuit/TestClient.csx URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/distributedcircuit/TestClient.csx?rev=612874&view=auto ============================================================================== --- incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/distributedcircuit/TestClient.csx (added) +++ incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/distributedcircuit/TestClient.csx Thu Jan 17 09:13:11 2008 @@ -0,0 +1,493 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using log4net; +using org.apache.log4j.NDC; + +using Apache.Qpid.Integration.Tests.framework.MessagingTestConfigProperties; +using Apache.Qpid.Integration.Tests.framework.TestUtils; +using Apache.Qpid.Integration.Tests.framework.clocksynch.ClockSynchThread; +using Apache.Qpid.Integration.Tests.framework.clocksynch.UDPClockSynchronizer; +using org.apache.qpid.util.ReflectionUtils; +using org.apache.qpid.util.ReflectionUtilsException; + +using uk.co.thebadgerset.junit.extensions.SleepThrottle; +using uk.co.thebadgerset.junit.extensions.util.ParsedProperties; +using uk.co.thebadgerset.junit.extensions.util.TestContextProperties; + +using javax.jms.*; + +using java.util.*; + +namespace Apache.Qpid.Integration.Tests.framework.distributedtesting +{ + /// + /// Implements a test client as described in the interop testing spec + /// (http://cwiki.apache.org/confluence/display/qpid/Interop+Testing+Specification). A test client is an agent that + /// reacts to control message sequences send by the test . + /// + ///

+ ///
Messages Handled by TestClient
Message Action + ///
Invite(compulsory) Reply with Enlist. + ///
Invite(test case) Reply with Enlist if test case available. + ///
AssignRole(test case) Reply with Accept Role if matches an enlisted test. Keep test parameters. + ///
Start Send test messages defined by test parameters. Send report on messages sent. + ///
Status Request Send report on messages received. + ///
Terminate Terminate the test client. + ///
ClockSynch Synch clock against the supplied UDP address. + ///
+ /// + ///

+ ///
CRC Card
Responsibilities Collaborations + ///
Handle all incoming control messages. + ///
Configure and look up test cases by name. + ///
+ ///

+ public class TestClient : MessageListener + { + /// Used for debugging. + private static ILog log = LogManager.GetLogger(typeof(TestClient)); + + /// Used for reporting to the console. + private static ILog console = LogManager.GetLogger("CONSOLE"); + + /// Holds the default identifying name of the test client. + public static final string CLIENT_NAME = "java"; + + /// Holds the URL of the broker to run the tests on. + public static string brokerUrl; + + /// Holds the virtual host to run the tests on. If null, then the default virtual host is used. + public static string virtualHost; + + /// + /// Holds the test context properties that provides the default test parameters, plus command line overrides. + /// This is initialized with the default test parameters, to which command line overrides may be applied. + /// + public static ParsedProperties testContextProperties = + TestContextProperties.getInstance(MessagingTestConfigProperties.defaults); + + /// Holds all the test cases loaded from the classpath. + Map testCases = new HashMap(); + + /// Holds the test case currently being run by this client. + protected TestClientControlledTest currentTestCase; + + /// Holds the connection to the broker that the test is being coordinated on. + protected Connection connection; + + /// Holds the message producer to hold the test coordination over. + protected MessageProducer producer; + + /// Holds the JMS controlSession for the test coordination. + protected Session session; + + /// Holds the name of this client, with a default value. + protected string clientName = CLIENT_NAME; + + /// This flag indicates that the test client should attempt to join the currently running test case on start up. + protected bool join; + + /// Holds the clock synchronizer for the test node. + ClockSynchThread clockSynchThread; + + /// + /// Creates a new interop test client, listenting to the specified broker and virtual host, with the specified client + /// identifying name. + /// + /// The url of the broker to connect to. + /// The virtual host to conect to. + /// The client name to use. + /// Flag to indicate that this client should attempt to join running tests. + public TestClient(string pBrokerUrl, string pVirtualHost, string clientName, bool join) + { + log.debug("public TestClient(string pBrokerUrl = " + pBrokerUrl + ", string pVirtualHost = " + pVirtualHost + + ", string clientName = " + clientName + ", bool join = " + join + "): called"); + + // Retain the connection parameters. + brokerUrl = pBrokerUrl; + virtualHost = pVirtualHost; + this.clientName = clientName; + this.join = join; + } + + /// + /// The entry point for the interop test coordinator. This client accepts the following command line arguments: + /// + ///

+ ///
-b The broker URL. Optional. + ///
-h The virtual host. Optional. + ///
-n The test client name. Optional. + ///
name=value Trailing argument define name/value pairs. Added to system properties. Optional. + ///
+ ///

+ /// The command line arguments. + public static void main(String[] args) + { + log.debug("public static void main(String[] args = " + Arrays.ToString(args) + "): called"); + console.info("Qpid Distributed Test Client."); + + // Override the default broker url to be localhost:5672. + testContextProperties.setProperty(MessagingTestConfigProperties.BROKER_PROPNAME, "tcp://localhost:5672"); + + // Use the command line parser to evaluate the command line with standard handling behaviour (print errors + // and usage then exist if there are errors). + // Any options and trailing name=value pairs are also injected into the test context properties object, + // to override any defaults that may have been set up. + ParsedProperties options = + new ParsedProperties(uk.co.thebadgerset.junit.extensions.util.CommandLineParser.processCommandLine(args, + new uk.co.thebadgerset.junit.extensions.util.CommandLineParser( + new String[][] + { + { "b", "The broker URL.", "broker", "false" }, + { "h", "The virtual host to use.", "virtual host", "false" }, + { "o", "The name of the directory to output test timings to.", "dir", "false" }, + { "n", "The name of the test client.", "name", "false" }, + { "j", "Join this test client to running test.", "false" } + }), testContextProperties)); + + // Extract the command line options. + string brokerUrl = options.getProperty("b"); + string virtualHost = options.getProperty("h"); + string clientName = options.getProperty("n"); + clientName = (clientName == null) ? CLIENT_NAME : clientName; + bool join = options.getPropertyAsBoolean("j"); + + // To distinguish logging output set up an NDC on the client name. + NDC.push(clientName); + + // Create a test client and start it running. + TestClient client = new TestClient(brokerUrl, virtualHost, clientName, join); + + // Use a class path scanner to find all the interop test case implementations. + // Hard code the test classes till the classpath scanner is fixed. + Collection> testCaseClasses = + new ArrayList>(); + // ClasspathScanner.getMatches(TestClientControlledTest.class, "^TestCase.*", true); + testCaseClasses.addAll(loadTestCases("org.apache.qpid.interop.clienttestcases.TestCase1DummyRun", + "org.apache.qpid.interop.clienttestcases.TestCase2BasicP2P", + "org.apache.qpid.interop.clienttestcases.TestCase3BasicPubSub", + "org.apache.qpid.interop.clienttestcases.TestCase4P2PMessageSize", + "org.apache.qpid.interop.clienttestcases.TestCase5PubSubMessageSize", + "Apache.Qpid.Integration.Tests.framework.distributedcircuit.TestClientCircuitEnd")); + + try + { + client.start(testCaseClasses); + } + catch (Exception e) + { + log.error("The test client was unable to start.", e); + console.info(e.getMessage()); + System.exit(1); + } + } + + /// + /// Parses a list of class names, and loads them if they are available on the class path. + /// + /// The names of the classes to load. + /// + /// A list of the loaded test case classes. + public static IList> loadTestCases(String... classNames) + { + IList> testCases = + new LinkedList>(); + + for (string className : classNames) + { + try + { + Class cls = ReflectionUtils.forName(className); + testCases.add((Class) cls); + } + catch (ReflectionUtilsException e) + { + // Ignore, class could not be found, so test not available. + console.warn("Requested class " + className + " cannot be found, ignoring it."); + } + catch (ClassCastException e) + { + // Ignore, class was not of correct type to be a test case. + console.warn("Requested class " + className + " is not an instance of TestClientControlledTest."); + } + } + + return testCases; + } + + /// + /// Starts the interop test client running. This causes it to start listening for incoming test invites. + /// + /// The classes of the available test cases. The test case names from these are used to + /// matchin incoming test invites against. + /// + /// Any underlying JMSExceptions are allowed to fall through. + protected void start(Collection> testCaseClasses) throws JMSException + { + log.debug("protected void start(Collection> testCaseClasses = " + + testCaseClasses + "): called"); + + // Create all the test case implementations and index them by the test names. + for (Class nextClass : testCaseClasses) + { + try + { + TestClientControlledTest testCase = nextClass.newInstance(); + testCases.put(testCase.getName(), testCase); + } + catch (InstantiationException e) + { + log.warn("Could not instantiate test case class: " + nextClass.getName(), e); + // Ignored. + } + catch (IllegalAccessException e) + { + log.warn("Could not instantiate test case class due to illegal access: " + nextClass.getName(), e); + // Ignored. + } + } + + // Open a connection to communicate with the coordinator on. + connection = TestUtils.createConnection(testContextProperties); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Set this up to listen for control messages. + Topic privateControlTopic = session.createTopic("iop.control." + clientName); + MessageConsumer consumer = session.createConsumer(privateControlTopic); + consumer.setMessageListener(this); + + Topic controlTopic = session.createTopic("iop.control"); + MessageConsumer consumer2 = session.createConsumer(controlTopic); + consumer2.setMessageListener(this); + + // Create a producer to send replies with. + producer = session.createProducer(null); + + // If the join flag was set, then broadcast a join message to notify the coordinator that a new test client + // is available to join the current test case, if it supports it. This message may be ignored, or it may result + // in this test client receiving a test invite. + if (join) + { + Message joinMessage = session.createMessage(); + + joinMessage.setStringProperty("CONTROL_TYPE", "JOIN"); + joinMessage.setStringProperty("CLIENT_NAME", clientName); + joinMessage.setStringProperty("CLIENT_PRIVATE_CONTROL_KEY", "iop.control." + clientName); + producer.send(controlTopic, joinMessage); + } + + // Start listening for incoming control messages. + connection.start(); + } + + /// + /// Handles all incoming control messages. + /// + /// The incoming message. + public void onMessage(Message message) + { + NDC.push(clientName); + log.debug("public void onMessage(Message message = " + message + "): called"); + + try + { + string controlType = message.getStringProperty("CONTROL_TYPE"); + string testName = message.getStringProperty("TEST_NAME"); + + log.debug("Received control of type '" + controlType + "' for the test '" + testName + "'"); + + // Check if the message is a test invite. + if ("INVITE".equals(controlType)) + { + // Flag used to indicate that an enlist should be sent. Only enlist to compulsory invites or invites + // for which test cases exist. + bool enlist = false; + + if (testName != null) + { + log.debug("Got an invite to test: " + testName); + + // Check if the requested test case is available. + TestClientControlledTest testCase = testCases.get(testName); + + if (testCase != null) + { + log.debug("Found implementing class for test '" + testName + "', enlisting for it."); + + // Check if the test case will accept the invitation. + enlist = testCase.acceptInvite(message); + + log.debug("The test case " + + (enlist ? " accepted the invite, enlisting for it." + : " did not accept the invite, not enlisting.")); + + // Make the requested test case the current test case. + currentTestCase = testCase; + } + else + { + log.debug("Received an invite to the test '" + testName + "' but this test is not known."); + } + } + else + { + log.debug("Got a compulsory invite, enlisting for it."); + + enlist = true; + } + + if (enlist) + { + // Reply with the client name in an Enlist message. + Message enlistMessage = session.createMessage(); + enlistMessage.setStringProperty("CONTROL_TYPE", "ENLIST"); + enlistMessage.setStringProperty("CLIENT_NAME", clientName); + enlistMessage.setStringProperty("CLIENT_PRIVATE_CONTROL_KEY", "iop.control." + clientName); + enlistMessage.setJMSCorrelationID(message.getJMSCorrelationID()); + + log.debug("Sending enlist message '" + enlistMessage + "' to " + message.getJMSReplyTo()); + + producer.send(message.getJMSReplyTo(), enlistMessage); + } + else + { + // Reply with the client name in an Decline message. + Message enlistMessage = session.createMessage(); + enlistMessage.setStringProperty("CONTROL_TYPE", "DECLINE"); + enlistMessage.setStringProperty("CLIENT_NAME", clientName); + enlistMessage.setStringProperty("CLIENT_PRIVATE_CONTROL_KEY", "iop.control." + clientName); + enlistMessage.setJMSCorrelationID(message.getJMSCorrelationID()); + + log.debug("Sending decline message '" + enlistMessage + "' to " + message.getJMSReplyTo()); + + producer.send(message.getJMSReplyTo(), enlistMessage); + } + } + else if ("ASSIGN_ROLE".equals(controlType)) + { + // Assign the role to the current test case. + string roleName = message.getStringProperty("ROLE"); + + log.debug("Got a role assignment to role: " + roleName); + + TestClientControlledTest.Roles role = Enum.valueOf(TestClientControlledTest.Roles.class, roleName); + + currentTestCase.assignRole(role, message); + + // Reply by accepting the role in an Accept Role message. + Message acceptRoleMessage = session.createMessage(); + acceptRoleMessage.setStringProperty("CLIENT_NAME", clientName); + acceptRoleMessage.setStringProperty("CONTROL_TYPE", "ACCEPT_ROLE"); + acceptRoleMessage.setJMSCorrelationID(message.getJMSCorrelationID()); + + log.debug("Sending accept role message '" + acceptRoleMessage + "' to " + message.getJMSReplyTo()); + + producer.send(message.getJMSReplyTo(), acceptRoleMessage); + } + else if ("START".equals(controlType) || "STATUS_REQUEST".equals(controlType)) + { + if ("START".equals(controlType)) + { + log.debug("Got a start notification."); + + // Extract the number of test messages to send from the start notification. + int numMessages; + + try + { + numMessages = message.getIntProperty("MESSAGE_COUNT"); + } + catch (NumberFormatException e) + { + // If the number of messages is not specified, use the default of one. + numMessages = 1; + } + + // Start the current test case. + currentTestCase.start(numMessages); + } + else + { + log.debug("Got a status request."); + } + + // Generate the report from the test case and reply with it as a Report message. + Message reportMessage = currentTestCase.getReport(session); + reportMessage.setStringProperty("CLIENT_NAME", clientName); + reportMessage.setStringProperty("CONTROL_TYPE", "REPORT"); + reportMessage.setJMSCorrelationID(message.getJMSCorrelationID()); + + log.debug("Sending report message '" + reportMessage + "' to " + message.getJMSReplyTo()); + + producer.send(message.getJMSReplyTo(), reportMessage); + } + else if ("TERMINATE".equals(controlType)) + { + console.info("Received termination instruction from coordinator."); + + // Is a cleaner shutdown needed? + connection.close(); + System.exit(0); + } + else if ("CLOCK_SYNCH".equals(controlType)) + { + log.debug("Received clock synch command."); + string address = message.getStringProperty("ADDRESS"); + + log.debug("address = " + address); + + // Re-create (if necessary) and start the clock synch thread to synch the clock every ten seconds. + if (clockSynchThread != null) + { + clockSynchThread.terminate(); + } + + SleepThrottle throttle = new SleepThrottle(); + throttle.setRate(0.1f); + + clockSynchThread = new ClockSynchThread(new UDPClockSynchronizer(address), throttle); + clockSynchThread.start(); + } + else + { + // Log a warning about this but otherwise ignore it. + log.warn("Got an unknown control message, controlType = " + controlType + ", message = " + message); + } + } + catch (JMSException e) + { + // Log a warning about this, but otherwise ignore it. + log.warn("Got JMSException whilst handling message: " + message, e); + } + // Log any runtimes that fall through this message handler. These are fatal errors for the test client. + catch (RuntimeException e) + { + log.error("The test client message handler got an unhandled exception: ", e); + console.info("The message handler got an unhandled exception, terminating the test client."); + System.exit(1); + } + finally + { + NDC.pop(); + } + } + } +} \ No newline at end of file Added: incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/distributedcircuit/TestClientCircuitEnd.csx URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/distributedcircuit/TestClientCircuitEnd.csx?rev=612874&view=auto ============================================================================== --- incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/distributedcircuit/TestClientCircuitEnd.csx (added) +++ incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/distributedcircuit/TestClientCircuitEnd.csx Thu Jan 17 09:13:11 2008 @@ -0,0 +1,312 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using log4net; + +using Apache.Qpid.Integration.Tests.framework.*; +using Apache.Qpid.Integration.Tests.framework.distributedtesting.TestClientControlledTest; + +using uk.co.thebadgerset.junit.extensions.util.ParsedProperties; +using uk.co.thebadgerset.junit.extensions.util.TestContextProperties; + +using javax.jms.*; + +namespace Apache.Qpid.Integration.Tests.framework.distributedcircuit +{ + /// + /// A TestClientCircuitEnd is a that may be controlled from a + /// , and that forms a single publishing or + /// receiving end point in a distributed test . + /// + ///

When operating in the SENDER role, this circuit end is capable of acting as part of the default circuit test + /// procedure (described in the class comment for ). That is, it will + /// send the number of test messages required, using the test configuration parameters given in the test invite, and + /// return a report on its activities to the circuit controller. + /// + ///

When operation in the RECEIVER role, this circuit end acts as part of the default circuit test procedure. It will + /// receive test messages, on the setup specified in the test configuration parameters, and keep count of the messages + /// received, and time taken to receive them. When requested by the circuit controller to provide a report, it will + /// return this report of its activities. + /// + ///

+ ///
CRC Card
Responsibilities Collaborations + ///
Provide a message producer for sending messages. + /// , , + ///
Provide a message consumer for receiving messages. + /// , , + ///
Supply the name of the test case that this implements. + ///
Accept/Reject invites based on test parameters. + ///
Adapt to assigned roles. + ///
Perform test case actions. + ///
Generate test reports. + ///
+ ///

+ public class TestClientCircuitEnd : CircuitEnd, TestClientControlledTest + { + /// Used for debugging. + private static ILog log = LogManager.GetLogger(typeof(TestClientCircuitEnd)); + + /// Holds the test parameters. + ParsedProperties testProps; + + /// The number of test messages to send. + private int numMessages; + + /// The role to be played by the test. + private Roles role; + + /// The connection to send the test messages on. + private Connection connection; + + /// Holds the circuit end for this test. + CircuitEnd circuitEnd; + + /// + /// Holds a message monitor for this circuit end, either the monitor on the consumer when in RECEIVER more, or + /// a monitor updated on every message sent, when acting as a SENDER. + MessageMonitor messageMonitor; + + /// + /// Should provide the name of the test case that this class implements. The exact names are defined in the + /// interop testing spec. + /// + /// The name of the test case that this implements. + public string getName() + { + return "DEFAULT_CIRCUIT_TEST"; + } + + /// + /// Determines whether the test invite that matched this test case is acceptable. + /// + /// The invitation to accept or reject. + /// true to accept the invitation, false to reject it. + /// + /// Any JMSException resulting from reading the message are allowed to fall through. + public bool acceptInvite(Message inviteMessage) throws JMSException + { + log.debug("public bool acceptInvite(Message inviteMessage): called"); + + // Populate the test parameters from the invitation. + testProps = TestContextProperties.getInstance(MessagingTestConfigProperties.defaults); + + for (Object key : testProps.keySet()) + { + string propName = (String) key; + + // If the test parameters is overridden by the invitation, use it instead. + string inviteValue = inviteMessage.getStringProperty(propName); + + if (inviteValue != null) + { + testProps.setProperty(propName, inviteValue); + log.debug("Test invite supplied override to " + propName + " of " + inviteValue); + } + + } + + // Accept the invitation. + return true; + } + + /// + /// Assigns the role to be played by this test case. The test parameters are fully specified in the + /// assignment message. When this method return the test case will be ready to execute. + /// + /// The role to be played; sender or receivers. + /// The role assingment message, contains the full test parameters. + /// + /// Any JMSException resulting from reading the message are allowed to fall through. + public void assignRole(Roles role, Message assignRoleMessage) throws JMSException + { + log.debug("public void assignRole(Roles role, Message assignRoleMessage): called"); + + // Take note of the role to be played. + this.role = role; + + // Extract and retain the test parameters. + numMessages = 1; // assignRoleMessage.getIntProperty("NUM_MESSAGES"); + + // Connect using the test parameters. + connection = TestUtils.createConnection(testProps); + + // Create a circuit end that matches the assigned role and test parameters. + LocalCircuitFactory circuitFactory = new LocalCircuitFactory(); + + switch (role) + { + // Check if the sender role is being assigned, and set up a message producer if so. + case SENDER: + + // Set up the publisher. + circuitEnd = circuitFactory.createPublisherCircuitEnd(connection, testProps, 0L); + + // Create a custom message monitor that will be updated on every message sent. + messageMonitor = new MessageMonitor(); + + break; + + // Otherwise the receivers role is being assigned, so set this up to listen for messages. + case RECEIVER: + + // Set up the receiver. + circuitEnd = circuitFactory.createReceiverCircuitEnd(connection, testProps, 0L); + + // Use the message monitor from the consumer for stats. + messageMonitor = getMessageMonitor(); + + break; + } + + // Reset all messaging stats for the report. + messageMonitor.reset(); + + connection.start(); + } + + /// + /// Performs the test case actions. Returning from here, indicates that the sending role has completed its test. + /// + /// The number of test messages to send. + /// + /// Any JMSException resulting from reading the message are allowed to fall through. + /// + /// Add round robin on destinations where multiple destinations being used. + /// + /// Add rate limiting when rate limit specified on publishers. + /// + /// Add Max pending message size protection. The receiver will have to send back some acks once in a while, + /// to notify the publisher that its messages are being consumed. This makes the safety valve harder to + /// implement than in the single VM case. For example, if the limit is 1000 messages, might want to get back + /// an ack every 500, to notify the publisher that it can keep sending. What about pub/sub tests? Will it be + /// necessary to wait for an ack from every receiver? This will have the effect of rate limiting to slow + /// consumers too. + /// + /// Add commits on every commit batch size boundary. + public void start(int numMessages) throws JMSException + { + log.debug("public void start(): called"); + + // If in the SENDER role, send the specified number of test messages to the circuit destinations. + if (role.equals(Roles.SENDER)) + { + Message testMessage = getSession().createMessage(); + + for (int i = 0; i < numMessages; i++) + { + getProducer().send(testMessage); + + // Increment the message count and timings. + messageMonitor.onMessage(testMessage); + } + } + } + + /// + /// Gets a report on the actions performed by the test case in its assigned role. + /// + /// The controlSession to create the report message in. + /// The report message. + /// + /// Any JMSExceptions resulting from creating the report are allowed to fall through. + public Message getReport(Session session) throws JMSException + { + Message report = session.createMessage(); + report.setStringProperty("CONTROL_TYPE", "REPORT"); + + // Add the count of messages sent/received to the report. + report.setIntProperty("MESSAGE_COUNT", messageMonitor.getNumMessage()); + + // Add the time to send/receive messages to the report. + report.setLongProperty("TEST_TIME", messageMonitor.getTime()); + + // Add any exceptions detected to the report. + + return report; + } + + /// + /// Gets the message producer at this circuit end point. + /// + /// The message producer at with this circuit end point. + public MessageProducer getProducer() + { + return circuitEnd.getProducer(); + } + + /// + /// Gets the message consumer at this circuit end point. + /// + /// The message consumer at this circuit end point. + public MessageConsumer getConsumer() + { + return circuitEnd.getConsumer(); + } + + /// + /// Send the specified message over the producer at this end point. + /// + /// The message to send. + /// + /// Any JMS exception occuring during the send is allowed to fall through. + public void send(Message message) throws JMSException + { + // Send the message on the circuit ends producer. + circuitEnd.send(message); + } + + /// + /// Gets the JMS Session associated with this circuit end point. + /// + /// The JMS Session associated with this circuit end point. + public Session getSession() + { + return circuitEnd.getSession(); + } + + /// + /// Closes the message producers and consumers and the sessions, associated with this circuit end point. + /// + /// Any JMSExceptions occurring during the close are allowed to fall through. + public void close() throws JMSException + { + // Close the producer and consumer. + circuitEnd.close(); + } + + /// + /// Returns the message monitor for reporting on received messages on this circuit end. + /// + /// The message monitor for this circuit end. + public MessageMonitor getMessageMonitor() + { + return circuitEnd.getMessageMonitor(); + } + + /// + /// Returns the exception monitor for reporting on exceptions received on this circuit end. + /// + /// The exception monitor for this circuit end. + public ExceptionMonitor getExceptionMonitor() + { + return circuitEnd.getExceptionMonitor(); + } + } +} \ No newline at end of file Added: incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/distributedcircuit/TestClientControlledTest.csx URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/distributedcircuit/TestClientControlledTest.csx?rev=612874&view=auto ============================================================================== --- incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/distributedcircuit/TestClientControlledTest.csx (added) +++ incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/distributedcircuit/TestClientControlledTest.csx Thu Jan 17 09:13:11 2008 @@ -0,0 +1,104 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using javax.jms.JMSException; +using javax.jms.Message; +using javax.jms.MessageListener; +using javax.jms.Session; + +namespace Apache.Qpid.Integration.Tests.framework.distributedtesting +{ + /// + /// TestClientControlledTest provides an interface that classes implementing test cases to run on a + /// node can use. Implementations must be Java beans, that is, to provide a default constructor and to implement the + /// method. + /// + ///

The methods specified in this interface are called when the receives control instructions to + /// apply to the test. There are control instructions to present the test case with the test invite, so that it may + /// choose whether or not to participate in the test, assign the test to play the sender or receiver role, start the + /// test and obtain the test status report. + /// + ///

+ ///
CRC Card
Responsibilities + ///
Supply the name of the test case that this implements. + ///
Accept/Reject invites based on test parameters. + ///
Adapt to assigned roles. + ///
Perform test case actions. + ///
Generate test reports. + ///
+ ///

+ public interface TestClientControlledTest + { + /// Defines the possible test case roles that an interop test case can take on. + public enum Roles + { + /// Specifies the sender role. + SENDER, + + /// Specifies the receivers role. + RECEIVER + } + + /// + /// Should provide the name of the test case that this class implements. The exact names are defined in the + /// interop testing spec. + /// + /// The name of the test case that this implements. + public string getName(); + + /// + /// Determines whether the test invite that matched this test case is acceptable. + /// + /// The invitation to accept or reject. + /// + /// true to accept the invitation, false to reject it. + /// + /// Any JMSException resulting from reading the message are allowed to fall through. + public bool acceptInvite(Message inviteMessage) throws JMSException; + + /// + /// Assigns the role to be played by this test case. The test parameters are fully specified in the + /// assignment message. When this method return the test case will be ready to execute. + /// + /// The role to be played; sender or receivers. + /// The role assingment message, contains the full test parameters. + /// + /// Any JMSException resulting from reading the message are allowed to fall through. + public void assignRole(Roles role, Message assignRoleMessage) throws JMSException; + + /// + /// Performs the test case actions. Returning from here, indicates that the sending role has completed its test. + /// + /// The number of test messages to send. + /// + /// Any JMSException resulting from reading the message are allowed to fall through. + public void start(int numMessages) throws JMSException; + + /// + /// Gets a report on the actions performed by the test case in its assigned role. + /// + /// The controlSession to create the report message in. + /// + /// The report message. + /// + /// Any JMSExceptions resulting from creating the report are allowed to fall through. + public Message getReport(Session session) throws JMSException; + } +} \ No newline at end of file