flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject flink git commit: [FLINK-4510] [checkpoint] Always create CheckpointCoordinator
Date Wed, 19 Oct 2016 11:50:19 GMT
Repository: flink
Updated Branches:
  refs/heads/master b05c3c1b0 -> 398bd9b31


[FLINK-4510] [checkpoint] Always create CheckpointCoordinator

This closes #2453.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/398bd9b3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/398bd9b3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/398bd9b3

Branch: refs/heads/master
Commit: 398bd9b3198e70f199a1277a9f5074e81cbee3c8
Parents: b05c3c1
Author: Jark Wu <wuchong.wc@alibaba-inc.com>
Authored: Mon Oct 17 19:46:15 2016 +0200
Committer: Ufuk Celebi <uce@apache.org>
Committed: Wed Oct 19 13:50:08 2016 +0200

----------------------------------------------------------------------
 .../runtime/executiongraph/ExecutionGraph.java  |  10 +-
 .../checkpoint/CheckpointCoordinatorTest.java   |   6 +-
 .../runtime/jobmanager/JobManagerTest.java      | 102 +++++++++++++++++++
 .../api/graph/StreamingJobGraphGenerator.java   |  89 ++++++++--------
 .../graph/StreamingJobGraphGeneratorTest.java   |  33 ++++--
 5 files changed, 178 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/398bd9b3/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index aa9406c..101bdba 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -389,9 +389,13 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 				checkpointDir,
 				checkpointStatsTracker);
 
