flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [10/11] flink git commit: [FLINK-5822] [state backends] Make JobManager / Checkpoint Coordinator aware of the root state backend
Date Tue, 28 Feb 2017 18:36:53 GMT
[FLINK-5822] [state backends] Make JobManager / Checkpoint Coordinator aware of the root state backend

This closes #3411


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3446e66a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3446e66a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3446e66a

Branch: refs/heads/master
Commit: 3446e66aac63a3dfdaf8cfd4a73bd80a7f038379
Parents: 5b7f21d
Author: Stephan Ewen <sewen@apache.org>
Authored: Fri Feb 17 17:51:00 2017 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Tue Feb 28 19:02:13 2017 +0100

----------------------------------------------------------------------
 .../streaming/state/RocksDBStateBackend.java    |  12 +-
 .../state/RocksDBStateBackendFactory.java       |  19 +-
 .../jobmanager/JMXJobManagerMetricTest.java     |   2 +-
 .../CheckpointConfigHandlerTest.java            |   3 +
 .../checkpoint/CheckpointCoordinator.java       |  21 ++-
 .../runtime/executiongraph/ExecutionGraph.java  |   4 +-
 .../executiongraph/ExecutionGraphBuilder.java   |  32 +++-
 .../jobgraph/tasks/JobSnapshottingSettings.java |  15 +-
 .../runtime/state/AbstractStateBackend.java     | 173 ++++++++++++++++++-
 .../runtime/state/StateBackendFactory.java      |  16 +-
 .../state/filesystem/FsStateBackend.java        |  31 +++-
 .../state/filesystem/FsStateBackendFactory.java |  22 +--
 .../flink/runtime/state/heap/package-info.java  |  23 +++
 .../runtime/state/internal/package-info.java    |  52 ++++++
 .../state/memory/MemoryStateBackend.java        |   2 +-
 .../checkpoint/CheckpointStatsTrackerTest.java  |   1 +
 .../checkpoint/CoordinatorShutdownTest.java     |   5 +-
 ...ExecutionGraphCheckpointCoordinatorTest.java |   3 +-
 .../ArchivedExecutionGraphTest.java             |   3 +-
 .../tasks/JobSnapshottingSettingsTest.java      |   6 +
 .../jobmanager/JobManagerHARecoveryTest.java    |   1 +
 .../runtime/jobmanager/JobManagerTest.java      |   5 +
 .../flink/runtime/jobmanager/JobSubmitTest.java |   2 +-
 .../runtime/state/StateBackendLoadingTest.java  | 164 ++++++++++++++++++
 .../runtime/jobmanager/JobManagerITCase.scala   |   3 +
 .../api/graph/StreamGraphGenerator.java         |   2 +-
 .../api/graph/StreamingJobGraphGenerator.java   |   1 +
 .../streaming/runtime/tasks/StreamTask.java     |  72 ++------
 .../runtime/tasks/BlockingCheckpointsTest.java  |   2 +-
 .../streaming/runtime/tasks/StreamTaskTest.java |  56 +++---
 .../streaming/runtime/StateBackendITCase.java   |   2 +-
 31 files changed, 609 insertions(+), 146 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 3fd5d0f..dd0e2f7 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -29,10 +29,12 @@ import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.util.AbstractID;
