flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srich...@apache.org
Subject [2/2] flink git commit: [FLINK-7268] [checkpoints] Scope SharedStateRegistry objects per (re)start
Date Fri, 28 Jul 2017 13:42:54 GMT
[FLINK-7268] [checkpoints] Scope SharedStateRegistry objects per (re)start


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/09caa9ff
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/09caa9ff
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/09caa9ff

Branch: refs/heads/release-1.3
Commit: 09caa9ffdc8168610c7d0260360c034ea87f904c
Parents: 0225db2
Author: Stefan Richter <s.richter@data-artisans.com>
Authored: Tue Jul 25 12:04:16 2017 +0200
Committer: Stefan Richter <s.richter@data-artisans.com>
Committed: Fri Jul 28 15:42:28 2017 +0200

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         |  13 +-
 .../checkpoint/CheckpointCoordinator.java       |  32 +-
 .../runtime/checkpoint/CompletedCheckpoint.java |   3 +-
 .../checkpoint/CompletedCheckpointStore.java    |   5 +-
 .../StandaloneCompletedCheckpointStore.java     |   4 +-
 .../ZooKeeperCompletedCheckpointStore.java      |  12 +-
 .../runtime/executiongraph/ExecutionGraph.java  |   6 +-
 .../state/IncrementalKeyedStateHandle.java      |  68 ++-
 .../runtime/state/KeyGroupsStateHandle.java     |   2 +-
 .../runtime/state/MultiStreamStateHandle.java   |  10 +-
 .../runtime/state/SharedStateRegistry.java      |  52 ++-
 .../state/SharedStateRegistryFactory.java       |  35 ++
 .../state/memory/ByteStreamStateHandle.java     |   1 +
 ...tCoordinatorExternalizedCheckpointsTest.java |  22 +-
 .../CheckpointCoordinatorFailureTest.java       |   7 +-
 .../CheckpointCoordinatorMasterHooksTest.java   |   7 +-
 .../checkpoint/CheckpointCoordinatorTest.java   | 437 ++++++++++---------
 .../checkpoint/CheckpointStateRestoreTest.java  |  10 +-
 ...ZooKeeperCompletedCheckpointStoreITCase.java |  25 +-
 .../ZooKeeperCompletedCheckpointStoreTest.java  |   7 +-
 .../state/IncrementalKeyedStateHandleTest.java  |  75 +++-
 .../RecoverableCompletedCheckpointStore.java    |  33 +-
 .../streaming/runtime/tasks/StreamTask.java     |   1 -
 ...tractEventTimeWindowCheckpointingITCase.java |  85 +++-
 ...ckendEventTimeWindowCheckpointingITCase.java |   4 +-
 ...ckendEventTimeWindowCheckpointingITCase.java |   4 +-
 ...ckendEventTimeWindowCheckpointingITCase.java |   4 +-
 ...ckendEventTimeWindowCheckpointingITCase.java |  51 +++
 ...ckendEventTimeWindowCheckpointingITCase.java |   4 +-
 ...ckendEventTimeWindowCheckpointingITCase.java |   4 +-
 ...ckendEventTimeWindowCheckpointingITCase.java |   4 +-
 31 files changed, 688 insertions(+), 339 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index a6b53ec..7e0910e 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -231,7 +231,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		this.restoredKvStateMetaInfos = new HashMap<>();
 		this.materializedSstFiles = new TreeMap<>();
 		this.backendUID = UUID.randomUUID();
-		LOG.debug("Setting initial keyed backend uid for operator {} to {}.", this.operatorIdentifier, this.backendUID);
+
+		LOG.debug("Setting initial backend ID in RocksDBKeyedStateBackend for operator {} to {}.",
+			this.operatorIdentifier,
+			this.backendUID);
 	}
 
 	/**
@@ -835,11 +838,17 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		void takeSnapshot() throws Exception {
 			assert (Thread.holdsLock(stateBackend.asyncSnapshotLock));
 
+			final long lastCompletedCheckpoint;
+
 			// use the last completed checkpoint as the comparison base.
 			synchronized (stateBackend.materializedSstFiles) {
-				baseSstFiles = stateBackend.materializedSstFiles.get(stateBackend.lastCompletedCheckpointId);
+				lastCompletedCheckpoint = stateBackend.lastCompletedCheckpointId;
+				baseSstFiles = stateBackend.materializedSstFiles.get(lastCompletedCheckpoint);
 			}
 
+			LOG.trace("Taking incremental snapshot for checkpoint {}. Snapshot is based on last completed checkpoint {} " +
+				"assuming the following (shared) files as base: {}.", checkpointId, lastCompletedCheckpoint, baseSstFiles);
+
 			// save meta data
 			for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry
 					: stateBackend.kvStateInformation.entrySet()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 82933ac..fe94d25 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -42,6 +42,7 @@ import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.SharedStateRegistryFactory;
 import org.apache.flink.runtime.state.TaskStateHandles;
 import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
 import org.apache.flink.util.Preconditions;
@@ -174,8 +175,11 @@ public class CheckpointCoordinator {
 	@Nullable
 	private CheckpointStatsTracker statsTracker;
 
+	/** A factory for SharedStateRegistry objects */
+	private final SharedStateRegistryFactory sharedStateRegistryFactory;
+
 	/** Registry that tracks state which is shared across (incremental) checkpoints */
-	private final SharedStateRegistry sharedStateRegistry;
+	private SharedStateRegistry sharedStateRegistry;
 
 	// --------------------------------------------------------------------------------------------
 
