flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject flink git commit: [FLINK-4510] [checkpoint] Always create CheckpointCoordinator
Date Thu, 27 Oct 2016 15:31:37 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.1 16e7c7867 -> b4207503c


[FLINK-4510] [checkpoint] Always create CheckpointCoordinator


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b4207503
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b4207503
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b4207503

Branch: refs/heads/release-1.1
Commit: b4207503c2e674ce501404cbc6cb264647f17397
Parents: 16e7c78
Author: Jark Wu <wuchong.wc@alibaba-inc.com>
Authored: Mon Oct 17 19:46:15 2016 +0200
Committer: Ufuk Celebi <uce@apache.org>
Committed: Thu Oct 27 17:30:50 2016 +0200

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       |   2 +-
 .../runtime/executiongraph/ExecutionGraph.java  |  46 ++++---
 .../flink/runtime/jobmanager/JobManager.scala   |  44 +++---
 .../runtime/jobmanager/JobManagerTest.java      | 138 +++++++++++++++++--
 .../api/graph/StreamingJobGraphGenerator.java   |  68 +++++----
 .../graph/StreamingJobGraphGeneratorTest.java   |  28 +++-
 6 files changed, 234 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b4207503/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 6aaa014..409f05b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -411,7 +411,7 @@ public class CheckpointCoordinator {
 			}
 
 			//make sure the minimum interval between checkpoints has passed
