flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [2/5] flink git commit: [hotfix][tests] Add easier way to chain operator in StreamTaskTestHarness
Date Tue, 24 Oct 2017 14:14:13 GMT
[hotfix][tests] Add easier way to chain operator in StreamTaskTestHarness


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5bebef1d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5bebef1d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5bebef1d

Branch: refs/heads/master
Commit: 5bebef1da4a6dbcbda49ce44e148b1dfc36c273f
Parents: 03c1785
Author: Piotr Nowojski <piotr.nowojski@gmail.com>
Authored: Wed Oct 18 16:01:38 2017 +0200
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Tue Oct 24 15:06:43 2017 +0200

----------------------------------------------------------------------
 .../runtime/tasks/OneInputStreamTaskTest.java   |  70 +----------
 .../runtime/tasks/StreamConfigChainer.java      | 118 +++++++++++++++++++
 .../runtime/tasks/StreamTaskTestHarness.java    |   5 +
 3 files changed, 128 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5bebef1d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index 8d80d66..3834633 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -39,7 +39,6 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.graph.StreamNode;
@@ -47,7 +46,6 @@ import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamMap;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
 import org.apache.flink.streaming.util.TestHarnessUtil;
@@ -61,7 +59,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -253,71 +250,14 @@ public class OneInputStreamTaskTest extends TestLogger {
 				BasicTypeInfo.STRING_TYPE_INFO,
 				BasicTypeInfo.STRING_TYPE_INFO);
 
-		// ------------------ setup the chain ------------------
-
 		TriggerableFailOnWatermarkTestOperator headOperator = new TriggerableFailOnWatermarkTestOperator();
-		StreamConfig headOperatorConfig = testHarness.getStreamConfig();
-
 		WatermarkGeneratingTestOperator watermarkOperator = new WatermarkGeneratingTestOperator();
-		StreamConfig watermarkOperatorConfig = new StreamConfig(new Configuration());
-
 		TriggerableFailOnWatermarkTestOperator tailOperator = new TriggerableFailOnWatermarkTestOperator();
-		StreamConfig tailOperatorConfig = new StreamConfig(new Configuration());
-
-		headOperatorConfig.setStreamOperator(headOperator);
-		headOperatorConfig.setOperatorID(new OperatorID(42L, 42L));
-		headOperatorConfig.setChainStart();
-		headOperatorConfig.setChainIndex(0);
-		headOperatorConfig.setChainedOutputs(Collections.singletonList(new StreamEdge(
-			new StreamNode(null, 0, null, null, null, null, null),
-			new StreamNode(null, 1, null, null, null, null, null),
-			0,
-			Collections.<String>emptyList(),
-			null,
-			null
-		)));
-
-		watermarkOperatorConfig.setStreamOperator(watermarkOperator);
-		watermarkOperatorConfig.setOperatorID(new OperatorID(4711L, 42L));
-		watermarkOperatorConfig.setTypeSerializerIn1(StringSerializer.INSTANCE);
-		watermarkOperatorConfig.setChainIndex(1);
-		watermarkOperatorConfig.setChainedOutputs(Collections.singletonList(new StreamEdge(
-			new StreamNode(null, 1, null, null, null, null, null),
-			new StreamNode(null, 2, null, null, null, null, null),
-			0,
-			Collections.<String>emptyList(),
-			null,
-			null
-		)));
-
-		List<StreamEdge> outEdgesInOrder = new LinkedList<StreamEdge>();
-		outEdgesInOrder.add(new StreamEdge(
-			new StreamNode(null, 2, null, null, null, null, null),
-			new StreamNode(null, 3, null, null, null, null, null),
-			0,
-			Collections.<String>emptyList(),
-			new BroadcastPartitioner<Object>(),
-			null));
-
-		tailOperatorConfig.setStreamOperator(tailOperator);
-		tailOperatorConfig.setOperatorID(new OperatorID(123L, 123L));
-		tailOperatorConfig.setTypeSerializerIn1(StringSerializer.INSTANCE);
-		tailOperatorConfig.setBufferTimeout(0);
-		tailOperatorConfig.setChainIndex(2);
-		tailOperatorConfig.setChainEnd();
-		tailOperatorConfig.setOutputSelectors(Collections.<OutputSelector<?>>emptyList());
-		tailOperatorConfig.setNumberOfOutputs(1);
-		tailOperatorConfig.setOutEdgesInOrder(outEdgesInOrder);
-		tailOperatorConfig.setNonChainedOutputs(outEdgesInOrder);
-		tailOperatorConfig.setTypeSerializerOut(StringSerializer.INSTANCE);
-
-		Map<Integer, StreamConfig> chainedConfigs = new HashMap<>(2);
-		chainedConfigs.put(1, watermarkOperatorConfig);
-		chainedConfigs.put(2, tailOperatorConfig);
-		headOperatorConfig.setTransitiveChainedTaskConfigs(chainedConfigs);
-		headOperatorConfig.setOutEdgesInOrder(outEdgesInOrder);
-
-		// -----------------------------------------------------
+
+		testHarness.setupOperatorChain(new OperatorID(42L, 42L), headOperator)
+			.chain(new OperatorID(4711L, 42L), watermarkOperator, StringSerializer.INSTANCE)
+			.chain(new OperatorID(123L, 123L), tailOperator, StringSerializer.INSTANCE)
+			.finish();
 
 		// --------------------- begin test ---------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5bebef1d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
