flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [4/7] flink git commit: [FLINK-8629] [flip6] Allow JobMaster to rescale jobs
Date Fri, 23 Feb 2018 09:24:35 GMT
[FLINK-8629] [flip6] Allow JobMaster to rescale jobs

This commit adds the functionality to rescale a job or parts of it to
the JobMaster. In order to rescale a job, the JobMaster does the following:
1. Take a savepoint
2. Create a rescaled ExecutionGraph from the JobGraph
3. Initialize it with the taken savepoint
4. Suspend the old ExecutionGraph
5. Restart the new ExecutionGraph once the old ExecutionGraph has been suspended

This closes #5446.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f83e2f77
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f83e2f77
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f83e2f77

Branch: refs/heads/master
Commit: f83e2f770a2ba7da9c9333ef536bbd612d744de2
Parents: 7e96a24
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Tue Feb 13 16:14:41 2018 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Thu Feb 22 17:32:37 2018 +0100

----------------------------------------------------------------------
 .../util/function/BiConsumerWithException.java  |  50 +++++
 .../flink/runtime/checkpoint/Checkpoints.java   |   2 +-
 .../flink/runtime/jobmaster/JobMaster.java      | 219 +++++++++++++++++--
 .../jobmaster/JobMasterConfiguration.java       |  12 +
 .../runtime/jobmaster/JobMasterGateway.java     |  28 +++
 .../runtime/jobmaster/RescalingBehaviour.java   |  49 +++++
 .../exceptions/JobMasterException.java          |  41 ++++
 .../exceptions/JobModificationException.java    |  39 ++++
 .../utils/TestingJobMasterGateway.java          |  11 +
 9 files changed, 434 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f83e2f77/flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java