-		// the periodic checkpoint scheduler is activated and deactivated as a result of
-		// job status changes (running -> on, all other states -> off)
-		registerJobStatusListener(checkpointCoordinator.createActivatorDeactivator());
+		// interval of max long value indicates disable periodic checkpoint,
+		// the CheckpointActivatorDeactivator should be created only if the interval is not max
value
+		if (interval != Long.MAX_VALUE) {
+			// the periodic checkpoint scheduler is activated and deactivated as a result of
+			// job status changes (running -> on, all other states -> off)
+			registerJobStatusListener(checkpointCoordinator.createActivatorDeactivator());
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/398bd9b3/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 2a20c6c..bbe10d1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -2683,7 +2683,7 @@ public class CheckpointCoordinatorTest {
 				null,
 				true);
 
-		assertEquals(true, triggerResult.isFailure());
+		assertTrue(triggerResult.isFailure());
 		assertEquals(CheckpointDeclineReason.PERIODIC_SCHEDULER_SHUTDOWN, triggerResult.getFailureReason());
 
 		// Not periodic
@@ -2693,7 +2693,7 @@ public class CheckpointCoordinatorTest {
 				null,
 				false);
 
-		assertEquals(false, triggerResult.isFailure());
+		assertFalse(triggerResult.isFailure());
 	}
 
 	private void testCreateKeyGroupPartitions(int maxParallelism, int parallelism) {
@@ -2851,7 +2851,7 @@ public class CheckpointCoordinatorTest {
 		String targetDirectory = "xjasdkjakshdmmmxna";
 
 		CheckpointTriggerResult triggerResult = coord.triggerCheckpoint(timestamp, props, targetDirectory,
false);
-		assertEquals(true, triggerResult.isSuccess());
+		assertTrue(triggerResult.isSuccess());
 
 		// validate that we have a pending checkpoint
 		assertEquals(1, coord.getNumberOfPendingCheckpoints());

http://git-wip-us.apache.org/repos/asf/flink/blob/398bd9b3/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 3c6ae9d..b5e5d45 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -59,6 +59,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages.StopJob;
 import org.apache.flink.runtime.messages.JobManagerMessages.StoppingFailure;
 import org.apache.flink.runtime.messages.JobManagerMessages.StoppingSuccess;
 import org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob;
+import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
 import org.apache.flink.runtime.query.KvStateID;
 import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.query.KvStateMessage.LookupKvStateLocation;
@@ -702,6 +703,107 @@ public class JobManagerTest {
 				taskManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
 			}
 		}
+	}
+
+	/**
+	 * Tests that we can trigger a
+	 *
+	 * @throws Exception
+	 */
+	@Test
+	public void testSavepointWithDeactivatedPeriodicCheckpointing() throws Exception {
+		File defaultSavepointDir = tmpFolder.newFolder();
 
+		FiniteDuration timeout = new FiniteDuration(30, TimeUnit.SECONDS);
+		Configuration config = new Configuration();
+		config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY, defaultSavepointDir.getAbsolutePath());
+
+		ActorSystem actorSystem = null;
+		ActorGateway jobManager = null;
+		ActorGateway archiver = null;
+		ActorGateway taskManager = null;
+		try {
+			actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
+
+			Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors(
+					config,
+					actorSystem,
+					Option.apply("jm"),
+					Option.apply("arch"),
+					TestingJobManager.class,
+					TestingMemoryArchivist.class);
+
+			jobManager = new AkkaActorGateway(master._1(), null);
+			archiver = new AkkaActorGateway(master._2(), null);
+
+			ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor(
+					config,
+					ResourceID.generate(),
+					actorSystem,
+					"localhost",
+					Option.apply("tm"),
+					Option.<LeaderRetrievalService>apply(new StandaloneLeaderRetrievalService(jobManager.path())),
+					true,
+					TestingTaskManager.class);
+
+			taskManager = new AkkaActorGateway(taskManagerRef, null);
+
+			// Wait until connected
+			Object msg = new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
+			Await.ready(taskManager.ask(msg, timeout), timeout);
+
+			// Create job graph
+			JobVertex sourceVertex = new JobVertex("Source");
+			sourceVertex.setInvokableClass(BlockingStatefulInvokable.class);
+			sourceVertex.setParallelism(1);
+
+			JobGraph jobGraph = new JobGraph("TestingJob", sourceVertex);
+
+			JobSnapshottingSettings snapshottingSettings = new JobSnapshottingSettings(
+					Collections.singletonList(sourceVertex.getID()),
+					Collections.singletonList(sourceVertex.getID()),
+					Collections.singletonList(sourceVertex.getID()),
+					Long.MAX_VALUE, // deactivated checkpointing
+					360000,
+					0,
+					Integer.MAX_VALUE,
+					ExternalizedCheckpointSettings.none());
+
+			jobGraph.setSnapshotSettings(snapshottingSettings);
+
+			// Submit job graph
+			msg = new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED);
+			Await.result(jobManager.ask(msg, timeout), timeout);
+
+			// Wait for all tasks to be running
+			msg = new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobGraph.getJobID());
+			Await.result(jobManager.ask(msg, timeout), timeout);
+
+			// Cancel with savepoint
+			File targetDirectory = tmpFolder.newFolder();
+
+			msg = new TriggerSavepoint(jobGraph.getJobID(), Option.apply(targetDirectory.getAbsolutePath()));
+			Future<Object> future = jobManager.ask(msg, timeout);
+			Object result = Await.result(future, timeout);
+
+			assertTrue("Did not trigger savepoint", result instanceof JobManagerMessages.TriggerSavepointSuccess);
+			assertEquals(1, targetDirectory.listFiles().length);
+		} finally {
+			if (actorSystem != null) {
+				actorSystem.shutdown();
+			}
+
+			if (archiver != null) {
+				archiver.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
+			}
+
+			if (jobManager != null) {
+				jobManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
+			}
+
+			if (taskManager != null) {
+				taskManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
+			}
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/398bd9b3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 0b7dc2a..824e375 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -464,59 +464,58 @@ public class StreamingJobGraphGenerator {
 	
 	private void configureCheckpointing() {
 		CheckpointConfig cfg = streamGraph.getCheckpointConfig();
-		
-		if (cfg.isCheckpointingEnabled()) {
-			long interval = cfg.getCheckpointInterval();
-			if (interval < 1) {
-				throw new IllegalArgumentException("The checkpoint interval must be positive");
-			}
-
-			// collect the vertices that receive "trigger checkpoint" messages.
-			// currently, these are all the sources
-			List<JobVertexID> triggerVertices = new ArrayList<>();
-
-			// collect the vertices that need to acknowledge the checkpoint
-			// currently, these are all vertices
-			List<JobVertexID> ackVertices = new ArrayList<>(jobVertices.size());
-
-			// collect the vertices that receive "commit checkpoint" messages
-			// currently, these are all vertices
-			List<JobVertexID> commitVertices = new ArrayList<>();
-			
-			for (JobVertex vertex : jobVertices.values()) {
-				if (vertex.isInputVertex()) {
-					triggerVertices.add(vertex.getID());
-				}
-				commitVertices.add(vertex.getID());
-				ackVertices.add(vertex.getID());
-			}
-
-			ExternalizedCheckpointSettings externalizedCheckpointSettings;
-			if (cfg.isExternalizedCheckpointsEnabled()) {
-				CheckpointConfig.ExternalizedCheckpointCleanup cleanup = cfg.getExternalizedCheckpointCleanup();
-				// Sanity check
-				if (cleanup == null) {
-					throw new IllegalStateException("Externalized checkpoints enabled, but no cleanup mode
configured.");
-				}
-				externalizedCheckpointSettings = ExternalizedCheckpointSettings.externalizeCheckpoints(cleanup.deleteOnCancellation());
-			} else {
-				externalizedCheckpointSettings = ExternalizedCheckpointSettings.none();
-			}
-
-			JobSnapshottingSettings settings = new JobSnapshottingSettings(
-					triggerVertices, ackVertices, commitVertices, interval,
-					cfg.getCheckpointTimeout(), cfg.getMinPauseBetweenCheckpoints(),
-					cfg.getMaxConcurrentCheckpoints(),
-					externalizedCheckpointSettings);
-			jobGraph.setSnapshotSettings(settings);
 
+		long interval = cfg.getCheckpointInterval();
+		if (interval > 0) {
 			// check if a restart strategy has been set, if not then set the FixedDelayRestartStrategy
 			if (streamGraph.getExecutionConfig().getRestartStrategy() == null) {
 				// if the user enabled checkpointing, the default number of exec retries is infinite.
 				streamGraph.getExecutionConfig().setRestartStrategy(
 					RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, DEFAULT_RESTART_DELAY));
 			}
+		} else {
+			// interval of max value means disable periodic checkpoint
+			interval = Long.MAX_VALUE;
+		}
+
+		// collect the vertices that receive "trigger checkpoint" messages.
+		// currently, these are all the sources
+		List<JobVertexID> triggerVertices = new ArrayList<>();
+
+		// collect the vertices that need to acknowledge the checkpoint
+		// currently, these are all vertices
+		List<JobVertexID> ackVertices = new ArrayList<>(jobVertices.size());
+
+		// collect the vertices that receive "commit checkpoint" messages
+		// currently, these are all vertices
+		List<JobVertexID> commitVertices = new ArrayList<>();
+
+		for (JobVertex vertex : jobVertices.values()) {
+			if (vertex.isInputVertex()) {
+				triggerVertices.add(vertex.getID());
+			}
+			commitVertices.add(vertex.getID());
+			ackVertices.add(vertex.getID());
 		}
+
+		ExternalizedCheckpointSettings externalizedCheckpointSettings;
+		if (cfg.isExternalizedCheckpointsEnabled()) {
+			CheckpointConfig.ExternalizedCheckpointCleanup cleanup = cfg.getExternalizedCheckpointCleanup();
+			// Sanity check
+			if (cleanup == null) {
+				throw new IllegalStateException("Externalized checkpoints enabled, but no cleanup mode
configured.");
+			}
+			externalizedCheckpointSettings = ExternalizedCheckpointSettings.externalizeCheckpoints(cleanup.deleteOnCancellation());
+		} else {
+			externalizedCheckpointSettings = ExternalizedCheckpointSettings.none();
+		}
+
+		JobSnapshottingSettings settings = new JobSnapshottingSettings(
+				triggerVertices, ackVertices, commitVertices, interval,
+				cfg.getCheckpointTimeout(), cfg.getMinPauseBetweenCheckpoints(),
+				cfg.getMaxConcurrentCheckpoints(),
+				externalizedCheckpointSettings);
+		jobGraph.setSnapshotSettings(settings);
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/398bd9b3/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index 6259598..b817c93 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -17,29 +17,25 @@
 
 package org.apache.flink.streaming.api.graph;
 
-import java.io.IOException;
-import java.util.List;
-import java.util.Random;
-
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.util.NoOpIntMap;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.SerializedValue;
-
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
-import static org.junit.Assert.*;
+import java.io.IOException;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 
 @SuppressWarnings("serial")
 public class StreamingJobGraphGeneratorTest extends TestLogger {
@@ -159,4 +155,19 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 		assertEquals(1, jobGraph.getVerticesAsArray()[1].getParallelism());
 	}
 
+	/**
+	 * Tests that disabled checkpointing sets the checkpointing interval to Long.MAX_VALUE.
+	 */
+	@Test
+	public void testDisabledCheckpointing() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		StreamGraph streamGraph = new StreamGraph(env);
+		assertFalse("Checkpointing enabled", streamGraph.getCheckpointConfig().isCheckpointingEnabled());
+
+		StreamingJobGraphGenerator jobGraphGenerator = new StreamingJobGraphGenerator(streamGraph);
+		JobGraph jobGraph = jobGraphGenerator.createJobGraph();
+
+		JobSnapshottingSettings snapshottingSettings = jobGraph.getSnapshotSettings();
+		assertEquals(Long.MAX_VALUE, snapshottingSettings.getCheckpointInterval());
+	}
 }


Mime
View raw message