flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srich...@apache.org
Subject [4/7] flink git commit: [hotfix][tests] Deduplicate code in SingleInputGateTest
Date Wed, 28 Feb 2018 16:15:59 GMT
[hotfix][tests] Deduplicate code in SingleInputGateTest


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/67a547ad
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/67a547ad
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/67a547ad

Branch: refs/heads/master
Commit: 67a547ad438c33d3fbcbe23cd03f009fdc8dd021
Parents: af8efe9
Author: Piotr Nowojski <piotr.nowojski@gmail.com>
Authored: Fri Feb 23 11:37:37 2018 +0100
Committer: Stefan Richter <s.richter@data-artisans.com>
Committed: Wed Feb 28 17:12:52 2018 +0100

----------------------------------------------------------------------
 .../partition/consumer/SingleInputGateTest.java | 66 ++++++++------------
 1 file changed, 25 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/67a547ad/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 0dd0875..e94411d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -77,14 +77,7 @@ public class SingleInputGateTest {
 	@Test(timeout = 120 * 1000)
 	public void testBasicGetNextLogic() throws Exception {
 		// Setup
-		final SingleInputGate inputGate = new SingleInputGate(
-			"Test Task Name", new JobID(),
-			new IntermediateDataSetID(), ResultPartitionType.PIPELINED,
-			0, 2,
-			mock(TaskActions.class),
-			UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
-
-		assertEquals(ResultPartitionType.PIPELINED, inputGate.getConsumedPartitionType());
+		final SingleInputGate inputGate = createInputGate();
 
 		final TestInputChannel[] inputChannels = new TestInputChannel[]{
 			new TestInputChannel(inputGate, 0),
@@ -135,14 +128,8 @@ public class SingleInputGateTest {
 			any(BufferAvailabilityListener.class))).thenReturn(iterator);
 
 		// Setup reader with one local and one unknown input channel
-		final IntermediateDataSetID resultId = new IntermediateDataSetID();
 
-		final SingleInputGate inputGate = new SingleInputGate(
-				"Test Task Name", new JobID(),
-				resultId, ResultPartitionType.PIPELINED,
-				0, 2,
-				mock(TaskActions.class),
-				UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
+		final SingleInputGate inputGate = createInputGate();
 		final BufferPool bufferPool = mock(BufferPool.class);
 		when(bufferPool.getNumberOfRequiredMemorySegments()).thenReturn(2);
 
@@ -190,14 +177,7 @@ public class SingleInputGateTest {
 	 */
 	@Test
 	public void testUpdateChannelBeforeRequest() throws Exception {
-		SingleInputGate inputGate = new SingleInputGate(
-			"t1",
-			new JobID(),
-			new IntermediateDataSetID(),
-			ResultPartitionType.PIPELINED,
-			0,
-			1,
-			mock(TaskActions.class), UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
+		SingleInputGate inputGate = createInputGate(1);
 
 		ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
 
@@ -230,15 +210,7 @@ public class SingleInputGateTest {
 		final AtomicReference<Exception> asyncException = new AtomicReference<>();
 
 		// Setup the input gate with a single channel that does nothing
-		final SingleInputGate inputGate = new SingleInputGate(
-			"InputGate",
-			new JobID(),
-			new IntermediateDataSetID(),
-			ResultPartitionType.PIPELINED,
-			0,
-			1,
-			mock(TaskActions.class),
-			UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
+		final SingleInputGate inputGate = createInputGate(1);
 
 		InputChannel unknown = new UnknownInputChannel(
 			inputGate,
@@ -410,15 +382,7 @@ public class SingleInputGateTest {
 	 */
 	@Test
 	public void testRequestBuffersWithUnknownInputChannel() throws Exception {
-		final SingleInputGate inputGate = new SingleInputGate(
-			"t1",
-			new JobID(),
-			new IntermediateDataSetID(),
-			ResultPartitionType.PIPELINED_BOUNDED,
-			0,
-			1,
-			mock(TaskActions.class),
-			UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
+		final SingleInputGate inputGate = createInputGate(1);
 
 		UnknownInputChannel unknown = mock(UnknownInputChannel.class);
 		final ResultPartitionID resultPartitionId = new ResultPartitionID();
@@ -443,6 +407,26 @@ public class SingleInputGateTest {
 
 	// ---------------------------------------------------------------------------------------------
 
+	private static SingleInputGate createInputGate() {
+		return createInputGate(2);
+	}
+
+	private static SingleInputGate createInputGate(int numberOfInputChannels) {
+		SingleInputGate inputGate = new SingleInputGate(
+			"Test Task Name",
+			new JobID(),
+			new IntermediateDataSetID(),
+			ResultPartitionType.PIPELINED,
+			0,
+			numberOfInputChannels,
+			mock(TaskActions.class),
+			UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
+
+		assertEquals(ResultPartitionType.PIPELINED, inputGate.getConsumedPartitionType());
+
+		return inputGate;
+	}
+
 	static void verifyBufferOrEvent(
 		InputGate inputGate,
 		boolean isBuffer,


Mime
View raw message