From issues-return-154797-archive-asf-public=cust-asf.ponee.io@flink.apache.org Thu Feb 22 17:27:52 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 2F36F18064E for ; Thu, 22 Feb 2018 17:27:52 +0100 (CET) Received: (qmail 82268 invoked by uid 500); 22 Feb 2018 16:27:51 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 82259 invoked by uid 99); 22 Feb 2018 16:27:51 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 22 Feb 2018 16:27:51 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 14B0EE95D9; Thu, 22 Feb 2018 16:27:51 +0000 (UTC) From: tillrohrmann To: issues@flink.apache.org Reply-To: issues@flink.apache.org References: In-Reply-To: Subject: [GitHub] flink pull request #5487: [FLINK-8656] [flip6] Add modify CLI command to res... Content-Type: text/plain Message-Id: <20180222162751.14B0EE95D9@git1-us-west.apache.org> Date: Thu, 22 Feb 2018 16:27:51 +0000 (UTC) Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5487#discussion_r170014907 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -447,6 +453,165 @@ public void postStop() throws Exception { return CompletableFuture.completedFuture(Acknowledge.get()); } + @Override + public CompletableFuture rescaleJob( + int newParallelism, + RescalingBehaviour rescalingBehaviour, + Time timeout) { + final ArrayList allOperators = new ArrayList<>(jobGraph.getNumberOfVertices()); + + for (JobVertex jobVertex : jobGraph.getVertices()) { + allOperators.add(jobVertex.getID()); + } + + return rescaleOperators(allOperators, newParallelism, rescalingBehaviour, timeout); + } + + @Override + public CompletableFuture rescaleOperators( + Collection operators, + int newParallelism, + RescalingBehaviour rescalingBehaviour, + Time timeout) { + // 1. Check whether we can rescale the job & rescale the respective vertices + for (JobVertexID jobVertexId : operators) { + final JobVertex jobVertex = jobGraph.findVertexByID(jobVertexId); + + // update max parallelism in case that it has not been configure + final ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId); + + if (executionJobVertex != null) { + jobVertex.setMaxParallelism(executionJobVertex.getMaxParallelism()); + } + + try { + rescalingBehaviour.acceptWithException(jobVertex, newParallelism); + } catch (FlinkException e) { + final String msg = String.format("Cannot rescale job %s.", jobGraph.getName()); + + log.info(msg, e); + + return FutureUtils.completedExceptionally( + new JobModificationException(msg, e)); + } + } + + final ExecutionGraph currentExecutionGraph = executionGraph; + + final ExecutionGraph newExecutionGraph; + + try { + newExecutionGraph = ExecutionGraphBuilder.buildGraph( + null, + jobGraph, + jobMasterConfiguration.getConfiguration(), + scheduledExecutorService, + scheduledExecutorService, + slotPool.getSlotProvider(), + userCodeLoader, + highAvailabilityServices.getCheckpointRecoveryFactory(), + rpcTimeout, + currentExecutionGraph.getRestartStrategy(), + jobMetricGroup, + 1, + blobServer, + jobMasterConfiguration.getSlotRequestTimeout(), + log); + } catch (JobExecutionException | JobException e) { + return FutureUtils.completedExceptionally( + new JobModificationException("Could not create rescaled ExecutionGraph.", e)); + } + + // 3. disable checkpoint coordinator to suppress subsequent checkpoints + final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator(); + checkpointCoordinator.stopCheckpointScheduler(); + + // 4. take a savepoint + final CompletableFuture savepointFuture = triggerSavepoint( + jobMasterConfiguration.getTmpDirectory(), + timeout); + + final CompletableFuture executionGraphFuture = savepointFuture + .thenApplyAsync( + (String savepointPath) -> { + try { + newExecutionGraph.getCheckpointCoordinator().restoreSavepoint( + savepointPath, + false, + newExecutionGraph.getAllVertices(), + userCodeLoader); + } catch (Exception e) { + disposeSavepoint(savepointPath); + + throw new CompletionException(new JobModificationException("Could not restore from temporary rescaling savepoint.", e)); + } + + // delete the savepoint file once we reach a terminal state + newExecutionGraph.getTerminationFuture() + .whenCompleteAsync( + (JobStatus jobStatus, Throwable throwable) -> disposeSavepoint(savepointPath), + scheduledExecutorService); + + return newExecutionGraph; + }, scheduledExecutorService) + .exceptionally( + (Throwable failure) -> { + // in case that we couldn't take a savepoint or restore from it, let's restart the checkpoint + // coordinator and abort the rescaling operation + if (checkpointCoordinator.isPeriodicCheckpointingConfigured()) { + checkpointCoordinator.startCheckpointScheduler(); + } + + throw new CompletionException(failure); + }); + + // 5. suspend the current job + final CompletableFuture terminationFuture = executionGraphFuture.thenComposeAsync( + (ExecutionGraph ignored) -> { + currentExecutionGraph.suspend(new FlinkException("Job is being rescaled.")); + return currentExecutionGraph.getTerminationFuture(); + }, + getMainThreadExecutor()); + + final CompletableFuture suspendedFuture = terminationFuture.thenAccept( + (JobStatus jobStatus) -> { + if (jobStatus != JobStatus.SUSPENDED) { + final String msg = String.format("Job %s rescaling failed because we could not suspend the execution graph.", jobGraph.getName()); + log.info(msg); + throw new CompletionException(new JobModificationException(msg)); + } + }); + + // 6. resume the new execution graph from the taken savepoint + final CompletableFuture rescalingFuture = suspendedFuture.thenCombineAsync( + executionGraphFuture, + (Void ignored, ExecutionGraph restoredExecutionGraph) -> { + // check if the ExecutionGraph is still the same + //noinspection ObjectEquality --- End diff -- There is. The `a == b` causes this. But here we want to check for referential identity. ---