flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [1/2] flink git commit: [FLINK-5473] Limit max parallelism to 1 for non-parallel operators
Date Tue, 24 Jan 2017 14:52:32 GMT
Repository: flink
Updated Branches:
  refs/heads/master 5f0d8c9dd -> acfeeaf5e


http://git-wip-us.apache.org/repos/asf/flink/blob/acfeeaf5/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 7771064..6d01795 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -38,6 +38,7 @@ import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.io.StreamRecordWriter;
+import org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -94,6 +95,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>>
{
 		try {
 			for (int i = 0; i < outEdgesInOrder.size(); i++) {
 				StreamEdge outEdge = outEdgesInOrder.get(i);
+
 				RecordWriterOutput<?> streamOutput = createStreamOutput(
 						outEdge, chainedConfigs.get(outEdge.getSourceId()), i,
 						containingTask.getEnvironment(), containingTask.getName());
@@ -330,6 +332,14 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>>
{
 		
 		ResultPartitionWriter bufferWriter = taskEnvironment.getWriter(outputIndex);
 
+		// we initialize the partitioner here with the number of key groups (aka max. parallelism)
+		if (outputPartitioner instanceof ConfigurableStreamPartitioner) {
+			int numKeyGroups = bufferWriter.getNumTargetKeyGroups();
+			if (0 < numKeyGroups) {
+				((ConfigurableStreamPartitioner) outputPartitioner).configure(numKeyGroups);
+			}
+		}
+
 		StreamRecordWriter<SerializationDelegate<StreamRecord<T>>> output = 
 				new StreamRecordWriter<>(bufferWriter, outputPartitioner, upStreamConfig.getBufferTimeout());
 		output.setMetricGroup(taskEnvironment.getMetricGroup().getIOMetricGroup());

http://git-wip-us.apache.org/repos/asf/flink/blob/acfeeaf5/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
index 3fb4513..3fc1344 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
@@ -17,19 +17,12 @@
 
 package org.apache.flink.streaming.api;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-
+import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
@@ -38,10 +31,20 @@ import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.util.Collector;
 import org.apache.flink.util.SplittableIterator;
-
+import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 public class StreamExecutionEnvironmentTest {
 
 	@Test
@@ -124,6 +127,102 @@ public class StreamExecutionEnvironmentTest {
 		assertTrue(getFunctionFromDataSource(src4) instanceof FromElementsFunction);
 	}
 
+	@Test
+	public void testParallelismBounds() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		SourceFunction<Integer> srcFun = new SourceFunction<Integer>() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public void run(SourceContext<Integer> ctx) throws Exception {
+			}
+
+			@Override
+			public void cancel() {
+			}
+		};
+
+
+		SingleOutputStreamOperator<Object> operator =
+				env.addSource(srcFun).flatMap(new FlatMapFunction<Integer, Object>() {
+
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public void flatMap(Integer value, Collector<Object> out) throws Exception {
+
+			}
+		});
+
+		// default value for max parallelism
+		Assert.assertEquals(-1, operator.getTransformation().getMaxParallelism());
+
+		// bounds for parallelism 1
+		try {
+			operator.setParallelism(0);
+			Assert.fail();
+		} catch (IllegalArgumentException expected) {
+		}
+
+		// bounds for parallelism 2
+		operator.setParallelism(1);
+		Assert.assertEquals(1, operator.getParallelism());
+
+		// bounds for parallelism 3
+		operator.setParallelism(1 << 15);
+		Assert.assertEquals(1 << 15, operator.getParallelism());
+
+		// default value after generating
+		env.getStreamGraph().getJobGraph();
+		Assert.assertEquals(-1, operator.getTransformation().getMaxParallelism());
+
+		// configured value after generating
+		env.setMaxParallelism(42);
+		env.getStreamGraph().getJobGraph();
+		Assert.assertEquals(42, operator.getTransformation().getMaxParallelism());
+
+		// bounds configured parallelism 1
+		try {
+			env.setMaxParallelism(0);
+			Assert.fail();
+		} catch (IllegalArgumentException expected) {
+		}
+
+		// bounds configured parallelism 2
+		try {
+			env.setMaxParallelism(1 + (1 << 15));
+			Assert.fail();
+		} catch (IllegalArgumentException expected) {
+		}
+
+		// bounds for max parallelism 1
+		try {
+			operator.setMaxParallelism(0);
+			Assert.fail();
+		} catch (IllegalArgumentException expected) {
+		}
+
+		// bounds for max parallelism 2
+		try {
+			operator.setMaxParallelism(1 + (1 << 15));
+			Assert.fail();
+		} catch (IllegalArgumentException expected) {
+		}
+
+		// bounds for max parallelism 3
+		operator.setMaxParallelism(1);
+		Assert.assertEquals(1, operator.getTransformation().getMaxParallelism());
+
+		// bounds for max parallelism 4
+		operator.setMaxParallelism(1 << 15);
+		Assert.assertEquals(1 << 15, operator.getTransformation().getMaxParallelism());
+
+		// override config
+		env.getStreamGraph().getJobGraph();
+		Assert.assertEquals(1 << 15 , operator.getTransformation().getMaxParallelism());
+	}
+
 	/////////////////////////////////////////////////////////////
 	// Utilities
 	/////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/flink/blob/acfeeaf5/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
index aa86304..5fdacd4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
@@ -22,13 +22,12 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.streaming.api.datastream.ConnectedStreams;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
@@ -37,7 +36,6 @@ import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner;
-import org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
@@ -46,7 +44,6 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.util.EvenOddOutputSelector;
 import org.apache.flink.streaming.util.NoOpIntMap;
-
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -368,13 +365,9 @@ public class StreamGraphGeneratorTest {
 
 		StreamGraph graph = env.getStreamGraph();
 
-		StreamNode keyedResult1Node = graph.getStreamNode(keyedResult1.getId());
-		StreamNode keyedResult2Node = graph.getStreamNode(keyedResult2.getId());
 		StreamNode keyedResult3Node = graph.getStreamNode(keyedResult3.getId());
 		StreamNode keyedResult4Node = graph.getStreamNode(keyedResult4.getId());
 
-		assertEquals(KeyGroupRangeAssignment.DEFAULT_MAX_PARALLELISM, keyedResult1Node.getMaxParallelism());
-		assertEquals(KeyGroupRangeAssignment.DEFAULT_MAX_PARALLELISM, keyedResult2Node.getMaxParallelism());
 		assertEquals(maxParallelism, keyedResult3Node.getMaxParallelism());
 		assertEquals(maxParallelism, keyedResult4Node.getMaxParallelism());
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/acfeeaf5/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index 8045d82..073632a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
@@ -122,19 +123,29 @@ public class RescalingITCase extends TestLogger {
 
 	@Test
 	public void testSavepointRescalingInKeyedState() throws Exception {
-		testSavepointRescalingKeyedState(false);
+		testSavepointRescalingKeyedState(false, false);
 	}
 
 	@Test
 	public void testSavepointRescalingOutKeyedState() throws Exception {
-		testSavepointRescalingKeyedState(true);
+		testSavepointRescalingKeyedState(true, false);
+	}
+
+	@Test
+	public void testSavepointRescalingInKeyedStateDerivedMaxParallelism() throws Exception {
+		testSavepointRescalingKeyedState(false, true);
+	}
+
+	@Test
+	public void testSavepointRescalingOutKeyedStateDerivedMaxParallelism() throws Exception
{
+		testSavepointRescalingKeyedState(true, true);
 	}
 
 	/**
 	 * Tests that a a job with purely keyed state can be restarted from a savepoint
 	 * with a different parallelism.
 	 */
-	public void testSavepointRescalingKeyedState(boolean scaleOut) throws Exception {
+	public void testSavepointRescalingKeyedState(boolean scaleOut, boolean deriveMaxParallelism)
throws Exception {
 		final int numberKeys = 42;
 		final int numberElements = 1000;
 		final int numberElements2 = 500;
@@ -194,7 +205,9 @@ public class RescalingITCase extends TestLogger {
 
 			jobID = null;
 
-			JobGraph scaledJobGraph = createJobGraphWithKeyedState(parallelism2, maxParallelism, numberKeys,
numberElements2, true, 100);
+			int restoreMaxParallelism = deriveMaxParallelism ? ExecutionJobVertex.VALUE_NOT_SET :
maxParallelism;
+
+			JobGraph scaledJobGraph = createJobGraphWithKeyedState(parallelism2, restoreMaxParallelism,
numberKeys, numberElements2, true, 100);
 
 			scaledJobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
 
@@ -642,7 +655,9 @@ public class RescalingITCase extends TestLogger {
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setParallelism(parallelism);
-		env.getConfig().setMaxParallelism(maxParallelism);
+		if (0 < maxParallelism) {
+			env.getConfig().setMaxParallelism(maxParallelism);
+		}
 		env.enableCheckpointing(checkpointingInterval);
 		env.setRestartStrategy(RestartStrategies.noRestart());
 


Mime
View raw message