flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [2/2] flink git commit: [FLINK-3512] [runtime] Savepoint backend should not revert to 'jobmanager'
Date Thu, 25 Feb 2016 19:04:13 GMT
[FLINK-3512] [runtime] Savepoint backend should not revert to 'jobmanager'

This closes #1712.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c9cba277
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c9cba277
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c9cba277

Branch: refs/heads/master
Commit: c9cba2771c038eab35522324c78a40ab2bb3d1c9
Parents: d90672f
Author: Ufuk Celebi <uce@apache.org>
Authored: Thu Feb 25 16:55:38 2016 +0100
Committer: Ufuk Celebi <uce@apache.org>
Committed: Thu Feb 25 20:03:57 2016 +0100

----------------------------------------------------------------------
 .../checkpoint/SavepointStoreFactory.java       | 63 +++++++-------------
 .../checkpoint/SavepointStoreFactoryTest.java   | 24 +++++---
 2 files changed, 37 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c9cba277/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointStoreFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointStoreFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointStoreFactory.java
index 04a3227..6d25e18 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointStoreFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointStoreFactory.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -32,6 +32,7 @@ public class SavepointStoreFactory {
 
 	public static final String SAVEPOINT_BACKEND_KEY = "savepoints.state.backend";
 	public static final String SAVEPOINT_DIRECTORY_KEY = "savepoints.state.backend.fs.dir";
+	public static final String DEFAULT_SAVEPOINT_BACKEND = "jobmanager";
 
 	public static final Logger LOG = LoggerFactory.getLogger(SavepointStoreFactory.class);
 
@@ -52,55 +53,33 @@ public class SavepointStoreFactory {
 			Configuration config) throws Exception {
 
 		// Try a the savepoint-specific configuration first.
-		String savepointBackend = config.getString(SAVEPOINT_BACKEND_KEY, null);
+		String savepointBackend = config.getString(SAVEPOINT_BACKEND_KEY, DEFAULT_SAVEPOINT_BACKEND);
 
 		if (savepointBackend == null) {
 			LOG.info("No savepoint state backend configured. " +
 					"Using job manager savepoint state backend.");
 			return createJobManagerSavepointStore();
-		}
-		else if (savepointBackend.equals("jobmanager")) {
+		} else if (savepointBackend.equals("jobmanager")) {
 			LOG.info("Using job manager savepoint state backend.");
 			return createJobManagerSavepointStore();
-		}
-		else if (savepointBackend.equals("filesystem")) {
-			// Sanity check that the checkpoints are not stored on the job manager only
-			String checkpointBackend = config.getString(
-					ConfigConstants.STATE_BACKEND, "jobmanager");
-
-			if (checkpointBackend.equals("jobmanager")) {
-				LOG.warn("The combination of file system backend for savepoints and " +
-						"jobmanager backend for checkpoints does not work. The savepoint " +
-						"will *not* be recoverable after the job manager shuts down. " +
-						"Falling back to job manager savepoint state backend.");
-
-				return createJobManagerSavepointStore();
+		} else if (savepointBackend.equals("filesystem")) {
+			String rootPath = config.getString(SAVEPOINT_DIRECTORY_KEY, null);
+
+			if (rootPath == null) {
+				throw new IllegalConfigurationException("Using filesystem as savepoint state backend,
" +
+						"but did not specify directory. Please set the " +
+						"following configuration key: '" + SAVEPOINT_DIRECTORY_KEY +
+						"' (e.g. " + SAVEPOINT_DIRECTORY_KEY + ": hdfs:///flink/savepoints/). " +
+						"Falling back to job manager savepoint backend.");
+			} else {
+				LOG.info("Using filesystem savepoint backend (root path: {}).", rootPath);
+
+				return createFileSystemSavepointStore(rootPath);
 			}
-			else {
-				String rootPath = config.getString(SAVEPOINT_DIRECTORY_KEY, null);
-
-				if (rootPath == null) {
-					LOG.warn("Using filesystem as savepoint state backend, " +
-							"but did not specify directory. Please set the " +
-							"following configuration key: '" + SAVEPOINT_DIRECTORY_KEY +
-							"' (e.g. " + SAVEPOINT_DIRECTORY_KEY + ": hdfs:///flink/savepoints/). " +
-							"Falling back to job manager savepoint backend.");
-
-					return createJobManagerSavepointStore();
-				}
-				else {
-					LOG.info("Using filesystem savepoint backend (root path: {}).", rootPath);
-
-					return createFileSystemSavepointStore(rootPath);
-				}
-			}
-		}
-		else {
-			// Fallback
-			LOG.warn("Unexpected savepoint backend configuration '{}'. " +
-					"Falling back to job manager savepoint state backend.", savepointBackend);
-
-			return createJobManagerSavepointStore();
+		} else {
+			throw new IllegalConfigurationException("Unexpected savepoint backend " +
+					"configuration '" + savepointBackend + "'. " +
+					"Falling back to job manager savepoint state backend.");
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c9cba277/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointStoreFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointStoreFactoryTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointStoreFactoryTest.java
index 69b6f81..c0605f7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointStoreFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointStoreFactoryTest.java
@@ -20,11 +20,13 @@ package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.Path;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class SavepointStoreFactoryTest {
 
@@ -61,28 +63,34 @@ public class SavepointStoreFactoryTest {
 	@Test
 	public void testSavepointBackendFileSystemButCheckpointBackendJobManager() throws Exception
{
 		Configuration config = new Configuration();
-
+		String rootPath = System.getProperty("java.io.tmpdir");
 		// This combination does not make sense, because the checkpoints will be
 		// lost after the job manager shuts down.
 		config.setString(ConfigConstants.STATE_BACKEND, "jobmanager");
 		config.setString(SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "filesystem");
+		config.setString(SavepointStoreFactory.SAVEPOINT_DIRECTORY_KEY, rootPath);
+
 		SavepointStore store = SavepointStoreFactory.createFromConfig(config);
-		assertTrue(store.getStateStore() instanceof HeapStateStore);
+		assertTrue(store.getStateStore() instanceof FileSystemStateStore);
+
+		FileSystemStateStore<CompletedCheckpoint> stateStore = (FileSystemStateStore<CompletedCheckpoint>)
+				store.getStateStore();
+		assertEquals(new Path(rootPath), stateStore.getRootPath());
 	}
 
-	@Test
+	@Test(expected = IllegalConfigurationException.class)
 	public void testSavepointBackendFileSystemButNoDirectory() throws Exception {
 		Configuration config = new Configuration();
 		config.setString(SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "filesystem");
-		SavepointStore store = SavepointStoreFactory.createFromConfig(config);
-		assertTrue(store.getStateStore() instanceof HeapStateStore);
+		SavepointStoreFactory.createFromConfig(config);
+		fail("Did not throw expected Exception");
 	}
 
-	@Test
+	@Test(expected = IllegalConfigurationException.class)
 	public void testUnexpectedSavepointBackend() throws Exception {
 		Configuration config = new Configuration();
 		config.setString(SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "unexpected");
-		SavepointStore store = SavepointStoreFactory.createFromConfig(config);
-		assertTrue(store.getStateStore() instanceof HeapStateStore);
+		SavepointStoreFactory.createFromConfig(config);
+		fail("Did not throw expected Exception");
 	}
 }


Mime
View raw message