flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-6188) Some setParallelism() methods can't cope with default parallelism
Date Wed, 31 Oct 2018 15:40:07 GMT

    [ https://issues.apache.org/jira/browse/FLINK-6188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16670256#comment-16670256
] 

ASF GitHub Bot commented on FLINK-6188:
---------------------------------------

aljoscha closed pull request #3616: [FLINK-6188] Correctly handle PARALLELISM_DEFAULT in stream
operator
URL: https://github.com/apache/flink/pull/3616
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index c18db529d79..ed9eb2cc0c4 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -76,13 +76,6 @@
 	 */
 	public static final int PARALLELISM_DEFAULT = -1;
 
-	/**
-	 * The flag value indicating an unknown or unset parallelism. This value is
-	 * not a valid parallelism and indicates that the parallelism should remain
-	 * unchanged.
-	 */
-	public static final int PARALLELISM_UNKNOWN = -2;
-
 	/**
 	 * The default lower bound for max parallelism if nothing was configured by the user. We
have
 	 * this to allow users some degree of scale-up in case they forgot to configure maximum
@@ -293,7 +286,6 @@ public int getParallelism() {
 	 * @param parallelism The parallelism to use
 	 */
 	public ExecutionConfig setParallelism(int parallelism) {
-		checkArgument(parallelism != PARALLELISM_UNKNOWN, "Cannot specify UNKNOWN_PARALLELISM.");
 		checkArgument(
 				parallelism >= 1 || parallelism == PARALLELISM_DEFAULT,
 				"Parallelism must be at least one, or ExecutionConfig.PARALLELISM_DEFAULT " +
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index 9c5d9f072b8..705e3946aef 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -134,9 +134,6 @@ public String getName() {
 	 * @return The operator with set parallelism.
 	 */
 	public SingleOutputStreamOperator<T> setParallelism(int parallelism) {
-		Preconditions.checkArgument(parallelism > 0,
-				"The parallelism of an operator must be at least 1.");
-
 		Preconditions.checkArgument(canBeParallel() || parallelism == 1,
 				"The parallelism of non parallel operator must be 1.");
 
@@ -156,9 +153,6 @@ public String getName() {
 	 */
 	@PublicEvolving
 	public SingleOutputStreamOperator<T> setMaxParallelism(int maxParallelism) {
-		Preconditions.checkArgument(maxParallelism > 0,
-				"The maximum parallelism must be greater than 0.");
-
 		Preconditions.checkArgument(canBeParallel() || maxParallelism == 1,
 				"The maximum parallelism of non parallel operator must be 1.");
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index a99efb1c64f..645cc4e2e42 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -657,7 +657,8 @@ public JobGraph getJobGraph() {
 							+ "\nThe user can force enable state checkpoints with the reduced guarantees by calling:
env.enableCheckpointing(interval,true)");
 		}
 
-		StreamingJobGraphGenerator jobgraphGenerator = new StreamingJobGraphGenerator(this, defaultParallelism);
+		StreamingJobGraphGenerator jobgraphGenerator =
+				new StreamingJobGraphGenerator(this, defaultParallelism, executionConfig.getMaxParallelism());
 
 		return jobgraphGenerator.createJobGraph();
 	}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index df10ae4476f..48dbf0980e9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -151,16 +151,6 @@ private StreamGraph generateInternal(List<StreamTransformation<?>>
transformatio
 
 		LOG.debug("Transforming " + transform);
 
-		if (transform.getMaxParallelism() <= 0) {
-
-			// if the max parallelism hasn't been set, then first use the job wide max parallelism
-			// from theExecutionConfig.
-			int globalMaxParallelismFromConfig = env.getConfig().getMaxParallelism();
-			if (globalMaxParallelismFromConfig > 0) {
-				transform.setMaxParallelism(globalMaxParallelismFromConfig);
-			}
-		}
-
 		// call at least once to trigger exceptions about MissingTypeInfo
 		transform.getOutputType();
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 0896eb75c18..92ce314d9ef 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -94,12 +94,14 @@
 	private final List<StreamGraphHasher> legacyStreamGraphHashers;
 
 	private final int defaultParallelism;
+	private final int defaultMaxParallelism;
 
-	public StreamingJobGraphGenerator(StreamGraph streamGraph, int defaultParallelism) {
+	public StreamingJobGraphGenerator(StreamGraph streamGraph, int defaultParallelism, int defaultMaxParallelism)
{
 		this.streamGraph = streamGraph;
 		this.defaultStreamGraphHasher = new StreamGraphHasherV2();
 		this.legacyStreamGraphHashers = Arrays.asList(new StreamGraphHasherV1(), new StreamGraphUserHashHasher());
 		this.defaultParallelism = defaultParallelism;
+		this.defaultMaxParallelism = defaultMaxParallelism;
 	}
 
 	private void init() {
@@ -341,14 +343,15 @@ private StreamConfig createJobVertex(
 		jobVertex.setInvokableClass(streamNode.getJobVertexClass());
 
 		int parallelism = streamNode.getParallelism();
-
+		int maxParallelism = streamNode.getMaxParallelism();
 		if (parallelism == ExecutionConfig.PARALLELISM_DEFAULT) {
 			parallelism = defaultParallelism;
 		}
-
+		if (maxParallelism == ExecutionConfig.PARALLELISM_DEFAULT) {
+			maxParallelism = defaultMaxParallelism;
+		}
 		jobVertex.setParallelism(parallelism);
-
-		jobVertex.setMaxParallelism(streamNode.getMaxParallelism());
+		jobVertex.setMaxParallelism(maxParallelism);
 
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("Parallelism set: {} for {}", parallelism, streamNodeId);
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
index e86b3e8f04f..0da77357e4f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
@@ -19,17 +19,18 @@
 
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.MissingTypeInfo;
 import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.util.Preconditions;
 
 import java.util.Collection;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -126,7 +127,7 @@ public static int getNewNodeId() {
 	 * The maximum parallelism for this stream transformation. It defines the upper limit for
 	 * dynamic scaling and the number of key groups used for partitioned state.
 	 */
-	private int maxParallelism = -1;
+	private int maxParallelism = ExecutionConfig.PARALLELISM_DEFAULT;
 
 	/**
 	 *  The minimum resources for this stream transformation. It defines the lower limit for
@@ -202,7 +203,16 @@ public int getParallelism() {
 	 * @param parallelism The new parallelism to set on this {@code StreamTransformation}
 	 */
 	public void setParallelism(int parallelism) {
-		Preconditions.checkArgument(parallelism > 0, "Parallelism must be bigger than zero.");
+		checkArgument(
+				parallelism >= 1 || parallelism == ExecutionConfig.PARALLELISM_DEFAULT,
+				"Parallelism must be at least one, or ExecutionConfig.PARALLELISM_DEFAULT " +
+						"(use system default).");
+		checkArgument(
+				maxParallelism == -1 || parallelism <= maxParallelism,
+				"The specified parallelism must be smaller or equal to the maximum parallelism.");
+		checkArgument(
+				maxParallelism == -1 || parallelism != ExecutionConfig.PARALLELISM_DEFAULT,
+				"Default parallelism cannot be specified when maximum parallelism is specified");
 		this.parallelism = parallelism;
 	}
 
@@ -221,10 +231,19 @@ public int getMaxParallelism() {
 	 * @param maxParallelism Maximum parallelism for this stream transformation.
 	 */
 	public void setMaxParallelism(int maxParallelism) {
-		Preconditions.checkArgument(maxParallelism > 0
-						&& maxParallelism <= StreamGraphGenerator.UPPER_BOUND_MAX_PARALLELISM,
-				"Maximum parallelism must be between 1 and " + StreamGraphGenerator.UPPER_BOUND_MAX_PARALLELISM
-						+ ". Found: " + maxParallelism);
+		checkArgument(
+				parallelism != ExecutionConfig.PARALLELISM_DEFAULT,
+				"A maximum parallelism can only be specified with an explicitly specified " +
+						"parallelism.");
+		checkArgument(maxParallelism > 0, "The maximum parallelism must be greater than 0.");
+		checkArgument(
+				maxParallelism >= parallelism,
+				"The maximum parallelism must be larger than the parallelism. (parallelism = " +
+						parallelism + " max-parallelism = " + maxParallelism + ")");
+		checkArgument(
+				maxParallelism > 0 && maxParallelism <= ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM,
+				"maxParallelism is out of bounds 0 < maxParallelism <= " +
+						ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM + ". Found: " + maxParallelism);
 		this.maxParallelism = maxParallelism;
 	}
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TimestampAssignerTranslationTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TimestampAssignerTranslationTest.java
new file mode 100644
index 00000000000..dc053d3110d
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TimestampAssignerTranslationTest.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import javax.annotation.Nullable;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator;
+import org.apache.flink.streaming.runtime.operators.TimestampsAndPunctuatedWatermarksOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * These tests verify that the api calls on {@link DataStream} correctly instantiate
+ * timestamp/watermark assignment operators.
+ *
+ * <p>We also create a test harness and push one element into the operator to verify
+ * that we get some output.
+ */
+@SuppressWarnings("serial")
+public class TimestampAssignerTranslationTest {
+
+	/**
+	 * When the upstream operator has the default parallelism it has parallelism {@code -1}.
This
+	 * test makes sure that code API code can deal with that.
+	 */
+	@Test
+	public void testPunctuatedAssignerWorksWithDefaultParallelism() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(ExecutionConfig.PARALLELISM_DEFAULT);
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+		DataStream<Tuple2<String, Integer>> source =
+				env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		SingleOutputStreamOperator<Tuple2<String, Integer>> assigner = source
+				.map(new IdentityMap())
+				.setParallelism(ExecutionConfig.PARALLELISM_DEFAULT)
+				.assignTimestampsAndWatermarks(new DummyPunctuatedAssigner());
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>
transform =
+				(OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>)
assigner.getTransformation();
+
+		assertEquals(ExecutionConfig.PARALLELISM_DEFAULT, transform.getParallelism());
+
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>>
operator = transform.getOperator();
+		assertTrue(operator instanceof TimestampsAndPunctuatedWatermarksOperator);
+		TimestampsAndPunctuatedWatermarksOperator<Tuple2<String, Integer>> assignerOperator
=
+				(TimestampsAndPunctuatedWatermarksOperator<Tuple2<String, Integer>>) operator;
+
+		processElementAndEnsureOutput(assignerOperator, new Tuple2<>("hello", 1));
+	}
+
+	/**
+	 * When the upstream operator has the default parallelism it has parallelism {@code -1}.
This
+	 * test makes sure that code API code can deal with that.
+	 */
+	@Test
+	public void testPeriodicAssignerWorksWithDefaultParallelism() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(ExecutionConfig.PARALLELISM_DEFAULT);
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+		DataStream<Tuple2<String, Integer>> source =
+				env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		SingleOutputStreamOperator<Tuple2<String, Integer>> assigner = source
+				.map(new IdentityMap())
+				.setParallelism(ExecutionConfig.PARALLELISM_DEFAULT)
+				.assignTimestampsAndWatermarks(new DummyPeriodicAssigner());
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>
transform =
+				(OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>)
assigner.getTransformation();
+
+		assertEquals(ExecutionConfig.PARALLELISM_DEFAULT, transform.getParallelism());
+
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>>
operator = transform.getOperator();
+		assertTrue(operator instanceof TimestampsAndPeriodicWatermarksOperator);
+		TimestampsAndPeriodicWatermarksOperator<Tuple2<String, Integer>> assignerOperator
=
+				(TimestampsAndPeriodicWatermarksOperator<Tuple2<String, Integer>>) operator;
+
+		processElementAndEnsureOutput(assignerOperator, new Tuple2<>("hello", 1));
+	}
+
+	@Test
+	public void testPunctuatedAssignerPicksUpUpstreamParallelism() throws Exception {
+		final int parallelism = 13;
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+		DataStream<Tuple2<String, Integer>> source =
+				env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		SingleOutputStreamOperator<Tuple2<String, Integer>> assigner = source
+				.map(new IdentityMap())
+				.setParallelism(parallelism)
+				.assignTimestampsAndWatermarks(new DummyPunctuatedAssigner());
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>
transform =
+				(OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>)
assigner.getTransformation();
+
+		assertEquals(parallelism, transform.getParallelism());
+
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>>
operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof TimestampsAndPunctuatedWatermarksOperator);
+		TimestampsAndPunctuatedWatermarksOperator<Tuple2<String, Integer>> assignerOperator
=
+				(TimestampsAndPunctuatedWatermarksOperator<Tuple2<String, Integer>>) operator;
+
+		processElementAndEnsureOutput(assignerOperator, new Tuple2<>("hello", 1));
+	}
+
+	@Test
+	public void testPeriodicAssignerPicksUpUpstreamParallelism() throws Exception {
+		final int parallelism = 13;
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+		DataStream<Tuple2<String, Integer>> source =
+				env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		SingleOutputStreamOperator<Tuple2<String, Integer>> assigner = source
+				.map(new IdentityMap())
+				.setParallelism(parallelism)
+				.assignTimestampsAndWatermarks(new DummyPeriodicAssigner());
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>
transform =
+				(OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>)
assigner.getTransformation();
+
+		assertEquals(parallelism, transform.getParallelism());
+
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>>
operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof TimestampsAndPeriodicWatermarksOperator);
+		TimestampsAndPeriodicWatermarksOperator<Tuple2<String, Integer>> assignerOperator
=
+				(TimestampsAndPeriodicWatermarksOperator<Tuple2<String, Integer>>) operator;
+
+		processElementAndEnsureOutput(assignerOperator, new Tuple2<>("hello", 1));
+	}
+
+	/**
+	 * Ensure that we get some output from the given operator when pushing in an element and
+	 * setting watermark and processing time to {@code Long.MAX_VALUE}.
+	 */
+	private static <IN, OUT> void processElementAndEnsureOutput(
+			OneInputStreamOperator<IN, OUT> operator,
+			IN element) throws Exception {
+
+		OneInputStreamOperatorTestHarness<IN, OUT> testHarness =
+				new OneInputStreamOperatorTestHarness<>(operator);
+
+		if (operator instanceof OutputTypeConfigurable) {
+			// use a dummy type since window functions just need the ExecutionConfig
+			// this is also only needed for Fold, which we're getting rid off soon.
+			((OutputTypeConfigurable) operator).setOutputType(BasicTypeInfo.STRING_TYPE_INFO, new
ExecutionConfig());
+		}
+
+		testHarness.open();
+
+		testHarness.setProcessingTime(0);
+		testHarness.processWatermark(Long.MIN_VALUE);
+
+		testHarness.processElement(new StreamRecord<>(element, 0));
+
+		// provoke any processing-time/event-time triggers
+		testHarness.setProcessingTime(Long.MAX_VALUE);
+		testHarness.processWatermark(Long.MAX_VALUE);
+
+		// we at least get the record and the passed-through Long.MAX_VALUE watermark
+		assertTrue(testHarness.getOutput().size() >= 2);
+
+		testHarness.close();
+	}
+
+
+	private static class IdentityMap implements MapFunction<Tuple2<String,Integer>,
Tuple2<String, Integer>> {
+		@Override
+		public Tuple2<String, Integer> map(Tuple2<String, Integer> value) throws Exception
{
+			return value;
+		}
+	}
+
+	private static class DummyPunctuatedAssigner implements AssignerWithPunctuatedWatermarks<Tuple2<String,
Integer>> {
+		@Nullable
+		@Override
+		public Watermark checkAndGetNextWatermark(
+				Tuple2<String, Integer> lastElement, long extractedTimestamp) {
+			return null;
+		}
+
+		@Override
+		public long extractTimestamp(
+				Tuple2<String, Integer> element,
+				long previousElementTimestamp) {
+			return 0;
+		}
+	}
+
+	private static class DummyPeriodicAssigner implements AssignerWithPeriodicWatermarks<Tuple2<String,
Integer>> {
+		@Override
+		public long extractTimestamp(Tuple2<String, Integer> element,
+				long previousElementTimestamp) {
+			return 0;
+		}
+
+		@Nullable
+		@Override
+		public Watermark getCurrentWatermark() {
+			return null;
+		}
+	}
+}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
index fd271791711..fa07709bfc9 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
@@ -258,12 +258,6 @@ public void flatMap(Integer value, Collector<Object> out) throws
Exception {
 		env.getStreamGraph().getJobGraph();
 		Assert.assertEquals(-1, operator.getTransformation().getMaxParallelism());
 
-		// configured value after generating
-		env.setParallelism(21);
-		env.setMaxParallelism(42);
-		env.getStreamGraph().getJobGraph();
-		Assert.assertEquals(42, operator.getTransformation().getMaxParallelism());
-
 		// bounds configured parallelism 1
 		try {
 			env.setMaxParallelism(0);
@@ -293,6 +287,7 @@ public void flatMap(Integer value, Collector<Object> out) throws
Exception {
 		}
 
 		// bounds for max parallelism 3
+		operator.setParallelism(1);
 		operator.setMaxParallelism(1);
 		Assert.assertEquals(1, operator.getTransformation().getMaxParallelism());
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
index fbbb5d23146..544f69ac89b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
@@ -270,49 +270,6 @@ public Integer getKey(Integer value) throws Exception {
 		StreamPartitioner<?> streamPartitioner = keyedResultNode.getInEdges().get(0).getPartitioner();
 	}
 
-	/**
-	 * Tests that the global and operator-wide max parallelism setting is respected
-	 */
-	@Test
-	public void testMaxParallelismForwarding() {
-		int globalMaxParallelism = 42;
-		int keyedResult2MaxParallelism = 17;
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.getConfig().setParallelism(12);
-		env.getConfig().setMaxParallelism(globalMaxParallelism);
-
-		DataStream<Integer> source = env.fromElements(1, 2, 3);
-
-		DataStream<Integer> keyedResult1 = source.keyBy(new KeySelector<Integer, Integer>()
{
-			private static final long serialVersionUID = 9205556348021992189L;
-
-			@Override
-			public Integer getKey(Integer value) throws Exception {
-				return value;
-			}
-		}).map(new NoOpIntMap());
-
-		DataStream<Integer> keyedResult2 = keyedResult1.keyBy(new KeySelector<Integer,
Integer>() {
-			private static final long serialVersionUID = 1250168178707154838L;
-
-			@Override
-			public Integer getKey(Integer value) throws Exception {
-				return value;
-			}
-		}).map(new NoOpIntMap()).setMaxParallelism(keyedResult2MaxParallelism);
-
-		keyedResult2.addSink(new DiscardingSink<Integer>());
-
-		StreamGraph graph = env.getStreamGraph();
-
-		StreamNode keyedResult1Node = graph.getStreamNode(keyedResult1.getId());
-		StreamNode keyedResult2Node = graph.getStreamNode(keyedResult2.getId());
-
-		assertEquals(globalMaxParallelism, keyedResult1Node.getMaxParallelism());
-		assertEquals(keyedResult2MaxParallelism, keyedResult2Node.getMaxParallelism());
-	}
-
 	/**
 	 * Tests that the max parallelism is automatically set to the parallelism if it has not
been
 	 * specified.
@@ -321,7 +278,7 @@ public Integer getKey(Integer value) throws Exception {
 	public void testAutoMaxParallelism() {
 		int globalParallelism = 42;
 		int mapParallelism = 17;
-		int maxParallelism = 21;
+		int maxParallelism = 84;
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setParallelism(globalParallelism);
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index abf51abef80..d8e1c046779 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.api.graph;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.MapFunction;
@@ -37,6 +38,7 @@
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.lang.reflect.Method;
@@ -50,6 +52,83 @@
 @SuppressWarnings("serial")
 public class StreamingJobGraphGeneratorTest extends TestLogger {
 
+	/**
+	 * Verify that the default parallelism and max parallelism are manifested in
+	 * the generated job graph when no parallelism is set on operator or execution environment.
+	 */
+	@Test
+	public void testDefaultParallelismManifestation() {
+		final int customParallelism = 5;
+		final int customMaxParallelism = 10;
+		final int defaultParallelism = 20;
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(defaultParallelism);
+		env.disableOperatorChaining();
+
+		assertEquals(ExecutionConfig.PARALLELISM_DEFAULT, env.getParallelism());
+
+		env
+				.fromElements("a")
+				.map(new IdentityMap()).name("map1").setParallelism(customParallelism).setMaxParallelism(customMaxParallelism)
+				.map(new IdentityMap()).name("map2");
+
+		StreamGraph streamGraph = env.getStreamGraph();
+		streamGraph.setJobName("test job");
+		JobGraph jobGraph = streamGraph.getJobGraph();
+
+		assertEquals(3, jobGraph.getVerticesSortedTopologicallyFromSources().size());
+
+		JobVertex map1Vertex = getOnlyVertex(jobGraph, "map1");
+		JobVertex map2Vertex = getOnlyVertex(jobGraph, "map2");
+
+		assertEquals(customParallelism, map1Vertex.getParallelism());
+		assertEquals(defaultParallelism, map2Vertex.getParallelism());
+
+		assertEquals(customMaxParallelism, map1Vertex.getMaxParallelism());
+		assertEquals(ExecutionConfig.PARALLELISM_DEFAULT, map2Vertex.getMaxParallelism());
+	}
+
+	/**
+	 * Verify that the execution environment parallelism and max parallelism are manifested
in
+	 * the generated job graph when no parallelism is set on operator or execution environment.
+	 */
+	@Test
+	public void testEnvironmentParallelismManifestation() {
+		final int customParallelism = 5;
+		final int customMaxParallelism = 10;
+		final int defaultParallelism = 20;
+		final int envParallelism = 25;
+		final int envMaxParallelism = 30;
+
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(defaultParallelism);
+		env.disableOperatorChaining();
+		env.setParallelism(envParallelism);
+		env.setMaxParallelism(envMaxParallelism);
+
+		env
+				.fromElements("a")
+				.map(new IdentityMap()).name("map1").setParallelism(customParallelism).setMaxParallelism(customMaxParallelism)
+				.map(new IdentityMap()).name("map2");
+
+		// test once, then change the env parallelism and generate again
+		StreamGraph streamGraph = env.getStreamGraph();
+		streamGraph.setJobName("test job");
+		JobGraph jobGraph = streamGraph.getJobGraph();
+
+		assertEquals(3, jobGraph.getVerticesSortedTopologicallyFromSources().size());
+
+		JobVertex map1Vertex = getOnlyVertex(jobGraph, "map1");
+		JobVertex map2Vertex = getOnlyVertex(jobGraph, "map2");
+
+		assertEquals(customParallelism, map1Vertex.getParallelism());
+		assertEquals(envParallelism, map2Vertex.getParallelism());
+
+		assertEquals(customMaxParallelism, map1Vertex.getMaxParallelism());
+		assertEquals(envMaxParallelism, map2Vertex.getMaxParallelism());
+	}
+
+
 	@Test
 	public void testParallelismOneNotChained() {
 
@@ -115,7 +194,8 @@ public void testDisabledCheckpointing() throws Exception {
 		StreamGraph streamGraph = new StreamGraph(env, 1 /* default parallelism */);
 		assertFalse("Checkpointing enabled", streamGraph.getCheckpointConfig().isCheckpointingEnabled());
 
-		StreamingJobGraphGenerator jobGraphGenerator = new StreamingJobGraphGenerator(streamGraph,
1 /* default parallelism */);
+		StreamingJobGraphGenerator jobGraphGenerator =
+				new StreamingJobGraphGenerator(streamGraph, 1 /* default parallelism */, 1 /* max parallelism
*/);
 		JobGraph jobGraph = jobGraphGenerator.createJobGraph();
 
 		JobSnapshottingSettings snapshottingSettings = jobGraph.getSnapshotSettings();
@@ -137,7 +217,9 @@ public Integer map(Integer value) throws Exception {
 				}
 			})
 			.print();
-		JobGraph jobGraph = new StreamingJobGraphGenerator(env.getStreamGraph(), 1 /* default parallelism
*/).createJobGraph();
+
+		JobGraph jobGraph = new StreamingJobGraphGenerator(
+				env.getStreamGraph(), 1 /* default parallelism */, -1 /* default max parallelism */).createJobGraph();
 
 		List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
 		JobVertex sourceVertex = verticesSorted.get(0);
@@ -224,7 +306,8 @@ public void invoke(Tuple2<Integer, Integer> value) throws Exception
{
 		});
 		sinkMethod.invoke(sink, resource5);
 
-		JobGraph jobGraph = new StreamingJobGraphGenerator(env.getStreamGraph(), 1 /* default parallelism
*/).createJobGraph();
+		JobGraph jobGraph = new StreamingJobGraphGenerator(
+				env.getStreamGraph(), 1 /* default parallelism */, -1 /* default max parallelism */).createJobGraph();
 
 		JobVertex sourceMapFilterVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(0);
 		JobVertex reduceSinkVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(1);
@@ -291,7 +374,8 @@ public void invoke(Integer value) throws Exception {
 		}).disableChaining().name("test_sink");
 		sinkMethod.invoke(sink, resource5);
 
-		JobGraph jobGraph = new StreamingJobGraphGenerator(env.getStreamGraph(), 1 /* default parallelism
*/).createJobGraph();
+		JobGraph jobGraph = new StreamingJobGraphGenerator(
+				env.getStreamGraph(), 1 /* default parallelism */, -1 /* default max parallelism */).createJobGraph();
 
 		for (JobVertex jobVertex : jobGraph.getVertices()) {
 			if (jobVertex.getName().contains("test_source")) {
@@ -307,4 +391,33 @@ public void invoke(Integer value) throws Exception {
 			}
 		}
 	}
+
+	/**
+	 * Returns the only vertex whose name contains the given name. Throws an {@link AssertionError}
+	 * if more than one vertex matches.
+	 */
+	private static JobVertex getOnlyVertex(JobGraph jobGraph, String name) {
+		JobVertex result = null;
+		for (JobVertex v : jobGraph.getVertices()) {
+			if (v.getName().equals(name)) {
+				if (result != null) {
+					Assert.fail("More than one vertex matches the name.");
+				}
+				result = v;
+			}
+		}
+		if (result == null) {
+			Assert.fail("No vertex matches the name.");
+		}
+		return result;
+	}
+
+	private static class IdentityMap implements MapFunction<String, String> {
+		private static final long serialVersionUID = 471891682418382583L;
+
+		@Override
+		public String map(String value) {
+			return value;
+		}
+	}
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Some setParallelism() methods can't cope with default parallelism
> -----------------------------------------------------------------
>
>                 Key: FLINK-6188
>                 URL: https://issues.apache.org/jira/browse/FLINK-6188
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.2.1, 1.3.0
>            Reporter: Aljoscha Krettek
>            Priority: Major
>              Labels: pull-request-available
>
> Recent changes done for FLINK-5808 move default parallelism manifestation from eager
to lazy, that is, the parallelism of operations that don't have an explicit parallelism is
only set when generating the JobGraph. Some {{setParallelism()}} calls, such as {{SingleOutputStreamOperator.setParallelism()}}
cannot deal with the fact that the parallelism of an operation might be {{-1}} (which indicates
that it should take the default parallelism when generating the JobGraph).
> We should either revert the changes that fixed another user-facing bug for version 1.2.1
or fix the methods.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message