b/flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java
new file mode 100644
index 0000000..6ab1161
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util.function;
+
+import java.util.function.BiConsumer;
+
+/**
+ * A checked extension of the {@link BiConsumer} interface.
+ *
+ * @param <T> type of the first argument
+ * @param <U> type of the second argument
+ * @param <E> type of the thrown exception
+ */
+@FunctionalInterface
+public interface BiConsumerWithException<T, U, E extends Throwable> extends BiConsumer<T,
U> {
+
+	/**
+	 * Performs this operation on the given arguments.
+	 *
+	 * @param t the first input argument
+	 * @param u the second input argument
+	 * @throws E in case of an error
+	 */
+	void acceptWithException(T t, U u) throws E;
+
+	@Override
+	default void accept(T t, U u) {
+		try {
+			acceptWithException(t, u);
+		} catch (Throwable e) {
+			throw new RuntimeException(e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f83e2f77/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
index 47efa6f..72b7c53 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
@@ -246,7 +246,7 @@ public class Checkpoints {
 		try (InputStream in = metadataHandle.openInputStream();
 			DataInputStream dis = new DataInputStream(in)) {
 
-				savepoint = loadCheckpointMetadata(dis, classLoader);
+			savepoint = loadCheckpointMetadata(dis, classLoader);
 		}
 
 		Exception exception = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/f83e2f77/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 2a4b881..22c69f5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -26,13 +26,15 @@ import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.queryablestate.KvStateID;
+import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.StoppingException;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
-import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.Checkpoints;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
@@ -56,10 +58,12 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
+import org.apache.flink.runtime.jobmaster.exceptions.JobModificationException;
 import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolGateway;
@@ -106,6 +110,7 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -115,6 +120,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -174,9 +180,6 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements
JobMast
 
 	private final ClassLoader userCodeLoader;
 
-	/** The execution graph of this job. */
-	private final ExecutionGraph executionGraph;
-
 	private final SlotPool slotPool;
 
 	private final SlotPoolGateway slotPoolGateway;
@@ -201,6 +204,11 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements
JobMast
 
 	private final Map<ResourceID, Tuple2<TaskManagerLocation, TaskExecutorGateway>>
registeredTaskManagers;
 
+	// -------- Mutable fields ---------
+
+	/** The execution graph of this job. */
+	private ExecutionGraph executionGraph;
+
 	// ------------------------------------------------------------------------
 
 	public JobMaster(
@@ -268,8 +276,6 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements
JobMast
 
 		log.info("Using restart strategy {} for {} ({}).", restartStrategy, jobName, jid);
 
-		CheckpointRecoveryFactory checkpointRecoveryFactory = highAvailabilityServices.getCheckpointRecoveryFactory();
-
 		resourceManagerLeaderRetriever = highAvailabilityServices.getResourceManagerLeaderRetriever();
 
 		this.slotPool = new SlotPool(
@@ -289,7 +295,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements
JobMast
 			scheduledExecutorService,
 			slotPool.getSlotProvider(),
 			userCodeLoader,
-			checkpointRecoveryFactory,
+			highAvailabilityServices.getCheckpointRecoveryFactory(),
 			rpcTimeout,
 			restartStrategy,
 			jobMetricGroup,
@@ -447,6 +453,165 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId>
implements JobMast
 		return CompletableFuture.completedFuture(Acknowledge.get());
 	}
 
+	@Override
+	public CompletableFuture<Acknowledge> rescaleJob(
+			int newParallelism,
+			RescalingBehaviour rescalingBehaviour,
+			Time timeout) {
+		final ArrayList<JobVertexID> allOperators = new ArrayList<>(jobGraph.getNumberOfVertices());
+
+		for (JobVertex jobVertex : jobGraph.getVertices()) {
+			allOperators.add(jobVertex.getID());
+		}
+
+		return rescaleOperators(allOperators, newParallelism, rescalingBehaviour, timeout);
+	}
+
+	@Override
+	public CompletableFuture<Acknowledge> rescaleOperators(
+			Collection<JobVertexID> operators,
+			int newParallelism,
+			RescalingBehaviour rescalingBehaviour,
+			Time timeout) {
+		// 1. Check whether we can rescale the job & rescale the respective vertices
+		for (JobVertexID jobVertexId : operators) {
+			final JobVertex jobVertex = jobGraph.findVertexByID(jobVertexId);
+
+			// update max parallelism in case that it has not been configure
+			final ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId);
+
+			if (executionJobVertex != null) {
+				jobVertex.setMaxParallelism(executionJobVertex.getMaxParallelism());
+			}
+
+			try {
+				rescalingBehaviour.acceptWithException(jobVertex, newParallelism);
+			} catch (FlinkException e) {
+				final String msg = String.format("Cannot rescale job %s.", jobGraph.getName());
+
+				log.info(msg, e);
+
+				return FutureUtils.completedExceptionally(
+					new JobModificationException(msg, e));
+			}
+		}
+
+		final ExecutionGraph currentExecutionGraph = executionGraph;
+
+		final ExecutionGraph newExecutionGraph;
+
+		try {
+			newExecutionGraph = ExecutionGraphBuilder.buildGraph(
+				null,
+				jobGraph,
+				jobMasterConfiguration.getConfiguration(),
+				scheduledExecutorService,
+				scheduledExecutorService,
+				slotPool.getSlotProvider(),
+				userCodeLoader,
+				highAvailabilityServices.getCheckpointRecoveryFactory(),
+				rpcTimeout,
+				currentExecutionGraph.getRestartStrategy(),
+				jobMetricGroup,
+				1,
+				blobServer,
+				jobMasterConfiguration.getSlotRequestTimeout(),
+				log);
+		} catch (JobExecutionException | JobException e) {
+			return FutureUtils.completedExceptionally(
+				new JobModificationException("Could not create rescaled ExecutionGraph.", e));
+		}
+
+		// 3. disable checkpoint coordinator to suppress subsequent checkpoints
+		final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
+		checkpointCoordinator.stopCheckpointScheduler();
+
+		// 4. take a savepoint
+		final CompletableFuture<String> savepointFuture = triggerSavepoint(
+			jobMasterConfiguration.getTmpDirectory(),
+			timeout);
+
+		final CompletableFuture<ExecutionGraph> executionGraphFuture = savepointFuture
+			.thenApplyAsync(
+				(String savepointPath) -> {
+					try {
+						newExecutionGraph.getCheckpointCoordinator().restoreSavepoint(
+							savepointPath,
+							false,
+							newExecutionGraph.getAllVertices(),
+							userCodeLoader);
+					} catch (Exception e) {
+						disposeSavepoint(savepointPath);
+
+						throw new CompletionException(new JobModificationException("Could not restore from
temporary rescaling savepoint.", e));
+					}
+
+					// delete the savepoint file once we reach a terminal state
+					newExecutionGraph.getTerminationFuture()
+						.whenCompleteAsync(
+							(JobStatus jobStatus, Throwable throwable) -> disposeSavepoint(savepointPath),
+							scheduledExecutorService);
+
+					return newExecutionGraph;
+				}, scheduledExecutorService)
+			.exceptionally(
+				(Throwable failure) -> {
+					// in case that we couldn't take a savepoint or restore from it, let's restart the checkpoint
+					// coordinator and abort the rescaling operation
+					if (checkpointCoordinator.isPeriodicCheckpointingConfigured()) {
+						checkpointCoordinator.startCheckpointScheduler();
+					}
+
+					throw new CompletionException(failure);
+				});
+
+		// 5. suspend the current job
+		final CompletableFuture<JobStatus> terminationFuture = executionGraphFuture.thenComposeAsync(
+			(ExecutionGraph ignored) -> {
+				currentExecutionGraph.suspend(new FlinkException("Job is being rescaled."));
+				return currentExecutionGraph.getTerminationFuture();
+			},
+			getMainThreadExecutor());
+
+		final CompletableFuture<Void> suspendedFuture = terminationFuture.thenAccept(
+			(JobStatus jobStatus) -> {
+				if (jobStatus != JobStatus.SUSPENDED) {
+					final String msg = String.format("Job %s rescaling failed because we could not suspend
the execution graph.", jobGraph.getName());
+					log.info(msg);
+					throw new CompletionException(new JobModificationException(msg));
+				}
+			});
+
+		// 6. resume the new execution graph from the taken savepoint
+		final CompletableFuture<Acknowledge> rescalingFuture = suspendedFuture.thenCombineAsync(
+			executionGraphFuture,
+			(Void ignored, ExecutionGraph restoredExecutionGraph) -> {
+				// check if the ExecutionGraph is still the same
+				//noinspection ObjectEquality
+				if (executionGraph == currentExecutionGraph) {
+					executionGraph = restoredExecutionGraph;
+
+					scheduleExecutionGraph();
+
+					return Acknowledge.get();
+				} else {
+					throw new CompletionException(new JobModificationException("Detected concurrent modification
of ExecutionGraph. Aborting the resacling."));
+				}
+
+			},
+			getMainThreadExecutor());
+
+		rescalingFuture.whenComplete(
+			(Acknowledge ignored, Throwable throwable) -> {
+				if (throwable != null) {
+					// fail the newly created execution graph
+					newExecutionGraph.failGlobal(new FlinkException("Failed to rescale the job " + jobGraph.getJobID()
+ '.', throwable));
+				}
+			});
+
+		return rescalingFuture;
+	}
+
 	/**
 	 * Updates the task execution state for a given task.
 	 *
@@ -912,15 +1077,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId>
implements JobMast
 		}
 
 		// start scheduling job in another thread
-		scheduledExecutorService.execute(
-			() -> {
-				try {
-					executionGraph.scheduleForExecution();
-				}
-				catch (Throwable t) {
-					executionGraph.failGlobal(t);
-				}
-			});
+		scheduledExecutorService.execute(this::scheduleExecutionGraph);
 
 		return Acknowledge.get();
 	}
@@ -963,6 +1120,36 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId>
implements JobMast
 		return Acknowledge.get();
 	}
 
+	/**
+	 * Schedules the execution of the current {@link ExecutionGraph}.
+	 */
+	private void scheduleExecutionGraph() {
+		try {
+			executionGraph.scheduleForExecution();
+		}
+		catch (Throwable t) {
+			executionGraph.failGlobal(t);
+		}
+	}
+
+	/**
+	 * Dispose the savepoint stored under the given path.
+	 *
+	 * @param savepointPath path where the savepoint is stored
+	 */
+	private void disposeSavepoint(String savepointPath) {
+		try {
+			// delete the temporary savepoint
+			Checkpoints.disposeSavepoint(
+				savepointPath,
+				jobMasterConfiguration.getConfiguration(),
+				userCodeLoader,
+				log);
+		} catch (FlinkException | IOException de) {
+			log.info("Could not dispose temporary rescaling savepoint under {}.", savepointPath, de);
+		}
+	}
+
 	//----------------------------------------------------------------------------------------------
 
 	private void handleFatalError(final Throwable cause) {

http://git-wip-us.apache.org/repos/asf/flink/blob/f83e2f77/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterConfiguration.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterConfiguration.java
index 15a30e2..5a4e3b3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterConfiguration.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -36,16 +37,20 @@ public class JobMasterConfiguration {
 
 	private final Time slotIdleTimeout;
 
+	private final String tmpDirectory;
+
 	private final Configuration configuration;
 
 	public JobMasterConfiguration(
 			Time rpcTimeout,
 			Time slotRequestTimeout,
 			Time slotIdleTimeout,
+			String tmpDirectory,
 			Configuration configuration) {
 		this.rpcTimeout = Preconditions.checkNotNull(rpcTimeout);
 		this.slotRequestTimeout = Preconditions.checkNotNull(slotRequestTimeout);
 		this.slotIdleTimeout = Preconditions.checkNotNull(slotIdleTimeout);
+		this.tmpDirectory = Preconditions.checkNotNull(tmpDirectory);
 		this.configuration = Preconditions.checkNotNull(configuration);
 	}
 
@@ -61,6 +66,10 @@ public class JobMasterConfiguration {
 		return slotIdleTimeout;
 	}
 
+	public String getTmpDirectory() {
+		return tmpDirectory;
+	}
+
 	public Configuration getConfiguration() {
 		return configuration;
 	}
@@ -78,10 +87,13 @@ public class JobMasterConfiguration {
 		final Time slotRequestTimeout = Time.milliseconds(configuration.getLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT));
 		final Time slotIdleTimeout = Time.milliseconds(configuration.getLong(JobManagerOptions.SLOT_IDLE_TIMEOUT));
 
+		final String tmpDirectory = ConfigurationUtils.parseTempDirectories(configuration)[0];
+
 		return new JobMasterConfiguration(
 			rpcTimeout,
 			slotRequestTimeout,
 			slotIdleTimeout,
+			tmpDirectory,
 			configuration);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f83e2f77/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 0dcf3fb..fb53237 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -73,6 +73,34 @@ public interface JobMasterGateway extends
 	CompletableFuture<Acknowledge> stop(@RpcTimeout Time timeout);
 
 	/**
+	 * Triggers rescaling of the executed job.
+	 *
+	 * @param newParallelism new parallelism of the job
+	 * @param rescalingBehaviour defining how strict the rescaling has to be executed
+	 * @param timeout of this operation
+	 * @return Future which is completed with {@link Acknowledge} once the rescaling was successful
+	 */
+	CompletableFuture<Acknowledge> rescaleJob(
+		int newParallelism,
+		RescalingBehaviour rescalingBehaviour,
+		@RpcTimeout Time timeout);
+
+	/**
+	 * Triggers rescaling of the given set of operators.
+	 *
+	 * @param operators set of operators which shall be rescaled
+	 * @param newParallelism new parallelism of the given set of operators
+	 * @param rescalingBehaviour defining how strict the rescaling has to be executed
+	 * @param timeout of this operation
+	 * @return Future which is completed with {@link Acknowledge} once the rescaling was successful
+	 */
+	CompletableFuture<Acknowledge> rescaleOperators(
+		Collection<JobVertexID> operators,
+		int newParallelism,
+		RescalingBehaviour rescalingBehaviour,
+		@RpcTimeout Time timeout);
+
+	/**
 	 * Updates the task execution state for a given task.
 	 *
 	 * @param taskExecutionState New task execution state for a given task

http://git-wip-us.apache.org/repos/asf/flink/blob/f83e2f77/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RescalingBehaviour.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RescalingBehaviour.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RescalingBehaviour.java
new file mode 100644
index 0000000..7de9560
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RescalingBehaviour.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.function.BiConsumerWithException;
+
+/**
+ * Definition of the rescaling behaviour.
+ */
+public enum RescalingBehaviour implements BiConsumerWithException<JobVertex, Integer,
FlinkException> {
+	// rescaling is only executed if the operator can be set to the given parallelism
+	STRICT {
+		@Override
+		public void acceptWithException(JobVertex jobVertex, Integer newParallelism) throws FlinkException
{
+			if (jobVertex.getMaxParallelism() < newParallelism) {
+				throw new FlinkException("Cannot rescale vertex " + jobVertex.getName() +
+					" because its maximum parallelism " + jobVertex.getMaxParallelism() +
+					" is smaller than the new parallelism " + newParallelism + '.');
+			} else {
+				jobVertex.setParallelism(newParallelism);
+			}
+		}
+	},
+	// the new parallelism will be the minimum of the given parallelism and the maximum parallelism
+	RELAXED {
+		@Override
+		public void acceptWithException(JobVertex jobVertex, Integer newParallelism) {
+			jobVertex.setParallelism(Math.min(jobVertex.getMaxParallelism(), newParallelism));
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f83e2f77/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/exceptions/JobMasterException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/exceptions/JobMasterException.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/exceptions/JobMasterException.java
new file mode 100644
index 0000000..a7b62e1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/exceptions/JobMasterException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.exceptions;
+
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.util.FlinkException;
+
+/**
+ * Base class for all {@link JobMaster} related exceptions.
+ */
+public class JobMasterException extends FlinkException {
+	private static final long serialVersionUID = 2941885469739200908L;
+
+	public JobMasterException(String message) {
+		super(message);
+	}
+
+	public JobMasterException(Throwable cause) {
+		super(cause);
+	}
+
+	public JobMasterException(String message, Throwable cause) {
+		super(message, cause);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f83e2f77/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/exceptions/JobModificationException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/exceptions/JobModificationException.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/exceptions/JobModificationException.java
new file mode 100644
index 0000000..e08ec62
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/exceptions/JobModificationException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.exceptions;
+
+/**
+ * Base class for all exception which originate from a failed job modification.
+ */
+public class JobModificationException extends JobMasterException {
+
+	private static final long serialVersionUID = 2374146694058970746L;
+
+	public JobModificationException(String message) {
+		super(message);
+	}
+
+	public JobModificationException(Throwable cause) {
+		super(cause);
+	}
+
+	public JobModificationException(String message, Throwable cause) {
+		super(message, cause);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f83e2f77/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
index 168b32b..cac7e90 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.jobmaster.RescalingBehaviour;
 import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
 import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
 import org.apache.flink.runtime.messages.Acknowledge;
@@ -67,6 +68,16 @@ public class TestingJobMasterGateway implements JobMasterGateway {
 	}
 
 	@Override
+	public CompletableFuture<Acknowledge> rescaleJob(int newParallelism, RescalingBehaviour
rescalingBehaviour, Time timeout) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public CompletableFuture<Acknowledge> rescaleOperators(Collection<JobVertexID>
operators, int newParallelism, RescalingBehaviour rescalingBehaviour, Time timeout) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
 	public CompletableFuture<Acknowledge> updateTaskExecutionState(TaskExecutionState
taskExecutionState) {
 		throw new UnsupportedOperationException();
 	}


Mime
View raw message