flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [34/39] git commit: [FLINK-909] Remove additional empty (and non empty for iterative broadcast variables) superstep. [FLINK-945] Fix early memory release in iterations
Date Sat, 09 Aug 2014 12:40:05 GMT
[FLINK-909]  Remove additional empty (and non empty for iterative broadcast variables) superstep.
[FLINK-945]  Fix early memory release in iterations


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/76a48df0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/76a48df0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/76a48df0

Branch: refs/heads/travis_test
Commit: 76a48df00f0fc202bdedefb33d576fd9a18a92c0
Parents: 71cccd6
Author: Stephan Ewen <sewen@apache.org>
Authored: Thu Aug 7 22:00:51 2014 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Thu Aug 7 22:26:09 2014 +0200

----------------------------------------------------------------------
 .../concurrent/SuperstepKickoffLatch.java       |  65 +++++
 .../concurrent/SuperstepKickoffLatchBroker.java |  32 +++
 .../task/AbstractIterativePactTask.java         |  41 +--
 .../iterative/task/IterationHeadPactTask.java   |  21 +-
 .../task/IterationIntermediatePactTask.java     |  31 ++-
 .../iterative/task/IterationTailPactTask.java   |  38 +--
 .../concurrent/SuperstepKickoffLatchTest.java   | 247 +++++++++++++++++++
 7 files changed, 404 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76a48df0/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatch.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatch.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatch.java