new file mode 100644
index 0000000..74898a4
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.graph.StreamEdge;
+import org.apache.flink.streaming.api.graph.StreamNode;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Helper class to build StreamConfig for chain of operators.
+ */
+public class StreamConfigChainer {
+	private final StreamConfig headConfig;
+	private final Map<Integer, StreamConfig> chainedConfigs = new HashMap<>();
+
+	private StreamConfig tailConfig;
+	private int chainIndex = 0;
+
+	public StreamConfigChainer(OperatorID headOperatorID, StreamOperator<?> headOperator,
StreamConfig headConfig) {
+		this.headConfig = checkNotNull(headConfig);
+		this.tailConfig = checkNotNull(headConfig);
+
+		head(headOperator, headOperatorID);
+	}
+
+	private void head(StreamOperator<?> headOperator, OperatorID headOperatorID) {
+		headConfig.setStreamOperator(headOperator);
+		headConfig.setOperatorID(headOperatorID);
+		headConfig.setChainStart();
+		headConfig.setChainIndex(chainIndex);
+	}
+
+	public <T> StreamConfigChainer chain(
+			OperatorID operatorID,
+			OneInputStreamOperator<T, T> operator,
+			TypeSerializer<T> typeSerializer) {
+		return chain(operatorID, operator, typeSerializer, typeSerializer);
+	}
+
+	public <IN, OUT> StreamConfigChainer chain(
+			OperatorID operatorID,
+			OneInputStreamOperator<IN, OUT> operator,
+			TypeSerializer<IN> inputSerializer,
+			TypeSerializer<OUT> outputSerializer) {
+		chainIndex++;
+
+		tailConfig.setChainedOutputs(Collections.singletonList(
+			new StreamEdge(
+				new StreamNode(null, tailConfig.getChainIndex(), null, null, null, null, null),
+				new StreamNode(null, chainIndex, null, null, null, null, null),
+				0,
+				Collections.<String>emptyList(),
+				null,
+				null)));
+		tailConfig = new StreamConfig(new Configuration());
+		tailConfig.setStreamOperator(checkNotNull(operator));
+		tailConfig.setOperatorID(checkNotNull(operatorID));
+		tailConfig.setTypeSerializerIn1(inputSerializer);
+		tailConfig.setTypeSerializerOut(outputSerializer);
+		tailConfig.setChainIndex(chainIndex);
+
+		chainedConfigs.put(chainIndex, tailConfig);
+
+		return this;
+	}
+
+	public void finish() {
+
+		List<StreamEdge> outEdgesInOrder = new LinkedList<StreamEdge>();
+		outEdgesInOrder.add(
+			new StreamEdge(
+				new StreamNode(null, chainIndex, null, null, null, null, null),
+				new StreamNode(null, chainIndex , null, null, null, null, null),
+				0,
+				Collections.<String>emptyList(),
+				new BroadcastPartitioner<Object>(),
+				null));
+
+		tailConfig.setBufferTimeout(0);
+		tailConfig.setChainEnd();
+		tailConfig.setOutputSelectors(Collections.emptyList());
+		tailConfig.setNumberOfOutputs(1);
+		tailConfig.setOutEdgesInOrder(outEdgesInOrder);
+		tailConfig.setNonChainedOutputs(outEdgesInOrder);
+		headConfig.setTransitiveChainedTaskConfigs(chainedConfigs);
+		headConfig.setOutEdgesInOrder(outEdgesInOrder);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5bebef1d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index 19d48e1..5b15477 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -34,6 +34,7 @@ import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.graph.StreamNode;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
@@ -370,6 +371,10 @@ public class StreamTaskTestHarness<OUT> {
 		}
 	}
 
+	public StreamConfigChainer setupOperatorChain(OperatorID headOperatorId, OneInputStreamOperator<?,
?> headOperator) {
+		return new StreamConfigChainer(headOperatorId, headOperator, getStreamConfig());
+	}
+
 	// ------------------------------------------------------------------------
 
 	private class TaskThread extends Thread {


Mime
View raw message