flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [1/2] flink git commit: [hotfix] [tests] Code cleanups in ExecutionGraphRestartTest
Date Sun, 23 Jul 2017 16:48:39 GMT
Repository: flink
Updated Branches:
  refs/heads/master 605319b55 -> 02850545e


[hotfix] [tests] Code cleanups in ExecutionGraphRestartTest


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d80ba4d6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d80ba4d6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d80ba4d6

Branch: refs/heads/master
Commit: d80ba4d6f12658c55b164339ec5930397fd455e3
Parents: 605319b
Author: Stephan Ewen <sewen@apache.org>
Authored: Sun Jul 23 14:58:36 2017 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Sun Jul 23 14:58:36 2017 +0200

----------------------------------------------------------------------
 .../ExecutionGraphRestartTest.java              | 39 +++++++++-----------
 1 file changed, 18 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d80ba4d6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index b062369..7275e0f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.executiongraph;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.OneShotLatch;
@@ -63,11 +64,8 @@ import org.apache.flink.util.TestLogger;
 import org.junit.After;
 import org.junit.Test;
 
-import scala.concurrent.Await;
-import scala.concurrent.Future;
 import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
-import scala.concurrent.impl.Promise;
 
 import java.io.IOException;
 import java.net.InetAddress;
@@ -531,8 +529,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 	 */
 	@Test
 	public void testSuspendWhileRestarting() throws Exception {
-		FiniteDuration timeout = new FiniteDuration(1, TimeUnit.MINUTES);
-		Deadline deadline = timeout.fromNow();
+		final Time timeout = Time.of(1, TimeUnit.MINUTES);
 
 		Instance instance = ExecutionGraphTestUtils.getInstance(
 			new ActorTaskManagerGateway(
@@ -571,7 +568,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
 		instance.markDead();
 
-		Await.ready(controllableRestartStrategy.getReachedCanRestart(), deadline.timeLeft());
+		controllableRestartStrategy.getReachedCanRestart().await(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 
 		assertEquals(JobStatus.RESTARTING, eg.getState());
 
@@ -581,7 +578,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
 		controllableRestartStrategy.unlockRestart();
 
-		Await.ready(controllableRestartStrategy.getRestartDone(), deadline.timeLeft());
+		controllableRestartStrategy.getRestartDone().await(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 
 		assertEquals(JobStatus.SUSPENDED, eg.getState());
 	}
@@ -795,37 +792,37 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
 	private static class ControllableRestartStrategy implements RestartStrategy {
 
-		private Promise<Boolean> reachedCanRestart = new Promise.DefaultPromise<>();
-		private Promise<Boolean> doRestart = new Promise.DefaultPromise<>();
-		private Promise<Boolean> restartDone = new Promise.DefaultPromise<>();
+		private final OneShotLatch reachedCanRestart = new OneShotLatch();
+		private final OneShotLatch doRestart = new OneShotLatch();
+		private final OneShotLatch restartDone = new OneShotLatch();
 
-		private volatile Exception exception = null;
+		private final Time timeout;
 
-		private FiniteDuration timeout;
+		private volatile Exception exception;
 
-		public ControllableRestartStrategy(FiniteDuration timeout) {
+		public ControllableRestartStrategy(Time timeout) {
 			this.timeout = timeout;
 		}
 
 		public void unlockRestart() {
-			doRestart.success(true);
+			doRestart.trigger();
 		}
 
 		public Exception getException() {
 			return exception;
 		}
 
-		public Future<Boolean> getReachedCanRestart() {
-			return reachedCanRestart.future();
+		public OneShotLatch getReachedCanRestart() {
+			return reachedCanRestart;
 		}
 
-		public Future<Boolean> getRestartDone() {
-			return restartDone.future();
+		public OneShotLatch getRestartDone() {
+			return restartDone;
 		}
 
 		@Override
 		public boolean canRestart() {
-			reachedCanRestart.success(true);
+			reachedCanRestart.trigger();
 			return true;
 		}
 
@@ -835,13 +832,13 @@ public class ExecutionGraphRestartTest extends TestLogger {
 				@Override
 				public void run() {
 					try {
-						Await.ready(doRestart.future(), timeout);
+						doRestart.await(timeout.getSize(), timeout.getUnit());
 						restarter.triggerFullRecovery();
 					} catch (Exception e) {
 						exception = e;
 					}
 
-					restartDone.success(true);
+					restartDone.trigger();
 				}
 			});
 		}


Mime
View raw message