new file mode 100644
index 0000000..b53928c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatch.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.iterative.concurrent;
+
+public class SuperstepKickoffLatch {
+	
+	private final Object monitor = new Object();
+	
+	private int superstepNumber = 1;
+	
+	private boolean terminated;
+	
+	public void triggerNextSuperstep() {
+		synchronized (monitor) {
+			if (terminated) {
+				throw new IllegalStateException("Already teriminated.");
+			}
+			superstepNumber++;
+			monitor.notifyAll();
+		}
+	}
+	
+	public void signalTermination() {
+		synchronized (monitor) {
+			terminated = true;
+			monitor.notifyAll();
+		}
+	}
+	
+	public boolean awaitStartOfSuperstepOrTermination(int superstep) throws InterruptedException
{
+		while (true) {
+			synchronized (monitor) {
+				if (terminated) {
+					return true;
+				}
+				else if (superstepNumber == superstep) {
+					// reached the superstep. all good!
+					return false;
+				}
+				else if (superstepNumber == superstep - 1) {
+					monitor.wait(2000);
+				}
+				else {
+					throw new IllegalStateException("Error while waiting for start of next superstep. current=
" + superstepNumber + " waitingFor=" + superstep);
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76a48df0/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatchBroker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatchBroker.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatchBroker.java
new file mode 100644
index 0000000..41f6985
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatchBroker.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.iterative.concurrent;
+
+public class SuperstepKickoffLatchBroker extends Broker<SuperstepKickoffLatch> {
+
+	private static final SuperstepKickoffLatchBroker INSTANCE = new SuperstepKickoffLatchBroker();
+
+	private SuperstepKickoffLatchBroker() {}
+
+
+	public static Broker<SuperstepKickoffLatch> instance() {
+		return INSTANCE;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76a48df0/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
index 636c492..4c03278 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
@@ -210,28 +210,17 @@ public abstract class AbstractIterativePactTask<S extends Function,
OT> extends
 		return this.iterationAggregators;
 	}
 
-	protected void checkForTerminationAndResetEndOfSuperstepState() throws IOException {
+	protected void verifyEndOfSuperstepState() throws IOException {
 		// sanity check that there is at least one iterative input reader
 		if (this.iterativeInputs.length == 0 && this.iterativeBroadcastInputs.length ==
0) {
-			throw new IllegalStateException();
+			throw new IllegalStateException("Error: Iterative task without a single iterative input.");
 		}
 
-		// check whether this step ended due to end-of-superstep, or proper close
-		boolean anyClosed = false;
-		boolean allClosed = true;
-
 		for (int inputNum : this.iterativeInputs) {
 			MutableReader<?> reader = this.inputReaders[inputNum];
 
-			if (reader.isInputClosed()) {
-				anyClosed = true;
-			}
-			else {
-				// check if reader has reached the end of superstep, or if the operation skipped out
early
+			if (!reader.isInputClosed()) {
 				if (reader.hasReachedEndOfSuperstep()) {
-					allClosed = false;
-					
-					// also reset the end-of-superstep state
 					reader.startNextSuperstep();
 				}
 				else {
@@ -241,11 +230,7 @@ public abstract class AbstractIterativePactTask<S extends Function,
OT> extends
 					Object o = this.inputSerializers[inputNum].getSerializer().createInstance();
 					while ((o = inIter.next(o)) != null);
 					
-					if (reader.isInputClosed()) {
-						anyClosed = true;
-					} else {
-						allClosed = false;
-						
+					if (!reader.isInputClosed()) {
 						// also reset the end-of-superstep state
 						reader.startNextSuperstep();
 					}
@@ -256,28 +241,16 @@ public abstract class AbstractIterativePactTask<S extends Function,
OT> extends
 		for (int inputNum : this.iterativeBroadcastInputs) {
 			MutableReader<?> reader = this.broadcastInputReaders[inputNum];
 
-			if (reader.isInputClosed()) {
-				anyClosed = true;
-			}
-			else {
-				// sanity check that the BC input is at the end of teh superstep
+			if (!reader.isInputClosed()) {
+				
+				// sanity check that the BC input is at the end of the superstep
 				if (!reader.hasReachedEndOfSuperstep()) {
 					throw new IllegalStateException("An iterative broadcast input has not been fully consumed.");
 				}
 				
-				allClosed = false;
 				reader.startNextSuperstep();
 			}
 		}
-
-		// sanity check whether we saw the same state (end-of-superstep or termination) on all
inputs
-		if (allClosed != anyClosed) {
-			throw new IllegalStateException("Inconsistent state: Iteration termination received on
some, but not all inputs.");
-		}
-
-		if (allClosed) {
-			requestTermination();
-		}
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76a48df0/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
index 797bbb6..3dbd47c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
@@ -41,6 +41,8 @@ import org.apache.flink.runtime.iterative.concurrent.SolutionSetBroker;
 import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrier;
 import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrierBroker;
 import org.apache.flink.runtime.iterative.concurrent.SuperstepBarrier;
+import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatch;
+import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatchBroker;
 import org.apache.flink.runtime.iterative.event.AllWorkersDoneEvent;
 import org.apache.flink.runtime.iterative.event.TerminationEvent;
 import org.apache.flink.runtime.iterative.event.WorkerDoneEvent;
@@ -134,8 +136,7 @@ public class IterationHeadPactTask<X, Y, S extends Function, OT>
extends Abstrac
 	private BlockingBackChannel initBackChannel() throws Exception {
 
 		/* get the size of the memory available to the backchannel */
-		int backChannelMemoryPages = getMemoryManager().computeNumberOfPages(this.config.getRelativeBackChannelMemory
-				());
+		int backChannelMemoryPages = getMemoryManager().computeNumberOfPages(this.config.getRelativeBackChannelMemory());
 
 		/* allocate the memory available to the backchannel */
 		List<MemorySegment> segments = new ArrayList<MemorySegment>();
@@ -220,6 +221,9 @@ public class IterationHeadPactTask<X, Y, S extends Function, OT>
extends Abstrac
 
 		try {
 			/* used for receiving the current iteration result from iteration tail */
+			SuperstepKickoffLatch nextStepKickoff = new SuperstepKickoffLatch();
+			SuperstepKickoffLatchBroker.instance().handIn(brokerKey, nextStepKickoff);
+			
 			BlockingBackChannel backChannel = initBackChannel();
 			SuperstepBarrier barrier = initSuperstepBarrier();
 			SolutionSetUpdateBarrier solutionSetUpdateBarrier = null;
@@ -316,12 +320,15 @@ public class IterationHeadPactTask<X, Y, S extends Function, OT>
extends Abstrac
 							+ "]"));
 					}
 					requestTermination();
+					nextStepKickoff.signalTermination();
 				} else {
 					incrementIterationCounter();
 
 					String[] globalAggregateNames = barrier.getAggregatorNames();
 					Value[] globalAggregates = barrier.getAggregates();
 					aggregatorRegistry.updateGlobalAggregatesAndReset(globalAggregateNames, globalAggregates);
+					
+					nextStepKickoff.triggerNextSuperstep();
 				}
 			}
 
@@ -344,12 +351,10 @@ public class IterationHeadPactTask<X, Y, S extends Function, OT>
extends Abstrac
 			// - solution set index
 			IterationAggregatorBroker.instance().remove(brokerKey);
 			BlockingBackChannelBroker.instance().remove(brokerKey);
-			if (isWorksetIteration) {
-				SolutionSetBroker.instance().remove(brokerKey);
-				if (waitForSolutionSetUpdate) {
-					SolutionSetUpdateBarrierBroker.instance().remove(brokerKey);
-				}
-			}
+			SuperstepKickoffLatchBroker.instance().remove(brokerKey);
+			SolutionSetBroker.instance().remove(brokerKey);
+			SolutionSetUpdateBarrierBroker.instance().remove(brokerKey);
+
 			if (solutionSet != null) {
 				solutionSet.close();
 				solutionSet = null;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76a48df0/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java
index c23eae1..b12e70b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java
@@ -26,6 +26,8 @@ import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.runtime.io.network.api.BufferWriter;
 import org.apache.flink.runtime.io.network.channels.EndOfSuperstepEvent;
 import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannel;
+import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatch;
+import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatchBroker;
 import org.apache.flink.runtime.iterative.event.TerminationEvent;
 import org.apache.flink.runtime.iterative.io.WorksetUpdateOutputCollector;
 import org.apache.flink.util.Collector;
@@ -78,6 +80,8 @@ public class IterationIntermediatePactTask<S extends Function, OT>
extends Abstr
 
 	@Override
 	public void run() throws Exception {
+		
+		SuperstepKickoffLatch nextSuperstepLatch = SuperstepKickoffLatchBroker.instance().get(brokerKey());
 
 		while (this.running && !terminationRequested()) {
 
@@ -88,26 +92,31 @@ public class IterationIntermediatePactTask<S extends Function, OT>
extends Abstr
 			super.run();
 
 			// check if termination was requested
-			checkForTerminationAndResetEndOfSuperstepState();
+			verifyEndOfSuperstepState();
 
 			if (isWorksetUpdate && isWorksetIteration) {
 				long numCollected = worksetUpdateOutputCollector.getElementsCollectedAndReset();
 				worksetAggregator.aggregate(numCollected);
 			}
-
+			
 			if (log.isInfoEnabled()) {
 				log.info(formatLogString("finishing iteration [" + currentIteration() + "]"));
 			}
+			
+			// let the successors know that the end of this superstep data is reached
+			sendEndOfSuperstep();
+			
+			if (isWorksetUpdate) {
+				// notify iteration head if responsible for workset update
+				worksetBackChannel.notifyOfEndOfSuperstep();
+			}
+			
+			boolean terminated = nextSuperstepLatch.awaitStartOfSuperstepOrTermination(currentIteration()
+ 1);
 
-			if (!terminationRequested()) {
-				if (isWorksetUpdate) {
-					// notify iteration head if responsible for workset update
-					worksetBackChannel.notifyOfEndOfSuperstep();
-				}
-
-				// send the end-of-superstep
-				sendEndOfSuperstep();
-
+			if (terminated) {
+				requestTermination();
+			}
+			else {
 				incrementIterationCounter();
 			}
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76a48df0/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java
index 90d732c..0d9c903 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java
@@ -23,6 +23,8 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrier;
 import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrierBroker;
+import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatch;
+import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatchBroker;
 import org.apache.flink.runtime.iterative.io.WorksetUpdateOutputCollector;
 import org.apache.flink.runtime.operators.PactTaskContext;
 import org.apache.flink.util.Collector;
@@ -95,23 +97,19 @@ public class IterationTailPactTask<S extends Function, OT> extends
AbstractItera
 
 	@Override
 	public void run() throws Exception {
+		
+		SuperstepKickoffLatch nextSuperStepLatch = SuperstepKickoffLatchBroker.instance().get(brokerKey());
+		
 		while (this.running && !terminationRequested()) {
 
 			if (log.isInfoEnabled()) {
 				log.info(formatLogString("starting iteration [" + currentIteration() + "]"));
 			}
 
-			try {
-				super.run();
-			}
-			catch (NullPointerException e) {
-				boolean terminationRequested = terminationRequested();
-				System.out.println("Nullpoint exception when termination requested was " + terminationRequested);
-				e.printStackTrace();
-			}
+			super.run();
 
 			// check if termination was requested
-			checkForTerminationAndResetEndOfSuperstepState();
+			verifyEndOfSuperstepState();
 
 			if (isWorksetUpdate && isWorksetIteration) {
 				// aggregate workset update element count
@@ -123,16 +121,20 @@ public class IterationTailPactTask<S extends Function, OT> extends
AbstractItera
 			if (log.isInfoEnabled()) {
 				log.info(formatLogString("finishing iteration [" + currentIteration() + "]"));
 			}
+			
+			if (isWorksetUpdate) {
+				// notify iteration head if responsible for workset update
+				worksetBackChannel.notifyOfEndOfSuperstep();
+			} else if (isSolutionSetUpdate) {
+				// notify iteration head if responsible for solution set update
+				solutionSetUpdateBarrier.notifySolutionSetUpdate();
+			}
 
-			if (!terminationRequested()) {
-				if (isWorksetUpdate) {
-					// notify iteration head if responsible for workset update
-					worksetBackChannel.notifyOfEndOfSuperstep();
-				} else if (isSolutionSetUpdate) {
-					// notify iteration head if responsible for solution set update
-					solutionSetUpdateBarrier.notifySolutionSetUpdate();
-				}
-
+			boolean terminate = nextSuperStepLatch.awaitStartOfSuperstepOrTermination(currentIteration()
+ 1);
+			if (terminate) {
+				requestTermination();
+			}
+			else {
 				incrementIterationCounter();
 			}
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76a48df0/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatchTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatchTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatchTest.java
new file mode 100644
index 0000000..3173570
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatchTest.java
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.iterative.concurrent;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SuperstepKickoffLatchTest {
+
+	@Test
+	public void testWaitFromOne() {
+		try {
+			SuperstepKickoffLatch latch = new SuperstepKickoffLatch();
+			
+			Waiter w = new Waiter(latch, 2);
+			Thread waiter = new Thread(w);
+			waiter.setDaemon(true);
+			waiter.start();
+			
+			WatchDog wd = new WatchDog(waiter, 2000);
+			wd.start();
+			
+			Thread.sleep(100);
+			
+			latch.triggerNextSuperstep();
+			
+			wd.join();
+			if (wd.getError() != null) {
+				throw wd.getError();
+			}
+			
+			if (w.getError() != null) {
+				throw w.getError();
+			}
+		}
+		catch (Throwable t) {
+			t.printStackTrace();
+			Assert.fail("Error: " + t.getMessage());
+		}
+	}
+	
+	@Test
+	public void testWaitAlreadyFulfilled() {
+		try {
+			SuperstepKickoffLatch latch = new SuperstepKickoffLatch();
+			latch.triggerNextSuperstep();
+			
+			Waiter w = new Waiter(latch, 2);
+			Thread waiter = new Thread(w);
+			waiter.setDaemon(true);
+			waiter.start();
+			
+			WatchDog wd = new WatchDog(waiter, 2000);
+			wd.start();
+			
+			Thread.sleep(100);
+			
+			wd.join();
+			if (wd.getError() != null) {
+				throw wd.getError();
+			}
+			
+			if (w.getError() != null) {
+				throw w.getError();
+			}
+		}
+		catch (Throwable t) {
+			t.printStackTrace();
+			Assert.fail("Error: " + t.getMessage());
+		}
+	}
+	
+	@Test
+	public void testWaitIncorrect() {
+		try {
+			SuperstepKickoffLatch latch = new SuperstepKickoffLatch();
+			latch.triggerNextSuperstep();
+			latch.triggerNextSuperstep();
+			
+			try {
+				latch.awaitStartOfSuperstepOrTermination(2);
+				Assert.fail("should throw exception");
+			}
+			catch (IllegalStateException e) {
+				// good
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("Error: " + e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testWaitIncorrectAsync() {
+		try {
+			SuperstepKickoffLatch latch = new SuperstepKickoffLatch();
+			latch.triggerNextSuperstep();
+			latch.triggerNextSuperstep();
+			
+			Waiter w = new Waiter(latch, 2);
+			Thread waiter = new Thread(w);
+			waiter.setDaemon(true);
+			waiter.start();
+			
+			WatchDog wd = new WatchDog(waiter, 2000);
+			wd.start();
+			
+			Thread.sleep(100);
+			
+			wd.join();
+			if (wd.getError() != null) {
+				throw wd.getError();
+			}
+			
+			if (w.getError() != null) {
+				if (!(w.getError() instanceof IllegalStateException)) {
+					throw new Exception("wrong exception type " + w.getError());
+				}
+			} else {
+				Assert.fail("should cause exception");
+			}
+		}
+		catch (Throwable t) {
+			t.printStackTrace();
+			Assert.fail("Error: " + t.getMessage());
+		}
+	}
+	
+	@Test
+	public void testWaitForTermination() {
+		try {
+			SuperstepKickoffLatch latch = new SuperstepKickoffLatch();
+			latch.triggerNextSuperstep();
+			latch.triggerNextSuperstep();
+			
+			Waiter w = new Waiter(latch, 4);
+			Thread waiter = new Thread(w);
+			waiter.setDaemon(true);
+			waiter.start();
+			
+			WatchDog wd = new WatchDog(waiter, 2000);
+			wd.start();
+			
+			latch.signalTermination();
+			
+			wd.join();
+			if (wd.getError() != null) {
+				throw wd.getError();
+			}
+			
+			if (w.getError() != null) {
+				throw w.getError();
+			}
+		}
+		catch (Throwable t) {
+			t.printStackTrace();
+			Assert.fail("Error: " + t.getMessage());
+		}
+	}
+	
+	private static class Waiter implements Runnable {
+
+		private final SuperstepKickoffLatch latch;
+		
+		private final int waitFor;
+		
+		private volatile Throwable error;
+		
+		
+		public Waiter(SuperstepKickoffLatch latch, int waitFor) {
+			this.latch = latch;
+			this.waitFor = waitFor;
+		}
+
+		@Override
+		public void run() {
+			try {
+				latch.awaitStartOfSuperstepOrTermination(waitFor);
+			}
+			catch (Throwable t) {
+				this.error = t;
+			}
+		}
+		
+		public Throwable getError() {
+			return error;
+		}
+	}
+	
+	private static class WatchDog extends Thread {
+		
+		private final Thread toWatch;
+		
+		private final long timeOut;
+		
+		private volatile Throwable failed;
+		
+		public WatchDog(Thread toWatch, long timeout) {
+			setDaemon(true);
+			setName("Watchdog");
+			this.toWatch = toWatch;
+			this.timeOut = timeout;
+		}
+		
+		@SuppressWarnings("deprecation")
+		@Override
+		public void run() {
+			try {
+				toWatch.join(timeOut);
+				
+				if (toWatch.isAlive()) {
+					this.failed = new Exception("timed out");
+					toWatch.interrupt();
+					
+					toWatch.join(2000);
+					if (toWatch.isAlive()) {
+						toWatch.stop();
+					}
+				}
+			}
+			catch (Throwable t) {
+				failed = t;
+			}
+		}
+		
+		public Throwable getError() {
+			return failed;
+		}
+	}
+}


Mime
View raw message