@@ -192,7 +196,8 @@ public class CheckpointCoordinator {
 			CheckpointIDCounter checkpointIDCounter,
 			CompletedCheckpointStore completedCheckpointStore,
 			@Nullable String checkpointDirectory,
-			Executor executor) {
+			Executor executor,
+			SharedStateRegistryFactory sharedStateRegistryFactory) {
 
 		// sanity checks
 		checkArgument(baseInterval > 0, "Checkpoint timeout must be larger than zero");
@@ -230,7 +235,8 @@ public class CheckpointCoordinator {
 		this.completedCheckpointStore = checkNotNull(completedCheckpointStore);
 		this.checkpointDirectory = checkpointDirectory;
 		this.executor = checkNotNull(executor);
-		this.sharedStateRegistry = new SharedStateRegistry(executor);
+		this.sharedStateRegistryFactory = checkNotNull(sharedStateRegistryFactory);
+		this.sharedStateRegistry = sharedStateRegistryFactory.create(executor);
 
 		this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
 		this.masterHooks = new HashMap<>();
@@ -1044,10 +1050,23 @@ public class CheckpointCoordinator {
 				throw new IllegalStateException("CheckpointCoordinator is shut down");
 			}
 
-			// Recover the checkpoints
-			completedCheckpointStore.recover(sharedStateRegistry);
+			// We create a new shared state registry object, so that all pending async disposal requests from previous
+			// runs will go against the old object (were they can do no harm).
+			// This must happen under the checkpoint lock.
+			sharedStateRegistry.close();
+			sharedStateRegistry = sharedStateRegistryFactory.create(executor);
+
+			// Recover the checkpoints, TODO this could be done only when there is a new leader, not on each recovery
+			completedCheckpointStore.recover();
+
+			// Now, we re-register all (shared) states from the checkpoint store with the new registry
+			for (CompletedCheckpoint completedCheckpoint : completedCheckpointStore.getAllCheckpoints()) {
+				completedCheckpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
+			}
+
+			LOG.debug("Status of the shared state registry after restore: {}.", sharedStateRegistry);
 
-			// restore from the latest checkpoint
+			// Restore from the latest checkpoint
 			CompletedCheckpoint latest = completedCheckpointStore.getLatestCheckpoint();
 
 			if (latest == null) {
@@ -1121,7 +1140,6 @@ public class CheckpointCoordinator {
 		CompletedCheckpoint savepoint = SavepointLoader.loadAndValidateSavepoint(
 				job, tasks, savepointPath, userClassLoader, allowNonRestored);
 
-		savepoint.registerSharedStatesAfterRestored(sharedStateRegistry);
 		completedCheckpointStore.addCheckpoint(savepoint);
 		
 		// Reset the checkpoint ID counter

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index 56aa19d..76d1580 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -209,6 +209,8 @@ public class CompletedCheckpoint implements Serializable {
 
 	private void doDiscard() throws Exception {
 
+		LOG.trace("Executing discard procedure for {}.", this);
+
 		try {
 			// collect exceptions and continue cleanup
 			Exception exception = null;
@@ -225,7 +227,6 @@ public class CompletedCheckpoint implements Serializable {
 			// discard private state objects
 			try {
 				Collection<OperatorState> values = operatorStates.values();
-				LOG.trace("About to discard operator states {}.", values);
 				StateUtil.bestEffortDiscardAllStateObjects(values);
 			} catch (Exception e) {
 				exception = ExceptionUtils.firstOrSuppressed(e, exception);

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
index 45d407e..82193b5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.state.SharedStateRegistry;
 
 import java.util.List;
 
@@ -33,10 +32,8 @@ public interface CompletedCheckpointStore {
 	 *
 	 * <p>After a call to this method, {@link #getLatestCheckpoint()} returns the latest
 	 * available checkpoint.
-	 *
-	 * @param sharedStateRegistry the shared state registry to register recovered states.
 	 */
-	void recover(SharedStateRegistry sharedStateRegistry) throws Exception;
+	void recover() throws Exception;
 
 	/**
 	 * Adds a {@link CompletedCheckpoint} instance to the list of completed checkpoints.

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
index fbb0198..63e7468 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
-import org.apache.flink.runtime.state.SharedStateRegistry;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,7 +57,7 @@ public class StandaloneCompletedCheckpointStore implements CompletedCheckpointSt
 	}
 
 	@Override
-	public void recover(SharedStateRegistry sharedStateRegistry) throws Exception {
+	public void recover() throws Exception {
 		// Nothing to do
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index c4cb6bc..88dd0d4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -18,20 +18,21 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.utils.ZKPaths;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
-import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
 import org.apache.flink.util.FlinkException;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.ZKPaths;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
@@ -138,14 +139,13 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 	 * that the history of checkpoints is consistent.
 	 */
 	@Override
-	public void recover(SharedStateRegistry sharedStateRegistry) throws Exception {
+	public void recover() throws Exception {
 		LOG.info("Recovering checkpoints from ZooKeeper.");
 
 		// Clear local handles in order to prevent duplicates on
 		// recovery. The local handles should reflect the state
 		// of ZooKeeper.
 		completedCheckpoints.clear();
-		sharedStateRegistry.clear();
 
 		// Get all there is first
 		List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> initialCheckpoints;
@@ -170,8 +170,6 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 			try {
 				completedCheckpoint = retrieveCompletedCheckpoint(checkpointStateHandle);
 				if (completedCheckpoint != null) {
-					// Re-register all shared states in the checkpoint.
-					completedCheckpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
 					completedCheckpoints.add(completedCheckpoint);
 				}
 			} catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index f9d2d69..c105d2d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -66,6 +66,7 @@ import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.query.KvStateLocationRegistry;
+import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.util.SerializedThrowable;
@@ -74,8 +75,8 @@ import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
-
 import org.apache.flink.util.StringUtils;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -459,7 +460,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			checkpointIDCounter,
 			checkpointStore,
 			checkpointDir,
-			ioExecutor);
+			ioExecutor,
+			SharedStateRegistry.DEFAULT_FACTORY);
 
 		// register the master hooks on the checkpoint coordinator
 		for (MasterTriggerRestoreHook<?> hook : masterHooks) {

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
index 0085890..0268b10 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
@@ -65,27 +65,27 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
 	private final UUID backendIdentifier;
 
 	/**
-	 * The key-group range covered by this state handle
+	 * The key-group range covered by this state handle.
 	 */
 	private final KeyGroupRange keyGroupRange;
 
 	/**
-	 * The checkpoint Id
+	 * The checkpoint Id.
 	 */
 	private final long checkpointId;
 
 	/**
-	 * Shared state in the incremental checkpoint. This i
+	 * Shared state in the incremental checkpoint.
 	 */
 	private final Map<StateHandleID, StreamStateHandle> sharedState;
 
 	/**
-	 * Private state in the incremental checkpoint
+	 * Private state in the incremental checkpoint.
 	 */
 	private final Map<StateHandleID, StreamStateHandle> privateState;
 
 	/**
-	 * Primary meta data state of the incremental checkpoint
+	 * Primary meta data state of the incremental checkpoint.
 	 */
 	private final StreamStateHandle metaStateHandle;
 
@@ -143,16 +143,21 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
 
 	@Override
 	public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
-		if (this.keyGroupRange.getIntersection(keyGroupRange) != KeyGroupRange.EMPTY_KEY_GROUP_RANGE) {
-			return this;
-		} else {
-			return null;
-		}
+		return KeyGroupRange.EMPTY_KEY_GROUP_RANGE.equals(this.keyGroupRange.getIntersection(keyGroupRange)) ?
+			null : this;
 	}
 
 	@Override
 	public void discardState() throws Exception {
 
+		SharedStateRegistry registry = this.sharedStateRegistry;
+		final boolean isRegistered = (registry != null);
+
+		LOG.trace("Discarding IncrementalKeyedStateHandle (registered = {}) for checkpoint {} from backend with id {}.",
+			isRegistered,
+			checkpointId,
+			backendIdentifier);
+
 		try {
 			metaStateHandle.discardState();
 		} catch (Exception e) {
@@ -168,19 +173,20 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
 		// If this was not registered, we can delete the shared state. We can simply apply this
 		// to all handles, because all handles that have not been created for the first time for this
 		// are only placeholders at this point (disposing them is a NOP).
-		if (sharedStateRegistry == null) {
-			try {
-				StateUtil.bestEffortDiscardAllStateObjects(sharedState.values());
-			} catch (Exception e) {
-				LOG.warn("Could not properly discard new sst file states.", e);
-			}
-		} else {
+		if (isRegistered) {
 			// If this was registered, we only unregister all our referenced shared states
 			// from the registry.
 			for (StateHandleID stateHandleID : sharedState.keySet()) {
-				sharedStateRegistry.unregisterReference(
+				registry.unregisterReference(
 					createSharedStateRegistryKeyFromFileName(stateHandleID));
 			}
+		} else {
+			// Otherwise, we assume to own those handles and dispose them directly.
+			try {
+				StateUtil.bestEffortDiscardAllStateObjects(sharedState.values());
+			} catch (Exception e) {
+				LOG.warn("Could not properly discard new sst file states.", e);
+			}
 		}
 	}
 
@@ -202,10 +208,21 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
 	@Override
 	public void registerSharedStates(SharedStateRegistry stateRegistry) {
 
-		Preconditions.checkState(sharedStateRegistry == null, "The state handle has already registered its shared states.");
+		// This is a quick check to avoid that we register twice with the same registry. However, the code allows to
+		// register again with a different registry. The implication is that ownership is transferred to this new
+		// registry. This should only happen in case of a restart, when the CheckpointCoordinator creates a new
+		// SharedStateRegistry for the current attempt and the old registry becomes meaningless. We also assume that
+		// an old registry object from a previous run is due to be GCed and will never be used for registration again.
+		Preconditions.checkState(
+			sharedStateRegistry != stateRegistry,
+			"The state handle has already registered its shared states to the given registry.");
 
 		sharedStateRegistry = Preconditions.checkNotNull(stateRegistry);
 
+		LOG.trace("Registering IncrementalKeyedStateHandle for checkpoint {} from backend with id {}.",
+			checkpointId,
+			backendIdentifier);
+
 		for (Map.Entry<StateHandleID, StreamStateHandle> sharedStateHandle : sharedState.entrySet()) {
 			SharedStateRegistryKey registryKey =
 				createSharedStateRegistryKeyFromFileName(sharedStateHandle.getKey());
@@ -284,5 +301,18 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
 		result = 31 * result + getMetaStateHandle().hashCode();
 		return result;
 	}
+
+	@Override
+	public String toString() {
+		return "IncrementalKeyedStateHandle{" +
+			"backendIdentifier=" + backendIdentifier +
+			", keyGroupRange=" + keyGroupRange +
+			", checkpointId=" + checkpointId +
+			", sharedState=" + sharedState +
+			", privateState=" + privateState +
+			", metaStateHandle=" + metaStateHandle +
+			", registered=" + (sharedStateRegistry != null) +
+			'}';
+	}
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
index 8e38ad4..8092f6c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
@@ -141,7 +141,7 @@ public class KeyGroupsStateHandle implements StreamStateHandle, KeyedStateHandle
 	public String toString() {
 		return "KeyGroupsStateHandle{" +
 				"groupRangeOffsets=" + groupRangeOffsets +
-				", data=" + stateHandle +
+				", stateHandle=" + stateHandle +
 				'}';
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java
index b95dace..1960c1c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java
@@ -38,7 +38,7 @@ public class MultiStreamStateHandle implements StreamStateHandle {
 	private final List<StreamStateHandle> stateHandles;
 	private final long stateSize;
 
-	public MultiStreamStateHandle(List<StreamStateHandle> stateHandles) throws IOException {
+	public MultiStreamStateHandle(List<StreamStateHandle> stateHandles) {
 		this.stateHandles = Preconditions.checkNotNull(stateHandles);
 		long calculateSize = 0L;
 		for(StreamStateHandle stateHandle : stateHandles) {
@@ -62,6 +62,14 @@ public class MultiStreamStateHandle implements StreamStateHandle {
 		return stateSize;
 	}
 
+	@Override
+	public String toString() {
+		return "MultiStreamStateHandle{" +
+			"stateHandles=" + stateHandles +
+			", stateSize=" + stateSize +
+			'}';
+	}
+
 	static final class MultiFSDataInputStream extends AbstractMultiFSDataInputStream {
 
 		private final TreeMap<Long, StreamStateHandle> stateHandleMap;

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
index e0ca873..347f30c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
@@ -38,13 +38,24 @@ import java.util.concurrent.Executor;
  * maintain the reference count of {@link StreamStateHandle}s by a key that (logically) identifies
  * them.
  */
-public class SharedStateRegistry {
+public class SharedStateRegistry implements AutoCloseable {
 
 	private static final Logger LOG = LoggerFactory.getLogger(SharedStateRegistry.class);
 
+	/** A singleton object for the default implementation of a {@link SharedStateRegistryFactory} */
+	public static final SharedStateRegistryFactory DEFAULT_FACTORY = new SharedStateRegistryFactory() {
+		@Override
+		public SharedStateRegistry create(Executor deleteExecutor) {
+			return new SharedStateRegistry(deleteExecutor);
+		}
+	};
+
 	/** All registered state objects by an artificial key */
 	private final Map<SharedStateRegistryKey, SharedStateRegistry.SharedStateEntry> registeredStates;
 
+	/** This flag indicates whether or not the registry is open or if close() was called */
+	private boolean open;
+
 	/** Executor for async state deletion */
 	private final Executor asyncDisposalExecutor;
 
@@ -56,6 +67,7 @@ public class SharedStateRegistry {
 	public SharedStateRegistry(Executor asyncDisposalExecutor) {
 		this.registeredStates = new HashMap<>();
 		this.asyncDisposalExecutor = Preconditions.checkNotNull(asyncDisposalExecutor);
+		this.open = true;
 	}
 
 	/**
@@ -82,6 +94,9 @@ public class SharedStateRegistry {
 		SharedStateRegistry.SharedStateEntry entry;
 
 		synchronized (registeredStates) {
+
+			Preconditions.checkState(open, "Attempt to register state to closed SharedStateRegistry.");
+
 			entry = registeredStates.get(registrationKey);
 
 			if (entry == null) {
@@ -96,6 +111,11 @@ public class SharedStateRegistry {
 				// delete if this is a real duplicate
 				if (!Objects.equals(state, entry.stateHandle)) {
 					scheduledStateDeletion = state;
+					LOG.trace("Identified duplicate state registration under key {}. New state {} was determined to " +
+							"be an unnecessary copy of existing state {} and will be dropped.",
+						registrationKey,
+						state,
+						entry.stateHandle);
 				}
 				entry.increaseReferenceCount();
 			}
@@ -112,7 +132,8 @@ public class SharedStateRegistry {
 	 *
 	 * @param registrationKey the shared state for which we release a reference.
 	 * @return the result of the request, consisting of the reference count after this operation
-	 * and the state handle, or null if the state handle was deleted through this request.
+	 * and the state handle, or null if the state handle was deleted through this request. Returns null if the registry
+	 * was previously closed.
 	 */
 	public Result unregisterReference(SharedStateRegistryKey registrationKey) {
 
@@ -123,6 +144,7 @@ public class SharedStateRegistry {
 		SharedStateRegistry.SharedStateEntry entry;
 
 		synchronized (registeredStates) {
+
 			entry = registeredStates.get(registrationKey);
 
 			Preconditions.checkState(entry != null,
@@ -164,10 +186,18 @@ public class SharedStateRegistry {
 		}
 	}
 
+	@Override
+	public String toString() {
+		synchronized (registeredStates) {
+			return "SharedStateRegistry{" +
+				"registeredStates=" + registeredStates +
+				'}';
+		}
+	}
+
 	private void scheduleAsyncDelete(StreamStateHandle streamStateHandle) {
 		// We do the small optimization to not issue discards for placeholders, which are NOPs.
 		if (streamStateHandle != null && !isPlaceholder(streamStateHandle)) {
-
 			LOG.trace("Scheduled delete of state handle {}.", streamStateHandle);
 			asyncDisposalExecutor.execute(
 				new SharedStateRegistry.AsyncDisposalRunnable(streamStateHandle));
@@ -178,6 +208,13 @@ public class SharedStateRegistry {
 		return stateHandle instanceof PlaceholderStreamStateHandle;
 	}
 
+	@Override
+	public void close() {
+		synchronized (registeredStates) {
+			open = false;
+		}
+	}
+
 	/**
 	 * An entry in the registry, tracking the handle and the corresponding reference count.
 	 */
@@ -279,13 +316,4 @@ public class SharedStateRegistry {
 			}
 		}
 	}
-
-	/**
-	 * Clears the registry.
-	 */
-	public void clear() {
-		synchronized (registeredStates) {
-			registeredStates.clear();
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryFactory.java
new file mode 100644
index 0000000..05c9825
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import java.util.concurrent.Executor;
+
+/**
+ * Simple factory to produce {@link SharedStateRegistry} objects.
+ */
+public interface SharedStateRegistryFactory {
+
+	/**
+	 * Factory method for {@link SharedStateRegistry}.
+	 *
+	 * @param deleteExecutor executor used to run (async) deletes.
+	 * @return a SharedStateRegistry object
+	 */
+	SharedStateRegistry create(Executor deleteExecutor);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
index 9ba9d35..3a43d4f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
@@ -95,6 +95,7 @@ public class ByteStreamStateHandle implements StreamStateHandle {
 	public String toString() {
 		return "ByteStreamStateHandle{" +
 			"handleName='" + handleName + '\'' +
+			", dataBytes=" + data.length +
 			'}';
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java
index d293eea..edc29fe 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java
@@ -18,14 +18,6 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader;
 import org.apache.flink.runtime.concurrent.Executors;
@@ -37,11 +29,22 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
 /**
  * CheckpointCoordinator tests for externalized checkpoints.
  *
@@ -91,7 +94,8 @@ public class CheckpointCoordinatorExternalizedCheckpointsTest {
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(1),
 			checkpointDir.getAbsolutePath(),
-			Executors.directExecutor());
+			Executors.directExecutor(),
+			SharedStateRegistry.DEFAULT_FACTORY);
 
 		assertEquals(0, coord.getNumberOfPendingCheckpoints());
 		assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
index 344b340..5cca94f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -78,7 +78,8 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
 			new StandaloneCheckpointIDCounter(),
 			new FailingCompletedCheckpointStore(),
 			null,
-			Executors.directExecutor());
+			Executors.directExecutor(),
+			SharedStateRegistry.DEFAULT_FACTORY);
 
 		coord.triggerCheckpoint(triggerTimestamp, false);
 
@@ -113,7 +114,7 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
 		when(subtaskState.getManagedKeyedState()).thenReturn(managedRawHandle);
 		
 		AcknowledgeCheckpoint acknowledgeMessage = new AcknowledgeCheckpoint(jid, executionAttemptId, checkpointId, new CheckpointMetrics(), subtaskState);
-		
+
 		try {
 			coord.receiveAcknowledgeMessage(acknowledgeMessage);
 			fail("Expected a checkpoint exception because the completed checkpoint store could not " +
@@ -136,7 +137,7 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
 	private static final class FailingCompletedCheckpointStore implements CompletedCheckpointStore {
 
 		@Override
-		public void recover(SharedStateRegistry sharedStateRegistry) throws Exception {
+		public void recover() throws Exception {
 			throw new UnsupportedOperationException("Not implemented.");
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
index d6daa4e..94063a9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
@@ -30,9 +30,9 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.state.SharedStateRegistry;
 
 import org.junit.Test;
-
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -47,14 +47,12 @@ import java.util.List;
 import java.util.concurrent.Executor;
 
 import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest.mockExecutionVertex;
-
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.isNull;
 import static org.mockito.Mockito.any;
@@ -405,7 +403,8 @@ public class CheckpointCoordinatorMasterHooksTest {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(10),
 				null,
-				Executors.directExecutor());
+				Executors.directExecutor(),
+				SharedStateRegistry.DEFAULT_FACTORY);
 	}
 
 	private static <T> T mockGeneric(Class<?> clazz) {

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 186a819..16a89ea 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -56,6 +54,9 @@ import org.apache.flink.runtime.util.TestByteStreamStateHandleDeepCompare;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -140,7 +141,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1),
 				null,
-				Executors.directExecutor());
+				Executors.directExecutor(),
+				SharedStateRegistry.DEFAULT_FACTORY);
 
 			// nothing should be happening
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -200,7 +202,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1),
 				null,
-				Executors.directExecutor());
+				Executors.directExecutor(),
+				SharedStateRegistry.DEFAULT_FACTORY);
 
 			// nothing should be happening
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -251,7 +254,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1),
 				null,
-				Executors.directExecutor());
+				Executors.directExecutor(),
+				SharedStateRegistry.DEFAULT_FACTORY);
 
 			// nothing should be happening
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -303,7 +307,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1),
 				null,
-				Executors.directExecutor());
+				Executors.directExecutor(),
+				SharedStateRegistry.DEFAULT_FACTORY);
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -407,7 +412,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1),
 				null,
-				Executors.directExecutor());
+				Executors.directExecutor(),
+				SharedStateRegistry.DEFAULT_FACTORY);
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -526,7 +532,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1),
 				null,
-				Executors.directExecutor());
+				Executors.directExecutor(),
+				SharedStateRegistry.DEFAULT_FACTORY);
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -698,7 +705,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(2),
 				null,
-				Executors.directExecutor());
+				Executors.directExecutor(),
+				SharedStateRegistry.DEFAULT_FACTORY);
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -828,7 +836,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(10),
 				null,
-				Executors.directExecutor());
+				Executors.directExecutor(),
+				SharedStateRegistry.DEFAULT_FACTORY);
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -992,7 +1001,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(2),
 				null,
-				Executors.directExecutor());
+				Executors.directExecutor(),
+				SharedStateRegistry.DEFAULT_FACTORY);
 
 			// trigger a checkpoint, partially acknowledged
 			assertTrue(coord.triggerCheckpoint(timestamp, false));
@@ -1019,8 +1029,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				Thread.sleep(250);
 			}
 			while (!checkpoint.isDiscarded() &&
-					coord.getNumberOfPendingCheckpoints() > 0 &&
-					System.currentTimeMillis() < deadline);
+				coord.getNumberOfPendingCheckpoints() > 0 &&
+				System.currentTimeMillis() < deadline);
 
 			assertTrue("Checkpoint was not canceled by the timeout", checkpoint.isDiscarded());
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -1071,7 +1081,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(2),
 				null,
-				Executors.directExecutor());
+				Executors.directExecutor(),
+				SharedStateRegistry.DEFAULT_FACTORY);
 
 			assertTrue(coord.triggerCheckpoint(timestamp, false));
 
@@ -1134,7 +1145,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(1),
 			null,
-			Executors.directExecutor());
+			Executors.directExecutor(),
+			SharedStateRegistry.DEFAULT_FACTORY);
 
 		assertTrue(coord.triggerCheckpoint(timestamp, false));
 
@@ -1274,7 +1286,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(2),
 				null,
-				Executors.directExecutor());
+				Executors.directExecutor(),
+				SharedStateRegistry.DEFAULT_FACTORY);
 
 
 			coord.startCheckpointScheduler();
@@ -1296,7 +1309,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			int numCallsSoFar = numCalls.get();
 			Thread.sleep(400);
 			assertTrue(numCallsSoFar == numCalls.get() ||
-					numCallsSoFar+1 == numCalls.get());
+				numCallsSoFar+1 == numCalls.get());
 
 			// start another sequence of periodic scheduling
 			numCalls.set(0);
@@ -1318,7 +1331,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			numCallsSoFar = numCalls.get();
 			Thread.sleep(400);
 			assertTrue(numCallsSoFar == numCalls.get() ||
-					numCallsSoFar + 1 == numCalls.get());
+				numCallsSoFar + 1 == numCalls.get());
 
 			coord.shutdown(JobStatus.FINISHED);
 		}
@@ -1354,19 +1367,20 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		final long delay = 50;
 
 		final CheckpointCoordinator coord = new CheckpointCoordinator(
-				jid,
-				2,           // periodic interval is 2 ms
-				200_000,     // timeout is very long (200 s)
-				delay,       // 50 ms delay between checkpoints
-				1,
-				ExternalizedCheckpointSettings.none(),
-				new ExecutionVertex[] { vertex },
-				new ExecutionVertex[] { vertex },
-				new ExecutionVertex[] { vertex },
-				new StandaloneCheckpointIDCounter(),
-				new StandaloneCompletedCheckpointStore(2),
-				"dummy-path",
-				Executors.directExecutor());
+			jid,
+			2,           // periodic interval is 2 ms
+			200_000,     // timeout is very long (200 s)
+			delay,       // 50 ms delay between checkpoints
+			1,
+			ExternalizedCheckpointSettings.none(),
+			new ExecutionVertex[] { vertex },
+			new ExecutionVertex[] { vertex },
+			new ExecutionVertex[] { vertex },
+			new StandaloneCheckpointIDCounter(),
+			new StandaloneCompletedCheckpointStore(2),
+			"dummy-path",
+			Executors.directExecutor(),
+			SharedStateRegistry.DEFAULT_FACTORY);
 
 		try {
 			coord.startCheckpointScheduler();
@@ -1439,7 +1453,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(1),
 			null,
-			Executors.directExecutor());
+			Executors.directExecutor(),
+			SharedStateRegistry.DEFAULT_FACTORY);
 
 		assertEquals(0, coord.getNumberOfPendingCheckpoints());
 		assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -1596,7 +1611,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			counter,
 			new StandaloneCompletedCheckpointStore(10),
 			null,
-			Executors.directExecutor());
+			Executors.directExecutor(),
+			SharedStateRegistry.DEFAULT_FACTORY);
 
 		String savepointDir = tmpFolder.newFolder().getAbsolutePath();
 
@@ -1702,7 +1718,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(2),
 				null,
-				Executors.directExecutor());
+				Executors.directExecutor(),
+				SharedStateRegistry.DEFAULT_FACTORY);
 
 			coord.startCheckpointScheduler();
 
@@ -1715,12 +1732,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				Thread.sleep(20);
 			}
 			while ((now = System.currentTimeMillis()) < minDuration ||
-					(numCalls.get() < maxConcurrentAttempts && now < timeout));
+				(numCalls.get() < maxConcurrentAttempts && now < timeout));
 
 			assertEquals(maxConcurrentAttempts, numCalls.get());
 
 			verify(triggerVertex.getCurrentExecutionAttempt(), times(maxConcurrentAttempts))
-					.triggerCheckpoint(anyLong(), anyLong(), any(CheckpointOptions.class));
+				.triggerCheckpoint(anyLong(), anyLong(), any(CheckpointOptions.class));
 
 			// now, once we acknowledge one checkpoint, it should trigger the next one
 			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID, 1L));
@@ -1775,7 +1792,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(2),
 				null,
-				Executors.directExecutor());
+				Executors.directExecutor(),
+				SharedStateRegistry.DEFAULT_FACTORY);
 
 			coord.startCheckpointScheduler();
 
@@ -1788,7 +1806,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				Thread.sleep(20);
 			}
 			while ((now = System.currentTimeMillis()) < minDuration ||
-					(coord.getNumberOfPendingCheckpoints() < maxConcurrentAttempts && now < timeout));
+				(coord.getNumberOfPendingCheckpoints() < maxConcurrentAttempts && now < timeout));
 
 			// validate that the pending checkpoints are there
 			assertEquals(maxConcurrentAttempts, coord.getNumberOfPendingCheckpoints());
@@ -1806,7 +1824,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				Thread.sleep(20);
 			}
 			while (coord.getPendingCheckpoints().get(4L) == null &&
-					System.currentTimeMillis() < newTimeout);
+				System.currentTimeMillis() < newTimeout);
 
 			// do the final check
 			assertEquals(maxConcurrentAttempts, coord.getNumberOfPendingCheckpoints());
@@ -1837,12 +1855,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
 			final AtomicReference<ExecutionState> currentState = new AtomicReference<>(ExecutionState.CREATED);
 			when(triggerVertex.getCurrentExecutionAttempt().getState()).thenAnswer(
-					new Answer<ExecutionState>() {
-						@Override
-						public ExecutionState answer(InvocationOnMock invocation){
-							return currentState.get();
-						}
-					});
+				new Answer<ExecutionState>() {
+					@Override
+					public ExecutionState answer(InvocationOnMock invocation){
+						return currentState.get();
+					}
+				});
 
 			CheckpointCoordinator coord = new CheckpointCoordinator(
 				jid,
@@ -1857,7 +1875,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(2),
 				null,
-				Executors.directExecutor());
+				Executors.directExecutor(),
+				SharedStateRegistry.DEFAULT_FACTORY);
 
 			coord.startCheckpointScheduler();
 
@@ -1874,7 +1893,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				Thread.sleep(20);
 			}
 			while (System.currentTimeMillis() < timeout &&
-					coord.getNumberOfPendingCheckpoints() == 0);
+				coord.getNumberOfPendingCheckpoints() == 0);
 
 			assertTrue(coord.getNumberOfPendingCheckpoints() > 0);
 		}
@@ -1909,7 +1928,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			checkpointIDCounter,
 			new StandaloneCompletedCheckpointStore(2),
 			null,
-			Executors.directExecutor());
+			Executors.directExecutor(),
+			SharedStateRegistry.DEFAULT_FACTORY);
 
 		List<Future<CompletedCheckpoint>> savepointFutures = new ArrayList<>();
 
@@ -1962,7 +1982,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(2),
 			null,
-			Executors.directExecutor());
+			Executors.directExecutor(),
+			SharedStateRegistry.DEFAULT_FACTORY);
 
 		String savepointDir = tmpFolder.newFolder().getAbsolutePath();
 
@@ -2006,7 +2027,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		allExecutionVertices.addAll(Arrays.asList(jobVertex2.getTaskVertices()));
 
 		ExecutionVertex[] arrayExecutionVertices =
-				allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]);
+			allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]);
 
 		CompletedCheckpointStore store = new RecoverableCompletedCheckpointStore();
 
@@ -2024,7 +2045,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			new StandaloneCheckpointIDCounter(),
 			store,
 			null,
-			Executors.directExecutor());
+			Executors.directExecutor(),
+			SharedStateRegistry.DEFAULT_FACTORY);
 
 		// trigger the checkpoint
 		coord.triggerCheckpoint(timestamp, false);
@@ -2051,11 +2073,11 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			SubtaskState subtaskState = mockSubtaskState(jobVertexID1, index, keyGroupPartitions1.get(index));
 
 			AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
-					jid,
-					jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
-					checkpointId,
-					new CheckpointMetrics(),
-					subtaskState);
+				jid,
+				jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
+				checkpointId,
+				new CheckpointMetrics(),
+				subtaskState);
 
 			coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
 		}
@@ -2064,11 +2086,11 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			SubtaskState subtaskState = mockSubtaskState(jobVertexID2, index, keyGroupPartitions2.get(index));
 
 			AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
-					jid,
-					jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
-					checkpointId,
-					new CheckpointMetrics(),
-					subtaskState);
+				jid,
+				jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
+				checkpointId,
+				new CheckpointMetrics(),
+				subtaskState);
 
 			coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
 		}
@@ -2150,7 +2172,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(1),
 			null,
-			Executors.directExecutor());
+			Executors.directExecutor(),
+			SharedStateRegistry.DEFAULT_FACTORY);
 
 		// trigger the checkpoint
 		coord.triggerCheckpoint(timestamp, false);
@@ -2167,11 +2190,11 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			KeyGroupsStateHandle keyGroupState = generateKeyGroupState(jobVertexID1, keyGroupPartitions1.get(index), false);
 			SubtaskState checkpointStateHandles = new SubtaskState(valueSizeTuple, null, null, keyGroupState, null);
 			AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
-					jid,
-					jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
-					checkpointId,
-					new CheckpointMetrics(),
-					checkpointStateHandles);
+				jid,
+				jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
+				checkpointId,
+				new CheckpointMetrics(),
+				checkpointStateHandles);
 
 			coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
 		}
@@ -2182,11 +2205,11 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			KeyGroupsStateHandle keyGroupState = generateKeyGroupState(jobVertexID2, keyGroupPartitions2.get(index), false);
 			SubtaskState checkpointStateHandles = new SubtaskState(valueSizeTuple, null, null, keyGroupState, null);
 			AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
-					jid,
-					jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
-					checkpointId,
-					new CheckpointMetrics(),
-					checkpointStateHandles);
+				jid,
+				jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
+				checkpointId,
+				new CheckpointMetrics(),
+				checkpointStateHandles);
 
 			coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
 		}
@@ -2251,7 +2274,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		allExecutionVertices.addAll(Arrays.asList(jobVertex2.getTaskVertices()));
 
 		ExecutionVertex[] arrayExecutionVertices =
-				allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]);
+			allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]);
 
 		// set up the coordinator and validate the initial state
 		CheckpointCoordinator coord = new CheckpointCoordinator(
@@ -2267,7 +2290,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(1),
 			null,
-			Executors.directExecutor());
+			Executors.directExecutor(),
+			SharedStateRegistry.DEFAULT_FACTORY);
 
 		// trigger the checkpoint
 		coord.triggerCheckpoint(timestamp, false);
@@ -2277,22 +2301,22 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L);
 
 		List<KeyGroupRange> keyGroupPartitions1 =
-				StateAssignmentOperation.createKeyGroupPartitions(maxParallelism1, parallelism1);
+			StateAssignmentOperation.createKeyGroupPartitions(maxParallelism1, parallelism1);
 		List<KeyGroupRange> keyGroupPartitions2 =
-				StateAssignmentOperation.createKeyGroupPartitions(maxParallelism2, parallelism2);
+			StateAssignmentOperation.createKeyGroupPartitions(maxParallelism2, parallelism2);
 
 		for (int index = 0; index < jobVertex1.getParallelism(); index++) {
 			ChainedStateHandle<StreamStateHandle> valueSizeTuple = generateStateForVertex(jobVertexID1, index);
 			KeyGroupsStateHandle keyGroupState = generateKeyGroupState(
-					jobVertexID1, keyGroupPartitions1.get(index), false);
+				jobVertexID1, keyGroupPartitions1.get(index), false);
 
 			SubtaskState checkpointStateHandles = new SubtaskState(valueSizeTuple, null, null, keyGroupState, null);
 			AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
-					jid,
-					jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
-					checkpointId,
-					new CheckpointMetrics(),
-					checkpointStateHandles);
+				jid,
+				jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
+				checkpointId,
+				new CheckpointMetrics(),
+				checkpointStateHandles);
 
 			coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
 		}
@@ -2302,15 +2326,15 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
 			ChainedStateHandle<StreamStateHandle> state = generateStateForVertex(jobVertexID2, index);
 			KeyGroupsStateHandle keyGroupState = generateKeyGroupState(
-					jobVertexID2, keyGroupPartitions2.get(index), false);
+				jobVertexID2, keyGroupPartitions2.get(index), false);
 
 			SubtaskState checkpointStateHandles = new SubtaskState(state, null, null, keyGroupState, null);
 			AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
-					jid,
-					jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
-					checkpointId,
-					new CheckpointMetrics(),
-					checkpointStateHandles);
+				jid,
+				jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
+				checkpointId,
+				new CheckpointMetrics(),
+				checkpointStateHandles);
 
 			coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
 		}
@@ -2390,13 +2414,13 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		int newParallelism2 = scaleOut ? 13 : 2;
 
 		final ExecutionJobVertex jobVertex1 = mockExecutionJobVertex(
-				jobVertexID1,
-				parallelism1,
-				maxParallelism1);
+			jobVertexID1,
+			parallelism1,
+			maxParallelism1);
 		final ExecutionJobVertex jobVertex2 = mockExecutionJobVertex(
-				jobVertexID2,
-				parallelism2,
-				maxParallelism2);
+			jobVertexID2,
+			parallelism2,
+			maxParallelism2);
 
 		List<ExecutionVertex> allExecutionVertices = new ArrayList<>(parallelism1 + parallelism2);
 
@@ -2404,7 +2428,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		allExecutionVertices.addAll(Arrays.asList(jobVertex2.getTaskVertices()));
 
 		ExecutionVertex[] arrayExecutionVertices =
-				allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]);
+			allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]);
 
 		// set up the coordinator and validate the initial state
 		CheckpointCoordinator coord = new CheckpointCoordinator(
@@ -2420,7 +2444,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(1),
 			null,
-			Executors.directExecutor());
+			Executors.directExecutor(),
+			SharedStateRegistry.DEFAULT_FACTORY);
 
 		// trigger the checkpoint
 		coord.triggerCheckpoint(timestamp, false);
@@ -2430,9 +2455,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L);
 
 		List<KeyGroupRange> keyGroupPartitions1 =
-				StateAssignmentOperation.createKeyGroupPartitions(maxParallelism1, parallelism1);
+			StateAssignmentOperation.createKeyGroupPartitions(maxParallelism1, parallelism1);
 		List<KeyGroupRange> keyGroupPartitions2 =
-				StateAssignmentOperation.createKeyGroupPartitions(maxParallelism2, parallelism2);
+			StateAssignmentOperation.createKeyGroupPartitions(maxParallelism2, parallelism2);
 
 		//vertex 1
 		for (int index = 0; index < jobVertex1.getParallelism(); index++) {
@@ -2443,11 +2468,11 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
 			SubtaskState checkpointStateHandles = new SubtaskState(valueSizeTuple, opStateBackend, null, keyedStateBackend, keyedStateRaw);
 			AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
-					jid,
-					jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
-					checkpointId,
-					new CheckpointMetrics(),
-					checkpointStateHandles);
+				jid,
+				jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
+				checkpointId,
+				new CheckpointMetrics(),
+				checkpointStateHandles);
 
 			coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
 		}
@@ -2463,14 +2488,14 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			expectedOpStatesBackend.add(opStateBackend);
 			expectedOpStatesRaw.add(opStateRaw);
 			SubtaskState checkpointStateHandles =
-					new SubtaskState(new ChainedStateHandle<>(
-							Collections.<StreamStateHandle>singletonList(null)), opStateBackend, opStateRaw, keyedStateBackend, keyedStateRaw);
+				new SubtaskState(new ChainedStateHandle<>(
+					Collections.<StreamStateHandle>singletonList(null)), opStateBackend, opStateRaw, keyedStateBackend, keyedStateRaw);
 			AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
-					jid,
-					jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
-					checkpointId,
-					new CheckpointMetrics(),
-					checkpointStateHandles);
+				jid,
+				jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
+				checkpointId,
+				new CheckpointMetrics(),
+				checkpointStateHandles);
 
 			coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
 		}
@@ -2482,18 +2507,18 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		Map<JobVertexID, ExecutionJobVertex> tasks = new HashMap<>();
 
 		List<KeyGroupRange> newKeyGroupPartitions2 =
-				StateAssignmentOperation.createKeyGroupPartitions(maxParallelism2, newParallelism2);
+			StateAssignmentOperation.createKeyGroupPartitions(maxParallelism2, newParallelism2);
 
 		final ExecutionJobVertex newJobVertex1 = mockExecutionJobVertex(
-				jobVertexID1,
-				parallelism1,
-				maxParallelism1);
+			jobVertexID1,
+			parallelism1,
+			maxParallelism1);
 
 		// rescale vertex 2
 		final ExecutionJobVertex newJobVertex2 = mockExecutionJobVertex(
-				jobVertexID2,
-				newParallelism2,
-				maxParallelism2);
+			jobVertexID2,
+			newParallelism2,
+			maxParallelism2);
 
 		tasks.put(jobVertexID1, newJobVertex1);
 		tasks.put(jobVertexID2, newJobVertex2);
@@ -2534,7 +2559,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		OperatorID operatorID = OperatorID.fromJobVertexID(jobVertexID);
 		return new Tuple2<>(jobVertexID, operatorID);
 	}
-	
+
 	/**
 	 * old topology
 	 * [operator1,operator2] * parallelism1 -> [operator3,operator4] * parallelism2
@@ -2575,7 +2600,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			OperatorState taskState = new OperatorState(id.f1, parallelism1, maxParallelism1);
 			operatorStates.put(id.f1, taskState);
 			for (int index = 0; index < taskState.getParallelism(); index++) {
-				StreamStateHandle subNonPartitionedState = 
+				StreamStateHandle subNonPartitionedState =
 					generateStateForVertex(id.f0, index)
 						.get(0);
 				OperatorStateHandle subManagedOperatorState =
@@ -2673,15 +2698,15 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			spy(new StandaloneCompletedCheckpointStore(1));
 
 		CompletedCheckpoint completedCheckpoint = new CompletedCheckpoint(
-				jobID,
-				2,
-				System.currentTimeMillis(),
-				System.currentTimeMillis() + 3000,
-				operatorStates,
-				Collections.<MasterState>emptyList(),
-				CheckpointProperties.forStandardCheckpoint(),
-				null,
-				null);
+			jobID,
+			2,
+			System.currentTimeMillis(),
+			System.currentTimeMillis() + 3000,
+			operatorStates,
+			Collections.<MasterState>emptyList(),
+			CheckpointProperties.forStandardCheckpoint(),
+			null,
+			null);
 
 		when(standaloneCompletedCheckpointStore.getLatestCheckpoint()).thenReturn(completedCheckpoint);
 
@@ -2699,7 +2724,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			new StandaloneCheckpointIDCounter(),
 			standaloneCompletedCheckpointStore,
 			null,
-			Executors.directExecutor());
+			Executors.directExecutor(),
+			SharedStateRegistry.DEFAULT_FACTORY);
 
 		coord.restoreLatestCheckpointedState(tasks, false, true);
 
@@ -2832,7 +2858,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1),
 				"fake-directory",
-				Executors.directExecutor());
+				Executors.directExecutor(),
+				SharedStateRegistry.DEFAULT_FACTORY);
 
 			assertTrue(coord.triggerCheckpoint(timestamp, false));
 
@@ -2862,14 +2889,14 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
 		OperatorStateRepartitioner repartitioner = RoundRobinOperatorStateRepartitioner.INSTANCE;
 		List<Collection<OperatorStateHandle>> repartitionedStates =
-				repartitioner.repartitionState(Collections.singletonList(osh), 3);
+			repartitioner.repartitionState(Collections.singletonList(osh), 3);
 
 		Map<String, Integer> checkCounts = new HashMap<>(3);
 
 		for (Collection<OperatorStateHandle> operatorStateHandles : repartitionedStates) {
 			for (OperatorStateHandle operatorStateHandle : operatorStateHandles) {
 				for (Map.Entry<String, OperatorStateHandle.StateMetaInfo> stateNameToMetaInfo :
-						operatorStateHandle.getStateNameToPartitionOffsets().entrySet()) {
+					operatorStateHandle.getStateNameToPartitionOffsets().entrySet()) {
 
 					String stateName = stateNameToMetaInfo.getKey();
 					Integer count = checkCounts.get(stateName);
@@ -2900,8 +2927,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 	// ------------------------------------------------------------------------
 
 	public static KeyGroupsStateHandle generateKeyGroupState(
-			JobVertexID jobVertexID,
-			KeyGroupRange keyGroupPartition, boolean rawState) throws IOException {
+		JobVertexID jobVertexID,
+		KeyGroupRange keyGroupPartition, boolean rawState) throws IOException {
 
 		List<Integer> testStatesLists = new ArrayList<>(keyGroupPartition.getNumberOfKeyGroups());
 
@@ -2918,27 +2945,27 @@ public class CheckpointCoordinatorTest extends TestLogger {
 	}
 
 	public static KeyGroupsStateHandle generateKeyGroupState(
-			KeyGroupRange keyGroupRange,
-			List<? extends Serializable> states) throws IOException {
+		KeyGroupRange keyGroupRange,
+		List<? extends Serializable> states) throws IOException {
 
 		Preconditions.checkArgument(keyGroupRange.getNumberOfKeyGroups() == states.size());
 
 		Tuple2<byte[], List<long[]>> serializedDataWithOffsets =
-				serializeTogetherAndTrackOffsets(Collections.<List<? extends Serializable>>singletonList(states));
+			serializeTogetherAndTrackOffsets(Collections.<List<? extends Serializable>>singletonList(states));
 
 		KeyGroupRangeOffsets keyGroupRangeOffsets = new KeyGroupRangeOffsets(keyGroupRange, serializedDataWithOffsets.f1.get(0));
 
 		ByteStreamStateHandle allSerializedStatesHandle = new TestByteStreamStateHandleDeepCompare(
-				String.valueOf(UUID.randomUUID()),
-				serializedDataWithOffsets.f0);
+			String.valueOf(UUID.randomUUID()),
+			serializedDataWithOffsets.f0);
 		KeyGroupsStateHandle keyGroupsStateHandle = new KeyGroupsStateHandle(
-				keyGroupRangeOffsets,
-				allSerializedStatesHandle);
+			keyGroupRangeOffsets,
+			allSerializedStatesHandle);
 		return keyGroupsStateHandle;
 	}
 
 	public static Tuple2<byte[], List<long[]>> serializeTogetherAndTrackOffsets(
-			List<List<? extends Serializable>> serializables) throws IOException {
+		List<List<? extends Serializable>> serializables) throws IOException {
 
 		List<long[]> offsets = new ArrayList<>(serializables.size());
 		List<byte[]> serializedGroupValues = new ArrayList<>();
@@ -2962,19 +2989,19 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		runningGroupsOffset = 0;
 		for (byte[] serializedGroupValue : serializedGroupValues) {
 			System.arraycopy(
-					serializedGroupValue,
-					0,
-					allSerializedValuesConcatenated,
-					runningGroupsOffset,
-					serializedGroupValue.length);
+				serializedGroupValue,
+				0,
+				allSerializedValuesConcatenated,
+				runningGroupsOffset,
+				serializedGroupValue.length);
 			runningGroupsOffset += serializedGroupValue.length;
 		}
 		return new Tuple2<>(allSerializedValuesConcatenated, offsets);
 	}
 
 	public static ChainedStateHandle<StreamStateHandle> generateStateForVertex(
-			JobVertexID jobVertexID,
-			int index) throws IOException {
+		JobVertexID jobVertexID,
+		int index) throws IOException {
 
 		Random random = new Random(jobVertexID.hashCode() + index);
 		int value = random.nextInt();
@@ -2982,17 +3009,17 @@ public class CheckpointCoordinatorTest extends TestLogger {
 	}
 
 	public static ChainedStateHandle<StreamStateHandle> generateChainedStateHandle(
-			Serializable value) throws IOException {
+		Serializable value) throws IOException {
 		return ChainedStateHandle.wrapSingleHandle(
-				TestByteStreamStateHandleDeepCompare.fromSerializable(String.valueOf(UUID.randomUUID()), value));
+			TestByteStreamStateHandleDeepCompare.fromSerializable(String.valueOf(UUID.randomUUID()), value));
 	}
 
 	public static ChainedStateHandle<OperatorStateHandle> generateChainedPartitionableStateHandle(
-			JobVertexID jobVertexID,
-			int index,
-			int namedStates,
-			int partitionsPerState,
-			boolean rawState) throws IOException {
+		JobVertexID jobVertexID,
+		int index,
+		int namedStates,
+		int partitionsPerState,
+		boolean rawState) throws IOException {
 
 		Map<String, List<? extends Serializable>> statesListsMap = new HashMap<>(namedStates);
 
@@ -3015,7 +3042,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 	}
 
 	private static ChainedStateHandle<OperatorStateHandle> generateChainedPartitionableStateHandle(
-			Map<String, List<? extends Serializable>> states) throws IOException {
+		Map<String, List<? extends Serializable>> states) throws IOException {
 
 		List<List<? extends Serializable>> namedStateSerializables = new ArrayList<>(states.size());
 
@@ -3030,26 +3057,26 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		int idx = 0;
 		for (Map.Entry<String, List<? extends Serializable>> entry : states.entrySet()) {
 			offsetsMap.put(
-					entry.getKey(),
-					new OperatorStateHandle.StateMetaInfo(
-							serializationWithOffsets.f1.get(idx),
-							OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
+				entry.getKey(),
+				new OperatorStateHandle.StateMetaInfo(
+					serializationWithOffsets.f1.get(idx),
+					OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
 			++idx;
 		}
 
 		ByteStreamStateHandle streamStateHandle = new TestByteStreamStateHandleDeepCompare(
-				String.valueOf(UUID.randomUUID()),
-				serializationWithOffsets.f0);
+			String.valueOf(UUID.randomUUID()),
+			serializationWithOffsets.f0);
 
 		OperatorStateHandle operatorStateHandle =
-				new OperatorStateHandle(offsetsMap, streamStateHandle);
+			new OperatorStateHandle(offsetsMap, streamStateHandle);
 		return ChainedStateHandle.wrapSingleHandle(operatorStateHandle);
 	}
 
 	static ExecutionJobVertex mockExecutionJobVertex(
-			JobVertexID jobVertexID,
-			int parallelism,
-			int maxParallelism) {
+		JobVertexID jobVertexID,
+		int parallelism,
+		int maxParallelism) {
 
 		return mockExecutionJobVertex(
 			jobVertexID,
@@ -3131,7 +3158,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
 		ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class);
 		when(jobVertex.getOperatorIDs()).thenReturn(jobVertexIDs);
-		
+
 		when(vertex.getJobVertex()).thenReturn(jobVertex);
 
 		return vertex;
@@ -3158,8 +3185,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 	}
 
 	public static void verifyStateRestore(
-			JobVertexID jobVertexID, ExecutionJobVertex executionJobVertex,
-			List<KeyGroupRange> keyGroupPartitions) throws Exception {
+		JobVertexID jobVertexID, ExecutionJobVertex executionJobVertex,
+		List<KeyGroupRange> keyGroupPartitions) throws Exception {
 
 		for (int i = 0; i < executionJobVertex.getParallelism(); i++) {
 
@@ -3168,28 +3195,28 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			ChainedStateHandle<StreamStateHandle> expectNonPartitionedState = generateStateForVertex(jobVertexID, i);
 			ChainedStateHandle<StreamStateHandle> actualNonPartitionedState = taskStateHandles.getLegacyOperatorState();
 			assertTrue(CommonTestUtils.isSteamContentEqual(
-					expectNonPartitionedState.get(0).openInputStream(),
-					actualNonPartitionedState.get(0).openInputStream()));
+				expectNonPartitionedState.get(0).openInputStream(),
+				actualNonPartitionedState.get(0).openInputStream()));
 
 			ChainedStateHandle<OperatorStateHandle> expectedOpStateBackend =
-					generateChainedPartitionableStateHandle(jobVertexID, i, 2, 8, false);
+				generateChainedPartitionableStateHandle(jobVertexID, i, 2, 8, false);
 
 			List<Collection<OperatorStateHandle>> actualPartitionableState = taskStateHandles.getManagedOperatorState();
 
 			assertTrue(CommonTestUtils.isSteamContentEqual(
-					expectedOpStateBackend.get(0).openInputStream(),
-					actualPartitionableState.get(0).iterator().next().openInputStream()));
+				expectedOpStateBackend.get(0).openInputStream(),
+				actualPartitionableState.get(0).iterator().next().openInputStream()));
 
 			KeyGroupsStateHandle expectPartitionedKeyGroupState = generateKeyGroupState(
-					jobVertexID, keyGroupPartitions.get(i), false);
+				jobVertexID, keyGroupPartitions.get(i), false);
 			Collection<KeyedStateHandle> actualPartitionedKeyGroupState = taskStateHandles.getManagedKeyedState();
 			compareKeyedState(Collections.singletonList(expectPartitionedKeyGroupState), actualPartitionedKeyGroupState);
 		}
 	}
 
 	public static void compareKeyedState(
-			Collection<KeyGroupsStateHandle> expectPartitionedKeyGroupState,
-			Collection<? extends KeyedStateHandle> actualPartitionedKeyGroupState) throws Exception {
+		Collection<KeyGroupsStateHandle> expectPartitionedKeyGroupState,
+		Collection<? extends KeyedStateHandle> actualPartitionedKeyGroupState) throws Exception {
 
 		KeyGroupsStateHandle expectedHeadOpKeyGroupStateHandle = expectPartitionedKeyGroupState.iterator().next();
 		int expectedTotalKeyGroups = expectedHeadOpKeyGroupStateHandle.getKeyGroupRange().getNumberOfKeyGroups();
@@ -3207,7 +3234,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				long offset = expectedHeadOpKeyGroupStateHandle.getOffsetForKeyGroup(groupId);
 				inputStream.seek(offset);
 				int expectedKeyGroupState =
-						InstantiationUtil.deserializeObject(inputStream, Thread.currentThread().getContextClassLoader());
+					InstantiationUtil.deserializeObject(inputStream, Thread.currentThread().getContextClassLoader());
 				for (KeyedStateHandle oneActualKeyedStateHandle : actualPartitionedKeyGroupState) {
 
 					assertTrue(oneActualKeyedStateHandle instanceof KeyGroupsStateHandle);
@@ -3218,7 +3245,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 						try (FSDataInputStream actualInputStream = oneActualKeyGroupStateHandle.openInputStream()) {
 							actualInputStream.seek(actualOffset);
 							int actualGroupState = InstantiationUtil.
-									deserializeObject(actualInputStream, Thread.currentThread().getContextClassLoader());
+								deserializeObject(actualInputStream, Thread.currentThread().getContextClassLoader());
 							assertEquals(expectedKeyGroupState, actualGroupState);
 						}
 					}
@@ -3228,8 +3255,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 	}
 
 	public static void comparePartitionableState(
-			List<ChainedStateHandle<OperatorStateHandle>> expected,
-			List<List<Collection<OperatorStateHandle>>> actual) throws Exception {
+		List<ChainedStateHandle<OperatorStateHandle>> expected,
+		List<List<Collection<OperatorStateHandle>>> actual) throws Exception {
 
 		List<String> expectedResult = new ArrayList<>();
 		for (ChainedStateHandle<OperatorStateHandle> chainedStateHandle : expected) {
@@ -3263,7 +3290,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				for (long offset : entry.getValue().getOffsets()) {
 					in.seek(offset);
 					Integer state = InstantiationUtil.
-							deserializeObject(in, Thread.currentThread().getContextClassLoader());
+						deserializeObject(in, Thread.currentThread().getContextClassLoader());
 					resultCollector.add(opIdx + " : " + entry.getKey() + " : " + state);
 				}
 			}
@@ -3308,24 +3335,25 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(1),
 			null,
-			Executors.directExecutor());
+			Executors.directExecutor(),
+			SharedStateRegistry.DEFAULT_FACTORY);
 
 		// Periodic
 		CheckpointTriggerResult triggerResult = coord.triggerCheckpoint(
-				System.currentTimeMillis(),
-				CheckpointProperties.forStandardCheckpoint(),
-				null,
-				true);
+			System.currentTimeMillis(),
+			CheckpointProperties.forStandardCheckpoint(),
+			null,
+			true);
 
 		assertTrue(triggerResult.isFailure());
 		assertEquals(CheckpointDeclineReason.PERIODIC_SCHEDULER_SHUTDOWN, triggerResult.getFailureReason());
 
 		// Not periodic
 		triggerResult = coord.triggerCheckpoint(
-				System.currentTimeMillis(),
-				CheckpointProperties.forStandardCheckpoint(),
-				null,
-				false);
+			System.currentTimeMillis(),
+			CheckpointProperties.forStandardCheckpoint(),
+			null,
+			false);
 
 		assertFalse(triggerResult.isFailure());
 	}
@@ -3352,12 +3380,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			int maxPartitionsPerState = 1 + r.nextInt(9);
 
 			doTestPartitionableStateRepartitioning(
-					r, oldParallelism, newParallelism, numNamedStates, maxPartitionsPerState);
+				r, oldParallelism, newParallelism, numNamedStates, maxPartitionsPerState);
 		}
 	}
 
 	private void doTestPartitionableStateRepartitioning(
-			Random r, int oldParallelism, int newParallelism, int numNamedStates, int maxPartitionsPerState) {
+		Random r, int oldParallelism, int newParallelism, int numNamedStates, int maxPartitionsPerState) {
 
 		List<OperatorStateHandle> previousParallelOpInstanceStates = new ArrayList<>(oldParallelism);
 
@@ -3374,15 +3402,15 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				}
 
 				OperatorStateHandle.Mode mode = r.nextInt(10) == 0 ?
-						OperatorStateHandle.Mode.BROADCAST : OperatorStateHandle.Mode.SPLIT_DISTRIBUTE;
+					OperatorStateHandle.Mode.BROADCAST : OperatorStateHandle.Mode.SPLIT_DISTRIBUTE;
 				namedStatesToOffsets.put(
-						"State-" + s,
-						new OperatorStateHandle.StateMetaInfo(offs, mode));
+					"State-" + s,
+					new OperatorStateHandle.StateMetaInfo(offs, mode));
 
 			}
 
 			previousParallelOpInstanceStates.add(
-					new OperatorStateHandle(namedStatesToOffsets, new FileStateHandle(fakePath, -1)));
+				new OperatorStateHandle(namedStatesToOffsets, new FileStateHandle(fakePath, -1)));
 		}
 
 		Map<StreamStateHandle, Map<String, List<Long>>> expected = new HashMap<>();
@@ -3395,7 +3423,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
 				long[] offs = e.getValue().getOffsets();
 				int replication = e.getValue().getDistributionMode().equals(OperatorStateHandle.Mode.BROADCAST) ?
-						newParallelism : 1;
+					newParallelism : 1;
 
 				expectedTotalPartitions += replication * offs.length;
 				List<Long> offsList = new ArrayList<>(offs.length);
@@ -3413,7 +3441,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		OperatorStateRepartitioner repartitioner = RoundRobinOperatorStateRepartitioner.INSTANCE;
 
 		List<Collection<OperatorStateHandle>> pshs =
-				repartitioner.repartitionState(previousParallelOpInstanceStates, newParallelism);
+			repartitioner.repartitionState(previousParallelOpInstanceStates, newParallelism);
 
 		Map<StreamStateHandle, Map<String, List<Long>>> actual = new HashMap<>();
 
@@ -3486,7 +3514,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(1),
 			null,
-			Executors.directExecutor());
+			Executors.directExecutor(),
+			SharedStateRegistry.DEFAULT_FACTORY);
 
 		CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
 		coord.setCheckpointStatsTracker(tracker);
@@ -3524,7 +3553,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			new StandaloneCheckpointIDCounter(),
 			store,
 			null,
-			Executors.directExecutor());
+			Executors.directExecutor(),
+			SharedStateRegistry.DEFAULT_FACTORY);
 
 		store.addCheckpoint(new CompletedCheckpoint(
 			new JobID(),
@@ -3580,7 +3610,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			checkpointIDCounter,
 			completedCheckpointStore,
 			null,
-			Executors.directExecutor());
+			Executors.directExecutor(),
+			SharedStateRegistry.DEFAULT_FACTORY);
 
 		// trigger a first checkpoint
 		assertTrue(

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index 7d24568..0888cff 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.TaskStateHandles;
 import org.apache.flink.runtime.util.SerializableObject;
@@ -109,7 +110,8 @@ public class CheckpointStateRestoreTest {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1),
 				null,
-				Executors.directExecutor());
+				Executors.directExecutor(),
+				SharedStateRegistry.DEFAULT_FACTORY);
 
 			// create ourselves a checkpoint with state
 			final long timestamp = 34623786L;
@@ -183,7 +185,8 @@ public class CheckpointStateRestoreTest {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1),
 				null,
-				Executors.directExecutor());
+				Executors.directExecutor(),
+				SharedStateRegistry.DEFAULT_FACTORY);
 
 			try {
 				coord.restoreLatestCheckpointedState(new HashMap<JobVertexID, ExecutionJobVertex>(), true, false);
@@ -240,7 +243,8 @@ public class CheckpointStateRestoreTest {
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(1),
 			null,
-			Executors.directExecutor());
+			Executors.directExecutor(),
+			SharedStateRegistry.DEFAULT_FACTORY);
 
 		StreamStateHandle serializedState = CheckpointCoordinatorTest
 				.generateChainedStateHandle(new SerializableObject())


Mime
View raw message