Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3D125200BA5 for ; Wed, 19 Oct 2016 13:50:21 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 3BB6D160AEA; Wed, 19 Oct 2016 11:50:21 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 32DE3160ADE for ; Wed, 19 Oct 2016 13:50:20 +0200 (CEST) Received: (qmail 93718 invoked by uid 500); 19 Oct 2016 11:50:19 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 93708 invoked by uid 99); 19 Oct 2016 11:50:19 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Oct 2016 11:50:19 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 21E35E04AF; Wed, 19 Oct 2016 11:50:19 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: uce@apache.org To: commits@flink.apache.org Message-Id: <925ddc90f58a44cabc950a1534322104@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: flink git commit: [FLINK-4510] [checkpoint] Always create CheckpointCoordinator Date: Wed, 19 Oct 2016 11:50:19 +0000 (UTC) archived-at: Wed, 19 Oct 2016 11:50:21 -0000 Repository: flink Updated Branches: refs/heads/master b05c3c1b0 -> 398bd9b31 [FLINK-4510] [checkpoint] Always create CheckpointCoordinator This closes #2453. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/398bd9b3 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/398bd9b3 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/398bd9b3 Branch: refs/heads/master Commit: 398bd9b3198e70f199a1277a9f5074e81cbee3c8 Parents: b05c3c1 Author: Jark Wu Authored: Mon Oct 17 19:46:15 2016 +0200 Committer: Ufuk Celebi Committed: Wed Oct 19 13:50:08 2016 +0200 ---------------------------------------------------------------------- .../runtime/executiongraph/ExecutionGraph.java | 10 +- .../checkpoint/CheckpointCoordinatorTest.java | 6 +- .../runtime/jobmanager/JobManagerTest.java | 102 +++++++++++++++++++ .../api/graph/StreamingJobGraphGenerator.java | 89 ++++++++-------- .../graph/StreamingJobGraphGeneratorTest.java | 33 ++++-- 5 files changed, 178 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/398bd9b3/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index aa9406c..101bdba 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -389,9 +389,13 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable on, all other states -> off) - registerJobStatusListener(checkpointCoordinator.createActivatorDeactivator()); + // interval of max long value indicates disable periodic checkpoint, + // the CheckpointActivatorDeactivator should be created only if the interval is not max value + if (interval != Long.MAX_VALUE) { + // the periodic checkpoint scheduler is activated and deactivated as a result of + // job status changes (running -> on, all other states -> off) + registerJobStatusListener(checkpointCoordinator.createActivatorDeactivator()); + } } /** http://git-wip-us.apache.org/repos/asf/flink/blob/398bd9b3/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index 2a20c6c..bbe10d1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -2683,7 +2683,7 @@ public class CheckpointCoordinatorTest { null, true); - assertEquals(true, triggerResult.isFailure()); + assertTrue(triggerResult.isFailure()); assertEquals(CheckpointDeclineReason.PERIODIC_SCHEDULER_SHUTDOWN, triggerResult.getFailureReason()); // Not periodic @@ -2693,7 +2693,7 @@ public class CheckpointCoordinatorTest { null, false); - assertEquals(false, triggerResult.isFailure()); + assertFalse(triggerResult.isFailure()); } private void testCreateKeyGroupPartitions(int maxParallelism, int parallelism) { @@ -2851,7 +2851,7 @@ public class CheckpointCoordinatorTest { String targetDirectory = "xjasdkjakshdmmmxna"; CheckpointTriggerResult triggerResult = coord.triggerCheckpoint(timestamp, props, targetDirectory, false); - assertEquals(true, triggerResult.isSuccess()); + assertTrue(triggerResult.isSuccess()); // validate that we have a pending checkpoint assertEquals(1, coord.getNumberOfPendingCheckpoints()); http://git-wip-us.apache.org/repos/asf/flink/blob/398bd9b3/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java index 3c6ae9d..b5e5d45 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java @@ -59,6 +59,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages.StopJob; import org.apache.flink.runtime.messages.JobManagerMessages.StoppingFailure; import org.apache.flink.runtime.messages.JobManagerMessages.StoppingSuccess; import org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob; +import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint; import org.apache.flink.runtime.query.KvStateID; import org.apache.flink.runtime.query.KvStateLocation; import org.apache.flink.runtime.query.KvStateMessage.LookupKvStateLocation; @@ -702,6 +703,107 @@ public class JobManagerTest { taskManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender()); } } + } + + /** + * Tests that we can trigger a + * + * @throws Exception + */ + @Test + public void testSavepointWithDeactivatedPeriodicCheckpointing() throws Exception { + File defaultSavepointDir = tmpFolder.newFolder(); + FiniteDuration timeout = new FiniteDuration(30, TimeUnit.SECONDS); + Configuration config = new Configuration(); + config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY, defaultSavepointDir.getAbsolutePath()); + + ActorSystem actorSystem = null; + ActorGateway jobManager = null; + ActorGateway archiver = null; + ActorGateway taskManager = null; + try { + actorSystem = AkkaUtils.createLocalActorSystem(new Configuration()); + + Tuple2 master = JobManager.startJobManagerActors( + config, + actorSystem, + Option.apply("jm"), + Option.apply("arch"), + TestingJobManager.class, + TestingMemoryArchivist.class); + + jobManager = new AkkaActorGateway(master._1(), null); + archiver = new AkkaActorGateway(master._2(), null); + + ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor( + config, + ResourceID.generate(), + actorSystem, + "localhost", + Option.apply("tm"), + Option.apply(new StandaloneLeaderRetrievalService(jobManager.path())), + true, + TestingTaskManager.class); + + taskManager = new AkkaActorGateway(taskManagerRef, null); + + // Wait until connected + Object msg = new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor()); + Await.ready(taskManager.ask(msg, timeout), timeout); + + // Create job graph + JobVertex sourceVertex = new JobVertex("Source"); + sourceVertex.setInvokableClass(BlockingStatefulInvokable.class); + sourceVertex.setParallelism(1); + + JobGraph jobGraph = new JobGraph("TestingJob", sourceVertex); + + JobSnapshottingSettings snapshottingSettings = new JobSnapshottingSettings( + Collections.singletonList(sourceVertex.getID()), + Collections.singletonList(sourceVertex.getID()), + Collections.singletonList(sourceVertex.getID()), + Long.MAX_VALUE, // deactivated checkpointing + 360000, + 0, + Integer.MAX_VALUE, + ExternalizedCheckpointSettings.none()); + + jobGraph.setSnapshotSettings(snapshottingSettings); + + // Submit job graph + msg = new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED); + Await.result(jobManager.ask(msg, timeout), timeout); + + // Wait for all tasks to be running + msg = new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobGraph.getJobID()); + Await.result(jobManager.ask(msg, timeout), timeout); + + // Cancel with savepoint + File targetDirectory = tmpFolder.newFolder(); + + msg = new TriggerSavepoint(jobGraph.getJobID(), Option.apply(targetDirectory.getAbsolutePath())); + Future future = jobManager.ask(msg, timeout); + Object result = Await.result(future, timeout); + + assertTrue("Did not trigger savepoint", result instanceof JobManagerMessages.TriggerSavepointSuccess); + assertEquals(1, targetDirectory.listFiles().length); + } finally { + if (actorSystem != null) { + actorSystem.shutdown(); + } + + if (archiver != null) { + archiver.actor().tell(PoisonPill.getInstance(), ActorRef.noSender()); + } + + if (jobManager != null) { + jobManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender()); + } + + if (taskManager != null) { + taskManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender()); + } + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/398bd9b3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index 0b7dc2a..824e375 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -464,59 +464,58 @@ public class StreamingJobGraphGenerator { private void configureCheckpointing() { CheckpointConfig cfg = streamGraph.getCheckpointConfig(); - - if (cfg.isCheckpointingEnabled()) { - long interval = cfg.getCheckpointInterval(); - if (interval < 1) { - throw new IllegalArgumentException("The checkpoint interval must be positive"); - } - - // collect the vertices that receive "trigger checkpoint" messages. - // currently, these are all the sources - List triggerVertices = new ArrayList<>(); - - // collect the vertices that need to acknowledge the checkpoint - // currently, these are all vertices - List ackVertices = new ArrayList<>(jobVertices.size()); - - // collect the vertices that receive "commit checkpoint" messages - // currently, these are all vertices - List commitVertices = new ArrayList<>(); - - for (JobVertex vertex : jobVertices.values()) { - if (vertex.isInputVertex()) { - triggerVertices.add(vertex.getID()); - } - commitVertices.add(vertex.getID()); - ackVertices.add(vertex.getID()); - } - - ExternalizedCheckpointSettings externalizedCheckpointSettings; - if (cfg.isExternalizedCheckpointsEnabled()) { - CheckpointConfig.ExternalizedCheckpointCleanup cleanup = cfg.getExternalizedCheckpointCleanup(); - // Sanity check - if (cleanup == null) { - throw new IllegalStateException("Externalized checkpoints enabled, but no cleanup mode configured."); - } - externalizedCheckpointSettings = ExternalizedCheckpointSettings.externalizeCheckpoints(cleanup.deleteOnCancellation()); - } else { - externalizedCheckpointSettings = ExternalizedCheckpointSettings.none(); - } - - JobSnapshottingSettings settings = new JobSnapshottingSettings( - triggerVertices, ackVertices, commitVertices, interval, - cfg.getCheckpointTimeout(), cfg.getMinPauseBetweenCheckpoints(), - cfg.getMaxConcurrentCheckpoints(), - externalizedCheckpointSettings); - jobGraph.setSnapshotSettings(settings); + long interval = cfg.getCheckpointInterval(); + if (interval > 0) { // check if a restart strategy has been set, if not then set the FixedDelayRestartStrategy if (streamGraph.getExecutionConfig().getRestartStrategy() == null) { // if the user enabled checkpointing, the default number of exec retries is infinite. streamGraph.getExecutionConfig().setRestartStrategy( RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, DEFAULT_RESTART_DELAY)); } + } else { + // interval of max value means disable periodic checkpoint + interval = Long.MAX_VALUE; + } + + // collect the vertices that receive "trigger checkpoint" messages. + // currently, these are all the sources + List triggerVertices = new ArrayList<>(); + + // collect the vertices that need to acknowledge the checkpoint + // currently, these are all vertices + List ackVertices = new ArrayList<>(jobVertices.size()); + + // collect the vertices that receive "commit checkpoint" messages + // currently, these are all vertices + List commitVertices = new ArrayList<>(); + + for (JobVertex vertex : jobVertices.values()) { + if (vertex.isInputVertex()) { + triggerVertices.add(vertex.getID()); + } + commitVertices.add(vertex.getID()); + ackVertices.add(vertex.getID()); } + + ExternalizedCheckpointSettings externalizedCheckpointSettings; + if (cfg.isExternalizedCheckpointsEnabled()) { + CheckpointConfig.ExternalizedCheckpointCleanup cleanup = cfg.getExternalizedCheckpointCleanup(); + // Sanity check + if (cleanup == null) { + throw new IllegalStateException("Externalized checkpoints enabled, but no cleanup mode configured."); + } + externalizedCheckpointSettings = ExternalizedCheckpointSettings.externalizeCheckpoints(cleanup.deleteOnCancellation()); + } else { + externalizedCheckpointSettings = ExternalizedCheckpointSettings.none(); + } + + JobSnapshottingSettings settings = new JobSnapshottingSettings( + triggerVertices, ackVertices, commitVertices, interval, + cfg.getCheckpointTimeout(), cfg.getMinPauseBetweenCheckpoints(), + cfg.getMaxConcurrentCheckpoints(), + externalizedCheckpointSettings); + jobGraph.setSnapshotSettings(settings); } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/398bd9b3/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java index 6259598..b817c93 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java @@ -17,29 +17,25 @@ package org.apache.flink.streaming.api.graph; -import java.io.IOException; -import java.util.List; -import java.util.Random; - import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.sink.DiscardingSink; import org.apache.flink.streaming.api.functions.sink.SinkFunction; -import org.apache.flink.streaming.util.NoOpIntMap; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.SerializedValue; - import org.apache.flink.util.TestLogger; import org.junit.Test; -import static org.junit.Assert.*; +import java.io.IOException; +import java.util.Random; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; @SuppressWarnings("serial") public class StreamingJobGraphGeneratorTest extends TestLogger { @@ -159,4 +155,19 @@ public class StreamingJobGraphGeneratorTest extends TestLogger { assertEquals(1, jobGraph.getVerticesAsArray()[1].getParallelism()); } + /** + * Tests that disabled checkpointing sets the checkpointing interval to Long.MAX_VALUE. + */ + @Test + public void testDisabledCheckpointing() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamGraph streamGraph = new StreamGraph(env); + assertFalse("Checkpointing enabled", streamGraph.getCheckpointConfig().isCheckpointingEnabled()); + + StreamingJobGraphGenerator jobGraphGenerator = new StreamingJobGraphGenerator(streamGraph); + JobGraph jobGraph = jobGraphGenerator.createJobGraph(); + + JobSnapshottingSettings snapshottingSettings = jobGraph.getSnapshotSettings(); + assertEquals(Long.MAX_VALUE, snapshottingSettings.getCheckpointInterval()); + } }