From commits-return-15967-archive-asf-public=cust-asf.ponee.io@flink.apache.org Fri Feb 23 10:24:36 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 2B78518076D for ; Fri, 23 Feb 2018 10:24:35 +0100 (CET) Received: (qmail 15201 invoked by uid 500); 23 Feb 2018 09:24:34 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 14977 invoked by uid 99); 23 Feb 2018 09:24:34 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 23 Feb 2018 09:24:34 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id BBFC9EB4BF; Fri, 23 Feb 2018 09:24:32 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: trohrmann@apache.org To: commits@flink.apache.org Date: Fri, 23 Feb 2018 09:24:35 -0000 Message-Id: <196037ca332046dda25e586b2df10c8b@git.apache.org> In-Reply-To: <9381b88a64834927a21b4b83df1854c3@git.apache.org> References: <9381b88a64834927a21b4b83df1854c3@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [4/7] flink git commit: [FLINK-8629] [flip6] Allow JobMaster to rescale jobs [FLINK-8629] [flip6] Allow JobMaster to rescale jobs This commit adds the functionality to rescale a job or parts of it to the JobMaster. In order to rescale a job, the JobMaster does the following: 1. Take a savepoint 2. Create a rescaled ExecutionGraph from the JobGraph 3. Initialize it with the taken savepoint 4. Suspend the old ExecutionGraph 5. Restart the new ExecutionGraph once the old ExecutionGraph has been suspended This closes #5446. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f83e2f77 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f83e2f77 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f83e2f77 Branch: refs/heads/master Commit: f83e2f770a2ba7da9c9333ef536bbd612d744de2 Parents: 7e96a24 Author: Till Rohrmann Authored: Tue Feb 13 16:14:41 2018 +0100 Committer: Till Rohrmann Committed: Thu Feb 22 17:32:37 2018 +0100 ---------------------------------------------------------------------- .../util/function/BiConsumerWithException.java | 50 +++++ .../flink/runtime/checkpoint/Checkpoints.java | 2 +- .../flink/runtime/jobmaster/JobMaster.java | 219 +++++++++++++++++-- .../jobmaster/JobMasterConfiguration.java | 12 + .../runtime/jobmaster/JobMasterGateway.java | 28 +++ .../runtime/jobmaster/RescalingBehaviour.java | 49 +++++ .../exceptions/JobMasterException.java | 41 ++++ .../exceptions/JobModificationException.java | 39 ++++ .../utils/TestingJobMasterGateway.java | 11 + 9 files changed, 434 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f83e2f77/flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java b/flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java new file mode 100644 index 0000000..6ab1161 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util.function; + +import java.util.function.BiConsumer; + +/** + * A checked extension of the {@link BiConsumer} interface. + * + * @param type of the first argument + * @param type of the second argument + * @param type of the thrown exception + */ +@FunctionalInterface +public interface BiConsumerWithException extends BiConsumer { + + /** + * Performs this operation on the given arguments. + * + * @param t the first input argument + * @param u the second input argument + * @throws E in case of an error + */ + void acceptWithException(T t, U u) throws E; + + @Override + default void accept(T t, U u) { + try { + acceptWithException(t, u); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f83e2f77/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java index 47efa6f..72b7c53 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java @@ -246,7 +246,7 @@ public class Checkpoints { try (InputStream in = metadataHandle.openInputStream(); DataInputStream dis = new DataInputStream(in)) { - savepoint = loadCheckpointMetadata(dis, classLoader); + savepoint = loadCheckpointMetadata(dis, classLoader); } Exception exception = null; http://git-wip-us.apache.org/repos/asf/flink/blob/f83e2f77/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 2a4b881..22c69f5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -26,13 +26,15 @@ import org.apache.flink.core.io.InputSplit; import org.apache.flink.core.io.InputSplitAssigner; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.queryablestate.KvStateID; +import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.StoppingException; import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; -import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.checkpoint.Checkpoints; import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.FutureUtils; @@ -56,10 +58,12 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.jobmanager.OnCompletionActions; import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException; +import org.apache.flink.runtime.jobmaster.exceptions.JobModificationException; import org.apache.flink.runtime.jobmaster.message.ClassloadingProps; import org.apache.flink.runtime.jobmaster.slotpool.SlotPool; import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolGateway; @@ -106,6 +110,7 @@ import javax.annotation.Nullable; import java.io.IOException; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -115,6 +120,7 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -174,9 +180,6 @@ public class JobMaster extends FencedRpcEndpoint implements JobMast private final ClassLoader userCodeLoader; - /** The execution graph of this job. */ - private final ExecutionGraph executionGraph; - private final SlotPool slotPool; private final SlotPoolGateway slotPoolGateway; @@ -201,6 +204,11 @@ public class JobMaster extends FencedRpcEndpoint implements JobMast private final Map> registeredTaskManagers; + // -------- Mutable fields --------- + + /** The execution graph of this job. */ + private ExecutionGraph executionGraph; + // ------------------------------------------------------------------------ public JobMaster( @@ -268,8 +276,6 @@ public class JobMaster extends FencedRpcEndpoint implements JobMast log.info("Using restart strategy {} for {} ({}).", restartStrategy, jobName, jid); - CheckpointRecoveryFactory checkpointRecoveryFactory = highAvailabilityServices.getCheckpointRecoveryFactory(); - resourceManagerLeaderRetriever = highAvailabilityServices.getResourceManagerLeaderRetriever(); this.slotPool = new SlotPool( @@ -289,7 +295,7 @@ public class JobMaster extends FencedRpcEndpoint implements JobMast scheduledExecutorService, slotPool.getSlotProvider(), userCodeLoader, - checkpointRecoveryFactory, + highAvailabilityServices.getCheckpointRecoveryFactory(), rpcTimeout, restartStrategy, jobMetricGroup, @@ -447,6 +453,165 @@ public class JobMaster extends FencedRpcEndpoint implements JobMast return CompletableFuture.completedFuture(Acknowledge.get()); } + @Override + public CompletableFuture rescaleJob( + int newParallelism, + RescalingBehaviour rescalingBehaviour, + Time timeout) { + final ArrayList allOperators = new ArrayList<>(jobGraph.getNumberOfVertices()); + + for (JobVertex jobVertex : jobGraph.getVertices()) { + allOperators.add(jobVertex.getID()); + } + + return rescaleOperators(allOperators, newParallelism, rescalingBehaviour, timeout); + } + + @Override + public CompletableFuture rescaleOperators( + Collection operators, + int newParallelism, + RescalingBehaviour rescalingBehaviour, + Time timeout) { + // 1. Check whether we can rescale the job & rescale the respective vertices + for (JobVertexID jobVertexId : operators) { + final JobVertex jobVertex = jobGraph.findVertexByID(jobVertexId); + + // update max parallelism in case that it has not been configure + final ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId); + + if (executionJobVertex != null) { + jobVertex.setMaxParallelism(executionJobVertex.getMaxParallelism()); + } + + try { + rescalingBehaviour.acceptWithException(jobVertex, newParallelism); + } catch (FlinkException e) { + final String msg = String.format("Cannot rescale job %s.", jobGraph.getName()); + + log.info(msg, e); + + return FutureUtils.completedExceptionally( + new JobModificationException(msg, e)); + } + } + + final ExecutionGraph currentExecutionGraph = executionGraph; + + final ExecutionGraph newExecutionGraph; + + try { + newExecutionGraph = ExecutionGraphBuilder.buildGraph( + null, + jobGraph, + jobMasterConfiguration.getConfiguration(), + scheduledExecutorService, + scheduledExecutorService, + slotPool.getSlotProvider(), + userCodeLoader, + highAvailabilityServices.getCheckpointRecoveryFactory(), + rpcTimeout, + currentExecutionGraph.getRestartStrategy(), + jobMetricGroup, + 1, + blobServer, + jobMasterConfiguration.getSlotRequestTimeout(), + log); + } catch (JobExecutionException | JobException e) { + return FutureUtils.completedExceptionally( + new JobModificationException("Could not create rescaled ExecutionGraph.", e)); + } + + // 3. disable checkpoint coordinator to suppress subsequent checkpoints + final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator(); + checkpointCoordinator.stopCheckpointScheduler(); + + // 4. take a savepoint + final CompletableFuture savepointFuture = triggerSavepoint( + jobMasterConfiguration.getTmpDirectory(), + timeout); + + final CompletableFuture executionGraphFuture = savepointFuture + .thenApplyAsync( + (String savepointPath) -> { + try { + newExecutionGraph.getCheckpointCoordinator().restoreSavepoint( + savepointPath, + false, + newExecutionGraph.getAllVertices(), + userCodeLoader); + } catch (Exception e) { + disposeSavepoint(savepointPath); + + throw new CompletionException(new JobModificationException("Could not restore from temporary rescaling savepoint.", e)); + } + + // delete the savepoint file once we reach a terminal state + newExecutionGraph.getTerminationFuture() + .whenCompleteAsync( + (JobStatus jobStatus, Throwable throwable) -> disposeSavepoint(savepointPath), + scheduledExecutorService); + + return newExecutionGraph; + }, scheduledExecutorService) + .exceptionally( + (Throwable failure) -> { + // in case that we couldn't take a savepoint or restore from it, let's restart the checkpoint + // coordinator and abort the rescaling operation + if (checkpointCoordinator.isPeriodicCheckpointingConfigured()) { + checkpointCoordinator.startCheckpointScheduler(); + } + + throw new CompletionException(failure); + }); + + // 5. suspend the current job + final CompletableFuture terminationFuture = executionGraphFuture.thenComposeAsync( + (ExecutionGraph ignored) -> { + currentExecutionGraph.suspend(new FlinkException("Job is being rescaled.")); + return currentExecutionGraph.getTerminationFuture(); + }, + getMainThreadExecutor()); + + final CompletableFuture suspendedFuture = terminationFuture.thenAccept( + (JobStatus jobStatus) -> { + if (jobStatus != JobStatus.SUSPENDED) { + final String msg = String.format("Job %s rescaling failed because we could not suspend the execution graph.", jobGraph.getName()); + log.info(msg); + throw new CompletionException(new JobModificationException(msg)); + } + }); + + // 6. resume the new execution graph from the taken savepoint + final CompletableFuture rescalingFuture = suspendedFuture.thenCombineAsync( + executionGraphFuture, + (Void ignored, ExecutionGraph restoredExecutionGraph) -> { + // check if the ExecutionGraph is still the same + //noinspection ObjectEquality + if (executionGraph == currentExecutionGraph) { + executionGraph = restoredExecutionGraph; + + scheduleExecutionGraph(); + + return Acknowledge.get(); + } else { + throw new CompletionException(new JobModificationException("Detected concurrent modification of ExecutionGraph. Aborting the resacling.")); + } + + }, + getMainThreadExecutor()); + + rescalingFuture.whenComplete( + (Acknowledge ignored, Throwable throwable) -> { + if (throwable != null) { + // fail the newly created execution graph + newExecutionGraph.failGlobal(new FlinkException("Failed to rescale the job " + jobGraph.getJobID() + '.', throwable)); + } + }); + + return rescalingFuture; + } + /** * Updates the task execution state for a given task. * @@ -912,15 +1077,7 @@ public class JobMaster extends FencedRpcEndpoint implements JobMast } // start scheduling job in another thread - scheduledExecutorService.execute( - () -> { - try { - executionGraph.scheduleForExecution(); - } - catch (Throwable t) { - executionGraph.failGlobal(t); - } - }); + scheduledExecutorService.execute(this::scheduleExecutionGraph); return Acknowledge.get(); } @@ -963,6 +1120,36 @@ public class JobMaster extends FencedRpcEndpoint implements JobMast return Acknowledge.get(); } + /** + * Schedules the execution of the current {@link ExecutionGraph}. + */ + private void scheduleExecutionGraph() { + try { + executionGraph.scheduleForExecution(); + } + catch (Throwable t) { + executionGraph.failGlobal(t); + } + } + + /** + * Dispose the savepoint stored under the given path. + * + * @param savepointPath path where the savepoint is stored + */ + private void disposeSavepoint(String savepointPath) { + try { + // delete the temporary savepoint + Checkpoints.disposeSavepoint( + savepointPath, + jobMasterConfiguration.getConfiguration(), + userCodeLoader, + log); + } catch (FlinkException | IOException de) { + log.info("Could not dispose temporary rescaling savepoint under {}.", savepointPath, de); + } + } + //---------------------------------------------------------------------------------------------- private void handleFatalError(final Throwable cause) { http://git-wip-us.apache.org/repos/asf/flink/blob/f83e2f77/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterConfiguration.java index 15a30e2..5a4e3b3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterConfiguration.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmaster; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ConfigurationUtils; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.runtime.akka.AkkaUtils; @@ -36,16 +37,20 @@ public class JobMasterConfiguration { private final Time slotIdleTimeout; + private final String tmpDirectory; + private final Configuration configuration; public JobMasterConfiguration( Time rpcTimeout, Time slotRequestTimeout, Time slotIdleTimeout, + String tmpDirectory, Configuration configuration) { this.rpcTimeout = Preconditions.checkNotNull(rpcTimeout); this.slotRequestTimeout = Preconditions.checkNotNull(slotRequestTimeout); this.slotIdleTimeout = Preconditions.checkNotNull(slotIdleTimeout); + this.tmpDirectory = Preconditions.checkNotNull(tmpDirectory); this.configuration = Preconditions.checkNotNull(configuration); } @@ -61,6 +66,10 @@ public class JobMasterConfiguration { return slotIdleTimeout; } + public String getTmpDirectory() { + return tmpDirectory; + } + public Configuration getConfiguration() { return configuration; } @@ -78,10 +87,13 @@ public class JobMasterConfiguration { final Time slotRequestTimeout = Time.milliseconds(configuration.getLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT)); final Time slotIdleTimeout = Time.milliseconds(configuration.getLong(JobManagerOptions.SLOT_IDLE_TIMEOUT)); + final String tmpDirectory = ConfigurationUtils.parseTempDirectories(configuration)[0]; + return new JobMasterConfiguration( rpcTimeout, slotRequestTimeout, slotIdleTimeout, + tmpDirectory, configuration); } } http://git-wip-us.apache.org/repos/asf/flink/blob/f83e2f77/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java index 0dcf3fb..fb53237 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java @@ -73,6 +73,34 @@ public interface JobMasterGateway extends CompletableFuture stop(@RpcTimeout Time timeout); /** + * Triggers rescaling of the executed job. + * + * @param newParallelism new parallelism of the job + * @param rescalingBehaviour defining how strict the rescaling has to be executed + * @param timeout of this operation + * @return Future which is completed with {@link Acknowledge} once the rescaling was successful + */ + CompletableFuture rescaleJob( + int newParallelism, + RescalingBehaviour rescalingBehaviour, + @RpcTimeout Time timeout); + + /** + * Triggers rescaling of the given set of operators. + * + * @param operators set of operators which shall be rescaled + * @param newParallelism new parallelism of the given set of operators + * @param rescalingBehaviour defining how strict the rescaling has to be executed + * @param timeout of this operation + * @return Future which is completed with {@link Acknowledge} once the rescaling was successful + */ + CompletableFuture rescaleOperators( + Collection operators, + int newParallelism, + RescalingBehaviour rescalingBehaviour, + @RpcTimeout Time timeout); + + /** * Updates the task execution state for a given task. * * @param taskExecutionState New task execution state for a given task http://git-wip-us.apache.org/repos/asf/flink/blob/f83e2f77/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RescalingBehaviour.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RescalingBehaviour.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RescalingBehaviour.java new file mode 100644 index 0000000..7de9560 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RescalingBehaviour.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.function.BiConsumerWithException; + +/** + * Definition of the rescaling behaviour. + */ +public enum RescalingBehaviour implements BiConsumerWithException { + // rescaling is only executed if the operator can be set to the given parallelism + STRICT { + @Override + public void acceptWithException(JobVertex jobVertex, Integer newParallelism) throws FlinkException { + if (jobVertex.getMaxParallelism() < newParallelism) { + throw new FlinkException("Cannot rescale vertex " + jobVertex.getName() + + " because its maximum parallelism " + jobVertex.getMaxParallelism() + + " is smaller than the new parallelism " + newParallelism + '.'); + } else { + jobVertex.setParallelism(newParallelism); + } + } + }, + // the new parallelism will be the minimum of the given parallelism and the maximum parallelism + RELAXED { + @Override + public void acceptWithException(JobVertex jobVertex, Integer newParallelism) { + jobVertex.setParallelism(Math.min(jobVertex.getMaxParallelism(), newParallelism)); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f83e2f77/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/exceptions/JobMasterException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/exceptions/JobMasterException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/exceptions/JobMasterException.java new file mode 100644 index 0000000..a7b62e1 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/exceptions/JobMasterException.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.exceptions; + +import org.apache.flink.runtime.jobmaster.JobMaster; +import org.apache.flink.util.FlinkException; + +/** + * Base class for all {@link JobMaster} related exceptions. + */ +public class JobMasterException extends FlinkException { + private static final long serialVersionUID = 2941885469739200908L; + + public JobMasterException(String message) { + super(message); + } + + public JobMasterException(Throwable cause) { + super(cause); + } + + public JobMasterException(String message, Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f83e2f77/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/exceptions/JobModificationException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/exceptions/JobModificationException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/exceptions/JobModificationException.java new file mode 100644 index 0000000..e08ec62 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/exceptions/JobModificationException.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.exceptions; + +/** + * Base class for all exception which originate from a failed job modification. + */ +public class JobModificationException extends JobMasterException { + + private static final long serialVersionUID = 2374146694058970746L; + + public JobModificationException(String message) { + super(message); + } + + public JobModificationException(Throwable cause) { + super(cause); + } + + public JobModificationException(String message, Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f83e2f77/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java index 168b32b..cac7e90 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java @@ -34,6 +34,7 @@ import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmaster.JobMasterGateway; import org.apache.flink.runtime.jobmaster.JobMasterId; +import org.apache.flink.runtime.jobmaster.RescalingBehaviour; import org.apache.flink.runtime.jobmaster.SerializedInputSplit; import org.apache.flink.runtime.jobmaster.message.ClassloadingProps; import org.apache.flink.runtime.messages.Acknowledge; @@ -67,6 +68,16 @@ public class TestingJobMasterGateway implements JobMasterGateway { } @Override + public CompletableFuture rescaleJob(int newParallelism, RescalingBehaviour rescalingBehaviour, Time timeout) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture rescaleOperators(Collection operators, int newParallelism, RescalingBehaviour rescalingBehaviour, Time timeout) { + throw new UnsupportedOperationException(); + } + + @Override public CompletableFuture updateTaskExecutionState(TaskExecutionState taskExecutionState) { throw new UnsupportedOperationException(); }