Return-Path: Delivered-To: apmail-incubator-qpid-commits-archive@locus.apache.org Received: (qmail 13090 invoked from network); 17 Jan 2008 17:14:19 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 17 Jan 2008 17:14:19 -0000 Received: (qmail 65240 invoked by uid 500); 17 Jan 2008 17:14:08 -0000 Delivered-To: apmail-incubator-qpid-commits-archive@incubator.apache.org Received: (qmail 65217 invoked by uid 500); 17 Jan 2008 17:14:08 -0000 Mailing-List: contact qpid-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: qpid-dev@incubator.apache.org Delivered-To: mailing list qpid-commits@incubator.apache.org Received: (qmail 65208 invoked by uid 99); 17 Jan 2008 17:14:08 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 Jan 2008 09:14:08 -0800 X-ASF-Spam-Status: No, hits=-100.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 Jan 2008 17:13:59 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id A2A991A9842; Thu, 17 Jan 2008 09:13:48 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r612874 [3/5] - in /incubator/qpid/branches/M2.1/dotnet: Qpid.Buffer.Tests/Properties/ Qpid.Buffer/Properties/ Qpid.Client.Tests/Properties/ Qpid.Client.Transport.Socket.Blocking/Properties/ Qpid.Client/Properties/ Qpid.Codec/Properties/ Qp... Date: Thu, 17 Jan 2008 17:13:23 -0000 To: qpid-commits@incubator.apache.org From: rupertlssmith@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080117171348.A2A991A9842@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Added: incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/alljava.csx URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/alljava.csx?rev=612874&view=auto ============================================================================== --- incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/alljava.csx (added) +++ incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/framework/alljava.csx Thu Jan 17 09:13:11 2008 @@ -0,0 +1,7851 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +namespace Apache.Qpid.Integration.Tests.framework +{ + /// + /// Assertion models an assertion on a test . + /// + ///

+ ///
CRC Card
Responsibilities + ///
Indicate whether or not the assertion passes when applied. + ///
+ ///

+ public interface Assertion + { + /// + /// Applies the assertion. + /// + /// true if the assertion passes, false if it fails. + public bool apply(); + } +} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System.Collections.Generic.LinkedList; +using System.Collections.Generic.IList; + +namespace Apache.Qpid.Integration.Tests.framework +{ + /// + /// AssertionBase is a base class for implenmenting assertions. It provides a mechanism to store error messages, and + /// report all error messages when its method is called. + /// + ///

+ ///
CRC Card
Responsibilities Collaborations + ///
Collect error messages. + ///
+ ///

+ public abstract class AssertionBase : Assertion + { + /// Holds the error messages. + IList errors = new LinkedList(); + + /// + /// Adds an error message to the assertion. + /// + /// An error message to add to the assertion. + public void addError(string error) + { + errors.add(error); + } + + /// + /// Prints all of the error messages in the assertion into a string. + /// + /// All of the error messages in the assertion as a string. + public string ToString() + { + string result = ""; + + for (string error : errors) + { + result += error; + } + + return result; + } + } +} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +namespace Apache.Qpid.Integration.Tests.framework +{ + /// + /// BrokerLifecycleAware is an awareness interface implemented by test cases that can run control the life-cycle of + /// the brokers on which they run. Its purpose is to expose additional instrumentation of brokers during testing, that + /// enables tests to use an automated failure mechanism to simulate broker failures, and to re-start failed brokers. + /// + ///

+ ///
CRC Card
Responsibilities Collaborations + ///
Indicate whether or not a test case is using an in-vm broker. + ///
Track which in-vm broker is currently in use. + ///
Accept setting of a failure mechanism. . + ///
+ ///

+ /// + /// Need to think about how to present the brokers through this interface. Thinking numbering the available + /// brokers from 1 will do. Then can kill 1 and assume failing onto 2. Restart 1 and kill 2 and fail back onto + /// 1 again? + public interface BrokerLifecycleAware + { + public void setInVmBrokers(); + + /// + /// Indicates whether or not a test case is using in-vm brokers. + /// + /// true if the test is using in-vm brokers, false otherwise. + public bool usingInVmBroker(); + + /// + /// Sets the currently live in-vm broker. + /// + /// The currently live in-vm broker. + public void setLiveBroker(int i); + + /// + /// Reports the currently live in-vm broker. + /// + /// The currently live in-vm broker. + public int getLiveBroker(); + + /// + /// Accepts a failure mechanism. + /// + /// The failure mechanism. + public void setFailureMechanism(CauseFailure failureMechanism); + } +} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +namespace Apache.Qpid.Integration.Tests.framework +{ + /// + /// CauseFailure provides a method to cause a failure in a messaging broker, usually used in conjunction with fail-over + /// or other failure mode testing. In some cases failures may be automated, for example by shutting down an in-vm broker, + /// or by sending a special control signal to a broker over a network connection. In other cases, it may be preferable + /// to ask a user interactively to cause a failure scenario, in which case an implementation may display a prompt or + /// dialog box asking for notification once the failure has been caused. The purpose of this interface is to abstract + /// the exact cause and nature of a failure out of failure test cases. + /// + ///

+ ///
CRC Card
Responsibilities + ///
Cause messaging broker failure. + ///
+ ///

+ public interface CauseFailure + { + /// Causes the active message broker to fail. + void causeFailure(); + } +} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using Apache.Qpid.Integration.Tests.framework.CauseFailure; + +using java.io.IOException; + +namespace Apache.Qpid.Integration.Tests.framework +{ + /// + /// Causes a message broker failure by interactively prompting the user to cause it. + /// + ///

+ ///
CRC Card
Responsibilities Collaborations + ///
Cause messaging broker failure. + ///
+ ///

+ public class CauseFailureUserPrompt : CauseFailure + { + /// Causes the active message broker to fail. + public void causeFailure() + { + waitForUser("Cause a broker failure now, then press Return."); + } + + /// + /// Outputs a prompt to the console and waits for the user to press return. + /// + /// The prompt to display on the console. + private void waitForUser(string prompt) + { + System.out.println(prompt); + + try + { + System.in.read(); + } + catch (IOException e) + { + // Ignored. + } + + System.out.println("Continuing."); + } + } +} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System.Collections.Generic.IList; + +namespace Apache.Qpid.Integration.Tests.framework +{ + /// + /// A Circuit is the basic test unit against which test cases are to be written. A circuit consists of two 'ends', an + /// instigating 'publisher' end and a more passive 'receivers' end. + /// + ///

Once created, the life-cycle of a circuit may be controlled by ing it, or ing it. + /// Once started, the circuit is ready to send messages over. Once closed the circuit can no longer be used. + /// + ///

The state of the circuit may be taken with the method, and asserted against by the + /// method. + /// + ///

There is a default test procedure which may be performed against the circuit. The outline of this procedure is: + /// + ///

+    /// Start the circuit.
+    /// Send test messages.
+    /// Request a status report.
+    /// Assert conditions on the publishing end of the circuit.
+    /// Assert conditions on the receiving end of the circuit.
+    /// Close the circuit.
+    /// Pass with no failed assertions or fail with a list of failed assertions.
+    /// 
+ /// + ///

+ ///
CRC Card
Responsibilities + ///
Supply the publishing and receiving ends of a test messaging circuit. + ///
Start the circuit running. + ///
Close the circuit down. + ///
Take a reading of the circuits state. + ///
Apply assertions against the circuits state. + ///
Send test messages over the circuit. + ///
Perform the default test procedue on the circuit. + ///
+ ///

+ public interface Circuit + { + /// + /// Gets the interface on the publishing end of the circuit. + /// + /// The publishing end of the circuit. + public Publisher getPublisher(); + + /// + /// Gets the interface on the receiving end of the circuit. + /// + /// The receiving end of the circuit. + public Receiver getReceiver(); + + /// + /// Connects and starts the circuit. After this method is called the circuit is ready to send messages. + public void start(); + + /// + /// Checks the test circuit. The effect of this is to gather the circuits state, for both ends of the circuit, + /// into a report, against which assertions may be checked. + public void check(); + + /// + /// Closes the circuit. All associated resources are closed. + public void close(); + + /// + /// Applied a list of assertions against the test circuit. The method should be called before doing + /// this, to ensure that the circuit has gathered its state into a report to assert against. + /// + /// The list of assertions to apply to the circuit. + /// + /// Any assertions that failed. + public IList applyAssertions(List assertions); + + /// + /// Runs the default test procedure against the circuit, and checks that all of the specified assertions hold. + /// + /// The number of messages to send using the default test procedure. + /// The list of assertions to apply. + /// + /// Any assertions that failed. + public IList test(int numMessages, List assertions); + } +} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using javax.jms.*; + +namespace Apache.Qpid.Integration.Tests.framework +{ + /// + /// A CircuitEnd is a pair consisting of one message producer and one message consumer, that represents one end of a + /// test circuit. It is a standard unit of connectivity allowing a full-duplex conversation to be held, provided both + /// the consumer and producer are instantiated and configured. + /// + ///

+ ///
CRC Card
Responsibilities + ///
Provide a message producer for sending messages. + ///
Provide a message consumer for receiving messages. + ///
+ ///

+ /// + /// Update the so that it accepts these as the basic conversation + /// connection units. + public interface CircuitEnd + { + /// + /// Gets the message producer at this circuit end point. + /// + /// The message producer at with this circuit end point. + public MessageProducer getProducer(); + + /// + /// Gets the message consumer at this circuit end point. + /// + /// The message consumer at this circuit end point. + public MessageConsumer getConsumer(); + + /// + /// Send the specified message over the producer at this end point. + /// + /// The message to send. + /// + /// Any JMS exception occuring during the send is allowed to fall through. + public void send(Message message) throws JMSException; + + /// + /// Gets the JMS Session associated with this circuit end point. + /// + /// The JMS Session associated with this circuit end point. + public Session getSession(); + + /// + /// Closes the message producers and consumers and the sessions, associated with this circuit end point. + /// + /// Any JMSExceptions occurring during the close are allowed to fall through. + public void close() throws JMSException; + + /// + /// Returns the message monitor for reporting on received messages on this circuit end. + /// + /// The message monitor for this circuit end. + public MessageMonitor getMessageMonitor(); + + /// + /// Returns the exception monitor for reporting on exceptions received on this circuit end. + /// + /// The exception monitor for this circuit end. + public ExceptionMonitor getExceptionMonitor(); + } +} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using javax.jms.*; + +namespace Apache.Qpid.Integration.Tests.framework +{ + /// + /// A CircuitEndBase is a pair consisting of one message producer and one message consumer, that represents one end of a + /// test circuit. It is a standard unit of connectivity allowing a full-duplex conversation to be held, provided both + /// the consumer and producer are instantiated and configured. + /// + ///

+ ///
CRC Card
Responsibilities + ///
Provide a message producer for sending messages. + ///
Provide a message consumer for receiving messages. + ///
+ ///

+ public class CircuitEndBase : CircuitEnd + { + /// Holds the single message producer. + MessageProducer producer; + + /// Holds the single message consumer. + MessageConsumer consumer; + + /// Holds the controlSession for the circuit end. + Session session; + + /// Holds the message monitor for the circuit end. + MessageMonitor messageMonitor; + + /// Holds the exception monitor for the circuit end. + ExceptionMonitor exceptionMonitor; + + /// + /// Creates a circuit end point on the specified producer, consumer and controlSession. Monitors are also configured + /// for messages and exceptions received by the circuit end. + /// + /// The message producer for the circuit end point. + /// The message consumer for the circuit end point. + /// The controlSession for the circuit end point. + /// The monitor to notify of all messages received by the circuit end. + /// The monitor to notify of all exceptions received by the circuit end. + public CircuitEndBase(MessageProducer producer, MessageConsumer consumer, Session session, MessageMonitor messageMonitor, + ExceptionMonitor exceptionMonitor) + { + this.producer = producer; + this.consumer = consumer; + this.session = session; + + this.messageMonitor = messageMonitor; + this.exceptionMonitor = exceptionMonitor; + } + + /// + /// Gets the message producer at this circuit end point. + /// + /// The message producer at with this circuit end point. + public MessageProducer getProducer() + { + return producer; + } + + /// + /// Gets the message consumer at this circuit end point. + /// + /// The message consumer at this circuit end point. + public MessageConsumer getConsumer() + { + return consumer; + } + + /// + /// Send the specified message over the producer at this end point. + /// + /// The message to send. + /// Any JMS exception occuring during the send is allowed to fall through. + public void send(Message message) throws JMSException + { + producer.send(message); + } + + /// + /// Gets the JMS Session associated with this circuit end point. + /// + /// The JMS Session associated with this circuit end point. + public Session getSession() + { + return session; + } + + /// + /// Closes the message producers and consumers and the sessions, associated with this circuit end point. + /// + /// Any JMSExceptions occurring during the close are allowed to fall through. + public void close() throws JMSException + { + if (producer != null) + { + producer.close(); + } + + if (consumer != null) + { + consumer.close(); + } + } + + /// + /// Returns the message monitor for reporting on received messages on this circuit end. + /// + /// The message monitor for this circuit end. + public MessageMonitor getMessageMonitor() + { + return messageMonitor; + } + + /// + /// Returns the exception monitor for reporting on exceptions received on this circuit end. + /// + /// The exception monitor for this circuit end. + public ExceptionMonitor getExceptionMonitor() + { + return exceptionMonitor; + } + } +} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +namespace Apache.Qpid.Integration.Tests.framework.clocksynch +{ + /// + /// ClockSynchFailureException represents failure of a to achieve synchronization. For example, + /// this could be because a reference signal is not available, or because a desired accurracy cannot be attained. + /// + ///

+ ///
CRC Card
Responsibilities Collaborations + ///
Represent failure to achieve synchronization. + ///
+ ///

+ public class ClockSynchFailureException extends Exception + { + /// + /// Creates a clock synch failure exception. + /// + /// The detail message (which is saved for later retrieval by the method). + /// The cause (which is saved for later retrieval by the method). (A null + /// value is permitted, and indicates that the cause is nonexistent or unknown.) + public ClockSynchFailureException(string message, Throwable cause) + { + super(message, cause); + } + } +} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +namespace Apache.Qpid.Integration.Tests.framework.clocksynch +{ + /// + /// ClockSynchronizer provides an interface through which two nodes may synchronize their clocks. It is expected that one + /// node will act as the reference clock, to which no delta need be applied, and the other node will act as the slave, + /// and which must apply a delta to its local clock to get a clock synchronized with the reference. + /// + ///

The slave side will initiate the computation of a clock delta by calling the method. This method + /// will not return until the delta has been computed, at which point there is a method to return its value, as well as + /// an estimate of the likely error (usually one standard deviation), in the synchronization. For convenience there is a + /// method to return the value of System.nanoTime() with the delta added in. + /// + ///

+ ///
CRC Card
Responsibilities Collaborations + ///
Trigger a clock synchronization. + ///
Compute a clock delta to apply to the local clock. + ///
Estimate the error in the synchronzation. + ///
+ ///

+ public interface ClockSynchronizer + { + /// + /// The slave side should call this to copute a clock delta with the reference. + /// + /// If synchronization cannot be achieved. + public void synch() throws ClockSynchFailureException; + + /// + /// Gets the clock delta in nano seconds. + /// + /// The clock delta in nano seconds. + public long getDelta(); + + /// + /// Gets an estimate of the clock error in nan seconds. + /// + /// An estimate of the clock error in nan seconds. + public long getEpsilon(); + + /// + /// Gets the local clock time with any computed delta added in. + /// + /// The local clock time with any computed delta added in. + public long nanoTime(); + } +} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using log4net; + +using uk.co.thebadgerset.junit.extensions.ShutdownHookable; +using uk.co.thebadgerset.junit.extensions.Throttle; + +namespace Apache.Qpid.Integration.Tests.framework.clocksynch +{ + /// + /// ClockSynchThread is a convenient utility for running a thread that periodically synchronizes the clock against + /// a reference. Supply it with a and a and it will continually keep the + /// clock up-to-date at a rate determined by the throttle. + /// + ///

+ ///
CRC Card
Responsibilities Collaborations + ///
Continually sychronize the clock at a throttled rate. + ///
+ ///

+ public class ClockSynchThread extends Thread : ShutdownHookable + { + /// Used for debugging. + private static ILog log = LogManager.GetLogger(typeof(ClockSynchThread)); + + /// Holds the clock syncher for the synch thread. + private ClockSynchronizer clockSyncher; + + /// Holds the throttle to limit the synch rate. + private Throttle throttle; + + /// Flag to indicate that the periodic clock syncher should keep running. + bool doSynch = true; + + /// + /// Creates a clock synchronizer thread from a clock synchronizer and a throttle. + /// + /// The clock synchronizer. + /// The throttle. + public ClockSynchThread(ClockSynchronizer syncher, Throttle throttle) + { + this.clockSyncher = syncher; + this.throttle = throttle; + } + + /// Terminates the synchronization thread. + public void terminate() + { + doSynch = false; + } + + /// Continually updates the clock, until is called. + public void run() + { + while (doSynch) + { + // Perform a clock clockSynch. + try + { + // Wait controlled by the throttle before doing the next synch. + throttle.throttle(); + + clockSyncher.synch(); + log.debug("Clock synched, delta = " + clockSyncher.getDelta() + ", epsilon = " + clockSyncher.getEpsilon() + + "."); + } + // Terminate the synch thread if the synchronization cannot be achieved. + catch (ClockSynchFailureException e) + { + log.debug("Cannot synchronize the clock (reference service may be down). Terminating the synch thread."); + doSynch = false; + } + } + } + + /// + /// Gets the clock synchronizer that is kept continually up to date. + /// + /// The clock synchronizer that is kept continually up to date. + public ClockSynchronizer getClockSyncher() + { + return clockSyncher; + } + + /// + /// Supplies a shutdown hook, that terminates the synching thread. + /// + /// The shut down hook. + public Thread getShutdownHook() + { + return new Thread(new Runnable() + { + public void run() + { + doSynch = false; + } + }); + } + } +} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +namespace Apache.Qpid.Integration.Tests.framework.clocksynch +{ + + /// + /// LocalClockSynchronizer is a fake that simply calls System.nanoTime(). It exists so that + /// the same tests can be run distributed or locally, taking timings against the ClockSynchronizer interface without + /// being aware of how they are being run. + /// + ///

+ ///
CRC Card
Responsibilities Collaborations + ///
Supply the local clock with no delta. + ///
+ ///

+ public class LocalClockSynchronizer : ClockSynchronizer + { + /// + /// The slave side should call this to copute a clock delta with the reference. + /// + /// If synchronization cannot be achieved. + public void synch() throws ClockSynchFailureException + { } + + /// + /// Gets the clock delta in nano seconds. + /// + /// The clock delta in nano seconds. + public long getDelta() + { + return 0L; + } + + /// + /// Gets an estimate of the clock error in nan seconds. + /// + /// An estimate of the clock error in nan seconds. + public long getEpsilon() + { + return 0L; + } + + /// + /// Gets the local clock time with any computed delta added in. + /// + /// The local clock time with any computed delta added in. + public long nanoTime() + { + return System.nanoTime(); + } + } +} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using log4net; + +using uk.co.thebadgerset.junit.extensions.ShutdownHookable; + +using java.io.IOException; +using java.net.*; +using java.nio.ByteBuffer; + +namespace Apache.Qpid.Integration.Tests.framework.clocksynch +{ + /// + /// UDPClockReference supplies a refernce clock signal (generated from System.nanoTime()). + /// + ///

+ ///
CRC Card
Responsibilities Collaborations + ///
Supply a reference clock signal. + ///
+ ///

+ /// + /// Port hard coded. Make configurable. + /// + /// Errors rethrown as runtimes, or silently terminate the service. Could add better error handling if needed. + public class UDPClockReference : Runnable, ShutdownHookable + { + /// Used for debugging. + // private static ILog log = LogManager.GetLogger(typeof(UDPClockReference)); + + /// Defines the timeout to use when polling the socket for time requests. + private static final int TIMEOUT = 200; + + /// Defines the port to run the clock reference on. + public static final int REFERENCE_PORT = 4444; + + /// Holds the socket to receive clock reference requests on. + protected DatagramSocket socket = null; + + /// Flag used to indicate that the time server should keep running. Set to false to terminate. + protected bool publish = true; + + /// Creates a clock reference service on the standard port. + public UDPClockReference() + { + try + { + socket = new DatagramSocket(REFERENCE_PORT); + socket.setSoTimeout(TIMEOUT); + } + catch (SocketException e) + { + throw new RuntimeException(e); + } + } + + /// + /// Implements the run loop for this reference time server. This waits for incoming time requests, and replies to + /// any, with a message with the local time stamp in it. Periodically (controlled by ), the run + /// loop will check if the flag has been cleared, and terminate the reference time service if so. + /// + public void run() + { + byte[] buf = new byte[256]; + ByteBuffer bbuf = ByteBuffer.wrap(buf); + + while (publish) + { + try + { + // Wait for a reference time request. + DatagramPacket packet = new DatagramPacket(buf, buf.length); + bool timedOut = false; + + try + { + socket.receive(packet); + } + catch (SocketTimeoutException e) + { + timedOut = true; + } + + if (!timedOut) + { + // Work out from the received packet, where to reply to. + InetAddress address = packet.getAddress(); + int port = packet.getPort(); + + // Respond to the time request by sending back the local clock as the reference time. + bbuf.putLong(System.nanoTime()); + bbuf.flip(); + packet = new DatagramPacket(bbuf.array(), bbuf.capacity(), address, port); + + socket.send(packet); + } + } + catch (IOException e) + { + publish = false; + } + } + + socket.close(); + } + + /// + /// Supplies a shutdown hook. + /// + /// The shut down hook. + public Thread getShutdownHook() + { + return new Thread(new Runnable() + { + public void run() + { + publish = false; + } + }); + } + + /// + /// For testing purposes. Runs a reference clock on the default port. + /// + /// None. + public static void main(String[] args) + { + try + { + // Create the clock reference service. + UDPClockReference clock = new UDPClockReference(); + + // Set up a shutdown hook for it. + Runtime.getRuntime().addShutdownHook(clock.getShutdownHook()); + + // Start the service. + clock.run(); + } + catch (Exception e) + { + e.printStackTrace(); + System.exit(1); + } + } + } +} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using log4net; + +using uk.co.thebadgerset.junit.extensions.util.CommandLineParser; +using uk.co.thebadgerset.junit.extensions.util.ParsedProperties; + +using java.io.IOException; +using java.net.*; +using java.nio.ByteBuffer; +using java.util.Arrays; + +namespace Apache.Qpid.Integration.Tests.framework.clocksynch +{ + /// + /// UDPClockSynchronizer is a that sends pings as UDP datagrams, and uses the following simple + /// algorithm to perform clock synchronization: + /// + ///
    + ///
  1. Slave initiates synchronization with a Reference clock.
  2. + ///
  3. Slave stamps current local time on a "time request" message and sends to the Reference.
  4. + ///
  5. Upon receipt by Reference, Reference stamps Reference-time and returns.
  6. + ///
  7. Upon receipt by Slave, Slave subtracts current time from sent time and divides by two to compute latency. It + /// subtracts current time from Reference time to determine Slave-Reference time delta and adds in the + /// half-latency to get the correct clock delta.
  8. + ///
  9. The first result is immediately used to update the clock since it will get the local clock into at least + /// the right ballpark.
  10. + ///
  11. The Slave repeats steps 2 through 4, 15 more times.
  12. + ///
  13. The results of the packet receipts are accumulated and sorted in lowest-latency to highest-latency order. The + /// median latency is determined by picking the mid-point sample from this ordered list.
  14. + ///
  15. All samples outside 1 standard-deviation from the median are discarded and the remaining samples + /// are averaged using an arithmetic mean.
  16. + ///
+ /// + ///

The use of UDP datagrams, instead of TCP based communication eliminates the hidden delays that TCP can introduce, + /// as it can transparently re-order or re-send packets, or introduce delays as packets are naggled. + /// + ///

+ ///
CRC Card
Responsibilities Collaborations + ///
Trigger a clock synchronziation. + ///
Compute a clock delta to apply to the local clock. + ///
Estimate the error in the synchronzation. + ///
+ ///

+ public class UDPClockSynchronizer : ClockSynchronizer + { + /// Used for debugging. + // private static ILog log = LogManager.GetLogger(typeof(UDPClockSynchronizer)); + + /// Defines the timeout to use when waiting for responses to time requests. + private static final int TIMEOUT = 50; + + /// The clock delta. + private long delta = 0L; + + /// Holds an estimate of the clock error relative to the reference clock. + private long epsilon = 0L; + + /// Holds the address of the reference clock. + private InetAddress referenceAddress; + + /// Holds the socket to communicate with the reference service over. + private DatagramSocket socket; + + /// Used to control the shutdown in the main test loop. + private static bool doSynch = true; + + /// + /// Creates a clock synchronizer against the specified address for the reference. + /// + /// The address of the reference service. + public UDPClockSynchronizer(string address) + { + try + { + referenceAddress = InetAddress.getByName(address); + } + catch (UnknownHostException e) + { + throw new RuntimeException(e); + } + } + + /// + /// The slave side should call this to compute a clock delta with the reference. + /// + /// If synchronization cannot be achieved, due to unavailability of the reference + /// time service. + public void synch() throws ClockSynchFailureException + { + try + { + socket = new DatagramSocket(); + socket.setSoTimeout(TIMEOUT); + + // Synchronize on a single ping, to get the clock into the right ball-park. + synch(1); + + // Synchronize on 15 pings. + synch(15); + + // And again, for greater accuracy, on 31. + synch(31); + + socket.close(); + } + catch (SocketException e) + { + throw new RuntimeException(e); + } + } + + /// + /// Updates the synchronization delta by performing the specified number of reference clock requests. + /// + /// The number of reference clock request cycles to perform. + /// + /// If synchronization cannot be achieved, due to unavailability of the reference + /// time service. + protected void synch(int n) throws ClockSynchFailureException + { + // log.debug("protected void synch(int n = " + n + "): called"); + + // Create an array of deltas by performing n reference pings. + long[] delta = new long[n]; + + for (int i = 0; i < n; i++) + { + delta[i] = ping(); + } + + // Reject any deltas that are larger than 1 s.d. above the median. + long median = median(delta); + long sd = standardDeviation(delta); + + // log.debug("median = " + median); + // log.debug("sd = " + sd); + + long[] tempDeltas = new long[n]; + int count = 0; + + for (int i = 0; i < n; i++) + { + if ((delta[i] <= (median + sd)) && (delta[i] >= (median - sd))) + { + tempDeltas[count] = delta[i]; + count++; + } + else + { + // log.debug("Rejected: " + delta[i]); + } + } + + System.arraycopy(tempDeltas, 0, delta, 0, count); + + // Estimate the delta as the mean of the remaining deltas. + this.delta += mean(delta); + + // Estimate the error as the standard deviation of the remaining deltas. + this.epsilon = standardDeviation(delta); + + // log.debug("this.delta = " + this.delta); + // log.debug("this.epsilon = " + this.epsilon); + } + + /// + /// Performs a single reference clock request cycle and returns the estimated delta relative to the local clock. + /// This is computed as the half-latency of the requst cycle, plus the reference clock, minus the local clock. + /// + /// The estimated clock delta. + /// + /// If the reference service is not responding. + protected long ping() throws ClockSynchFailureException + { + // log.debug("protected long ping(): called"); + + try + { + byte[] buf = new byte[256]; + + bool timedOut = false; + long start = 0L; + long refTime = 0L; + long localTime = 0L; + long latency = 0L; + int failCount = 0; + + // Keep trying the ping until it gets a response, or 10 tries in a row all time out. + do + { + // Start timing the request latency. + start = nanoTime(); + + // Get the reference time. + DatagramPacket packet = + new DatagramPacket(buf, buf.length, referenceAddress, UDPClockReference.REFERENCE_PORT); + socket.send(packet); + packet = new DatagramPacket(buf, buf.length); + + timedOut = false; + + try + { + socket.receive(packet); + } + catch (SocketTimeoutException e) + { + timedOut = true; + failCount++; + + continue; + } + + ByteBuffer bbuf = ByteBuffer.wrap(packet.getData()); + refTime = bbuf.getLong(); + + // Stop timing the request latency. + localTime = nanoTime(); + latency = localTime - start; + + // log.debug("refTime = " + refTime); + // log.debug("localTime = " + localTime); + // log.debug("start = " + start); + // log.debug("latency = " + latency); + // log.debug("delta = " + ((latency / 2) + (refTime - localTime))); + + } + while (timedOut && (failCount < 10)); + + // Fail completely if the fail count is too high. + if (failCount >= 10) + { + throw new ClockSynchFailureException("Clock reference not responding.", null); + } + + // Estimate delta as (ref clock + half-latency) - local clock. + return (latency / 2) + (refTime - localTime); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + + /// + /// Gets the clock delta in nano seconds. + /// + /// The clock delta in nano seconds. + public long getDelta() + { + return delta; + } + + /// + /// Gets an estimate of the clock error in nan seconds. + /// + /// An estimate of the clock error in nan seconds. + public long getEpsilon() + { + return epsilon; + } + + /// + /// Gets the local clock time with any computed delta added in. + /// + /// The local clock time with any computed delta added in. + public long nanoTime() + { + return System.nanoTime() + delta; + } + + /// + /// Computes the median of a series of values. + /// + /// The values. + /// + /// The median. + public static long median(long[] values) + { + // log.debug("public static long median(long[] values = " + Arrays.ToString(values) + "): called"); + + long median; + + // Order the list of values. + long[] orderedValues = new long[values.length]; + System.arraycopy(values, 0, orderedValues, 0, values.length); + Arrays.sort(orderedValues); + + // Check if the median is computed from a pair of middle value. + if ((orderedValues.length % 2) == 0) + { + int middle = orderedValues.length / 2; + + median = (orderedValues[middle] + orderedValues[middle - 1]) / 2; + } + // The median is computed from a single middle value. + else + { + median = orderedValues[orderedValues.length / 2]; + } + + // log.debug("median = " + median); + + return median; + } + + /// + /// Computes the mean of a series of values. + /// + /// The values. + /// + /// The mean. + public static long mean(long[] values) + { + // log.debug("public static long mean(long[] values = " + Arrays.ToString(values) + "): called"); + + long total = 0L; + + for (long value : values) + { + total += value; + } + + long mean = total / values.length; + + // log.debug("mean = " + mean); + + return mean; + } + + /// + /// Computes the variance of series of values. + /// + /// The values. + /// + /// The variance of the values. + public static long variance(long[] values) + { + // log.debug("public static long variance(long[] values = " + Arrays.ToString(values) + "): called"); + + long mean = mean(values); + + long totalVariance = 0; + + for (long value : values) + { + long diff = (value - mean); + totalVariance += diff/// diff; + } + + long variance = totalVariance / values.length; + + // log.debug("variance = " + variance); + + return variance; + } + + /// + /// Computes the standard deviation of a series of values. + /// + /// The values. + /// + /// The standard deviation. + public static long standardDeviation(long[] values) + { + // log.debug("public static long standardDeviation(long[] values = " + Arrays.ToString(values) + "): called"); + + long sd = Double.valueOf(Math.sqrt(variance(values))).longValue(); + + // log.debug("sd = " + sd); + + return sd; + } + + /// + /// For testing purposes. Supply address of reference clock as arg 1. + /// + /// Address of reference clock as arg 1. + public static void main(String[] args) + { + ParsedProperties options = + new ParsedProperties(CommandLineParser.processCommandLine(args, + new CommandLineParser( + new String[][] + { + { "1", "Address of clock reference service.", "address", "true" } + }), System.getProperties())); + + string address = options.getProperty("1"); + + // Create a clock synchronizer. + UDPClockSynchronizer clockSyncher = new UDPClockSynchronizer(address); + + // Set up a shutdown hook for it. + Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() + { + public void run() + { + doSynch = false; + } + })); + + // Repeat the clock synching until the user kills the progam. + while (doSynch) + { + // Perform a clock clockSynch. + try + { + clockSyncher.synch(); + + // Print out the clock delta and estimate of the error. + System.out.println("Delta = " + clockSyncher.getDelta()); + System.out.println("Epsilon = " + clockSyncher.getEpsilon()); + + try + { + Thread.sleep(250); + } + catch (InterruptedException e) + { + // Restore the interrupted status and terminate the loop. + Thread.currentThread().interrupt(); + doSynch = false; + } + } + // Terminate if the reference time service is unavailable. + catch (ClockSynchFailureException e) + { + doSynch = false; + } + } + } + } +} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using log4net; + +using Apache.Qpid.Integration.Tests.framework.*; +using org.apache.qpid.util.ConversationFactory; + +using uk.co.thebadgerset.junit.extensions.TimingController; +using uk.co.thebadgerset.junit.extensions.TimingControllerAware; +using uk.co.thebadgerset.junit.extensions.util.ParsedProperties; + +using javax.jms.Destination; +using javax.jms.JMSException; +using javax.jms.Message; +using javax.jms.Session; + +using System.Collections.Generic.LinkedList; +using System.Collections.Generic.IList; + +namespace Apache.Qpid.Integration.Tests.framework.distributedcircuit +{ + /// + /// DistributedCircuitImpl is a distributed implementation of the test . Many publishers and receivers + /// accross multiple machines may be combined to form a single test circuit. The test circuit extracts reports from + /// all of its publishers and receivers, and applies its assertions to these reports. + /// + ///

+ ///
CRC Card
Responsibilities Collaborations + ///
Supply the publishing and receiving ends of a test messaging circuit. + ///
Start the circuit running. + ///
Close the circuit down. + ///
Take a reading of the circuits state. + ///
Apply assertions against the circuits state. + ///
Send test messages over the circuit. + ///
Perform the default test procedue on the circuit. + ///
+ ///

+ /// + /// There is a short pause after receiving sender reports before asking for receiver reports, because receivers may + /// not have finished receiving all their test messages before the report request arrives. This is going to be a + /// problem for taking test timings and needs to be eliminiated. Suggested solution: have receiver send back reports + /// asynchronously, on test batch size boundaries, and do so automatically rather than having to have the report + /// request sent to them. Number each test run, or otherwise uniquely identify it, when a receiver does not get + /// any more messages on a test run for more than a timeout, it can assume the test is complete and send a final + /// report. On the coordinator end a future will need to be created to wait for all final reports to come in, and + /// to register results and timings for the test. This must work in such a way that a new test cycle can be started + /// without waiting for the results of the old one to come in. + /// + /// Add in setting of timing controller, from timing aware test cases. + public class DistributedCircuitImpl : Circuit, TimingControllerAware + { + /// Used for debugging purposes. + private static ILog log = LogManager.GetLogger(typeof(DistributedCircuitImpl)); + + /// Holds the conversation factory over which to coordinate the test. + protected ConversationFactory conversationFactory; + + /// Holds the controlSession over which to hold the control conversation. + protected Session controlSession; + + /// Holds the sender nodes in the test circuit. + protected IList senders; + + /// Holds the receiver nodes in the test circuit. + protected IList receivers; + + /// Holds the sender control conversations. + protected ConversationFactory.Conversation[] senderConversation; + + /// Holds the receiver control conversations. + protected ConversationFactory.Conversation[] receiverConversation; + + /// Holds the control topics for the senders in the test circuit. + protected Destination[] senderControlTopic; + + /// Holds the control topics for the receivers in the test circuit. + protected Destination[] receiverControlTopic; + + /// Holds the number of messages to send per test run. + protected int numMessages; + + /// + /// Holds the timing controller for the circuit. This is used to log test times asynchronously, when reciever nodes + /// return their reports after senders have completed a test case. + TimingController timingController; + + /// + /// Creates a distributed test circuit on the specified senders and receivers. + /// + /// The controlSession for all control conversations. + /// The senders. + /// The receivers. + /// A control conversation with the senders. + /// A control conversation with the receivers. + /// The senders control topic. + /// The receivers control topic. + protected DistributedCircuitImpl(Session session, IList senders, List receivers, + ConversationFactory.Conversation[] senderConversation, ConversationFactory.Conversation[] receiverConversation, + Destination[] senderControlTopic, Destination[] receiverControlTopic) + { + this.controlSession = session; + this.senders = senders; + this.receivers = receivers; + this.senderConversation = senderConversation; + this.receiverConversation = receiverConversation; + this.senderControlTopic = senderControlTopic; + this.receiverControlTopic = receiverControlTopic; + } + + /// + /// Creates a distributed test circuit from the specified test parameters, on the senders and receivers + /// given. + /// + /// The test parameters. + /// The sender ends in the test circuit. + /// The receiver ends in the test circuit. + /// A conversation factory for creating the control conversations with senders and receivers. + /// + /// A connected and ready to start, test circuit. + public static Circuit createCircuit(ParsedProperties testProps, IList senders, + IList receivers, ConversationFactory conversationFactory) + { + log.debug("public static Circuit createCircuit(ParsedProperties testProps, IList senders, " + + " IList receivers, ConversationFactory conversationFactory)"); + + try + { + Session session = conversationFactory.getSession(); + + // Create control conversations with each of the senders. + ConversationFactory.Conversation[] senderConversation = new ConversationFactory.Conversation[senders.size()]; + Destination[] senderControlTopic = new Destination[senders.size()]; + + for (int i = 0; i < senders.size(); i++) + { + TestClientDetails sender = senders.get(i); + + senderControlTopic[i] = session.createTopic(sender.privateControlKey); + senderConversation[i] = conversationFactory.startConversation(); + } + + log.debug("Sender conversations created."); + + // Create control conversations with each of the receivers. + ConversationFactory.Conversation[] receiverConversation = new ConversationFactory.Conversation[receivers.size()]; + Destination[] receiverControlTopic = new Destination[receivers.size()]; + + for (int i = 0; i < receivers.size(); i++) + { + TestClientDetails receiver = receivers.get(i); + + receiverControlTopic[i] = session.createTopic(receiver.privateControlKey); + receiverConversation[i] = conversationFactory.startConversation(); + } + + log.debug("Receiver conversations created."); + + // Assign the sender role to each of the sending test clients. + for (int i = 0; i < senders.size(); i++) + { + TestClientDetails sender = senders.get(i); + + Message assignSender = conversationFactory.getSession().createMessage(); + TestUtils.setPropertiesOnMessage(assignSender, testProps); + assignSender.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE"); + assignSender.setStringProperty("ROLE", "SENDER"); + + senderConversation[i].send(senderControlTopic[i], assignSender); + } + + log.debug("Sender role assignments sent."); + + // Assign the receivers role to each of the receiving test clients. + for (int i = 0; i < receivers.size(); i++) + { + TestClientDetails receiver = receivers.get(i); + + Message assignReceiver = session.createMessage(); + TestUtils.setPropertiesOnMessage(assignReceiver, testProps); + assignReceiver.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE"); + assignReceiver.setStringProperty("ROLE", "RECEIVER"); + + receiverConversation[i].send(receiverControlTopic[i], assignReceiver); + } + + log.debug("Receiver role assignments sent."); + + // Wait for the senders and receivers to confirm their roles. + for (int i = 0; i < senders.size(); i++) + { + senderConversation[i].receive(); + } + + log.debug("Got all sender role confirmations"); + + for (int i = 0; i < receivers.size(); i++) + { + receiverConversation[i].receive(); + } + + log.debug("Got all receiver role confirmations"); + + // Package everything up as a circuit. + return new DistributedCircuitImpl(session, senders, receivers, senderConversation, receiverConversation, + senderControlTopic, receiverControlTopic); + } + catch (JMSException e) + { + throw new RuntimeException("JMSException not handled."); + } + } + + /// + /// Used by tests cases that can supply a to set the + /// controller on an aware test. + /// + /// The timing controller. + public void setTimingController(TimingController controller) + { + this.timingController = controller; + } + + /// + /// Gets the interface on the publishing end of the circuit. + /// + /// The publishing end of the circuit. + public Publisher getPublisher() + { + throw new RuntimeException("Not Implemented."); + } + + /// + /// Gets the interface on the receiving end of the circuit. + /// + /// The receiving end of the circuit. + public Receiver getReceiver() + { + throw new RuntimeException("Not Implemented."); + } + + /// + /// Connects and starts the circuit. After this method is called the circuit is ready to send messages. + public void start() + { + log.debug("public void start(): called"); + + try + { + // Start the test on each of the senders. + Message start = controlSession.createMessage(); + start.setStringProperty("CONTROL_TYPE", "START"); + start.setIntProperty("MESSAGE_COUNT", numMessages); + + for (int i = 0; i < senders.size(); i++) + { + senderConversation[i].send(senderControlTopic[i], start); + } + + log.debug("All senders told to start their tests."); + } + catch (JMSException e) + { + throw new RuntimeException("Unhandled JMSException.", e); + } + } + + /// + /// Checks the test circuit. The effect of this is to gather the circuits state, for both ends of the circuit, + /// into a report, against which assertions may be checked. + /// + /// Replace the asynch receiver report thread with a choice of direct or asynch executor, so that asynch + /// or synch logging of test timings is optional. Also need to provide an onMessage method that is capable + /// of receiving timing reports that receivers will generate during an ongoing test, on the test sample + /// size boundaries. The message timing logging code should be factored out as a common method that can + /// be called in response to the final report responses, or the onMessage method. Another alternative is + /// to abandon the final report request altogether and just use the onMessage method? I think the two + /// differ though, as the final report is used to apply assertions, and the ongoing report is just for + /// periodic timing results... In which case, maybe there needs to be a way for the onMessage method + /// to process just some of the incoming messages, and forward the rest on to the conversion helper, as + /// a sort of pre-conversation helper filter? Make conversation expose its onMessage method (it should + /// already) and allow another delivery thread to filter the incoming messages to the conversation. + public void check() + { + log.debug("public void check(): called"); + + try + { + // Wait for all the test senders to return their reports. + for (int i = 0; i < senders.size(); i++) + { + Message senderReport = senderConversation[i].receive(); + log.debug("Sender " + senderReport.getStringProperty("CLIENT_NAME") + " reports message count: " + + senderReport.getIntProperty("MESSAGE_COUNT")); + log.debug("Sender " + senderReport.getStringProperty("CLIENT_NAME") + " reports message time: " + + senderReport.getLongProperty("TEST_TIME")); + } + + log.debug("Got all sender test reports."); + + // Apply sender assertions to pass/fail the tests. + + // Inject a short pause to give the receivers time to finish receiving their test messages. + TestUtils.pause(500); + + // Ask the receivers for their reports. + Message statusRequest = controlSession.createMessage(); + statusRequest.setStringProperty("CONTROL_TYPE", "STATUS_REQUEST"); + + for (int i = 0; i < receivers.size(); i++) + { + receiverConversation[i].send(receiverControlTopic[i], statusRequest); + } + + log.debug("All receiver test reports requested."); + + // Wait for all receiver reports to come in, but do so asynchronously. + Runnable gatherAllReceiverReports = + new Runnable() + { + public void run() + { + try + { + // Wait for all the receivers to send their reports. + for (int i = 0; i < receivers.size(); i++) + { + Message receiverReport = receiverConversation[i].receive(); + + string clientName = receiverReport.getStringProperty("CLIENT_NAME"); + int messageCount = receiverReport.getIntProperty("MESSAGE_COUNT"); + long testTime = receiverReport.getLongProperty("TEST_TIME"); + + log.debug("Receiver " + clientName + " reports message count: " + messageCount); + log.debug("Receiver " + receiverReport.getStringProperty("CLIENT_NAME") + + " reports message time: " + testTime); + + // Apply receiver assertions to pass/fail the tests. + + // Log the test timings on the asynchronous test timing controller. + /*try + { + timingController.completeTest(true, messageCount, testTime); + } + // The timing controll can throw InterruptedException is the current test is to be + // interrupted. + catch (InterruptedException e) + { + e.printStackTrace(); + }*/ + } + + log.debug("All receiver test reports received."); + } + catch (JMSException e) + { + throw new RuntimeException(e); + } + } + }; + + Thread receiverReportsThread = new Thread(gatherAllReceiverReports); + receiverReportsThread.start(); + + // return new Message[] { senderReport, receiverReport }; + + } + catch (JMSException e) + { + throw new RuntimeException("Unhandled JMSException.", e); + } + } + + /// Closes the circuit. All associated resources are closed. + public void close() + { + log.debug("public void close(): called"); + + // End the current test on all senders and receivers. + } + + /// + /// Applies a list of assertions against the test circuit. The method should be called before doing + /// this, to ensure that the circuit has gathered its state into a report to assert against. + /// + /// The list of assertions to apply. + /// + /// Any assertions that failed. + public IList applyAssertions(List assertions) + { + log.debug("public IList applyAssertions(List assertions = " + assertions + "): called"); + + IList failures = new LinkedList(); + + for (Assertion assertion : assertions) + { + if (!assertion.apply()) + { + failures.add(assertion); + } + } + + return failures; + } + + /// + /// Runs the default test procedure against the circuit, and checks that all of the specified assertions hold. + /// + /// The number of messages to send using the default test procedure. + /// The list of assertions to apply. + /// + /// Any assertions that failed. + /// + /// From check onwards needs to be handled as a future. The future must call back onto the test case to + /// report results asynchronously. + public IList test(int numMessages, List assertions) + { + log.debug("public IList test(int numMessages = " + numMessages + ", List assertions = " + + assertions + "): called"); + + // Keep the number of messages to send per test run, where the send method can reference it. + this.numMessages = numMessages; + + // Start the test running on all sender circuit ends. + start(); + + // Request status reports to be handed in. + check(); + + // Assert conditions on the publishing end of the circuit. + // Assert conditions on the receiving end of the circuit. + IList failures = applyAssertions(assertions); + + // Close the circuit ending the current test case. + close(); + + // Pass with no failed assertions or fail with a list of failed assertions. + return failures; + } + } +} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using Apache.Qpid.Integration.Tests.framework.Assertion; +using Apache.Qpid.Integration.Tests.framework.Publisher; + +using uk.co.thebadgerset.junit.extensions.util.ParsedProperties; + +namespace Apache.Qpid.Integration.Tests.framework.distributedcircuit +{ + /// + /// DistributedPublisherImpl represents the status of the publishing side of a test circuit. Its main purpose is to + /// provide assertions that can be applied to verify the behaviour of a non-local publisher. + /// + ///

+ ///
CRC Card
Responsibilities Collaborations + ///
Provide assertion that the publishers received no exceptions. + ///
Provide assertion that the publishers received a no consumers error code on every message. + ///
Provide assertion that the publishers received a no route error code on every message. + ///
+ ///

+ public class DistributedPublisherImpl : Publisher + { + /// + /// Provides an assertion that the publisher encountered no exceptions. + /// + /// The test configuration properties. + /// An assertion that the publisher encountered no exceptions. + public Assertion noExceptionsAssertion(ParsedProperties testProps) + { + throw new RuntimeException("Not implemented."); + } + + /// + /// Provides an assertion that the publisher got a no consumers exception on every message. + /// + /// An assertion that the publisher got a no consumers exception on every message. + public Assertion noConsumersAssertion() + { + throw new RuntimeException("Not implemented."); + } + + /// + /// Provides an assertion that the publisher got a no rout exception on every message. + /// + /// An assertion that the publisher got a no rout exception on every message. + public Assertion noRouteAssertion() + { + throw new RuntimeException("Not implemented."); + } + + /// [... 5804 lines stripped ...]