flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srich...@apache.org
Subject [14/35] flink git commit: [hotfix][tests] Reduce mockito usage in StreamTaskTest
Date Mon, 19 Feb 2018 14:08:07 GMT
[hotfix][tests] Reduce mockito usage in StreamTaskTest


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0af22bf2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0af22bf2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0af22bf2

Branch: refs/heads/master
Commit: 0af22bf284967c8f7e658b8eef3a91d407dbd8eb
Parents: 6b24757
Author: Piotr Nowojski <piotr.nowojski@gmail.com>
Authored: Wed Feb 7 16:16:29 2018 +0100
Committer: Piotr Nowojski <piotr.nowojski@gmail.com>
Committed: Mon Feb 19 12:21:24 2018 +0100

----------------------------------------------------------------------
 .../operators/testutils/MockEnvironment.java    | 44 +++++++++++++++-----
 .../operators/async/AsyncWaitOperatorTest.java  | 21 ++++------
 .../streaming/runtime/tasks/StreamTaskTest.java | 38 +++++------------
 .../util/AbstractStreamOperatorTestHarness.java |  2 +-
 4 files changed, 53 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0af22bf2/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index e28eada..4d1037e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -44,6 +44,7 @@ import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.TaskStateManager;
 import org.apache.flink.runtime.state.TestTaskStateManager;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
@@ -55,8 +56,11 @@ import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.Future;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 import static org.junit.Assert.fail;
 
