flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [2/2] flink git commit: [FLINK-5473] Limit max parallelism to 1 for non-parallel operators
Date Tue, 24 Jan 2017 14:52:33 GMT
[FLINK-5473] Limit max parallelism to 1 for non-parallel operators

[FLINK-5473] Better default behaviours for unspecified maximum parallelism

This closes #3182.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/acfeeaf5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/acfeeaf5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/acfeeaf5

Branch: refs/heads/master
Commit: acfeeaf5e337e56300d10a3a991e79edc827ac7a
Parents: 5f0d8c9
Author: Stefan Richter <s.richter@data-artisans.com>
Authored: Mon Jan 16 14:31:22 2017 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Tue Jan 24 15:51:35 2017 +0100

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       |   4 +-
 .../checkpoint/StateAssignmentOperation.java    | 285 ++++++++++---------
 .../checkpoint/savepoint/SavepointLoader.java   |   8 +-
 .../ResultPartitionDeploymentDescriptor.java    |  18 +-
 .../runtime/executiongraph/ExecutionGraph.java  |   8 +-
 .../executiongraph/ExecutionJobVertex.java      | 115 ++++++--
 .../runtime/executiongraph/ExecutionVertex.java |  31 +-
 .../api/writer/ResultPartitionWriter.java       |   4 +
 .../io/network/partition/ResultPartition.java   |   8 +
 .../flink/runtime/jobgraph/JobVertex.java       |   7 +-
 .../runtime/state/KeyGroupRangeAssignment.java  |  42 ++-
 .../apache/flink/runtime/taskmanager/Task.java  |   1 +
 .../checkpoint/CheckpointCoordinatorTest.java   |   1 +
 .../savepoint/SavepointLoaderTest.java          |   1 +
 ...ResultPartitionDeploymentDescriptorTest.java |   1 +
 .../executiongraph/ExecutionJobVertexTest.java  | 140 +++++++++
 .../ExecutionVertexDeploymentTest.java          |  27 +-
 .../api/writer/ResultPartitionWriterTest.java   |   5 +-
 .../network/partition/ResultPartitionTest.java  |   1 +
 .../consumer/LocalInputChannelTest.java         |   1 +
 .../runtime/jobmanager/JobManagerTest.java      |   2 +
 .../runtime/taskmanager/TaskManagerTest.java    |   5 +-
 .../streaming/api/datastream/KeyedStream.java   |  13 +-
 .../datastream/SingleOutputStreamOperator.java  |  29 +-
 .../environment/StreamExecutionEnvironment.java |  12 +-
 .../flink/streaming/api/graph/StreamGraph.java  |  37 +--
 .../api/graph/StreamGraphGenerator.java         |  26 +-
 .../flink/streaming/api/graph/StreamNode.java   |   2 -
 .../api/graph/StreamingJobGraphGenerator.java   |  13 +-
 .../transformations/StreamTransformation.java   |   5 +
 .../partitioner/KeyGroupStreamPartitioner.java  |   1 +
 .../streaming/runtime/tasks/OperatorChain.java  |  10 +
 .../api/StreamExecutionEnvironmentTest.java     | 119 +++++++-
 .../api/graph/StreamGraphGeneratorTest.java     |   9 +-
 .../test/checkpointing/RescalingITCase.java     |  25 +-
 35 files changed, 724 insertions(+), 292 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/acfeeaf5/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 9132897..78cad91 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -885,8 +885,10 @@ public class CheckpointCoordinator {
 
 			LOG.info("Restoring from latest valid checkpoint: {}.", latest);
 
+			final Map<JobVertexID, TaskState> taskStates = latest.getTaskStates();
+
 			StateAssignmentOperation stateAssignmentOperation =
-					new StateAssignmentOperation(LOG, tasks, latest, allowNonRestoredState);
+					new StateAssignmentOperation(LOG, tasks, taskStates, allowNonRestoredState);
 
 			stateAssignmentOperation.assignStates();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/acfeeaf5/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
index 6c23f02..6d075db 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
@@ -45,29 +45,34 @@ public class StateAssignmentOperation {
 
 	private final Logger logger;
 	private final Map<JobVertexID, ExecutionJobVertex> tasks;
-	private final CompletedCheckpoint latest;
+	private final Map<JobVertexID, TaskState> taskStates;
 	private final boolean allowNonRestoredState;
 
 	public StateAssignmentOperation(
 			Logger logger,
 			Map<JobVertexID, ExecutionJobVertex> tasks,
-			CompletedCheckpoint latest,
+			Map<JobVertexID, TaskState> taskStates,
 			boolean allowNonRestoredState) {
 
-		this.logger = logger;
-		this.tasks = tasks;
-		this.latest = latest;
+		this.logger = Preconditions.checkNotNull(logger);
+		this.tasks = Preconditions.checkNotNull(tasks);
+		this.taskStates = Preconditions.checkNotNull(taskStates);
 		this.allowNonRestoredState = allowNonRestoredState;
 	}
 
 	public boolean assignStates() throws Exception {
 
+		// this tracks if we find missing node hash ids and already use secondary mappings
 		boolean expandedToLegacyIds = false;
+
 		Map<JobVertexID, ExecutionJobVertex> localTasks = this.tasks;
 
-		for (Map.Entry<JobVertexID, TaskState> taskGroupStateEntry : latest.getTaskStates().entrySet()) {
+		for (Map.Entry<JobVertexID, TaskState> taskGroupStateEntry : taskStates.entrySet()) {
 
 			TaskState taskState = taskGroupStateEntry.getValue();
+
+			//----------------------------------------find vertex for state---------------------------------------------
+
 			ExecutionJobVertex executionJobVertex = localTasks.get(taskGroupStateEntry.getKey());
 
 			// on the first time we can not find the execution job vertex for an id, we also consider alternative ids,
@@ -89,8 +94,31 @@ public class StateAssignmentOperation {
 				}
 			}
 
-			// check that the number of key groups have not changed
-			if (taskState.getMaxParallelism() != executionJobVertex.getMaxParallelism()) {
+			checkParallelismPreconditions(taskState, executionJobVertex);
+
+			assignTaskStatesToOperatorInstances(taskState, executionJobVertex);
+		}
+
+		return true;
+	}
+
+	private void checkParallelismPreconditions(TaskState taskState, ExecutionJobVertex executionJobVertex) {
+		//----------------------------------------max parallelism preconditions-------------------------------------
+
+		// check that the number of key groups have not changed or if we need to override it to satisfy the restored state
+		if (taskState.getMaxParallelism() != executionJobVertex.getMaxParallelism()) {
+
+			if (!executionJobVertex.isMaxParallelismConfigured()) {
+				// if the max parallelism was not explicitly specified by the user, we derive it from the state
+
+				if (logger.isDebugEnabled()) {
+					logger.debug("Overriding maximum parallelism for JobVertex " + executionJobVertex.getJobVertexId()
+							+ " from " + executionJobVertex.getMaxParallelism() + " to " + taskState.getMaxParallelism());
+				}
+
+				executionJobVertex.setMaxParallelism(taskState.getMaxParallelism());
+			} else {
+				// if the max parallelism was explicitly specified, we complain on mismatch
 				throw new IllegalStateException("The maximum parallelism (" +
 						taskState.getMaxParallelism() + ") with which the latest " +
 						"checkpoint of the execution job vertex " + executionJobVertex +
@@ -98,159 +126,162 @@ public class StateAssignmentOperation {
 						executionJobVertex.getMaxParallelism() + ") changed. This " +
 						"is currently not supported.");
 			}
+		}
 
-			//-------------------------------------------------------------------
+		//----------------------------------------parallelism preconditions-----------------------------------------
 
-			final int oldParallelism = taskState.getParallelism();
-			final int newParallelism = executionJobVertex.getParallelism();
-			final boolean parallelismChanged = oldParallelism != newParallelism;
-			final boolean hasNonPartitionedState = taskState.hasNonPartitionedState();
+		final int oldParallelism = taskState.getParallelism();
+		final int newParallelism = executionJobVertex.getParallelism();
 
-			if (hasNonPartitionedState && parallelismChanged) {
-				throw new IllegalStateException("Cannot restore the latest checkpoint because " +
-						"the operator " + executionJobVertex.getJobVertexId() + " has non-partitioned " +
-						"state and its parallelism changed. The operator " + executionJobVertex.getJobVertexId() +
-						" has parallelism " + newParallelism + " whereas the corresponding " +
-						"state object has a parallelism of " + oldParallelism);
-			}
+		if (taskState.hasNonPartitionedState() && (oldParallelism != newParallelism)) {
+			throw new IllegalStateException("Cannot restore the latest checkpoint because " +
+					"the operator " + executionJobVertex.getJobVertexId() + " has non-partitioned " +
+					"state and its parallelism changed. The operator " + executionJobVertex.getJobVertexId() +
+					" has parallelism " + newParallelism + " whereas the corresponding " +
+					"state object has a parallelism of " + oldParallelism);
+		}
+	}
 
-			List<KeyGroupRange> keyGroupPartitions = createKeyGroupPartitions(
-					executionJobVertex.getMaxParallelism(),
-					newParallelism);
+	private static void assignTaskStatesToOperatorInstances(
+			TaskState taskState, ExecutionJobVertex executionJobVertex) {
 
-			final int chainLength = taskState.getChainLength();
+		final int oldParallelism = taskState.getParallelism();
+		final int newParallelism = executionJobVertex.getParallelism();
 
-			// operator chain idx -> list of the stored op states from all parallel instances for this chain idx
-			@SuppressWarnings("unchecked")
-			List<OperatorStateHandle>[] parallelOpStatesBackend = new List[chainLength];
-			@SuppressWarnings("unchecked")
-			List<OperatorStateHandle>[] parallelOpStatesStream = new List[chainLength];
+		List<KeyGroupRange> keyGroupPartitions = createKeyGroupPartitions(
+				executionJobVertex.getMaxParallelism(),
+				newParallelism);
 
-			List<KeyGroupsStateHandle> parallelKeyedStatesBackend = new ArrayList<>(oldParallelism);
-			List<KeyGroupsStateHandle> parallelKeyedStateStream = new ArrayList<>(oldParallelism);
+		final int chainLength = taskState.getChainLength();
 
-			for (int p = 0; p < oldParallelism; ++p) {
-				SubtaskState subtaskState = taskState.getState(p);
+		// operator chain idx -> list of the stored op states from all parallel instances for this chain idx
+		@SuppressWarnings("unchecked")
+		List<OperatorStateHandle>[] parallelOpStatesBackend = new List[chainLength];
+		@SuppressWarnings("unchecked")
+		List<OperatorStateHandle>[] parallelOpStatesStream = new List[chainLength];
 
-				if (null != subtaskState) {
-					collectParallelStatesByChainOperator(
-							parallelOpStatesBackend, subtaskState.getManagedOperatorState());
+		List<KeyGroupsStateHandle> parallelKeyedStatesBackend = new ArrayList<>(oldParallelism);
+		List<KeyGroupsStateHandle> parallelKeyedStateStream = new ArrayList<>(oldParallelism);
 
-					collectParallelStatesByChainOperator(
-							parallelOpStatesStream, subtaskState.getRawOperatorState());
+		for (int p = 0; p < oldParallelism; ++p) {
+			SubtaskState subtaskState = taskState.getState(p);
 
-					KeyGroupsStateHandle keyedStateBackend = subtaskState.getManagedKeyedState();
-					if (null != keyedStateBackend) {
-						parallelKeyedStatesBackend.add(keyedStateBackend);
-					}
+			if (null != subtaskState) {
+				collectParallelStatesByChainOperator(
+						parallelOpStatesBackend, subtaskState.getManagedOperatorState());
 
-					KeyGroupsStateHandle keyedStateStream = subtaskState.getRawKeyedState();
-					if (null != keyedStateStream) {
-						parallelKeyedStateStream.add(keyedStateStream);
-					}
+				collectParallelStatesByChainOperator(
+						parallelOpStatesStream, subtaskState.getRawOperatorState());
+
+				KeyGroupsStateHandle keyedStateBackend = subtaskState.getManagedKeyedState();
+				if (null != keyedStateBackend) {
+					parallelKeyedStatesBackend.add(keyedStateBackend);
+				}
+
+				KeyGroupsStateHandle keyedStateStream = subtaskState.getRawKeyedState();
+				if (null != keyedStateStream) {
+					parallelKeyedStateStream.add(keyedStateStream);
 				}
 			}
+		}
 
-			// operator chain index -> lists with collected states (one collection for each parallel subtasks)
-			@SuppressWarnings("unchecked")
-			List<Collection<OperatorStateHandle>>[] partitionedParallelStatesBackend = new List[chainLength];
+		// operator chain index -> lists with collected states (one collection for each parallel subtasks)
+		@SuppressWarnings("unchecked")
+		List<Collection<OperatorStateHandle>>[] partitionedParallelStatesBackend = new List[chainLength];
 
-			@SuppressWarnings("unchecked")
-			List<Collection<OperatorStateHandle>>[] partitionedParallelStatesStream = new List[chainLength];
+		@SuppressWarnings("unchecked")
+		List<Collection<OperatorStateHandle>>[] partitionedParallelStatesStream = new List[chainLength];
 
-			//TODO here we can employ different redistribution strategies for state, e.g. union state.
-			// For now we only offer round robin as the default.
-			OperatorStateRepartitioner opStateRepartitioner = RoundRobinOperatorStateRepartitioner.INSTANCE;
+		//TODO here we can employ different redistribution strategies for state, e.g. union state.
+		// For now we only offer round robin as the default.
+		OperatorStateRepartitioner opStateRepartitioner = RoundRobinOperatorStateRepartitioner.INSTANCE;
 
-			for (int chainIdx = 0; chainIdx < chainLength; ++chainIdx) {
+		for (int chainIdx = 0; chainIdx < chainLength; ++chainIdx) {
 
-				List<OperatorStateHandle> chainOpParallelStatesBackend = parallelOpStatesBackend[chainIdx];
-				List<OperatorStateHandle> chainOpParallelStatesStream = parallelOpStatesStream[chainIdx];
+			List<OperatorStateHandle> chainOpParallelStatesBackend = parallelOpStatesBackend[chainIdx];
+			List<OperatorStateHandle> chainOpParallelStatesStream = parallelOpStatesStream[chainIdx];
 
-				partitionedParallelStatesBackend[chainIdx] = applyRepartitioner(
-						opStateRepartitioner,
-						chainOpParallelStatesBackend,
-						oldParallelism,
-						newParallelism);
+			partitionedParallelStatesBackend[chainIdx] = applyRepartitioner(
+					opStateRepartitioner,
+					chainOpParallelStatesBackend,
+					oldParallelism,
+					newParallelism);
 
-				partitionedParallelStatesStream[chainIdx] = applyRepartitioner(
-						opStateRepartitioner,
-						chainOpParallelStatesStream,
-						oldParallelism,
-						newParallelism);
-			}
+			partitionedParallelStatesStream[chainIdx] = applyRepartitioner(
+					opStateRepartitioner,
+					chainOpParallelStatesStream,
+					oldParallelism,
+					newParallelism);
+		}
 
-			for (int subTaskIdx = 0; subTaskIdx < newParallelism; ++subTaskIdx) {
-				// non-partitioned state
-				ChainedStateHandle<StreamStateHandle> nonPartitionableState = null;
+		for (int subTaskIdx = 0; subTaskIdx < newParallelism; ++subTaskIdx) {
+			// non-partitioned state
+			ChainedStateHandle<StreamStateHandle> nonPartitionableState = null;
 
-				if (!parallelismChanged) {
-					if (taskState.getState(subTaskIdx) != null) {
-						nonPartitionableState = taskState.getState(subTaskIdx).getLegacyOperatorState();
-					}
+			if (oldParallelism == newParallelism) {
+				if (taskState.getState(subTaskIdx) != null) {
+					nonPartitionableState = taskState.getState(subTaskIdx).getLegacyOperatorState();
 				}
+			}
 
-				// partitionable state
-				@SuppressWarnings("unchecked")
-				Collection<OperatorStateHandle>[] iab = new Collection[chainLength];
-				@SuppressWarnings("unchecked")
-				Collection<OperatorStateHandle>[] ias = new Collection[chainLength];
-				List<Collection<OperatorStateHandle>> operatorStateFromBackend = Arrays.asList(iab);
-				List<Collection<OperatorStateHandle>> operatorStateFromStream = Arrays.asList(ias);
-
-				for (int chainIdx = 0; chainIdx < partitionedParallelStatesBackend.length; ++chainIdx) {
-					List<Collection<OperatorStateHandle>> redistributedOpStateBackend =
-							partitionedParallelStatesBackend[chainIdx];
+			// partitionable state
+			@SuppressWarnings("unchecked")
+			Collection<OperatorStateHandle>[] iab = new Collection[chainLength];
+			@SuppressWarnings("unchecked")
+			Collection<OperatorStateHandle>[] ias = new Collection[chainLength];
+			List<Collection<OperatorStateHandle>> operatorStateFromBackend = Arrays.asList(iab);
+			List<Collection<OperatorStateHandle>> operatorStateFromStream = Arrays.asList(ias);
 
-					List<Collection<OperatorStateHandle>> redistributedOpStateStream =
-							partitionedParallelStatesStream[chainIdx];
+			for (int chainIdx = 0; chainIdx < partitionedParallelStatesBackend.length; ++chainIdx) {
+				List<Collection<OperatorStateHandle>> redistributedOpStateBackend =
+						partitionedParallelStatesBackend[chainIdx];
 
-					if (redistributedOpStateBackend != null) {
-						operatorStateFromBackend.set(chainIdx, redistributedOpStateBackend.get(subTaskIdx));
-					}
+				List<Collection<OperatorStateHandle>> redistributedOpStateStream =
+						partitionedParallelStatesStream[chainIdx];
 
-					if (redistributedOpStateStream != null) {
-						operatorStateFromStream.set(chainIdx, redistributedOpStateStream.get(subTaskIdx));
-					}
+				if (redistributedOpStateBackend != null) {
+					operatorStateFromBackend.set(chainIdx, redistributedOpStateBackend.get(subTaskIdx));
 				}
 
-				Execution currentExecutionAttempt = executionJobVertex
-						.getTaskVertices()[subTaskIdx]
-						.getCurrentExecutionAttempt();
+				if (redistributedOpStateStream != null) {
+					operatorStateFromStream.set(chainIdx, redistributedOpStateStream.get(subTaskIdx));
+				}
+			}
 
-				List<KeyGroupsStateHandle> newKeyedStatesBackend;
-				List<KeyGroupsStateHandle> newKeyedStateStream;
-				if (parallelismChanged) {
-					KeyGroupRange subtaskKeyGroupIds = keyGroupPartitions.get(subTaskIdx);
-					newKeyedStatesBackend = getKeyGroupsStateHandles(parallelKeyedStatesBackend, subtaskKeyGroupIds);
-					newKeyedStateStream = getKeyGroupsStateHandles(parallelKeyedStateStream, subtaskKeyGroupIds);
+			Execution currentExecutionAttempt = executionJobVertex
+					.getTaskVertices()[subTaskIdx]
+					.getCurrentExecutionAttempt();
+
+			List<KeyGroupsStateHandle> newKeyedStatesBackend;
+			List<KeyGroupsStateHandle> newKeyedStateStream;
+			if (oldParallelism == newParallelism) {
+				SubtaskState subtaskState = taskState.getState(subTaskIdx);
+				if (subtaskState != null) {
+					KeyGroupsStateHandle oldKeyedStatesBackend = subtaskState.getManagedKeyedState();
+					KeyGroupsStateHandle oldKeyedStatesStream = subtaskState.getRawKeyedState();
+					newKeyedStatesBackend = oldKeyedStatesBackend != null ? Collections.singletonList(
+							oldKeyedStatesBackend) : null;
+					newKeyedStateStream = oldKeyedStatesStream != null ? Collections.singletonList(
+							oldKeyedStatesStream) : null;
 				} else {
-					SubtaskState subtaskState = taskState.getState(subTaskIdx);
-					if (subtaskState != null) {
-						KeyGroupsStateHandle oldKeyedStatesBackend = subtaskState.getManagedKeyedState();
-						KeyGroupsStateHandle oldKeyedStatesStream = subtaskState.getRawKeyedState();
-						newKeyedStatesBackend = oldKeyedStatesBackend != null ? Collections.singletonList(
-								oldKeyedStatesBackend) : null;
-						newKeyedStateStream = oldKeyedStatesStream != null ? Collections.singletonList(
-								oldKeyedStatesStream) : null;
-					} else {
-						newKeyedStatesBackend = null;
-						newKeyedStateStream = null;
-					}
+					newKeyedStatesBackend = null;
+					newKeyedStateStream = null;
 				}
+			} else {
+				KeyGroupRange subtaskKeyGroupIds = keyGroupPartitions.get(subTaskIdx);
+				newKeyedStatesBackend = getKeyGroupsStateHandles(parallelKeyedStatesBackend, subtaskKeyGroupIds);
+				newKeyedStateStream = getKeyGroupsStateHandles(parallelKeyedStateStream, subtaskKeyGroupIds);
+			}
 
-				TaskStateHandles taskStateHandles = new TaskStateHandles(
-						nonPartitionableState,
-						operatorStateFromBackend,
-						operatorStateFromStream,
-						newKeyedStatesBackend,
-						newKeyedStateStream);
+			TaskStateHandles taskStateHandles = new TaskStateHandles(
+					nonPartitionableState,
+					operatorStateFromBackend,
+					operatorStateFromStream,
+					newKeyedStatesBackend,
+					newKeyedStateStream);
 
-				currentExecutionAttempt.setInitialState(taskStateHandles);
-			}
+			currentExecutionAttempt.setInitialState(taskStateHandles);
 		}
-
-		return true;
 	}
 
 	/**
@@ -298,7 +329,7 @@ public class StateAssignmentOperation {
 
 	/**
 	 * @param chainParallelOpStates array = chain ops, array[idx] = parallel states for this chain op.
-	 * @param chainOpState
+	 * @param chainOpState the operator chain
 	 */
 	private static void collectParallelStatesByChainOperator(
 			List<OperatorStateHandle>[] chainParallelOpStates, ChainedStateHandle<OperatorStateHandle> chainOpState) {
@@ -359,4 +390,4 @@ public class StateAssignmentOperation {
 			return repackStream;
 		}
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/acfeeaf5/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
index d6be482..950a9a0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
@@ -81,10 +81,11 @@ public class SavepointLoader {
 			}
 
 			if (executionJobVertex != null) {
-				if (executionJobVertex.getMaxParallelism() == taskState.getMaxParallelism()) {
+
+				if (executionJobVertex.getMaxParallelism() == taskState.getMaxParallelism()
+						|| !executionJobVertex.isMaxParallelismConfigured()) {
 					taskStates.put(taskState.getJobVertexID(), taskState);
-				}
-				else {
+				} else {
 					String msg = String.format("Failed to rollback to savepoint %s. " +
 									"Max parallelism mismatch between savepoint state and new program. " +
 									"Cannot map operator %s with max parallelism %d to new program with " +
@@ -106,6 +107,7 @@ public class SavepointLoader {
 								"you want to allow to skip this, you can set the --allowNonRestoredState " +
 								"option on the CLI.",
 						savepointPath, taskState.getJobVertexID());
+
 				throw new IllegalStateException(msg);
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/acfeeaf5/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
index 14c7d2a..061f925 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 
 import java.io.Serializable;
 
@@ -36,6 +37,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class ResultPartitionDeploymentDescriptor implements Serializable {
 
+	private static final long serialVersionUID = 6343547936086963705L;
+
 	/** The ID of the result this partition belongs to. */
 	private final IntermediateDataSetID resultId;
 
@@ -47,6 +50,9 @@ public class ResultPartitionDeploymentDescriptor implements Serializable {
 
 	/** The number of subpartitions. */
 	private final int numberOfSubpartitions;
+
+	/** The maximum parallelism */
+	private final int maxParallelism;
 	
 	/** Flag whether the result partition should send scheduleOrUpdateConsumer messages. */
 	private final boolean sendScheduleOrUpdateConsumersMessage;
@@ -56,14 +62,17 @@ public class ResultPartitionDeploymentDescriptor implements Serializable {
 			IntermediateResultPartitionID partitionId,
 			ResultPartitionType partitionType,
 			int numberOfSubpartitions,
+			int maxParallelism,
 			boolean lazyScheduling) {
 
 		this.resultId = checkNotNull(resultId);
 		this.partitionId = checkNotNull(partitionId);
 		this.partitionType = checkNotNull(partitionType);
 
+		KeyGroupRangeAssignment.checkParallelismPreconditions(maxParallelism);
 		checkArgument(numberOfSubpartitions >= 1);
 		this.numberOfSubpartitions = numberOfSubpartitions;
+		this.maxParallelism = maxParallelism;
 		this.sendScheduleOrUpdateConsumersMessage = lazyScheduling;
 	}
 
@@ -83,6 +92,10 @@ public class ResultPartitionDeploymentDescriptor implements Serializable {
 		return numberOfSubpartitions;
 	}
 
+	public int getMaxParallelism() {
+		return maxParallelism;
+	}
+
 	public boolean sendScheduleOrUpdateConsumersMessage() {
 		return sendScheduleOrUpdateConsumersMessage;
 	}
@@ -96,7 +109,8 @@ public class ResultPartitionDeploymentDescriptor implements Serializable {
 
 	// ------------------------------------------------------------------------
 
-	public static ResultPartitionDeploymentDescriptor from(IntermediateResultPartition partition, boolean lazyScheduling) {
+	public static ResultPartitionDeploymentDescriptor from(
+			IntermediateResultPartition partition, int maxParallelism, boolean lazyScheduling) {
 
 		final IntermediateDataSetID resultId = partition.getIntermediateResult().getId();
 		final IntermediateResultPartitionID partitionId = partition.getPartitionId();
@@ -118,6 +132,6 @@ public class ResultPartitionDeploymentDescriptor implements Serializable {
 		}
 
 		return new ResultPartitionDeploymentDescriptor(
-				resultId, partitionId, partitionType, numberOfSubpartitions, lazyScheduling);
+				resultId, partitionId, partitionType, numberOfSubpartitions, maxParallelism, lazyScheduling);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/acfeeaf5/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 9092bae..2069638 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -678,12 +678,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			}
 
 			// create the execution job vertex and attach it to the graph
-			ExecutionJobVertex ejv = null;
-			try {
-				ejv = new ExecutionJobVertex(this, jobVertex, 1, timeout, createTimestamp);
-			} catch (IOException e) {
-				throw new JobException("Could not create a execution job vertex for " + jobVertex.getID() + '.', e);
-			}
+			ExecutionJobVertex ejv =
+					new ExecutionJobVertex(this, jobVertex, 1, timeout, createTimestamp);
 			ejv.connectToPredecessors(this.intermediateResults);
 
 			ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);

http://git-wip-us.apache.org/repos/asf/flink/blob/acfeeaf5/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index fbab572..e8664f7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -41,7 +41,7 @@ import org.apache.flink.runtime.jobmanager.JobManagerOptions;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
@@ -57,8 +57,10 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 
 	/** Use the same log for all ExecutionGraph classes */
 	private static final Logger LOG = ExecutionGraph.LOG;
-	
-	private final SerializableObject stateMonitor = new SerializableObject();
+
+	public static final int VALUE_NOT_SET = -1;
+
+	private final Object stateMonitor = new Object();
 	
 	private final ExecutionGraph graph;
 	
@@ -66,30 +68,32 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 	
 	private final ExecutionVertex[] taskVertices;
 
-	private IntermediateResult[] producedDataSets;
+	private final IntermediateResult[] producedDataSets;
 	
 	private final List<IntermediateResult> inputs;
 	
 	private final int parallelism;
 
-	private final int maxParallelism;
-	
 	private final boolean[] finishedSubtasks;
-			
-	private volatile int numSubtasksInFinalState;
-	
+
 	private final SlotSharingGroup slotSharingGroup;
-	
+
 	private final CoLocationGroup coLocationGroup;
-	
+
 	private final InputSplit[] inputSplits;
 
+	private final boolean maxParallelismConfigured;
+
+	private int maxParallelism;
+
+	private volatile int numSubtasksInFinalState;
+
 	/**
 	 * Serialized task information which is for all sub tasks the same. Thus, it avoids to
 	 * serialize the same information multiple times in order to create the
 	 * TaskDeploymentDescriptors.
 	 */
-	private final SerializedValue<TaskInformation> serializedTaskInformation;
+	private SerializedValue<TaskInformation> serializedTaskInformation;
 
 	private InputSplitAssigner splitAssigner;
 	
@@ -97,7 +101,7 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 		ExecutionGraph graph,
 		JobVertex jobVertex,
 		int defaultParallelism,
-		Time timeout) throws JobException, IOException {
+		Time timeout) throws JobException {
 
 		this(graph, jobVertex, defaultParallelism, timeout, System.currentTimeMillis());
 	}
@@ -107,7 +111,7 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 		JobVertex jobVertex,
 		int defaultParallelism,
 		Time timeout,
-		long createTimestamp) throws JobException, IOException {
+		long createTimestamp) throws JobException {
 
 		if (graph == null || jobVertex == null) {
 			throw new NullPointerException();
@@ -121,24 +125,19 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 
 		this.parallelism = numTaskVertices;
 
-		int maxP = jobVertex.getMaxParallelism();
+		final int configuredMaxParallelism = jobVertex.getMaxParallelism();
+
+		this.maxParallelismConfigured = (VALUE_NOT_SET != configuredMaxParallelism);
 
-		Preconditions.checkArgument(maxP >= parallelism, "The maximum parallelism (" +
-			maxP + ") must be greater or equal than the parallelism (" + parallelism +
-			").");
-		this.maxParallelism = maxP;
+		// if no max parallelism was configured by the user, we calculate and set a default
+		setMaxParallelismInternal(maxParallelismConfigured ?
+				configuredMaxParallelism : KeyGroupRangeAssignment.computeDefaultMaxParallelism(parallelism));
 
-		this.serializedTaskInformation = new SerializedValue<>(new TaskInformation(
-			jobVertex.getID(),
-			jobVertex.getName(),
-			parallelism,
-			maxParallelism,
-			jobVertex.getInvokableClassName(),
-			jobVertex.getConfiguration()));
+		this.serializedTaskInformation = null;
 
 		this.taskVertices = new ExecutionVertex[numTaskVertices];
 		
-		this.inputs = new ArrayList<IntermediateResult>(jobVertex.getInputs().size());
+		this.inputs = new ArrayList<>(jobVertex.getInputs().size());
 		
 		// take the sharing group
 		this.slotSharingGroup = jobVertex.getSlotSharingGroup();
@@ -212,6 +211,24 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 		finishedSubtasks = new boolean[parallelism];
 	}
 
+	public void setMaxParallelism(int maxParallelismDerived) {
+
+		Preconditions.checkState(!maxParallelismConfigured,
+				"Attempt to override a configured max parallelism. Configured: " + this.maxParallelism
+						+ ", argument: " + maxParallelismDerived);
+
+		setMaxParallelismInternal(maxParallelismDerived);
+	}
+
+	private void setMaxParallelismInternal(int maxParallelism) {
+		Preconditions.checkArgument(maxParallelism > 0
+						&& maxParallelism <= KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM,
+				"Overriding max parallelism is not in valid bounds (1.." +
+						KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM + "), found:" + maxParallelism);
+
+		this.maxParallelism = maxParallelism;
+	}
+
 	public ExecutionGraph getGraph() {
 		return graph;
 	}
@@ -235,6 +252,10 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 		return maxParallelism;
 	}
 
+	public boolean isMaxParallelismConfigured() {
+		return maxParallelismConfigured;
+	}
+
 	public JobID getJobId() {
 		return graph.getJobID();
 	}
@@ -269,24 +290,56 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 		return inputs;
 	}
 
-	public SerializedValue<TaskInformation> getSerializedTaskInformation() {
+	public SerializedValue<TaskInformation> getSerializedTaskInformation() throws IOException {
+
+		if (null == serializedTaskInformation) {
+
+			int parallelism = getParallelism();
+			int maxParallelism = getMaxParallelism();
+
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Creating task information for " + generateDebugString());
+			}
+
+			serializedTaskInformation = new SerializedValue<>(
+					new TaskInformation(
+							jobVertex.getID(),
+							jobVertex.getName(),
+							parallelism,
+							maxParallelism,
+							jobVertex.getInvokableClassName(),
+							jobVertex.getConfiguration()));
+		}
+
 		return serializedTaskInformation;
 	}
-	
+
 	public boolean isInFinalState() {
 		return numSubtasksInFinalState == parallelism;
 	}
-	
+
 	@Override
 	public ExecutionState getAggregateState() {
 		int[] num = new int[ExecutionState.values().length];
 		for (ExecutionVertex vertex : this.taskVertices) {
 			num[vertex.getExecutionState().ordinal()]++;
 		}
-		
+
 		return getAggregateJobVertexState(num, parallelism);
 	}
 
+	private String generateDebugString() {
+
+		return "ExecutionJobVertex" +
+				"(" + jobVertex.getName() + " | " + jobVertex.getID() + ")" +
+				"{" +
+				"parallelism=" + parallelism +
+				", maxParallelism=" + getMaxParallelism() +
+				", maxParallelismConfigured=" + maxParallelismConfigured +
+				'}';
+	}
+
+
 	//---------------------------------------------------------------------------------------------
 	
 	public void connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult> intermediateDataSets) throws JobException {

http://git-wip-us.apache.org/repos/asf/flink/blob/acfeeaf5/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 3b5a6cc..6084ad6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -40,13 +40,16 @@ import org.apache.flink.runtime.jobmanager.JobManagerOptions;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.TaskStateHandles;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.EvictingBoundedList;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -599,7 +602,24 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 		boolean lazyScheduling = getExecutionGraph().getScheduleMode().allowLazyDeployment();
 
 		for (IntermediateResultPartition partition : resultPartitions.values()) {
-			producedPartitions.add(ResultPartitionDeploymentDescriptor.from(partition, lazyScheduling));
+
+			List<List<ExecutionEdge>> consumers = partition.getConsumers();
+
+			if (consumers.isEmpty()) {
+				//TODO this case only exists for test, currently there has to be exactly one consumer in real jobs!
+				producedPartitions.add(ResultPartitionDeploymentDescriptor.from(
+						partition,
+						KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM,
+						lazyScheduling));
+			} else {
+				Preconditions.checkState(1 == consumers.size(),
+						"Only one consumer supported in the current implementation! Found: " + consumers.size());
+
+				List<ExecutionEdge> consumer = consumers.get(0);
+				ExecutionJobVertex vertex = consumer.get(0).getTarget().getJobVertex();
+				int maxParallelism = vertex.getMaxParallelism();
+				producedPartitions.add(ResultPartitionDeploymentDescriptor.from(partition, maxParallelism, lazyScheduling));
+			}
 		}
 		
 		
@@ -620,7 +640,14 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 		}
 
 		SerializedValue<JobInformation> serializedJobInformation = getExecutionGraph().getSerializedJobInformation();
-		SerializedValue<TaskInformation> serializedJobVertexInformation = jobVertex.getSerializedTaskInformation();
+		SerializedValue<TaskInformation> serializedJobVertexInformation = null;
+
+		try {
+			serializedJobVertexInformation = jobVertex.getSerializedTaskInformation();
+		} catch (IOException e) {
+			throw new ExecutionGraphException(
+					"Could not create a serialized JobVertexInformation for " + jobVertex.getJobVertexId(), e);
+		}
 
 		return new TaskDeploymentDescriptor(
 			serializedJobInformation,

http://git-wip-us.apache.org/repos/asf/flink/blob/acfeeaf5/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
index 4ca7616..57c7098 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
@@ -60,6 +60,10 @@ public class ResultPartitionWriter implements EventListener<TaskEvent> {
 		return partition.getNumberOfSubpartitions();
 	}
 
+	public int getNumTargetKeyGroups() {
+		return partition.getNumTargetKeyGroups();
+	}
+
 	// ------------------------------------------------------------------------
 	// Data processing
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/acfeeaf5/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index 474c25c..3d92584 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -95,6 +95,8 @@ public class ResultPartition implements BufferPoolOwner {
 
 	private final ResultPartitionConsumableNotifier partitionConsumableNotifier;
 
+	public final int numTargetKeyGroups;
+
 	private final boolean sendScheduleOrUpdateConsumersMessage;
 
 	// - Runtime state --------------------------------------------------------
@@ -131,6 +133,7 @@ public class ResultPartition implements BufferPoolOwner {
 		ResultPartitionID partitionId,
 		ResultPartitionType partitionType,
 		int numberOfSubpartitions,
+		int numTargetKeyGroups,
 		ResultPartitionManager partitionManager,
 		ResultPartitionConsumableNotifier partitionConsumableNotifier,
 		IOManager ioManager,
@@ -142,6 +145,7 @@ public class ResultPartition implements BufferPoolOwner {
 		this.partitionId = checkNotNull(partitionId);
 		this.partitionType = checkNotNull(partitionType);
 		this.subpartitions = new ResultSubpartition[numberOfSubpartitions];
+		this.numTargetKeyGroups = numTargetKeyGroups;
 		this.partitionManager = checkNotNull(partitionManager);
 		this.partitionConsumableNotifier = checkNotNull(partitionConsumableNotifier);
 		this.sendScheduleOrUpdateConsumersMessage = sendScheduleOrUpdateConsumersMessage;
@@ -356,6 +360,10 @@ public class ResultPartition implements BufferPoolOwner {
 		return cause;
 	}
 
+	public int getNumTargetKeyGroups() {
+		return numTargetKeyGroups;
+	}
+
 	/**
 	 * Releases buffers held by this result partition.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/acfeeaf5/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
index d24100e..9dcaeeb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
@@ -40,7 +40,6 @@ public class JobVertex implements java.io.Serializable {
 
 	private static final String DEFAULT_NAME = "(unnamed vertex)";
 
-
 	// --------------------------------------------------------------------------------------------
 	// Members that define the structure / topology of the graph
 	// --------------------------------------------------------------------------------------------
@@ -60,7 +59,7 @@ public class JobVertex implements java.io.Serializable {
 	private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
 
 	/** Maximum number of subtasks to split this taks into a runtime. */
-	private int maxParallelism = Short.MAX_VALUE;
+	private int maxParallelism = -1;
 
 	/** Custom configuration passed to the assigned task at runtime. */
 	private Configuration configuration;
@@ -276,10 +275,6 @@ public class JobVertex implements java.io.Serializable {
 	 * @param maxParallelism The maximum parallelism to be set. must be between 1 and Short.MAX_VALUE.
 	 */
 	public void setMaxParallelism(int maxParallelism) {
-		org.apache.flink.util.Preconditions.checkArgument(
-				maxParallelism > 0 && maxParallelism <= (1 << 15),
-				"The max parallelism must be at least 1 and smaller than 2^15.");
-
 		this.maxParallelism = maxParallelism;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/acfeeaf5/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
index 894f721..62bf3f6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
@@ -23,7 +23,14 @@ import org.apache.flink.util.Preconditions;
 
 public final class KeyGroupRangeAssignment {
 
-	public static final int DEFAULT_MAX_PARALLELISM = 128;
+	/**
+	 * The default lower bound for max parallelism if nothing was configured by the user. We have this so allow users
+	 * some degree of scale-up in case they forgot to configure maximum parallelism explicitly.
+	 */
+	public static final int DEFAULT_LOWER_BOUND_MAX_PARALLELISM = 1 << 7;
+
+	/** The (inclusive) upper bound for max parallelism */
+	public static final int UPPER_BOUND_MAX_PARALLELISM = 1 << 15;
 
 	private KeyGroupRangeAssignment() {
 		throw new AssertionError();
@@ -79,9 +86,12 @@ public final class KeyGroupRangeAssignment {
 			int maxParallelism,
 			int parallelism,
 			int operatorIndex) {
-		Preconditions.checkArgument(parallelism > 0, "Parallelism must not be smaller than zero.");
-		Preconditions.checkArgument(maxParallelism >= parallelism, "Maximum parallelism must not be smaller than parallelism.");
-		Preconditions.checkArgument(maxParallelism <= (1 << 15), "Maximum parallelism must be smaller than 2^15.");
+
+		checkParallelismPreconditions(parallelism);
+		checkParallelismPreconditions(maxParallelism);
+
+		Preconditions.checkArgument(maxParallelism >= parallelism,
+				"Maximum parallelism must not be smaller than parallelism.");
 
 		int start = operatorIndex == 0 ? 0 : ((operatorIndex * maxParallelism - 1) / parallelism) + 1;
 		int end = ((operatorIndex + 1) * maxParallelism - 1) / parallelism;
@@ -105,4 +115,28 @@ public final class KeyGroupRangeAssignment {
 	public static int computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId) {
 		return keyGroupId * parallelism / maxParallelism;
 	}
+
+	/**
+	 * Computes a default maximum parallelism from the operator parallelism. This is used in case the user has not
+	 * explicitly configured a maximum parallelism to still allow a certain degree of scale-up.
+	 *
+	 * @param operatorParallelism the operator parallelism as basis for computation.
+	 * @return the computed default maximum parallelism.
+	 */
+	public static int computeDefaultMaxParallelism(int operatorParallelism) {
+
+		checkParallelismPreconditions(operatorParallelism);
+
+		return Math.min(
+				Math.max(
+						MathUtils.roundUpToPowerOfTwo(operatorParallelism + (operatorParallelism / 2)),
+						DEFAULT_LOWER_BOUND_MAX_PARALLELISM),
+				UPPER_BOUND_MAX_PARALLELISM);
+	}
+
+	public static void checkParallelismPreconditions(int parallelism) {
+		Preconditions.checkArgument(parallelism > 0
+						&& parallelism <= UPPER_BOUND_MAX_PARALLELISM,
+				"Operator parallelism not within bounds: " + parallelism);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/acfeeaf5/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index a408990..6925d0f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -351,6 +351,7 @@ public class Task implements Runnable, TaskActions {
 				partitionId,
 				desc.getPartitionType(),
 				desc.getNumberOfSubpartitions(),
+				desc.getMaxParallelism(),
 				networkEnvironment.getResultPartitionManager(),
 				resultPartitionConsumableNotifier,
 				ioManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/acfeeaf5/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index ca9dbc2..6ba557b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -2607,6 +2607,7 @@ public class CheckpointCoordinatorTest {
 		when(executionJobVertex.getTaskVertices()).thenReturn(executionVertices);
 		when(executionJobVertex.getParallelism()).thenReturn(parallelism);
 		when(executionJobVertex.getMaxParallelism()).thenReturn(maxParallelism);
+		when(executionJobVertex.isMaxParallelismConfigured()).thenReturn(true);
 
 		return executionJobVertex;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/acfeeaf5/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
index 67575d6..6471d6f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
@@ -87,6 +87,7 @@ public class SavepointLoaderTest {
 
 		// 2) Load and validate: max parallelism mismatch
 		when(vertex.getMaxParallelism()).thenReturn(222);
+		when(vertex.isMaxParallelismConfigured()).thenReturn(true);
 
 		try {
 			SavepointLoader.loadAndValidateSavepoint(jobId, tasks, path, ucl, false);

http://git-wip-us.apache.org/repos/asf/flink/blob/acfeeaf5/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
index 3ed8236..aac2e13 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
@@ -46,6 +46,7 @@ public class ResultPartitionDeploymentDescriptorTest {
 						partitionId,
 						partitionType,
 						numberOfSubpartitions,
+						numberOfSubpartitions,
 						true);
 
 		ResultPartitionDeploymentDescriptor copy =

http://git-wip-us.apache.org/repos/asf/flink/blob/acfeeaf5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexTest.java
new file mode 100644
index 0000000..f0f6248
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ExecutionJobVertexTest {
+
+	private static final int NOT_CONFIGURED = -1;
+
+	@Test
+	public void testMaxParallelismDefaulting() throws Exception {
+
+		// default minimum
+		ExecutionJobVertex executionJobVertex = createExecutionJobVertex(1, NOT_CONFIGURED);
+		Assert.assertEquals(128, executionJobVertex.getMaxParallelism());
+
+		// test round up part 1
+		executionJobVertex = createExecutionJobVertex(171, NOT_CONFIGURED);
+		Assert.assertEquals(256, executionJobVertex.getMaxParallelism());
+
+		// test round up part 2
+		executionJobVertex = createExecutionJobVertex(172, NOT_CONFIGURED);
+		Assert.assertEquals(512, executionJobVertex.getMaxParallelism());
+
+		// test round up limit
+		executionJobVertex = createExecutionJobVertex(1 << 15, NOT_CONFIGURED);
+		Assert.assertEquals(1 << 15, executionJobVertex.getMaxParallelism());
+
+		// test upper bound
+		try {
+			executionJobVertex = createExecutionJobVertex(1 + (1 << 15), NOT_CONFIGURED);
+			executionJobVertex.getMaxParallelism();
+			Assert.fail();
+		} catch (IllegalArgumentException ignore) {
+		}
+
+		// test configured / trumps computed default
+		executionJobVertex = createExecutionJobVertex(172, 4);
+		Assert.assertEquals(4, executionJobVertex.getMaxParallelism());
+
+
+		// test configured / trumps computed default
+		executionJobVertex = createExecutionJobVertex(4, 1 << 15);
+		Assert.assertEquals(1 << 15, executionJobVertex.getMaxParallelism());
+
+		// test upper bound configured
+		try {
+			executionJobVertex = createExecutionJobVertex(4, 1 + (1 << 15));
+			Assert.fail(String.valueOf(executionJobVertex.getMaxParallelism()));
+		} catch (IllegalArgumentException ignore) {
+		}
+
+		// test lower bound configured
+		try {
+			executionJobVertex = createExecutionJobVertex(4, 0);
+			Assert.fail(String.valueOf(executionJobVertex.getMaxParallelism()));
+		} catch (IllegalArgumentException ignore) {
+		}
+
+		// test override trumps test configured 2
+		executionJobVertex = createExecutionJobVertex(4, NOT_CONFIGURED);
+		executionJobVertex.setMaxParallelism(7);
+		Assert.assertEquals(7, executionJobVertex.getMaxParallelism());
+
+		// test lower bound with derived value
+		executionJobVertex = createExecutionJobVertex(4, NOT_CONFIGURED);
+		try {
+			executionJobVertex.setMaxParallelism(0);
+			Assert.fail(String.valueOf(executionJobVertex.getMaxParallelism()));
+		} catch (IllegalArgumentException ignore) {
+		}
+
+		// test upper bound with derived value
+		executionJobVertex = createExecutionJobVertex(4, NOT_CONFIGURED);
+		try {
+			executionJobVertex.setMaxParallelism(1 + (1 << 15));
+			Assert.fail(String.valueOf(executionJobVertex.getMaxParallelism()));
+		} catch (IllegalArgumentException ignore) {
+		}
+
+		// test complain on setting derived value in presence of a configured value
+		executionJobVertex = createExecutionJobVertex(4, 16);
+		try {
+			executionJobVertex.setMaxParallelism(7);
+			Assert.fail(String.valueOf(executionJobVertex.getMaxParallelism()));
+		} catch (IllegalStateException ignore) {
+		}
+
+	}
+
+	//------------------------------------------------------------------------------------------------------
+
+	private static ExecutionJobVertex createExecutionJobVertex(
+			int parallelism,
+			int preconfiguredMaxParallelism) throws JobException, IOException {
+
+		JobVertex jobVertex = new JobVertex("testVertex");
+		jobVertex.setInvokableClass(AbstractInvokable.class);
+		jobVertex.setParallelism(parallelism);
+
+		if (NOT_CONFIGURED != preconfiguredMaxParallelism) {
+			jobVertex.setMaxParallelism(preconfiguredMaxParallelism);
+		}
+
+		ExecutionGraph executionGraphMock = mock(ExecutionGraph.class);
+		when(executionGraphMock.getFutureExecutor()).thenReturn(Executors.directExecutor());
+		ExecutionJobVertex executionJobVertex =
+				new ExecutionJobVertex(executionGraphMock, jobVertex, 1, Time.seconds(10));
+
+		return executionJobVertex;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/acfeeaf5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
index f3db5d8..c04ebc6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -356,8 +356,17 @@ public class ExecutionVertexDeploymentTest {
 	public void testTddProducedPartitionsLazyScheduling() throws Exception {
 		TestingUtils.QueuedActionExecutionContext context = TestingUtils.queuedActionExecutionContext();
 		ExecutionJobVertex jobVertex = getExecutionVertex(new JobVertexID(), context);
-		IntermediateResult result = new IntermediateResult(new IntermediateDataSetID(), jobVertex, 4, ResultPartitionType.PIPELINED);
-		ExecutionVertex vertex = new ExecutionVertex(jobVertex, 0, new IntermediateResult[]{result}, Time.minutes(1));
+
+		IntermediateResult result =
+				new IntermediateResult(new IntermediateDataSetID(), jobVertex, 1, ResultPartitionType.PIPELINED);
+
+		ExecutionVertex vertex =
+				new ExecutionVertex(jobVertex, 0, new IntermediateResult[]{result}, Time.minutes(1));
+
+		ExecutionEdge mockEdge = createMockExecutionEdge(1);
+
+		result.getPartitions()[0].addConsumerGroup();
+		result.getPartitions()[0].addConsumer(mockEdge, 0);
 
 		AllocatedSlot allocatedSlot = mock(AllocatedSlot.class);
 		when(allocatedSlot.getSlotAllocationId()).thenReturn(new AllocationID());
@@ -381,4 +390,18 @@ public class ExecutionVertexDeploymentTest {
 			assertEquals(mode.allowLazyDeployment(), desc.sendScheduleOrUpdateConsumersMessage());
 		}
 	}
+
+
+
+	private ExecutionEdge createMockExecutionEdge(int maxParallelism) {
+		ExecutionVertex targetVertex = mock(ExecutionVertex.class);
+		ExecutionJobVertex targetJobVertex = mock(ExecutionJobVertex.class);
+
+		when(targetVertex.getJobVertex()).thenReturn(targetJobVertex);
+		when(targetJobVertex.getMaxParallelism()).thenReturn(maxParallelism);
+
+		ExecutionEdge edge = mock(ExecutionEdge.class);
+		when(edge.getTarget()).thenReturn(targetVertex);
+		return edge;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/acfeeaf5/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriterTest.java
index 104bc87..2e5816d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriterTest.java
@@ -28,16 +28,12 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNo
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.util.TestInfiniteBufferProvider;
 import org.apache.flink.runtime.taskmanager.TaskActions;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
-import java.util.ArrayDeque;
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
@@ -67,6 +63,7 @@ public class ResultPartitionWriterTest {
 			new ResultPartitionID(),
 			ResultPartitionType.PIPELINED,
 			2,
+			2,
 			mock(ResultPartitionManager.class),
 			mock(ResultPartitionConsumableNotifier.class),
 			mock(IOManager.class),

http://git-wip-us.apache.org/repos/asf/flink/blob/acfeeaf5/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
index 4eb4fd1..f6562a1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
@@ -83,6 +83,7 @@ public class ResultPartitionTest {
 			new ResultPartitionID(),
 			type,
 			1,
+			1,
 			mock(ResultPartitionManager.class),
 			notifier,
 			mock(IOManager.class),

http://git-wip-us.apache.org/repos/asf/flink/blob/acfeeaf5/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 37ec751..e05fb56 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -120,6 +120,7 @@ public class LocalInputChannelTest {
 				partitionIds[i],
 				ResultPartitionType.PIPELINED,
 				parallelism,
+				parallelism,
 				partitionManager,
 				partitionConsumableNotifier,
 				ioManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/acfeeaf5/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index ed07e88..b522745 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -626,10 +626,12 @@ public class JobManagerTest {
 		JobGraph jobGraph = new JobGraph("croissant");
 		JobVertex jobVertex1 = new JobVertex("cappuccino");
 		jobVertex1.setParallelism(4);
+		jobVertex1.setMaxParallelism(16);
 		jobVertex1.setInvokableClass(BlockingNoOpInvokable.class);
 
 		JobVertex jobVertex2 = new JobVertex("americano");
 		jobVertex2.setParallelism(4);
+		jobVertex2.setMaxParallelism(16);
 		jobVertex2.setInvokableClass(BlockingNoOpInvokable.class);
 
 		jobGraph.addVertex(jobVertex1);

http://git-wip-us.apache.org/repos/asf/flink/blob/acfeeaf5/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index cc661ea..770aa35 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -633,7 +633,7 @@ public class TaskManagerTest extends TestLogger {
 				IntermediateResultPartitionID partitionId = new IntermediateResultPartitionID();
 
 				List<ResultPartitionDeploymentDescriptor> irpdd = new ArrayList<ResultPartitionDeploymentDescriptor>();
-				irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1, true));
+				irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1, 1, true));
 
 				InputGateDeploymentDescriptor ircdd =
 						new InputGateDeploymentDescriptor(
@@ -778,7 +778,7 @@ public class TaskManagerTest extends TestLogger {
 				IntermediateResultPartitionID partitionId = new IntermediateResultPartitionID();
 
 				List<ResultPartitionDeploymentDescriptor> irpdd = new ArrayList<ResultPartitionDeploymentDescriptor>();
-				irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1, true));
+				irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1, 1, true));
 
 				InputGateDeploymentDescriptor ircdd =
 						new InputGateDeploymentDescriptor(
@@ -1488,6 +1488,7 @@ public class TaskManagerTest extends TestLogger {
 				new IntermediateResultPartitionID(),
 				ResultPartitionType.PIPELINED,
 				1,
+				1,
 				true);
 
 			final TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(jid, "TestJob", vid, eid, executionConfig,

http://git-wip-us.apache.org/repos/asf/flink/blob/acfeeaf5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index 3e3afd3..7f33275 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -17,9 +17,9 @@
 
 package org.apache.flink.streaming.api.datastream;
 
-import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
@@ -30,7 +30,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.streaming.api.functions.RichProcessFunction;
@@ -40,17 +39,18 @@ import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
 import org.apache.flink.streaming.api.functions.query.QueryableAppendingStateOperator;
 import org.apache.flink.streaming.api.functions.query.QueryableValueStateOperator;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.ProcessOperator;
 import org.apache.flink.streaming.api.operators.StreamGroupedFold;
 import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
-import org.apache.flink.streaming.api.operators.ProcessOperator;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
-import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
 import org.apache.flink.streaming.api.windowing.time.Time;
@@ -113,8 +113,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 			dataStream.getExecutionEnvironment(),
 			new PartitionTransformation<>(
 				dataStream.getTransformation(),
-				new KeyGroupStreamPartitioner<>(
-					keySelector, KeyGroupRangeAssignment.DEFAULT_MAX_PARALLELISM)));
+				new KeyGroupStreamPartitioner<>(keySelector, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM)));
 		this.keySelector = keySelector;
 		this.keyType = keyType;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/acfeeaf5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index 3fe21fb..9dd60b7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -17,8 +17,8 @@
 
 package org.apache.flink.streaming.api.datastream;
 
-import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -115,18 +115,18 @@ public class SingleOutputStreamOperator<T> extends DataStream<T> {
 
 	/**
 	 * Sets the parallelism for this operator. The degree must be 1 or more.
-	 * 
+	 *
 	 * @param parallelism
 	 *            The parallelism for this operator.
 	 * @return The operator with set parallelism.
 	 */
 	public SingleOutputStreamOperator<T> setParallelism(int parallelism) {
-		if (parallelism < 1) {
-			throw new IllegalArgumentException("The parallelism of an operator must be at least 1.");
-		}
-		if (nonParallel && parallelism > 1) {
-			throw new IllegalArgumentException("The parallelism of non parallel operator must be 1.");
-		}
+		Preconditions.checkArgument(parallelism > 0,
+				"The parallelism of an operator must be at least 1.");
+
+		Preconditions.checkArgument(canBeParallel() || parallelism == 1,
+				"The parallelism of non parallel operator must be 1.");
+
 		transformation.setParallelism(parallelism);
 
 		return this;
@@ -143,15 +143,23 @@ public class SingleOutputStreamOperator<T> extends DataStream<T> {
 	 */
 	@PublicEvolving
 	public SingleOutputStreamOperator<T> setMaxParallelism(int maxParallelism) {
-		Preconditions.checkArgument(maxParallelism > 0, "The maximum parallelism must be greater than 0.");
+		Preconditions.checkArgument(maxParallelism > 0,
+				"The maximum parallelism must be greater than 0.");
+
+		Preconditions.checkArgument(canBeParallel() || maxParallelism == 1,
+				"The maximum parallelism of non parallel operator must be 1.");
 
 		transformation.setMaxParallelism(maxParallelism);
 
 		return this;
 	}
 
+	private boolean canBeParallel() {
+		return !nonParallel;
+	}
+
 	/**
-	 * Sets the parallelism of this operator to one.
+	 * Sets the parallelism and maximum parallelism of this operator to one.
 	 * And mark this operator cannot set a non-1 degree of parallelism.
 	 *
 	 * @return The operator with only one parallelism.
@@ -159,6 +167,7 @@ public class SingleOutputStreamOperator<T> extends DataStream<T> {
 	@PublicEvolving
 	public SingleOutputStreamOperator<T> forceNonParallel() {
 		transformation.setParallelism(1);
+		transformation.setMaxParallelism(1);
 		nonParallel = true;
 		return this;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/acfeeaf5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 5b4b901..dab0a06 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -48,6 +48,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -166,16 +167,19 @@ public abstract class StreamExecutionEnvironment {
 	}
 
 	/**
-	 * Sets the maximum degree of parallelism defined for the program.
+	 * Sets the maximum degree of parallelism defined for the program. The upper limit (inclusive) is Short.MAX_VALUE.
 	 *
 	 * The maximum degree of parallelism specifies the upper limit for dynamic scaling. It also
 	 * defines the number of key groups used for partitioned state.
 	 *
-	 * @param maxParallelism Maximum degree of parallelism to be used for the program., with 0 < maxParallelism <= 2^15
+	 * @param maxParallelism Maximum degree of parallelism to be used for the program., with 0 < maxParallelism <= 2^15 - 1
 	 */
 	public StreamExecutionEnvironment setMaxParallelism(int maxParallelism) {
-		Preconditions.checkArgument(maxParallelism > 0 && maxParallelism <= (1 << 15),
-				"maxParallelism is out of bounds 0 < maxParallelism <= 2^15. Found: " + maxParallelism);
+		Preconditions.checkArgument(maxParallelism > 0 &&
+						maxParallelism <= KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM,
+				"maxParallelism is out of bounds 0 < maxParallelism <= " +
+						KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM + ". Found: " + maxParallelism);
+
 		config.setMaxParallelism(maxParallelism);
 		return this;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/acfeeaf5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index a4a8dc7..2f80764 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -17,6 +17,18 @@
 
 package org.apache.flink.streaming.api.graph;
 
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.io.InputFormat;
@@ -38,7 +50,6 @@ import org.apache.flink.streaming.api.operators.StoppableStreamSource;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
@@ -51,18 +62,6 @@ import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
 /**
  * Class representing the streaming topology. It contains all the information
  * necessary to build the jobgraph for the execution.
@@ -358,18 +357,6 @@ public class StreamGraph extends StreamingPlan {
 			if (partitioner == null) {
 				partitioner = virtuaPartitionNodes.get(virtualId).f1;
 			}
-
-			if (partitioner instanceof ConfigurableStreamPartitioner) {
-				StreamNode downstreamNode = getStreamNode(downStreamVertexID);
-
-				ConfigurableStreamPartitioner configurableStreamPartitioner = (ConfigurableStreamPartitioner) partitioner;
-
-				// Configure the partitioner with the max parallelism. This is necessary if the
-				// partitioner has been created before the maximum parallelism has been set. The
-				// maximum parallelism is necessary for the key group mapping.
-				configurableStreamPartitioner.configure(downstreamNode.getMaxParallelism());
-			}
-
 			addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames);
 		} else {
 			StreamNode upstreamNode = getStreamNode(upStreamVertexID);

http://git-wip-us.apache.org/repos/asf/flink/blob/acfeeaf5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index 7ab7494..ddd0515 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -34,7 +34,6 @@ import org.apache.flink.streaming.api.transformations.SplitTransformation;
 import org.apache.flink.streaming.api.transformations.StreamTransformation;
 import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
 import org.apache.flink.streaming.api.transformations.UnionTransformation;
-import org.apache.flink.util.MathUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -79,6 +78,9 @@ public class StreamGraphGenerator {
 
 	private static final Logger LOG = LoggerFactory.getLogger(StreamGraphGenerator.class);
 
+	public static final int DEFAULT_LOWER_BOUND_MAX_PARALLELISM = KeyGroupRangeAssignment.DEFAULT_LOWER_BOUND_MAX_PARALLELISM;
+	public static final int UPPER_BOUND_MAX_PARALLELISM = KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM;
+
 	// The StreamGraph that is being built, this is initialized at the beginning.
 	private StreamGraph streamGraph;
 
@@ -149,25 +151,11 @@ public class StreamGraphGenerator {
 		if (transform.getMaxParallelism() <= 0) {
 
 			// if the max parallelism hasn't been set, then first use the job wide max parallelism
-			// from theExecutionConfig. If this value has not been specified either, then use the
-			// parallelism of the operator.
-			int maxParallelism = env.getConfig().getMaxParallelism();
-
-			if (maxParallelism <= 0) {
-
-				int parallelism = transform.getParallelism();
-
-				if(parallelism <= 0) {
-					parallelism = 1;
-					transform.setParallelism(parallelism);
-				}
-
-				maxParallelism = Math.max(
-						MathUtils.roundUpToPowerOfTwo(parallelism + (parallelism / 2)),
-						KeyGroupRangeAssignment.DEFAULT_MAX_PARALLELISM);
+			// from theExecutionConfig.
+			int globalMaxParallelismFromConfig = env.getConfig().getMaxParallelism();
+			if (globalMaxParallelismFromConfig > 0) {
+				transform.setMaxParallelism(globalMaxParallelismFromConfig);
 			}
-
-			transform.setMaxParallelism(maxParallelism);
 		}
 
 		// call at least once to trigger exceptions about MissingTypeInfo

http://git-wip-us.apache.org/repos/asf/flink/blob/acfeeaf5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
index 0d58ed2..19a3699 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.util.Preconditions;
 
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -163,7 +162,6 @@ public class StreamNode implements Serializable {
 	 * @param maxParallelism Maximum parallelism to be set
 	 */
 	void setMaxParallelism(int maxParallelism) {
-		Preconditions.checkArgument(maxParallelism > 0, "The maximum parallelism must be at least 1.");
 		this.maxParallelism = maxParallelism;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/acfeeaf5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index e306f30..8877c80 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -310,18 +310,7 @@ public class StreamingJobGraphGenerator {
 			parallelism = jobVertex.getParallelism();
 		}
 
-		int maxParallelism = streamNode.getMaxParallelism();
-
-		// the maximum parallelism specifies the upper bound for the parallelism
-		if (parallelism > maxParallelism) {
-			// the parallelism should always be smaller or equal than the max parallelism
-			throw new IllegalStateException("The maximum parallelism (" + maxParallelism + ") of " +
-				"the stream node " + streamNode + " is smaller than the parallelism (" +
-				parallelism + "). Increase the maximum parallelism or decrease the parallelism of " +
-				"this operator.");
-		} else {
-			jobVertex.setMaxParallelism(streamNode.getMaxParallelism());
-		}
+		jobVertex.setMaxParallelism(streamNode.getMaxParallelism());
 
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("Parallelism set: {} for {}", parallelism, streamNodeId);

http://git-wip-us.apache.org/repos/asf/flink/blob/acfeeaf5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
index f7aecdb..5e1b3e2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.MissingTypeInfo;
 import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.util.Preconditions;
 
@@ -205,6 +206,10 @@ public abstract class StreamTransformation<T> {
 	 * @param maxParallelism Maximum parallelism for this stream transformation.
 	 */
 	public void setMaxParallelism(int maxParallelism) {
+		Preconditions.checkArgument(maxParallelism > 0
+						&& maxParallelism <= StreamGraphGenerator.UPPER_BOUND_MAX_PARALLELISM,
+				"Maximum parallelism must be between 1 and " + StreamGraphGenerator.UPPER_BOUND_MAX_PARALLELISM
+						+ ". Found: " + maxParallelism);
 		this.maxParallelism = maxParallelism;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/acfeeaf5/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java
index 256fee1..ddbdaea 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java
@@ -76,6 +76,7 @@ public class KeyGroupStreamPartitioner<T, K> extends StreamPartitioner<T> implem
 
 	@Override
 	public void configure(int maxParallelism) {
+		KeyGroupRangeAssignment.checkParallelismPreconditions(maxParallelism);
 		this.maxParallelism = maxParallelism;
 	}
 }


Mime
View raw message