+
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
 import org.rocksdb.NativeLibraryLoader;
 import org.rocksdb.RocksDB;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -160,7 +162,7 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 
 	private void lazyInitializeForJob(
 			Environment env,
-			String operatorIdentifier) throws Exception {
+			String operatorIdentifier) throws IOException {
 
 		if (isInitialized) {
 			return;
@@ -193,7 +195,7 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 			}
 
 			if (dirs.isEmpty()) {
-				throw new Exception("No local storage directories available. " + errorMessage);
+				throw new IOException("No local storage directories available. " + errorMessage);
 			} else {
 				initializedDbBasePaths = dirs.toArray(new File[dirs.size()]);
 			}
@@ -235,7 +237,7 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 			TypeSerializer<K> keySerializer,
 			int numberOfKeyGroups,
 			KeyGroupRange keyGroupRange,
-			TaskKvStateRegistry kvStateRegistry) throws Exception {
+			TaskKvStateRegistry kvStateRegistry) throws IOException {
 
 		// first, make sure that the RocksDB JNI library is loaded
 		// we do this explicitly here to have better error handling
@@ -437,7 +439,7 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 	//  static library loading utilities
 	// ------------------------------------------------------------------------
 
-	private void ensureRocksDBIsLoaded(String tempDirectory) throws Exception {
+	private void ensureRocksDBIsLoaded(String tempDirectory) throws IOException {
 		synchronized (RocksDBStateBackend.class) {
 			if (!rocksDbInitialized) {
 
@@ -488,7 +490,7 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 					}
 				}
 
-				throw new Exception("Could not load the native RocksDB library", lastException);
+				throw new IOException("Could not load the native RocksDB library", lastException);
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java
index 5002272..bd9bcaa 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java
@@ -15,24 +15,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.contrib.streaming.state;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.StateBackendFactory;
-import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.IOException;
 
 /**
  * A factory that creates an {@link org.apache.flink.contrib.streaming.state.RocksDBStateBackend}
  * from a configuration.
  */
-public class RocksDBStateBackendFactory implements StateBackendFactory<FsStateBackend> {
+public class RocksDBStateBackendFactory implements StateBackendFactory<RocksDBStateBackend> {
 
 	protected static final Logger LOG = LoggerFactory.getLogger(RocksDBStateBackendFactory.class);
 
@@ -44,9 +45,11 @@ public class RocksDBStateBackendFactory implements StateBackendFactory<FsStateBa
 	public static final String ROCKSDB_CHECKPOINT_DIRECTORY_URI_CONF_KEY = "state.backend.rocksdb.checkpointdir";
 
 	@Override
-	public AbstractStateBackend createFromConfig(Configuration config) throws Exception {
-		String checkpointDirURI = config.getString(CHECKPOINT_DIRECTORY_URI_CONF_KEY, null);
-		String rocksdbLocalPath = config.getString(ROCKSDB_CHECKPOINT_DIRECTORY_URI_CONF_KEY, null);
+	public RocksDBStateBackend createFromConfig(Configuration config) 
+			throws IllegalConfigurationException, IOException {
+
+		final String checkpointDirURI = config.getString(CHECKPOINT_DIRECTORY_URI_CONF_KEY, null);
+		final String rocksdbLocalPath = config.getString(ROCKSDB_CHECKPOINT_DIRECTORY_URI_CONF_KEY, null);
 
 		if (checkpointDirURI == null) {
 			throw new IllegalConfigurationException(
@@ -67,8 +70,8 @@ public class RocksDBStateBackendFactory implements StateBackendFactory<FsStateBa
 			return backend;
 		}
 		catch (IllegalArgumentException e) {
-			throw new Exception("Cannot initialize RocksDB State Backend with URI '"
-									+ checkpointDirURI + '.', e);
+			throw new IllegalConfigurationException(
+					"Cannot initialize RocksDB State Backend with URI '" + checkpointDirURI + '.', e);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
index b3b7dfc..1fdac65 100644
--- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
+++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
@@ -74,7 +74,7 @@ public class JMXJobManagerMetricTest {
 				Collections.<JobVertexID>emptyList(),
 				Collections.<JobVertexID>emptyList(),
 				Collections.<JobVertexID>emptyList(),
-				500, 500, 50, 5, ExternalizedCheckpointSettings.none(), true));
+				500, 500, 50, 5, ExternalizedCheckpointSettings.none(), null, true));
 
 			flink.waitForActorsToBeAlive();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
index e517c3c..95ced0a 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
@@ -56,6 +56,7 @@ public class CheckpointConfigHandlerTest {
 			minPause,
 			maxConcurrent,
 			externalized,
+			null,
 			true);
 
 		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
@@ -92,6 +93,7 @@ public class CheckpointConfigHandlerTest {
 			1212L,
 			12,
 			ExternalizedCheckpointSettings.none(),
+			null,
 			false); // at least once
 
 		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
@@ -122,6 +124,7 @@ public class CheckpointConfigHandlerTest {
 			1212L,
 			12,
 			externalizedSettings,
+			null,
 			false); // at least once
 
 		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 6da6f7d..0592e3d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -85,6 +85,12 @@ public class CheckpointCoordinator {
 	/** The job whose checkpoint this coordinator coordinates */
 	private final JobID job;
 
+	/** Default checkpoint properties **/
+	private final CheckpointProperties checkpointProperties;
+
+	/** The executor used for asynchronous calls, like potentially blocking I/O */
+	private final Executor executor;
+	
 	/** Tasks who need to be sent a message when a checkpoint is started */
 	private final ExecutionVertex[] tasksToTrigger;
 
@@ -101,7 +107,9 @@ public class CheckpointCoordinator {
 	 * accessing this don't block the job manager actor and run asynchronously. */
 	private final CompletedCheckpointStore completedCheckpointStore;
 
-	/** Default directory for persistent checkpoints; <code>null</code> if none configured. */
+	/** Default directory for persistent checkpoints; <code>null</code> if none configured.
+	 * THIS WILL BE REPLACED BY PROPER STATE-BACKEND METADATA WRITING */
+	@Nullable
 	private final String checkpointDirectory;
 
 	/** A list of recent checkpoint IDs, to identify late messages (vs invalid ones) */
@@ -154,11 +162,6 @@ public class CheckpointCoordinator {
 	@Nullable
 	private CheckpointStatsTracker statsTracker;
 
-	/** Default checkpoint properties **/
-	private final CheckpointProperties checkpointProperties;
-
-	private final Executor executor;
-
 	// --------------------------------------------------------------------------------------------
 
 	public CheckpointCoordinator(
@@ -173,7 +176,7 @@ public class CheckpointCoordinator {
 			ExecutionVertex[] tasksToCommitTo,
 			CheckpointIDCounter checkpointIDCounter,
 			CompletedCheckpointStore completedCheckpointStore,
-			String checkpointDirectory,
+			@Nullable String checkpointDirectory,
 			Executor executor) {
 
 		// sanity checks
@@ -211,6 +214,8 @@ public class CheckpointCoordinator {
 		this.checkpointIdCounter = checkNotNull(checkpointIDCounter);
 		this.completedCheckpointStore = checkNotNull(completedCheckpointStore);
 		this.checkpointDirectory = checkpointDirectory;
+		this.executor = checkNotNull(executor);
+
 		this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
 
 		this.timer = new Timer("Checkpoint Timer", true);
@@ -229,8 +234,6 @@ public class CheckpointCoordinator {
 		} catch (Throwable t) {
 			throw new RuntimeException("Failed to start checkpoint ID counter: " + t.getMessage(), t);
 		}
-
-		this.executor = checkNotNull(executor);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index ad4347d..a76a421 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -61,6 +61,7 @@ import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.query.KvStateLocationRegistry;
+import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.runtime.util.SerializedThrowable;
@@ -348,7 +349,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		return false;
 	}
 
-	public void enableSnapshotCheckpointing(
+	public void enableCheckpointing(
 			long interval,
 			long checkpointTimeout,
 			long minPauseBetweenCheckpoints,
@@ -360,6 +361,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			CheckpointIDCounter checkpointIDCounter,
 			CompletedCheckpointStore checkpointStore,
 			String checkpointDir,
+			StateBackend metadataStore,
 			CheckpointStatsTracker statsTracker) {
 
 		// simple sanity checks

http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index c558e43..2a79302 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
@@ -37,6 +38,9 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
 import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.util.DynamicCodeLoadingException;
 import org.slf4j.Logger;
 
 import javax.annotation.Nullable;
@@ -71,8 +75,8 @@ public class ExecutionGraphBuilder {
 			MetricGroup metrics,
 			int parallelismForAutoMax,
 			Logger log)
-		throws JobExecutionException, JobException
-	{
+		throws JobExecutionException, JobException {
+
 		checkNotNull(jobGraph, "job graph cannot be null");
 
 		final String jobName = jobGraph.getName();
@@ -191,7 +195,28 @@ public class ExecutionGraphBuilder {
 			String externalizedCheckpointsDir = jobManagerConfig.getString(
 					ConfigConstants.CHECKPOINTS_DIRECTORY_KEY, null);
 
-			executionGraph.enableSnapshotCheckpointing(
+			// load the state backend for checkpoint metadata.
+			// if specified in the application, use from there, otherwise load from configuration
+			final StateBackend metadataBackend;
+
+			final StateBackend applicationConfiguredBackend = snapshotSettings.getDefaultStateBackend();
+			if (applicationConfiguredBackend != null) {
+				metadataBackend = applicationConfiguredBackend;
+
+				log.info("Using application-defined state backend for checkpoint/savepoint metadata: {}.",
+						applicationConfiguredBackend);
+			}
+			else {
+				try {
+					metadataBackend = AbstractStateBackend
+							.loadStateBackendFromConfigOrCreateDefault(jobManagerConfig, classLoader, log);
+				}
+				catch (IllegalConfigurationException | IOException | DynamicCodeLoadingException e) {
+					throw new JobExecutionException(jobId, "Could not instantiate configured state backend", e);
+				}
+			}
+
+			executionGraph.enableCheckpointing(
 					snapshotSettings.getCheckpointInterval(),
 					snapshotSettings.getCheckpointTimeout(),
 					snapshotSettings.getMinPauseBetweenCheckpoints(),
@@ -203,6 +228,7 @@ public class ExecutionGraphBuilder {
 					checkpointIdCounter,
 					completedCheckpoints,
 					externalizedCheckpointsDir,
+					metadataBackend,
 					checkpointStatsTracker);
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
index 561ba89..233aa88 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
@@ -19,7 +19,9 @@
 package org.apache.flink.runtime.jobgraph.tasks;
 
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.StateBackend;
 
+import javax.annotation.Nullable;
 import java.util.List;
 
 import static java.util.Objects.requireNonNull;
@@ -50,6 +52,10 @@ public class JobSnapshottingSettings implements java.io.Serializable {
 	/** Settings for externalized checkpoints. */
 	private final ExternalizedCheckpointSettings externalizedCheckpointSettings;
 
+	/** The default state backend, if configured by the user in the job */
+	@Nullable
+	private final StateBackend defaultStateBackend;
+
 	/**
 	 * Flag indicating whether exactly once checkpoint mode has been configured.
 	 * If <code>false</code>, at least once mode has been configured. This is
@@ -58,7 +64,7 @@ public class JobSnapshottingSettings implements java.io.Serializable {
 	 * UI.
 	 */
 	private final boolean isExactlyOnce;
-	
+
 	public JobSnapshottingSettings(
 			List<JobVertexID> verticesToTrigger,
 			List<JobVertexID> verticesToAcknowledge,
@@ -68,6 +74,7 @@ public class JobSnapshottingSettings implements java.io.Serializable {
 			long minPauseBetweenCheckpoints,
 			int maxConcurrentCheckpoints,
 			ExternalizedCheckpointSettings externalizedCheckpointSettings,
+			@Nullable StateBackend defaultStateBackend,
 			boolean isExactlyOnce) {
 
 		// sanity checks
@@ -84,6 +91,7 @@ public class JobSnapshottingSettings implements java.io.Serializable {
 		this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
 		this.maxConcurrentCheckpoints = maxConcurrentCheckpoints;
 		this.externalizedCheckpointSettings = requireNonNull(externalizedCheckpointSettings);
+		this.defaultStateBackend = defaultStateBackend;
 		this.isExactlyOnce = isExactlyOnce;
 	}
 	
@@ -121,6 +129,11 @@ public class JobSnapshottingSettings implements java.io.Serializable {
 		return externalizedCheckpointSettings;
 	}
 
+	@Nullable
+	public StateBackend getDefaultStateBackend() {
+		return defaultStateBackend;
+	}
+
 	public boolean isExactlyOnce() {
 		return isExactlyOnce;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
index a335e45..2cf20a1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
@@ -21,20 +21,50 @@ package org.apache.flink.runtime.state;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.util.DynamicCodeLoadingException;
+
+import org.slf4j.Logger;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * An abstract base implementation of the {@link StateBackend} interface.
+ * 
+ * <p>
  */
 @PublicEvolving
 public abstract class AbstractStateBackend implements StateBackend, java.io.Serializable {
 
 	private static final long serialVersionUID = 4620415814639230247L;
 
+	// ------------------------------------------------------------------------
+	//  Configuration shortcut names
+	// ------------------------------------------------------------------------
+
+	/** The shortcut configuration name for the MemoryState backend that checkpoints to the JobManager */
+	public static final String MEMORY_STATE_BACKEND_NAME = "jobmanager";
+
+	/** The shortcut configuration name for the FileSystem State backend */ 
+	public static final String FS_STATE_BACKEND_NAME = "filesystem";
+
+	/** The shortcut configuration name for the RocksDB State Backend */
+	public static final String ROCKSDB_STATE_BACKEND_NAME = "rocksdb";
+
+	// ------------------------------------------------------------------------
+	//  State Backend - Persisting Byte Storage
+	// ------------------------------------------------------------------------
+
 	@Override
 	public abstract CheckpointStreamFactory createStreamFactory(
 			JobID jobId,
@@ -46,6 +76,10 @@ public abstract class AbstractStateBackend implements StateBackend, java.io.Seri
 			String operatorIdentifier,
 			@Nullable String targetLocation) throws IOException;
 
+	// ------------------------------------------------------------------------
+	//  State Backend - State-Holding Backends
+	// ------------------------------------------------------------------------
+
 	@Override
 	public abstract <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
 			Environment env,
@@ -54,7 +88,7 @@ public abstract class AbstractStateBackend implements StateBackend, java.io.Seri
 			TypeSerializer<K> keySerializer,
 			int numberOfKeyGroups,
 			KeyGroupRange keyGroupRange,
-			TaskKvStateRegistry kvStateRegistry) throws Exception;
+			TaskKvStateRegistry kvStateRegistry) throws IOException;
 
 	@Override
 	public OperatorStateBackend createOperatorStateBackend(
@@ -63,4 +97,141 @@ public abstract class AbstractStateBackend implements StateBackend, java.io.Seri
 
 		return new DefaultOperatorStateBackend(env.getUserClassLoader());
 	}
+
+	// ------------------------------------------------------------------------
+	//  Loading the state backend from a configuration 
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Loads the state backend from the configuration, from the parameter 'state.backend', as defined
+	 * in {@link CoreOptions#STATE_BACKEND}.
+	 * 
+	 * <p>The state backends can be specified either via their shortcut name, or via the class name
+	 * of a {@link StateBackendFactory}. If a StateBackendFactory class name is specified, the factory
+	 * is instantiated (via its zero-argument constructor) and its
+	 * {@link StateBackendFactory#createFromConfig(Configuration)} method is called.
+	 *
+	 * <p>Recognized shortcut names are '{@value AbstractStateBackend#MEMORY_STATE_BACKEND_NAME}',
+	 * '{@value AbstractStateBackend#FS_STATE_BACKEND_NAME}', and
+	 * '{@value AbstractStateBackend#ROCKSDB_STATE_BACKEND_NAME}'.
+	 * 
+	 * @param config The configuration to load the state backend from
+	 * @param classLoader The class loader that should be used to load the state backend
+	 * @param logger Optionally, a logger to log actions to (may be null)
+	 * 
+	 * @return The instantiated state backend.
+	 * 
+	 * @throws DynamicCodeLoadingException
+	 *             Thrown if a state backend factory is configured and the factory class was not
+	 *             found or the factory could not be instantiated
+	 * @throws IllegalConfigurationException
+	 *             May be thrown by the StateBackendFactory when creating / configuring the state
+	 *             backend in the factory
+	 * @throws IOException
+	 *             May be thrown by the StateBackendFactory when instantiating the state backend
+	 */
+	public static StateBackend loadStateBackendFromConfig(
+			Configuration config,
+			ClassLoader classLoader,
+			@Nullable Logger logger) throws IllegalConfigurationException, DynamicCodeLoadingException, IOException {
+
+		checkNotNull(config, "config");
+		checkNotNull(classLoader, "classLoader");
+
+		final String backendName = config.getString(CoreOptions.STATE_BACKEND);
+		if (backendName == null) {
+			return null;
+		}
+
+		// by default the factory class is the backend name 
+		String factoryClassName = backendName;
+
+		switch (backendName.toLowerCase()) {
+			case MEMORY_STATE_BACKEND_NAME:
+				if (logger != null) {
+					logger.info("State backend is set to heap memory (checkpoint to JobManager)");
+				}
+				return new MemoryStateBackend();
+
+			case FS_STATE_BACKEND_NAME:
+				FsStateBackend fsBackend = new FsStateBackendFactory().createFromConfig(config);
+				if (logger != null) {
+					logger.info("State backend is set to heap memory (checkpoints to filesystem \"{}\")",
+							fsBackend.getBasePath());
+				}
+				return fsBackend;
+
+			case ROCKSDB_STATE_BACKEND_NAME:
+				factoryClassName = "org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory";
+				// fall through to the 'default' case that uses reflection to load the backend
+				// that way we can keep RocksDB in a separate module
+
+			default:
+				if (logger != null) {
+					logger.info("Loading state backend via factory {}", factoryClassName);
+				}
+
+				StateBackendFactory<?> factory;
+				try {
+					@SuppressWarnings("rawtypes")
+					Class<? extends StateBackendFactory> clazz = 
+							Class.forName(factoryClassName, false, classLoader)
+									.asSubclass(StateBackendFactory.class);
+
+					factory = clazz.newInstance();
+				}
+				catch (ClassNotFoundException e) {
+					throw new DynamicCodeLoadingException(
+							"Cannot find configured state backend factory class: " + backendName, e);
+				}
+				catch (ClassCastException | InstantiationException | IllegalAccessException e) {
+					throw new DynamicCodeLoadingException("The class configured under '" +
+							CoreOptions.STATE_BACKEND.key() + "' is not a valid state backend factory (" +
+							backendName + ')', e);
+				}
+				
+				return factory.createFromConfig(config);
+		}
+	}
+
+	/**
+	 * Loads the state backend from the configuration, from the parameter 'state.backend', as defined
+	 * in {@link CoreOptions#STATE_BACKEND}. If no state backend is configures, this instantiates the
+	 * default state backend (the {@link MemoryStateBackend}). 
+	 *
+	 * <p>Refer to {@link #loadStateBackendFromConfig(Configuration, ClassLoader, Logger)} for details on
+	 * how the state backend is loaded from the configuration.
+	 *
+	 * @param config The configuration to load the state backend from
+	 * @param classLoader The class loader that should be used to load the state backend
+	 * @param logger Optionally, a logger to log actions to (may be null)
+	 *
+	 * @return The instantiated state backend.
+	 *
+	 * @throws DynamicCodeLoadingException
+	 *             Thrown if a state backend factory is configured and the factory class was not
+	 *             found or the factory could not be instantiated
+	 * @throws IllegalConfigurationException
+	 *             May be thrown by the StateBackendFactory when creating / configuring the state
+	 *             backend in the factory
+	 * @throws IOException
+	 *             May be thrown by the StateBackendFactory when instantiating the state backend
+	 */
+	public static StateBackend loadStateBackendFromConfigOrCreateDefault(
+			Configuration config,
+			ClassLoader classLoader,
+			@Nullable Logger logger) throws IllegalConfigurationException, DynamicCodeLoadingException, IOException {
+
+		final StateBackend fromConfig = loadStateBackendFromConfig(config, classLoader, logger);
+
+		if (fromConfig != null) {
+			return fromConfig;
+		}
+		else {
+			if (logger != null) {
+				logger.info("No state backend has been configured, using default state backend (Memory / JobManager)");
+			}
+			return new MemoryStateBackend();
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendFactory.java
index 39e7ed2..78c976a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendFactory.java
@@ -18,17 +18,24 @@
 
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
 
+import java.io.IOException;
 import java.io.Serializable;
 
 /**
  * A factory to create a specific state backend. The state backend creation gets a Configuration
  * object that can be used to read further config values.
  * 
+ * <p>The state backend factory is typically specified in the configuration to produce a
+ * configured state backend.
+ * 
  * @param <T> The type of the state backend created.
  */
-public interface StateBackendFactory<T extends AbstractStateBackend> extends Serializable {
+@PublicEvolving
+public interface StateBackendFactory<T extends StateBackend> extends Serializable {
 
 	/**
 	 * Creates the state backend, optionally using the given configuration.
@@ -36,7 +43,10 @@ public interface StateBackendFactory<T extends AbstractStateBackend> extends Ser
 	 * @param config The Flink configuration (loaded by the TaskManager).
 	 * @return The created state backend. 
 	 * 
-	 * @throws Exception Exceptions during instantiation can be forwarded.
+	 * @throws IllegalConfigurationException
+	 *             If the configuration misses critical values, or specifies invalid values
+	 * @throws IOException
+	 *             If the state backend initialization failed due to an I/O exception
 	 */
-	AbstractStateBackend createFromConfig(Configuration config) throws Exception;
+	T createFromConfig(Configuration config) throws IllegalConfigurationException, IOException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index b614d98..5e8a15d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -36,6 +36,8 @@ import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
  * The file state backend is a state backend that stores the state of streaming jobs in a file system.
  *
@@ -139,17 +141,14 @@ public class FsStateBackend extends AbstractStateBackend {
 	 *                             rather than in files
 	 * 
 	 * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
+	 * @throws IllegalArgumentException Thrown, if the {@code fileStateSizeThreshold} is out of bounds.
 	 */
 	public FsStateBackend(URI checkpointDataUri, int fileStateSizeThreshold) throws IOException {
-		if (fileStateSizeThreshold < 0) {
-			throw new IllegalArgumentException("The threshold for file state size must be zero or larger.");
-		}
-		if (fileStateSizeThreshold > MAX_FILE_STATE_THRESHOLD) {
-			throw new IllegalArgumentException("The threshold for file state size cannot be larger than " +
-				MAX_FILE_STATE_THRESHOLD);
-		}
+		checkArgument(fileStateSizeThreshold >= 0, "The threshold for file state size must be zero or larger.");
+		checkArgument(fileStateSizeThreshold <= MAX_FILE_STATE_THRESHOLD, 
+				"The threshold for file state size cannot be larger than %s", MAX_FILE_STATE_THRESHOLD);
+
 		this.fileStateThreshold = fileStateSizeThreshold;
-		
 		this.basePath = validateAndNormalizeUri(checkpointDataUri);
 	}
 
@@ -163,6 +162,19 @@ public class FsStateBackend extends AbstractStateBackend {
 		return basePath;
 	}
 
+	/**
+	 * Gets the threshold below which state is stored as part of the metadata, rather than in files.
+	 * This threshold ensures that the backend does not create a large amount of very small files,
+	 * where potentially the file pointers are larger than the state itself.
+	 * 
+	 * <p>By default, this threshold is {@value #DEFAULT_FILE_STATE_THRESHOLD}.
+	 * 
+	 * @return The file size threshold, in bytes.
+	 */
+	public int getMinFileSizeThreshold() {
+		return fileStateThreshold;
+	}
+
 	// ------------------------------------------------------------------------
 	//  initialization and cleanup
 	// ------------------------------------------------------------------------
@@ -189,7 +201,8 @@ public class FsStateBackend extends AbstractStateBackend {
 			TypeSerializer<K> keySerializer,
 			int numberOfKeyGroups,
 			KeyGroupRange keyGroupRange,
-			TaskKvStateRegistry kvStateRegistry) throws Exception {
+			TaskKvStateRegistry kvStateRegistry) throws IOException {
+
 		return new HeapKeyedStateBackend<>(
 				kvStateRegistry,
 				keySerializer,

http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java
index 042700c..4c933ef 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java
@@ -23,6 +23,8 @@ import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.StateBackendFactory;
 
+import java.io.IOException;
+
 /**
  * A factory that creates an {@link org.apache.flink.runtime.state.filesystem.FsStateBackend}
  * from a configuration.
@@ -35,28 +37,26 @@ public class FsStateBackendFactory implements StateBackendFactory<FsStateBackend
 	/** The key under which the config stores the threshold for state to be store in memory,
 	 * rather than in files */
 	public static final String MEMORY_THRESHOLD_CONF_KEY = "state.backend.fs.memory-threshold";
-	
-	
+
+
 	@Override
-	public FsStateBackend createFromConfig(Configuration config) throws Exception {
-		String checkpointDirURI = config.getString(CHECKPOINT_DIRECTORY_URI_CONF_KEY, null);
-		int memoryThreshold = config.getInteger(
+	public FsStateBackend createFromConfig(Configuration config) throws IllegalConfigurationException {
+		final String checkpointDirURI = config.getString(CHECKPOINT_DIRECTORY_URI_CONF_KEY, null);
+		final int memoryThreshold = config.getInteger(
 			MEMORY_THRESHOLD_CONF_KEY, FsStateBackend.DEFAULT_FILE_STATE_THRESHOLD);
-		
+
 		if (checkpointDirURI == null) {
 			throw new IllegalConfigurationException(
 					"Cannot create the file system state backend: The configuration does not specify the " +
 							"checkpoint directory '" + CHECKPOINT_DIRECTORY_URI_CONF_KEY + '\'');
 		}
-		
+
 		try {
 			Path path = new Path(checkpointDirURI);
 			return new FsStateBackend(path.toUri(), memoryThreshold);
 		}
-		catch (IllegalArgumentException e) {
-			throw new Exception("Cannot initialize File System State Backend with URI '"
-					+ checkpointDirURI + '.', e);
+		catch (IOException | IllegalArgumentException e) {
+			throw new IllegalConfigurationException("Invalid configuration for the state backend", e);
 		}
-		
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/package-info.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/package-info.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/package-info.java
new file mode 100644
index 0000000..2f34ed8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains the classes for key/value state backends that store the state
+ * on the JVM heap as objects.
+ */
+package org.apache.flink.runtime.state.heap;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/package-info.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/package-info.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/package-info.java
new file mode 100644
index 0000000..fcc4df9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/package-info.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package holds the classes of the <b>internal state type hierarchy</b>.
+ *
+ * <p>The internal state classes give access to the namespace getters and setters and access to
+ * additional functionality, like raw value access or state merging.
+ *
+ * <p>The public API state hierarchy is intended to be programmed against by Flink applications.
+ * The internal state hierarchy holds all the auxiliary methods that are used by the runtime and not
+ * intended to be used by user applications. These internal methods are considered of limited use to users and
+ * only confusing, and are usually not regarded as stable across releases.
+ *
+ * <p>Each specific type in the internal state hierarchy extends the type from the public
+ * state hierarchy. The following illustrates the relationship between the public- and the internal
+ * hierarchy at the example of a subset of the classes:
+ *
+ * <pre>
+ *             State
+ *               |
+ *               +-------------------InternalKvState
+ *               |                         |
+ *          MergingState                   |
+ *               |                         |
+ *               +-----------------InternalMergingState
+ *               |                         |
+ *      +--------+------+                  |
+ *      |               |                  |
+ * ReducingState    ListState        +-----+-----------------+
+ *      |               |            |                       |
+ *      |               +-----------   -----------------InternalListState
+ *      |                            |
+ *      +------------------InternalReducingState
+ * </pre>
+ */
+package org.apache.flink.runtime.state.internal;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
index 2cc1164..6e6b034 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
@@ -90,7 +90,7 @@ public class MemoryStateBackend extends AbstractStateBackend {
 			TypeSerializer<K> keySerializer,
 			int numberOfKeyGroups,
 			KeyGroupRange keyGroupRange,
-			TaskKvStateRegistry kvStateRegistry) throws IOException {
+			TaskKvStateRegistry kvStateRegistry) {
 
 		return new HeapKeyedStateBackend<>(
 				kvStateRegistry,

http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
index 9a39182..7ab71cb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
@@ -62,6 +62,7 @@ public class CheckpointStatsTrackerTest {
 			191929L,
 			123,
 			ExternalizedCheckpointSettings.none(),
+			null,
 			false);
 
 		CheckpointStatsTracker tracker = new CheckpointStatsTracker(

http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
index 7949ef0..976da48 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
@@ -32,7 +32,6 @@ import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.testtasks.NoOpInvokable;
 
 import org.junit.Test;
 
@@ -67,7 +66,7 @@ public class CoordinatorShutdownTest {
 			
 			JobGraph testGraph = new JobGraph("test job", vertex);
 			testGraph.setSnapshotSettings(new JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList, 
-					5000, 60000, 0L, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), true));
+					5000, 60000, 0L, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), null, true));
 			
 			ActorGateway jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
 
@@ -126,7 +125,7 @@ public class CoordinatorShutdownTest {
 
 			JobGraph testGraph = new JobGraph("test job", vertex);
 			testGraph.setSnapshotSettings(new JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList,
-					5000, 60000, 0L, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), true));
+					5000, 60000, 0L, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), null, true));
 			
 			ActorGateway jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index 47e6826..8f565dd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -106,7 +106,7 @@ public class ExecutionGraphCheckpointCoordinatorTest {
 			ClassLoader.getSystemClassLoader(),
 			new UnregisteredMetricsGroup());
 
-		executionGraph.enableSnapshotCheckpointing(
+		executionGraph.enableCheckpointing(
 				100,
 				100,
 				100,
@@ -118,6 +118,7 @@ public class ExecutionGraphCheckpointCoordinatorTest {
 				counter,
 				store,
 				null,
+				null,
 				CheckpointStatsTrackerTest.createTestTracker());
 
 		JobVertex jobVertex = new JobVertex("MockVertex");

http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
index 46ce3f4..3090172 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
@@ -112,7 +112,7 @@ public class ArchivedExecutionGraphTest {
 				mock(JobSnapshottingSettings.class),
 				new UnregisteredMetricsGroup());
 
-		runtimeGraph.enableSnapshotCheckpointing(
+		runtimeGraph.enableCheckpointing(
 			100,
 			100,
 			100,
@@ -124,6 +124,7 @@ public class ArchivedExecutionGraphTest {
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(1),
 			null,
+			null,
 			statsTracker);
 
 		Map<String, Accumulator<?, ?>> userAccumulators = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettingsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettingsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettingsTest.java
index 667dbca..2508d5c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettingsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettingsTest.java
@@ -20,11 +20,14 @@ package org.apache.flink.runtime.jobgraph.tasks;
 
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.junit.Test;
 
 import java.util.Arrays;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 public class JobSnapshottingSettingsTest {
 
@@ -42,6 +45,7 @@ public class JobSnapshottingSettingsTest {
 			112,
 			12,
 			ExternalizedCheckpointSettings.externalizeCheckpoints(true),
+			new MemoryStateBackend(),
 			false);
 
 		JobSnapshottingSettings copy = CommonTestUtils.createCopySerializable(settings);
@@ -55,5 +59,7 @@ public class JobSnapshottingSettingsTest {
 		assertEquals(settings.getExternalizedCheckpointSettings().externalizeCheckpoints(), copy.getExternalizedCheckpointSettings().externalizeCheckpoints());
 		assertEquals(settings.getExternalizedCheckpointSettings().deleteOnCancellation(), copy.getExternalizedCheckpointSettings().deleteOnCancellation());
 		assertEquals(settings.isExactlyOnce(), copy.isExactlyOnce());
+		assertNotNull(copy.getDefaultStateBackend());
+		assertTrue(copy.getDefaultStateBackend().getClass() == MemoryStateBackend.class);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index cbb077c..115b06c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -225,6 +225,7 @@ public class JobManagerHARecoveryTest {
 					0,
 					1,
 					ExternalizedCheckpointSettings.none(),
+					null,
 					true));
 
 			BlockingStatefulInvokable.initializeStaticHelpers(slots);

http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index c5f6d99..727fc65 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -829,6 +829,7 @@ public class JobManagerTest {
 					0,
 					Integer.MAX_VALUE,
 					ExternalizedCheckpointSettings.none(),
+					null,
 					true);
 
 			jobGraph.setSnapshotSettings(snapshottingSettings);
@@ -954,6 +955,7 @@ public class JobManagerTest {
 				0,
 				Integer.MAX_VALUE,
 				ExternalizedCheckpointSettings.none(),
+				null,
 				true);
 
 			jobGraph.setSnapshotSettings(snapshottingSettings);
@@ -1059,6 +1061,7 @@ public class JobManagerTest {
 					0,
 					Integer.MAX_VALUE,
 					ExternalizedCheckpointSettings.none(),
+					null,
 					true);
 
 			jobGraph.setSnapshotSettings(snapshottingSettings);
@@ -1161,6 +1164,7 @@ public class JobManagerTest {
 					0,
 					Integer.MAX_VALUE,
 					ExternalizedCheckpointSettings.none(),
+					null,
 					true);
 
 			jobGraph.setSnapshotSettings(snapshottingSettings);
@@ -1207,6 +1211,7 @@ public class JobManagerTest {
 					0,
 					Integer.MAX_VALUE,
 					ExternalizedCheckpointSettings.none(),
+					null,
 					true);
 
 			newJobGraph.setSnapshotSettings(newSnapshottingSettings);

http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index feb3d4d..529c100 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -229,7 +229,7 @@ public class JobSubmitTest {
 
 		JobGraph jg = new JobGraph("test job", jobVertex);
 		jg.setSnapshotSettings(new JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList,
-			5000, 5000, 0L, 10, ExternalizedCheckpointSettings.none(), true));
+			5000, 5000, 0L, 10, ExternalizedCheckpointSettings.none(), null, true));
 		return jg;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java
new file mode 100644
index 0000000..a64faf1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.util.DynamicCodeLoadingException;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * This test validates that state backends are properly loaded from configuration.
+ */
+public class StateBackendLoadingTest {
+
+	@Rule
+	public final TemporaryFolder tmp = new TemporaryFolder();
+
+	private final ClassLoader cl = getClass().getClassLoader();
+
+	private final String backendKey = CoreOptions.STATE_BACKEND.key();
+
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testNoStateBackendDefined() throws Exception {
+		assertNull(AbstractStateBackend.loadStateBackendFromConfig(new Configuration(), cl, null));
+	}
+
+	@Test
+	public void testInstantiateMemoryBackendByDefault() throws Exception {
+		StateBackend backend = AbstractStateBackend
+				.loadStateBackendFromConfigOrCreateDefault(new Configuration(), cl, null);
+
+		assertTrue(backend instanceof MemoryStateBackend);
+	}
+
+	@Test
+	public void testLoadMemoryStateBackend() throws Exception {
+		// we configure with the explicit string (rather than AbstractStateBackend#X_STATE_BACKEND_NAME)
+		// to guard against config-breaking changes of the name 
+		final Configuration config = new Configuration();
+		config.setString(backendKey, "jobmanager");
+
+		StateBackend backend = AbstractStateBackend
+				.loadStateBackendFromConfigOrCreateDefault(new Configuration(), cl, null);
+
+		assertTrue(backend instanceof MemoryStateBackend);
+	}
+
+	@Test
+	public void testLoadFileSystemStateBackend() throws Exception {
+		final String checkpointDir = new Path(tmp.getRoot().toURI()).toString();
+		final Path expectedPath = new Path(checkpointDir);
+		final int threshold = 1000000;
+
+		// we configure with the explicit string (rather than AbstractStateBackend#X_STATE_BACKEND_NAME)
+		// to guard against config-breaking changes of the name 
+		final Configuration config1 = new Configuration();
+		config1.setString(backendKey, "filesystem");
+		config1.setString("state.checkpoints.dir", checkpointDir);
+		config1.setString("state.backend.fs.checkpointdir", checkpointDir);
+		config1.setInteger("state.backend.fs.memory-threshold", threshold);
+
+		final Configuration config2 = new Configuration();
+		config2.setString(backendKey, FsStateBackendFactory.class.getName());
+		config2.setString("state.checkpoints.dir", checkpointDir);
+		config2.setString("state.backend.fs.checkpointdir", checkpointDir);
+		config2.setInteger("state.backend.fs.memory-threshold", threshold);
+
+		StateBackend backend1 = AbstractStateBackend
+				.loadStateBackendFromConfigOrCreateDefault(config1, cl, null);
+
+		StateBackend backend2 = AbstractStateBackend
+				.loadStateBackendFromConfigOrCreateDefault(config2, cl, null);
+
+		assertTrue(backend1 instanceof FsStateBackend);
+		assertTrue(backend2 instanceof FsStateBackend);
+
+		FsStateBackend fs1 = (FsStateBackend) backend1;
+		FsStateBackend fs2 = (FsStateBackend) backend2;
+
+		assertEquals(expectedPath, fs1.getBasePath());
+		assertEquals(expectedPath, fs2.getBasePath());
+		assertEquals(threshold, fs1.getMinFileSizeThreshold());
+		assertEquals(threshold, fs2.getMinFileSizeThreshold());
+	}
+
+	/**
+	 * This test makes sure that failures properly manifest when the state backend could not be loaded.
+	 */
+	@Test
+	public void testLoadingFails() throws Exception {
+		final Configuration config = new Configuration();
+
+		// try a value that is neither recognized as a name, nor corresponds to a class
+		config.setString(backendKey, "does.not.exist");
+		try {
+			AbstractStateBackend.loadStateBackendFromConfigOrCreateDefault(config, cl, null);
+			fail("should fail with an exception");
+		} catch (DynamicCodeLoadingException ignored) {
+			// expected
+		}
+
+		// try a class that is not a factory
+		config.setString(backendKey, java.io.File.class.getName());
+		try {
+			AbstractStateBackend.loadStateBackendFromConfigOrCreateDefault(config, cl, null);
+			fail("should fail with an exception");
+		} catch (DynamicCodeLoadingException ignored) {
+			// expected
+		}
+
+		// a factory that fails
+		config.setString(backendKey, FailingFactory.class.getName());
+		try {
+			AbstractStateBackend.loadStateBackendFromConfigOrCreateDefault(config, cl, null);
+			fail("should fail with an exception");
+		} catch (IOException ignored) {
+			// expected
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	static final class FailingFactory implements StateBackendFactory<StateBackend> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public StateBackend createFromConfig(Configuration config) throws IllegalConfigurationException, IOException {
+			throw new IOException("fail!");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
index 75f1fd4..31e72dd 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
@@ -822,6 +822,7 @@ class JobManagerITCase(_system: ActorSystem)
             60000,
             1,
             ExternalizedCheckpointSettings.none,
+            null,
             true))
 
           // Submit job...
@@ -881,6 +882,7 @@ class JobManagerITCase(_system: ActorSystem)
             60000,
             1,
             ExternalizedCheckpointSettings.none,
+            null,
             true))
 
           // Submit job...
@@ -948,6 +950,7 @@ class JobManagerITCase(_system: ActorSystem)
             60000,
             1,
             ExternalizedCheckpointSettings.none,
+            null,
             true))
 
           // Submit job...

http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index f55ff47..bd018c3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -82,7 +82,7 @@ public class StreamGraphGenerator {
 	public static final int UPPER_BOUND_MAX_PARALLELISM = KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM;
 
 	// The StreamGraph that is being built, this is initialized at the beginning.
-	private StreamGraph streamGraph;
+	private final StreamGraph streamGraph;
 
 	private final StreamExecutionEnvironment env;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index a4bb165..003eff9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -539,6 +539,7 @@ public class StreamingJobGraphGenerator {
 				cfg.getCheckpointTimeout(), cfg.getMinPauseBetweenCheckpoints(),
 				cfg.getMaxConcurrentCheckpoints(),
 				externalizedCheckpointSettings,
+				streamGraph.getStateBackend(),
 				isExactlyOnce);
 
 		jobGraph.setSnapshotSettings(settings);

http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 938ffd2..1e208ee 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -20,9 +20,6 @@ package org.apache.flink.streaming.runtime.tasks;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
@@ -43,13 +40,10 @@ import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.runtime.state.StateBackendFactory;
+import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.StateUtil;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.TaskStateHandles;
-import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.graph.StreamConfig;
@@ -63,6 +57,7 @@ import org.apache.flink.util.CollectionUtil;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FutureUtil;
 import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -147,7 +142,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 	private StreamConfig configuration;
 
 	/** Our state backend. We use this to create checkpoint streams and a keyed state backend. */
-	private AbstractStateBackend stateBackend;
+	private StateBackend stateBackend;
 
 	/** Keyed state backend for the head operator, if it is keyed. There can only ever be one. */
 	private AbstractKeyedStateBackend<?> keyedStateBackend;
@@ -713,61 +708,20 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 	//  State backend
 	// ------------------------------------------------------------------------
 
-	private AbstractStateBackend createStateBackend() throws Exception {
-		AbstractStateBackend stateBackend = configuration.getStateBackend(getUserCodeClassLoader());
+	private StateBackend createStateBackend() throws Exception {
+		final StateBackend fromJob = configuration.getStateBackend(getUserCodeClassLoader());
 
-		if (stateBackend != null) {
+		if (fromJob != null) {
 			// backend has been configured on the environment
 			LOG.info("Using user-defined state backend: {}.", stateBackend);
-		} else {
-			// see if we have a backend specified in the configuration
-			Configuration flinkConfig = getEnvironment().getTaskManagerInfo().getConfiguration();
-			String backendName = flinkConfig.getString(CoreOptions.STATE_BACKEND, null);
-
-			if (backendName == null) {
-				LOG.warn("No state backend has been specified, using default state backend (Memory / JobManager)");
-				backendName = "jobmanager";
-			}
-
-			switch (backendName.toLowerCase()) {
-				case "jobmanager":
-					LOG.info("State backend is set to heap memory (checkpoint to jobmanager)");
-					stateBackend = new MemoryStateBackend();
-					break;
-
-				case "filesystem":
-					FsStateBackend backend = new FsStateBackendFactory().createFromConfig(flinkConfig);
-					LOG.info("State backend is set to heap memory (checkpoints to filesystem \"{}\")",
-						backend.getBasePath());
-					stateBackend = backend;
-					break;
-
-				case "rocksdb":
-					backendName = "org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory";
-					// fall through to the 'default' case that uses reflection to load the backend
-					// that way we can keep RocksDB in a separate module
-
-				default:
-					try {
-						@SuppressWarnings("rawtypes")
-						Class<? extends StateBackendFactory> clazz =
-								Class.forName(backendName, false, getUserCodeClassLoader()).
-										asSubclass(StateBackendFactory.class);
-
-						stateBackend = clazz.newInstance().createFromConfig(flinkConfig);
-					} catch (ClassNotFoundException e) {
-						throw new IllegalConfigurationException("Cannot find configured state backend: " + backendName);
-					} catch (ClassCastException e) {
-						throw new IllegalConfigurationException("The class configured under '" +
-								CoreOptions.STATE_BACKEND.key() + "' is not a valid state backend factory (" +
-								backendName + ')');
-					} catch (Throwable t) {
-						throw new IllegalConfigurationException("Cannot create configured state backend", t);
-					}
-			}
+			return fromJob;
+		}
+		else {
+			return AbstractStateBackend.loadStateBackendFromConfigOrCreateDefault(
+					getEnvironment().getTaskManagerInfo().getConfiguration(),
+					getUserCodeClassLoader(),
+					LOG);
 		}
-
-		return stateBackend;
 	}
 
 	public OperatorStateBackend createOperatorStateBackend(

http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
index 51294ce..e266ea1 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
@@ -183,7 +183,7 @@ public class BlockingCheckpointsTest {
 		public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
 				Environment env, JobID jobID, String operatorIdentifier,
 				TypeSerializer<K> keySerializer, int numberOfKeyGroups,
-				KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry) throws Exception {
+				KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry) {
 
 			throw new UnsupportedOperationException();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 3d01fdd..3826051 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -807,33 +807,39 @@ public class StreamTaskTest extends TestLogger {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public AbstractStateBackend createFromConfig(Configuration config) throws Exception {
+		public AbstractStateBackend createFromConfig(Configuration config) {
 			AbstractStateBackend stateBackendMock = mock(AbstractStateBackend.class);
 
-			Mockito.when(stateBackendMock.createOperatorStateBackend(
-					Mockito.any(Environment.class),
-					Mockito.any(String.class)))
-				.thenAnswer(new Answer<OperatorStateBackend>() {
-					@Override
-					public OperatorStateBackend answer(InvocationOnMock invocationOnMock) throws Throwable {
-						return Mockito.mock(OperatorStateBackend.class);
-					}
-				});
-
-			Mockito.when(stateBackendMock.createKeyedStateBackend(
-					Mockito.any(Environment.class),
-					Mockito.any(JobID.class),
-					Mockito.any(String.class),
-					Mockito.any(TypeSerializer.class),
-					Mockito.any(int.class),
-					Mockito.any(KeyGroupRange.class),
-					Mockito.any(TaskKvStateRegistry.class)))
-				.thenAnswer(new Answer<AbstractKeyedStateBackend>() {
-					@Override
-					public AbstractKeyedStateBackend answer(InvocationOnMock invocationOnMock) throws Throwable {
-						return Mockito.mock(AbstractKeyedStateBackend.class);
-					}
-				});
+			try {
+				Mockito.when(stateBackendMock.createOperatorStateBackend(
+						Mockito.any(Environment.class),
+						Mockito.any(String.class)))
+					.thenAnswer(new Answer<OperatorStateBackend>() {
+						@Override
+						public OperatorStateBackend answer(InvocationOnMock invocationOnMock) throws Throwable {
+							return Mockito.mock(OperatorStateBackend.class);
+						}
+					});
+	
+				Mockito.when(stateBackendMock.createKeyedStateBackend(
+						Mockito.any(Environment.class),
+						Mockito.any(JobID.class),
+						Mockito.any(String.class),
+						Mockito.any(TypeSerializer.class),
+						Mockito.any(int.class),
+						Mockito.any(KeyGroupRange.class),
+						Mockito.any(TaskKvStateRegistry.class)))
+					.thenAnswer(new Answer<AbstractKeyedStateBackend>() {
+						@Override
+						public AbstractKeyedStateBackend answer(InvocationOnMock invocationOnMock) throws Throwable {
+							return Mockito.mock(AbstractKeyedStateBackend.class);
+						}
+					});
+			}
+			catch (Exception e) {
+				// this is needed, because the signatures of the mocked methods throw 'Exception'
+				throw new RuntimeException(e);
+			}
 
 			return stateBackendMock;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
index 79665dd..4677242 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
@@ -109,7 +109,7 @@ public class StateBackendITCase extends StreamingMultipleProgramsTestBase {
 				TypeSerializer<K> keySerializer,
 				int numberOfKeyGroups,
 				KeyGroupRange keyGroupRange,
-				TaskKvStateRegistry kvStateRegistry) throws Exception {
+				TaskKvStateRegistry kvStateRegistry) throws IOException {
 			throw new SuccessException();
 		}
 	}


Mime
View raw message