-			if (lastTriggeredCheckpoint + minPauseBetweenCheckpoints > timestamp) {
+			if (lastTriggeredCheckpoint + minPauseBetweenCheckpoints > timestamp && baseInterval
!= Long.MAX_VALUE) {
 				if (currentPeriodicTrigger != null) {
 					currentPeriodicTrigger.cancel();
 					currentPeriodicTrigger = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/b4207503/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 8cf8354..cc8e75d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -397,27 +397,31 @@ public class ExecutionGraph implements Serializable {
 
 		checkpointStatsTracker = Objects.requireNonNull(statsTracker, "Checkpoint stats tracker");
 
-		// create the coordinator that triggers and commits checkpoints and holds the state
-		checkpointCoordinator = new CheckpointCoordinator(
-				jobID,
-				interval,
-				checkpointTimeout,
-				minPauseBetweenCheckpoints,
-				maxConcurrentCheckpoints,
-				numberKeyGroups,
-				tasksToTrigger,
-				tasksToWaitFor,
-				tasksToCommitTo,
-				userClassLoader,
-				checkpointIDCounter,
-				checkpointStore,
-				recoveryMode,
-				checkpointStatsTracker);
-
-		// the periodic checkpoint scheduler is activated and deactivated as a result of
-		// job status changes (running -> on, all other states -> off)
-		registerJobStatusListener(
-				checkpointCoordinator.createActivatorDeactivator(actorSystem, leaderSessionID));
+		// interval of max long value indicates disable periodic checkpoint,
+		// the CheckpoitnCoordinator should be created only if the interval is not max value
+		if (interval != Long.MAX_VALUE) {
+			// create the coordinator that triggers and commits checkpoints and holds the state
+			checkpointCoordinator = new CheckpointCoordinator(
+					jobID,
+					interval,
+					checkpointTimeout,
+					minPauseBetweenCheckpoints,
+					maxConcurrentCheckpoints,
+					numberKeyGroups,
+					tasksToTrigger,
+					tasksToWaitFor,
+					tasksToCommitTo,
+					userClassLoader,
+					checkpointIDCounter,
+					checkpointStore,
+					recoveryMode,
+					checkpointStatsTracker);
+
+			// the periodic checkpoint scheduler is activated and deactivated as a result of
+			// job status changes (running -> on, all other states -> off)
+			registerJobStatusListener(
+					checkpointCoordinator.createActivatorDeactivator(actorSystem, leaderSessionID));
+		}
 
 		// Savepoint Coordinator
 		savepointCoordinator = new SavepointCoordinator(

http://git-wip-us.apache.org/repos/asf/flink/blob/b4207503/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index a4c2403..f16747a 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1360,22 +1360,22 @@ class JobManager(
             val checkpointCoordinator = graph.getCheckpointCoordinator()
             val savepointCoordinator = graph.getSavepointCoordinator()
 
-            if (checkpointCoordinator != null && savepointCoordinator != null) {
+            if (checkpointCoordinator != null || savepointCoordinator != null) {
               future {
                 try {
-                  if (checkpointCoordinator.receiveAcknowledgeMessage(ackMessage)) {
+                  if (checkpointCoordinator != null &&
+                      checkpointCoordinator.receiveAcknowledgeMessage(ackMessage)) {
                     // OK, this is the common case
+                  } else if (
+                    savepointCoordinator != null &&
+                    !savepointCoordinator.receiveAcknowledgeMessage(ackMessage)) {
+
+                    // Tried the savepoint coordinator if the message was not
+                    // addressed to the periodic checkpoint coordinator.
+                    log.info("Received message for non-existing checkpoint " +
+                      ackMessage.getCheckpointId)
                   }
-                  else {
-                    // Try the savepoint coordinator if the message was not addressed
-                    // to the periodic checkpoint coordinator.
-                    if (!savepointCoordinator.receiveAcknowledgeMessage(ackMessage)) {
-                      log.info("Received message for non-existing checkpoint " +
-                        ackMessage.getCheckpointId)
-                    }
-                  }
-                }
-                catch {
+                } catch {
                   case t: Throwable =>
                     log.error(s"Error in CheckpointCoordinator while processing $ackMessage",
t)
                 }
@@ -1397,24 +1397,26 @@ class JobManager(
             val checkpointCoordinator = graph.getCheckpointCoordinator()
             val savepointCoordinator = graph.getSavepointCoordinator()
 
-            if (checkpointCoordinator != null && savepointCoordinator != null) {
+            if (checkpointCoordinator != null || savepointCoordinator != null) {
               future {
                 try {
-                  if (checkpointCoordinator.receiveDeclineMessage(declineMessage)) {
+                  if (checkpointCoordinator != null &&
+                    checkpointCoordinator.receiveDeclineMessage(declineMessage)) {
                     // OK, this is the common case
                   }
-                  else {
-                    // Try the savepoint coordinator if the message was not addressed
+                  else if (
+                    savepointCoordinator != null &&
+                    !savepointCoordinator.receiveDeclineMessage(declineMessage)) {
+
+                    // Tried the savepoint coordinator if the message was not addressed
                     // to the periodic checkpoint coordinator.
-                    if (!savepointCoordinator.receiveDeclineMessage(declineMessage)) {
-                      log.info("Received message for non-existing checkpoint " +
-                        declineMessage.getCheckpointId)
-                    }
+                    log.info("Received message for non-existing checkpoint " +
+                      declineMessage.getCheckpointId)
                   }
                 }
                 catch {
                   case t: Throwable =>
-                    log.error(s"Error in CheckpointCoordinator while processing $declineMessage",
t)
+                    log.error(s"Error in SavepointCoordinator while processing $declineMessage",
t)
                 }
               }(context.dispatcher)
             }

http://git-wip-us.apache.org/repos/asf/flink/blob/b4207503/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 5c25003..5515c00 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -18,52 +18,70 @@
 
 package org.apache.flink.runtime.jobmanager;
 
+import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
 import akka.testkit.JavaTestKit;
-
 import com.typesafe.config.Config;
-
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointStoreFactory;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage;
+import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState;
 import org.apache.flink.runtime.messages.JobManagerMessages.StopJob;
 import org.apache.flink.runtime.messages.JobManagerMessages.StoppingFailure;
 import org.apache.flink.runtime.messages.JobManagerMessages.StoppingSuccess;
 import org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob;
-import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState;
 import org.apache.flink.runtime.messages.TaskMessages.PartitionState;
+import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingJobManager;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.ExecutionGraphFound;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.RequestExecutionGraph;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunningOrFinished;
+import org.apache.flink.runtime.testingUtils.TestingMemoryArchivist;
+import org.apache.flink.runtime.testingUtils.TestingTaskManager;
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.StoppableInvokable;
+import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
-
+import org.junit.rules.TemporaryFolder;
+import scala.Option;
 import scala.Some;
 import scala.Tuple2;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
 
+import java.io.File;
 import java.net.InetAddress;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.PIPELINED;
 import static org.apache.flink.runtime.testingUtils.TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT;
@@ -73,7 +91,10 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-public class JobManagerTest {
+public class JobManagerTest extends TestLogger {
+
+	@Rule
+	public TemporaryFolder tmpFolder = new TemporaryFolder();
 
 	private static ActorSystem system;
 
@@ -342,4 +363,105 @@ public class JobManagerTest {
 		}};
 	}
 
+	/**
+	 * Tests that we can trigger a
+	 *
+	 * @throws Exception
+	 */
+	@Test
+	public void testSavepointWithDeactivatedPeriodicCheckpointing() throws Exception {
+		File defaultSavepointDir = tmpFolder.newFolder();
+
+		FiniteDuration timeout = new FiniteDuration(30, TimeUnit.SECONDS);
+
+		Configuration config = new Configuration();
+		config.setString(SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "filesystem");
+		config.setString(SavepointStoreFactory.SAVEPOINT_DIRECTORY_KEY, defaultSavepointDir.getAbsolutePath());
+
+		ActorSystem actorSystem = null;
+		ActorGateway jobManager = null;
+		ActorGateway archiver = null;
+		ActorGateway taskManager = null;
+		try {
+			actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
+
+			Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors(
+					config,
+					actorSystem,
+					Option.apply("jm"),
+					Option.apply("arch"),
+					TestingJobManager.class,
+					TestingMemoryArchivist.class);
+
+			jobManager = new AkkaActorGateway(master._1(), null);
+			archiver = new AkkaActorGateway(master._2(), null);
+
+			ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor(
+					new Configuration(),
+					ResourceID.generate(),
+					actorSystem,
+					"localhost",
+					Option.apply("tm"),
+					Option.<LeaderRetrievalService>apply(new StandaloneLeaderRetrievalService(jobManager.path())),
+					true,
+					TestingTaskManager.class);
+
+			taskManager = new AkkaActorGateway(taskManagerRef, null);
+
+			// Wait until connected
+			Object msg = new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
+			Await.ready(taskManager.ask(msg, timeout), timeout);
+
+			// Create job graph
+			JobVertex sourceVertex = new JobVertex("Source");
+			sourceVertex.setInvokableClass(JobManagerHARecoveryTest.BlockingStatefulInvokable.class);
+			sourceVertex.setParallelism(1);
+
+			JobGraph jobGraph = new JobGraph("TestingJob", sourceVertex);
+
+			JobSnapshottingSettings snapshottingSettings = new JobSnapshottingSettings(
+					Collections.singletonList(sourceVertex.getID()),
+					Collections.singletonList(sourceVertex.getID()),
+					Collections.singletonList(sourceVertex.getID()),
+					Long.MAX_VALUE, // deactivated checkpointing
+					360000,
+					0,
+					Integer.MAX_VALUE);
+
+			jobGraph.setSnapshotSettings(snapshottingSettings);
+
+			// Submit job graph
+			msg = new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED);
+			Await.result(jobManager.ask(msg, timeout), timeout);
+
+			// Wait for all tasks to be running
+			msg = new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobGraph.getJobID());
+			Await.result(jobManager.ask(msg, timeout), timeout);
+
+			// Trigger savepoint
+			msg = new JobManagerMessages.TriggerSavepoint(jobGraph.getJobID());
+			Future<Object> future = jobManager.ask(msg, timeout);
+			Object result = Await.result(future, timeout);
+
+			assertTrue("Did not trigger savepoint", result instanceof JobManagerMessages.TriggerSavepointSuccess);
+			assertEquals(1, defaultSavepointDir.listFiles().length);
+		} finally {
+			if (actorSystem != null) {
+				actorSystem.shutdown();
+			}
+
+			if (archiver != null) {
+				archiver.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
+			}
+
+			if (jobManager != null) {
+				jobManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
+			}
+
+			if (taskManager != null) {
+				taskManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
+			}
+		}
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b4207503/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index d6819e1..a63b089 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -20,9 +20,7 @@ package org.apache.flink.streaming.api.graph;
 import com.google.common.hash.HashFunction;
 import com.google.common.hash.Hasher;
 import com.google.common.hash.Hashing;
-
 import org.apache.commons.lang3.StringUtils;
-
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
@@ -51,7 +49,6 @@ import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.runtime.tasks.StreamIterationHead;
 import org.apache.flink.streaming.runtime.tasks.StreamIterationTail;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -448,47 +445,46 @@ public class StreamingJobGraphGenerator {
 	
 	private void configureCheckpointing() {
 		CheckpointConfig cfg = streamGraph.getCheckpointConfig();
-		
-		if (cfg.isCheckpointingEnabled()) {
-			long interval = cfg.getCheckpointInterval();
-			if (interval < 1) {
-				throw new IllegalArgumentException("The checkpoint interval must be positive");
-			}
-
-			// collect the vertices that receive "trigger checkpoint" messages.
-			// currently, these are all the sources
-			List<JobVertexID> triggerVertices = new ArrayList<>();
-
-			// collect the vertices that need to acknowledge the checkpoint
-			// currently, these are all vertices
-			List<JobVertexID> ackVertices = new ArrayList<>(jobVertices.size());
-
-			// collect the vertices that receive "commit checkpoint" messages
-			// currently, these are all vertices
-			List<JobVertexID> commitVertices = new ArrayList<>();
-			
-			for (JobVertex vertex : jobVertices.values()) {
-				if (vertex.isInputVertex()) {
-					triggerVertices.add(vertex.getID());
-				}
-				// TODO: add check whether the user function implements the checkpointing interface
-				commitVertices.add(vertex.getID());
-				ackVertices.add(vertex.getID());
-			}
-
-			JobSnapshottingSettings settings = new JobSnapshottingSettings(
-					triggerVertices, ackVertices, commitVertices, interval,
-					cfg.getCheckpointTimeout(), cfg.getMinPauseBetweenCheckpoints(),
-					cfg.getMaxConcurrentCheckpoints());
-			jobGraph.setSnapshotSettings(settings);
 
+		long interval = cfg.getCheckpointInterval();
+		if (interval > 0) {
 			// check if a restart strategy has been set, if not then set the FixedDelayRestartStrategy
 			if (streamGraph.getExecutionConfig().getRestartStrategy() == null) {
 				// if the user enabled checkpointing, the default number of exec retries is infinite.
 				streamGraph.getExecutionConfig().setRestartStrategy(
 					RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, DEFAULT_RESTART_DELAY));
 			}
+		} else {
+			// interval of max value means disable periodic checkpoint
+			interval = Long.MAX_VALUE;
+		}
+
+		// collect the vertices that receive "trigger checkpoint" messages.
+		// currently, these are all the sources
+		List<JobVertexID> triggerVertices = new ArrayList<>();
+
+		// collect the vertices that need to acknowledge the checkpoint
+		// currently, these are all vertices
+		List<JobVertexID> ackVertices = new ArrayList<>(jobVertices.size());
+
+		// collect the vertices that receive "commit checkpoint" messages
+		// currently, these are all vertices
+		List<JobVertexID> commitVertices = new ArrayList<>();
+
+		for (JobVertex vertex : jobVertices.values()) {
+			if (vertex.isInputVertex()) {
+				triggerVertices.add(vertex.getID());
+			}
+			// TODO: add check whether the user function implements the checkpointing interface
+			commitVertices.add(vertex.getID());
+			ackVertices.add(vertex.getID());
 		}
+
+		JobSnapshottingSettings settings = new JobSnapshottingSettings(
+				triggerVertices, ackVertices, commitVertices, interval,
+				cfg.getCheckpointTimeout(), cfg.getMinPauseBetweenCheckpoints(),
+				cfg.getMaxConcurrentCheckpoints());
+		jobGraph.setSnapshotSettings(settings);
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/b4207503/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index 7f94aa0..d646212 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -17,22 +17,24 @@
 
 package org.apache.flink.streaming.api.graph;
 
-import java.io.IOException;
-import java.util.Random;
-
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.SerializedValue;
-
 import org.junit.Test;
 
-import static org.junit.Assert.*;
+import java.io.IOException;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 
 @SuppressWarnings("serial")
 public class StreamingJobGraphGeneratorTest {
@@ -145,4 +147,20 @@ public class StreamingJobGraphGeneratorTest {
 		assertEquals(1, jobGraph.getVerticesAsArray()[0].getParallelism());
 		assertEquals(1, jobGraph.getVerticesAsArray()[1].getParallelism());
 	}
+
+	/**
+	 * Tests that disabled checkpointing sets the checkpointing interval to Long.MAX_VALUE.
+	 */
+	@Test
+	public void testDisabledCheckpointing() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		StreamGraph streamGraph = new StreamGraph(env);
+		assertFalse("Checkpointing enabled", streamGraph.getCheckpointConfig().isCheckpointingEnabled());
+
+		StreamingJobGraphGenerator jobGraphGenerator = new StreamingJobGraphGenerator(streamGraph);
+		JobGraph jobGraph = jobGraphGenerator.createJobGraph();
+
+		JobSnapshottingSettings snapshottingSettings = jobGraph.getSnapshotSettings();
+		assertEquals(Long.MAX_VALUE, snapshottingSettings.getCheckpointInterval());
+	}
 }


Mime
View raw message