flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [5/5] flink git commit: [FLINK-8662] [tests] Harden FutureUtilsTest#testRetryWithDelay
Date Sat, 17 Feb 2018 10:17:45 GMT
[FLINK-8662] [tests] Harden FutureUtilsTest#testRetryWithDelay

This commit moves the start of the time measurement before the triggering of
the retry with delay operation.

This closes #5494.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1809e9ff
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1809e9ff
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1809e9ff

Branch: refs/heads/master
Commit: 1809e9ff30080f5b65987bcd684d1b5e1056693f
Parents: 16451a3
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Thu Feb 15 11:37:58 2018 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Sat Feb 17 11:11:44 2018 +0100

----------------------------------------------------------------------
 .../flink/runtime/concurrent/FutureUtils.java   |  5 ++-
 .../runtime/concurrent/FutureUtilsTest.java     | 36 ++++++++------------
 .../ManuallyTriggeredScheduledExecutor.java     |  6 ++++
 3 files changed, 22 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1809e9ff/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index 17381a9..e9310c0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -187,7 +187,7 @@ public class FutureUtils {
 		if (!resultFuture.isDone()) {
 			final CompletableFuture<T> operationResultFuture = operation.get();
 
-			operationResultFuture.whenCompleteAsync(
+			operationResultFuture.whenComplete(
 				(t, throwable) -> {
 					if (throwable != null) {
 						if (throwable instanceof CancellationException) {
@@ -213,8 +213,7 @@ public class FutureUtils {
 					} else {
 						resultFuture.complete(t);
 					}
-				},
-				scheduledExecutor);
+				});
 
 			resultFuture.whenComplete(
 				(t, throwable) -> operationResultFuture.cancel(false));

http://git-wip-us.apache.org/repos/asf/flink/blob/1809e9ff/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
index 7d14ff2..cbc8a9a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
@@ -26,8 +26,8 @@ import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
 
+import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
@@ -45,13 +45,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
 
 /**
  * Tests for the utility methods in {@link FutureUtils}.
@@ -175,9 +168,11 @@ public class FutureUtilsTest extends TestLogger {
 	@Test
 	public void testRetryWithDelay() throws Exception {
 		final int retries = 4;
-		final Time delay = Time.milliseconds(50L);
+		final Time delay = Time.milliseconds(5L);
 		final AtomicInteger countDown = new AtomicInteger(retries);
 
+		long start = System.currentTimeMillis();
+
 		CompletableFuture<Boolean> retryFuture = FutureUtils.retryWithDelay(
 			() -> {
 				if (countDown.getAndDecrement() == 0) {
@@ -190,8 +185,6 @@ public class FutureUtilsTest extends TestLogger {
 			delay,
 			TestingUtils.defaultScheduledExecutor());
 
-		long start = System.currentTimeMillis();
-
 		Boolean result = retryFuture.get();
 
 		long completionTime = System.currentTimeMillis() - start;
@@ -205,29 +198,28 @@ public class FutureUtilsTest extends TestLogger {
 	 */
 	@Test
 	public void testRetryWithDelayCancellation() {
-		ScheduledFuture<?> scheduledFutureMock = mock(ScheduledFuture.class);
-		ScheduledExecutor scheduledExecutorMock = mock(ScheduledExecutor.class);
-		doReturn(scheduledFutureMock).when(scheduledExecutorMock).schedule(any(Runnable.class),
anyLong(), any(TimeUnit.class));
-		doAnswer(
-			(InvocationOnMock invocation) -> {
-				invocation.getArgumentAt(0, Runnable.class).run();
-				return null;
-			}).when(scheduledExecutorMock).execute(any(Runnable.class));
+		final ManuallyTriggeredScheduledExecutor scheduledExecutor = new ManuallyTriggeredScheduledExecutor();
 
 		CompletableFuture<?> retryFuture = FutureUtils.retryWithDelay(
 			() -> FutureUtils.completedExceptionally(new FlinkException("Test exception")),
 			1,
 			TestingUtils.infiniteTime(),
-			scheduledExecutorMock);
+			scheduledExecutor);
 
 		assertFalse(retryFuture.isDone());
 
-		verify(scheduledExecutorMock).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
+		final Collection<ScheduledFuture<?>> scheduledTasks = scheduledExecutor.getScheduledTasks();
+
+		assertFalse(scheduledTasks.isEmpty());
+
+		final ScheduledFuture<?> scheduledFuture = scheduledTasks.iterator().next();
+
+		assertFalse(scheduledFuture.isDone());
 
 		retryFuture.cancel(false);
 
 		assertTrue(retryFuture.isCancelled());
-		verify(scheduledFutureMock).cancel(anyBoolean());
+		assertTrue(scheduledFuture.isCancelled());
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/1809e9ff/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutor.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutor.java
index 1fc7705..8a46587 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutor.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutor.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.concurrent;
 import org.apache.flink.core.testutils.ManuallyTriggeredDirectExecutor;
 import org.apache.flink.util.Preconditions;
 
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
@@ -62,6 +64,10 @@ public class ManuallyTriggeredScheduledExecutor extends ManuallyTriggeredDirectE
 		return insertRunnable(command, true);
 	}
 
+	Collection<ScheduledFuture<?>> getScheduledTasks() {
+		return new ArrayList<>(scheduledTasks);
+	}
+
 	/**
 	 * Triggers all registered tasks.
 	 */


Mime
View raw message