@@ -73,7 +77,7 @@ public class MockEnvironment implements Environment, AutoCloseable {
 
 	private final IOManager ioManager;
 
-	private final TestTaskStateManager taskStateManager;
+	private final TaskStateManager taskStateManager;
 
 	private final InputSplitProvider inputSplitProvider;
 
@@ -99,14 +103,25 @@ public class MockEnvironment implements Environment, AutoCloseable {
 
 	private final TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
 
-	private Throwable failExternallyCause;
+	private Optional<Class<Throwable>> expectedExternalFailureCause = Optional.empty();
+
+	private Optional<Throwable> actualExternalFailureCause = Optional.empty();
+
+	public MockEnvironment() {
+		this(
+			"mock-task",
+			1024 * MemoryManager.DEFAULT_PAGE_SIZE,
+			null,
+			16,
+			new TestTaskStateManager());
+	}
 
 	public MockEnvironment(
 		String taskName,
 		long memorySize,
 		MockInputSplitProvider inputSplitProvider,
 		int bufferSize,
-		TestTaskStateManager taskStateManager) {
+		TaskStateManager taskStateManager) {
 		this(
 			taskName,
 			memorySize,
@@ -123,7 +138,7 @@ public class MockEnvironment implements Environment, AutoCloseable {
 		MockInputSplitProvider inputSplitProvider,
 		int bufferSize, Configuration taskConfiguration,
 		ExecutionConfig executionConfig,
-		TestTaskStateManager taskStateManager) {
+		TaskStateManager taskStateManager) {
 		this(
 			taskName,
 			memorySize,
@@ -144,7 +159,7 @@ public class MockEnvironment implements Environment, AutoCloseable {
 			int bufferSize,
 			Configuration taskConfiguration,
 			ExecutionConfig executionConfig,
-			TestTaskStateManager taskStateManager,
+			TaskStateManager taskStateManager,
 			int maxParallelism,
 			int parallelism,
 			int subtaskIndex) {
@@ -174,7 +189,7 @@ public class MockEnvironment implements Environment, AutoCloseable {
 			int parallelism,
 			int subtaskIndex,
 			ClassLoader userCodeClassLoader,
-			TestTaskStateManager taskStateManager) {
+			TaskStateManager taskStateManager) {
 		this.taskInfo = new TaskInfo(taskName, maxParallelism, subtaskIndex, parallelism, 0);
 		this.jobConfiguration = new Configuration();
 		this.taskConfiguration = taskConfiguration;
@@ -324,7 +339,7 @@ public class MockEnvironment implements Environment, AutoCloseable {
 	}
 
 	@Override
-	public TestTaskStateManager getTaskStateManager() {
+	public TaskStateManager getTaskStateManager() {
 		return taskStateManager;
 	}
 
@@ -355,7 +370,12 @@ public class MockEnvironment implements Environment, AutoCloseable {
 
 	@Override
 	public void failExternally(Throwable cause) {
-		this.failExternallyCause = Preconditions.checkNotNull(cause, "Must give a cause fail fail.");
+		if (!expectedExternalFailureCause.isPresent()) {
+			throw new UnsupportedOperationException("MockEnvironment does not support external task
failure.");
+		}
+		checkArgument(expectedExternalFailureCause.get().isInstance(checkNotNull(cause)));
+		checkState(!actualExternalFailureCause.isPresent());
+		actualExternalFailureCause = Optional.of(cause);
 	}
 
 	@Override
@@ -371,7 +391,11 @@ public class MockEnvironment implements Environment, AutoCloseable {
 		checkState(ioManager.isProperlyShutDown(), "IO Manager has not properly shut down.");
 	}
 
-	public Throwable getFailExternallyCause() {
-		return failExternallyCause;
+	public void setExpectedExternalFailureCause(Class<Throwable> expectedThrowableClass)
{
+		this.expectedExternalFailureCause = Optional.of(expectedThrowableClass);
+	}
+
+	public Optional<Throwable> getActualExternalFailureCause() {
+		return actualExternalFailureCause;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0af22bf2/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
index d24b55c..507ff0b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
@@ -59,6 +59,7 @@ import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 
@@ -80,7 +81,6 @@ import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledFuture;
@@ -609,6 +609,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
 			AsyncDataStream.OutputMode.ORDERED);
 
 		final MockEnvironment mockEnvironment = createMockEnvironment();
+		mockEnvironment.setExpectedExternalFailureCause(Throwable.class);
 
 		final OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
 			new OneInputStreamOperatorTestHarness<>(operator, IntSerializer.INSTANCE, mockEnvironment);
@@ -643,14 +644,8 @@ public class AsyncWaitOperatorTest extends TestLogger {
 
 		ArgumentCaptor<Throwable> argumentCaptor = ArgumentCaptor.forClass(Throwable.class);
 
-		Throwable failureCause = mockEnvironment.getFailExternallyCause();
-		Assert.assertNotNull(failureCause);
-
-		Assert.assertNotNull(failureCause.getCause());
-		Assert.assertTrue(failureCause.getCause() instanceof ExecutionException);
-
-		Assert.assertNotNull(failureCause.getCause().getCause());
-		Assert.assertTrue(failureCause.getCause().getCause() instanceof TimeoutException);
+		assertTrue(mockEnvironment.getActualExternalFailureCause().isPresent());
+		ExceptionUtils.findThrowable(mockEnvironment.getActualExternalFailureCause().get(), TimeoutException.class);
 	}
 
 	@Nonnull
@@ -730,8 +725,6 @@ public class AsyncWaitOperatorTest extends TestLogger {
 		synchronized (lock) {
 			operator.close();
 		}
-
-		Assert.assertNull(environment.getFailExternallyCause());
 	}
 
 	/**
@@ -867,6 +860,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
 			outputMode);
 
 		final MockEnvironment mockEnvironment = createMockEnvironment();
+		mockEnvironment.setExpectedExternalFailureCause(Throwable.class);
 
 		OneInputStreamOperatorTestHarness<Integer, Integer> harness = new OneInputStreamOperatorTestHarness<>(
 			asyncWaitOperator,
@@ -883,7 +877,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
 			harness.close();
 		}
 
-		Assert.assertNotNull(harness.getEnvironment().getFailExternallyCause());
+		assertTrue(harness.getEnvironment().getActualExternalFailureCause().isPresent());
 	}
 
 	/**
@@ -932,6 +926,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
 			outputMode);
 
 		final MockEnvironment mockEnvironment = createMockEnvironment();
+		mockEnvironment.setExpectedExternalFailureCause(Throwable.class);
 
 		OneInputStreamOperatorTestHarness<Integer, Integer> harness = new OneInputStreamOperatorTestHarness<>(
 			asyncWaitOperator,
@@ -949,8 +944,6 @@ public class AsyncWaitOperatorTest extends TestLogger {
 		synchronized (harness.getCheckpointLock()) {
 			harness.close();
 		}
-
-		Assert.assertNotNull(mockEnvironment.getFailExternallyCause());
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/0af22bf2/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 9ee35ee..52295fb 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -330,8 +330,7 @@ public class StreamTaskTest extends TestLogger {
 		TaskInfo mockTaskInfo = mock(TaskInfo.class);
 		when(mockTaskInfo.getTaskNameWithSubtasks()).thenReturn("foobar");
 		when(mockTaskInfo.getIndexOfThisSubtask()).thenReturn(0);
-		Environment mockEnvironment = mock(Environment.class);
-		when(mockEnvironment.getTaskInfo()).thenReturn(mockTaskInfo);
+		Environment mockEnvironment = new MockEnvironment();
 
 		StreamTask<?, ?> streamTask = new EmptyStreamTask(mockEnvironment);
 		CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp);
@@ -401,12 +400,7 @@ public class StreamTaskTest extends TestLogger {
 		final long checkpointId = 42L;
 		final long timestamp = 1L;
 
-		TaskInfo mockTaskInfo = mock(TaskInfo.class);
-		when(mockTaskInfo.getTaskNameWithSubtasks()).thenReturn("foobar");
-		when(mockTaskInfo.getIndexOfThisSubtask()).thenReturn(0);
-		Environment mockEnvironment = mock(Environment.class);
-		when(mockEnvironment.getTaskInfo()).thenReturn(mockTaskInfo);
-
+		MockEnvironment mockEnvironment = new MockEnvironment();
 		StreamTask<?, ?> streamTask = spy(new EmptyStreamTask(mockEnvironment));
 		CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp);
 
@@ -458,6 +452,7 @@ public class StreamTaskTest extends TestLogger {
 			new StreamTask.AsyncCheckpointExceptionHandler(streamTask);
 		Whitebox.setInternalState(streamTask, "asynchronousCheckpointExceptionHandler", asyncCheckpointExceptionHandler);
 
+		mockEnvironment.setExpectedExternalFailureCause(Throwable.class);
 		streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forCheckpointWithDefaultLocation());
 
 		verify(streamTask).handleAsyncException(anyString(), any(Throwable.class));
@@ -483,12 +478,6 @@ public class StreamTaskTest extends TestLogger {
 		final OneShotLatch acknowledgeCheckpointLatch = new OneShotLatch();
 		final OneShotLatch completeAcknowledge = new OneShotLatch();
 
-		TaskInfo mockTaskInfo = mock(TaskInfo.class);
-		when(mockTaskInfo.getTaskNameWithSubtasks()).thenReturn("foobar");
-		when(mockTaskInfo.getIndexOfThisSubtask()).thenReturn(0);
-		Environment mockEnvironment = mock(Environment.class);
-		when(mockEnvironment.getTaskInfo()).thenReturn(mockTaskInfo);
-
 		CheckpointResponder checkpointResponder = mock(CheckpointResponder.class);
 		doAnswer(new Answer() {
 			@Override
@@ -514,7 +503,12 @@ public class StreamTaskTest extends TestLogger {
 			null,
 			checkpointResponder);
 
-		when(mockEnvironment.getTaskStateManager()).thenReturn(taskStateManager);
+		MockEnvironment mockEnvironment = new MockEnvironment(
+			"mock-task",
+			1024 * MemoryManager.DEFAULT_PAGE_SIZE,
+			null,
+			16,
+			taskStateManager);
 
 		StreamTask<?, ?> streamTask = new EmptyStreamTask(mockEnvironment);
 		CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp);
@@ -606,11 +600,7 @@ public class StreamTaskTest extends TestLogger {
 		final OneShotLatch createSubtask = new OneShotLatch();
 		final OneShotLatch completeSubtask = new OneShotLatch();
 
-		TaskInfo mockTaskInfo = mock(TaskInfo.class);
-		when(mockTaskInfo.getTaskNameWithSubtasks()).thenReturn("foobar");
-		when(mockTaskInfo.getIndexOfThisSubtask()).thenReturn(0);
-		Environment mockEnvironment = mock(Environment.class);
-		when(mockEnvironment.getTaskInfo()).thenReturn(mockTaskInfo);
+		Environment mockEnvironment = spy(new MockEnvironment());
 
 		whenNew(OperatorSubtaskState.class).
 			withArguments(
@@ -707,12 +697,7 @@ public class StreamTaskTest extends TestLogger {
 		final long checkpointId = 42L;
 		final long timestamp = 1L;
 
-		TaskInfo mockTaskInfo = mock(TaskInfo.class);
-
-		when(mockTaskInfo.getTaskNameWithSubtasks()).thenReturn("foobar");
-		when(mockTaskInfo.getIndexOfThisSubtask()).thenReturn(0);
-
-		Environment mockEnvironment = mock(Environment.class);
+		Environment mockEnvironment = spy(new MockEnvironment());
 
 		// latch blocks until the async checkpoint thread acknowledges
 		final OneShotLatch checkpointCompletedLatch = new OneShotLatch();
@@ -742,7 +727,6 @@ public class StreamTaskTest extends TestLogger {
 			checkpointResponder);
 
 		when(mockEnvironment.getTaskStateManager()).thenReturn(taskStateManager);
-		when(mockEnvironment.getTaskInfo()).thenReturn(mockTaskInfo);
 
 		StreamTask<?, ?> streamTask = new EmptyStreamTask(mockEnvironment);
 		CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp);

http://git-wip-us.apache.org/repos/asf/flink/blob/0af22bf2/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 966d205..ced22c0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -175,7 +175,7 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable
{
 
 		this.environment = Preconditions.checkNotNull(env);
 
-		this.taskStateManager = env.getTaskStateManager();
+		this.taskStateManager = (TestTaskStateManager) env.getTaskStateManager();
 		this.internalEnvironment = environmentIsInternal ? Optional.of(environment) : Optional.empty();
 
 		mockTask = mock(StreamTask.class);


Mime
View raw message