flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srich...@apache.org
Subject [3/7] flink git commit: [FLINK-7268] [checkpoints] Scope SharedStateRegistry objects per (re)start
Date Tue, 15 Aug 2017 12:57:11 GMT
[FLINK-7268] [checkpoints] Scope SharedStateRegistry objects per (re)start


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/91a4b276
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/91a4b276
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/91a4b276

Branch: refs/heads/master
Commit: 91a4b276171afb760bfff9ccf30593e648e91dfb
Parents: b71154a
Author: Stefan Richter <s.richter@data-artisans.com>
Authored: Tue Jul 25 12:04:16 2017 +0200
Committer: Stefan Richter <s.richter@data-artisans.com>
Committed: Tue Aug 15 14:56:54 2017 +0200

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         |  13 +-
 .../checkpoint/CheckpointCoordinator.java       |  32 +-
 .../runtime/checkpoint/CompletedCheckpoint.java |   2 +
 .../checkpoint/CompletedCheckpointStore.java    |   5 +-
 .../StandaloneCompletedCheckpointStore.java     |   4 +-
 .../ZooKeeperCompletedCheckpointStore.java      |  12 +-
 .../runtime/executiongraph/ExecutionGraph.java  |   6 +-
 .../executiongraph/ExecutionJobVertex.java      |   2 +-
 .../flink/runtime/jobmaster/JobMaster.java      |   2 +-
 .../state/IncrementalKeyedStateHandle.java      |  68 ++--
 .../runtime/state/KeyGroupsStateHandle.java     |   2 +-
 .../runtime/state/MultiStreamStateHandle.java   |  10 +-
 .../runtime/state/SharedStateRegistry.java      |  52 ++-
 .../state/SharedStateRegistryFactory.java       |  35 ++
 .../state/memory/ByteStreamStateHandle.java     |   1 +
 ...tCoordinatorExternalizedCheckpointsTest.java |  22 +-
 .../CheckpointCoordinatorFailureTest.java       |   7 +-
 .../CheckpointCoordinatorMasterHooksTest.java   |   7 +-
 .../checkpoint/CheckpointCoordinatorTest.java   | 341 +++++++++++++++++--
 .../checkpoint/CheckpointStateRestoreTest.java  |  10 +-
 ...ZooKeeperCompletedCheckpointStoreITCase.java |  25 +-
 .../ZooKeeperCompletedCheckpointStoreTest.java  |   7 +-
 .../state/IncrementalKeyedStateHandleTest.java  |  75 +++-
 .../RecoverableCompletedCheckpointStore.java    |  33 +-
 .../streaming/runtime/tasks/StreamTask.java     |  21 +-
 ...tractEventTimeWindowCheckpointingITCase.java |  76 +++--
 ...ckendEventTimeWindowCheckpointingITCase.java |  49 +++
 27 files changed, 743 insertions(+), 176 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index bba5b55..756cfdd 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -253,7 +253,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		this.restoredKvStateMetaInfos = new HashMap<>();
 		this.materializedSstFiles = new TreeMap<>();
 		this.backendUID = UUID.randomUUID();
-		LOG.debug("Setting initial keyed backend uid for operator {} to {}.", this.operatorIdentifier, this.backendUID);
+
+		LOG.debug("Setting initial backend ID in RocksDBKeyedStateBackend for operator {} to {}.",
+			this.operatorIdentifier,
+			this.backendUID);
 	}
 
 	/**
@@ -883,11 +886,17 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		void takeSnapshot() throws Exception {
 			assert (Thread.holdsLock(stateBackend.asyncSnapshotLock));
 
+			final long lastCompletedCheckpoint;
+
 			// use the last completed checkpoint as the comparison base.
 			synchronized (stateBackend.materializedSstFiles) {
-				baseSstFiles = stateBackend.materializedSstFiles.get(stateBackend.lastCompletedCheckpointId);
+				lastCompletedCheckpoint = stateBackend.lastCompletedCheckpointId;
+				baseSstFiles = stateBackend.materializedSstFiles.get(lastCompletedCheckpoint);
 			}
 
+			LOG.trace("Taking incremental snapshot for checkpoint {}. Snapshot is based on last completed checkpoint {} " +
+				"assuming the following (shared) files as base: {}.", checkpointId, lastCompletedCheckpoint, baseSstFiles);
+
 			// save meta data
 			for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry
 					: stateBackend.kvStateInformation.entrySet()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 0b64a73..c98d3aa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.SharedStateRegistryFactory;
 import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
@@ -174,8 +175,11 @@ public class CheckpointCoordinator {
 	@Nullable
 	private CheckpointStatsTracker statsTracker;
 
+	/** A factory for SharedStateRegistry objects */
+	private final SharedStateRegistryFactory sharedStateRegistryFactory;
+
 	/** Registry that tracks state which is shared across (incremental) checkpoints */
-	private final SharedStateRegistry sharedStateRegistry;
+	private SharedStateRegistry sharedStateRegistry;
 
 	// --------------------------------------------------------------------------------------------
 
@@ -192,7 +196,8 @@ public class CheckpointCoordinator {
 			CheckpointIDCounter checkpointIDCounter,
 			CompletedCheckpointStore completedCheckpointStore,
 			@Nullable String checkpointDirectory,
-			Executor executor) {
+			Executor executor,
+			SharedStateRegistryFactory sharedStateRegistryFactory) {
 
 		// sanity checks
 		checkArgument(baseInterval > 0, "Checkpoint timeout must be larger than zero");
@@ -230,7 +235,8 @@ public class CheckpointCoordinator {
 		this.completedCheckpointStore = checkNotNull(completedCheckpointStore);
 		this.checkpointDirectory = checkpointDirectory;
 		this.executor = checkNotNull(executor);
-		this.sharedStateRegistry = new SharedStateRegistry(executor);
+		this.sharedStateRegistryFactory = checkNotNull(sharedStateRegistryFactory);
+		this.sharedStateRegistry = sharedStateRegistryFactory.create(executor);
 
 		this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
 		this.masterHooks = new HashMap<>();
@@ -1043,10 +1049,23 @@ public class CheckpointCoordinator {
 				throw new IllegalStateException("CheckpointCoordinator is shut down");
 			}
 
-			// Recover the checkpoints
-			completedCheckpointStore.recover(sharedStateRegistry);
+			// We create a new shared state registry object, so that all pending async disposal requests from previous
+			// runs will go against the old object (were they can do no harm).
+			// This must happen under the checkpoint lock.
+			sharedStateRegistry.close();
+			sharedStateRegistry = sharedStateRegistryFactory.create(executor);
+
+			// Recover the checkpoints, TODO this could be done only when there is a new leader, not on each recovery
+			completedCheckpointStore.recover();
+
+			// Now, we re-register all (shared) states from the checkpoint store with the new registry
+			for (CompletedCheckpoint completedCheckpoint : completedCheckpointStore.getAllCheckpoints()) {
+				completedCheckpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
+			}
+
+			LOG.debug("Status of the shared state registry after restore: {}.", sharedStateRegistry);
 
-			// restore from the latest checkpoint
+			// Restore from the latest checkpoint
 			CompletedCheckpoint latest = completedCheckpointStore.getLatestCheckpoint();
 
 			if (latest == null) {
@@ -1120,7 +1139,6 @@ public class CheckpointCoordinator {
 		CompletedCheckpoint savepoint = SavepointLoader.loadAndValidateSavepoint(
 				job, tasks, savepointPath, userClassLoader, allowNonRestored);
 
-		savepoint.registerSharedStatesAfterRestored(sharedStateRegistry);
 		completedCheckpointStore.addCheckpoint(savepoint);
 		
 		// Reset the checkpoint ID counter

http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index 7c3edee..d3f61e4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -209,6 +209,8 @@ public class CompletedCheckpoint implements Serializable {
 
 	private void doDiscard() throws Exception {
 
+		LOG.trace("Executing discard procedure for {}.", this);
+
 		try {
 			// collect exceptions and continue cleanup
 			Exception exception = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
index 45d407e..82193b5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.state.SharedStateRegistry;
 
 import java.util.List;
 
@@ -33,10 +32,8 @@ public interface CompletedCheckpointStore {
 	 *
 	 * <p>After a call to this method, {@link #getLatestCheckpoint()} returns the latest
 	 * available checkpoint.
-	 *
-	 * @param sharedStateRegistry the shared state registry to register recovered states.
 	 */
-	void recover(SharedStateRegistry sharedStateRegistry) throws Exception;
+	void recover() throws Exception;
 
 	/**
 	 * Adds a {@link CompletedCheckpoint} instance to the list of completed checkpoints.

http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
index fbb0198..63e7468 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
-import org.apache.flink.runtime.state.SharedStateRegistry;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,7 +57,7 @@ public class StandaloneCompletedCheckpointStore implements CompletedCheckpointSt
 	}
 
 	@Override
-	public void recover(SharedStateRegistry sharedStateRegistry) throws Exception {
+	public void recover() throws Exception {
 		// Nothing to do
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index c4cb6bc..88dd0d4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -18,20 +18,21 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.utils.ZKPaths;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
-import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
 import org.apache.flink.util.FlinkException;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.ZKPaths;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
@@ -138,14 +139,13 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 	 * that the history of checkpoints is consistent.
 	 */
 	@Override
-	public void recover(SharedStateRegistry sharedStateRegistry) throws Exception {
+	public void recover() throws Exception {
 		LOG.info("Recovering checkpoints from ZooKeeper.");
 
 		// Clear local handles in order to prevent duplicates on
 		// recovery. The local handles should reflect the state
 		// of ZooKeeper.
 		completedCheckpoints.clear();
-		sharedStateRegistry.clear();
 
 		// Get all there is first
 		List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> initialCheckpoints;
@@ -170,8 +170,6 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 			try {
 				completedCheckpoint = retrieveCompletedCheckpoint(checkpointStateHandle);
 				if (completedCheckpoint != null) {
-					// Re-register all shared states in the checkpoint.
-					completedCheckpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
 					completedCheckpoints.add(completedCheckpoint);
 				}
 			} catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 139f484..2e5f3d1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -61,6 +61,7 @@ import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.query.KvStateLocationRegistry;
+import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.util.SerializedThrowable;
@@ -69,8 +70,8 @@ import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
-
 import org.apache.flink.util.StringUtils;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -456,7 +457,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			checkpointIDCounter,
 			checkpointStore,
 			checkpointDir,
-			ioExecutor);
+			ioExecutor,
+			SharedStateRegistry.DEFAULT_FACTORY);
 
 		// register the master hooks on the checkpoint coordinator
 		for (MasterTriggerRestoreHook<?> hook : masterHooks) {

http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 5ee7a9f..e6d49d2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.core.io.InputSplitSource;
@@ -39,7 +40,6 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobEdge;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;

http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 25df19b..d6019db 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -89,9 +89,9 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.util.SerializedThrowable;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedThrowable;
 
 import org.slf4j.Logger;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
index 0085890..0268b10 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
@@ -65,27 +65,27 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
 	private final UUID backendIdentifier;
 
 	/**
-	 * The key-group range covered by this state handle
+	 * The key-group range covered by this state handle.
 	 */
 	private final KeyGroupRange keyGroupRange;
 
 	/**
-	 * The checkpoint Id
+	 * The checkpoint Id.
 	 */
 	private final long checkpointId;
 
 	/**
-	 * Shared state in the incremental checkpoint. This i
+	 * Shared state in the incremental checkpoint.
 	 */
 	private final Map<StateHandleID, StreamStateHandle> sharedState;
 
 	/**
-	 * Private state in the incremental checkpoint
+	 * Private state in the incremental checkpoint.
 	 */
 	private final Map<StateHandleID, StreamStateHandle> privateState;
 
 	/**
-	 * Primary meta data state of the incremental checkpoint
+	 * Primary meta data state of the incremental checkpoint.
 	 */
 	private final StreamStateHandle metaStateHandle;
 
@@ -143,16 +143,21 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
 
 	@Override
 	public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
-		if (this.keyGroupRange.getIntersection(keyGroupRange) != KeyGroupRange.EMPTY_KEY_GROUP_RANGE) {
-			return this;
-		} else {
-			return null;
-		}
+		return KeyGroupRange.EMPTY_KEY_GROUP_RANGE.equals(this.keyGroupRange.getIntersection(keyGroupRange)) ?
+			null : this;
 	}
 
 	@Override
 	public void discardState() throws Exception {
 
+		SharedStateRegistry registry = this.sharedStateRegistry;
+		final boolean isRegistered = (registry != null);
+
+		LOG.trace("Discarding IncrementalKeyedStateHandle (registered = {}) for checkpoint {} from backend with id {}.",
+			isRegistered,
+			checkpointId,
+			backendIdentifier);
+
 		try {
 			metaStateHandle.discardState();
 		} catch (Exception e) {
@@ -168,19 +173,20 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
 		// If this was not registered, we can delete the shared state. We can simply apply this
 		// to all handles, because all handles that have not been created for the first time for this
 		// are only placeholders at this point (disposing them is a NOP).
-		if (sharedStateRegistry == null) {
-			try {
-				StateUtil.bestEffortDiscardAllStateObjects(sharedState.values());
-			} catch (Exception e) {
-				LOG.warn("Could not properly discard new sst file states.", e);
-			}
-		} else {
+		if (isRegistered) {
 			// If this was registered, we only unregister all our referenced shared states
 			// from the registry.
 			for (StateHandleID stateHandleID : sharedState.keySet()) {
-				sharedStateRegistry.unregisterReference(
+				registry.unregisterReference(
 					createSharedStateRegistryKeyFromFileName(stateHandleID));
 			}
+		} else {
+			// Otherwise, we assume to own those handles and dispose them directly.
+			try {
+				StateUtil.bestEffortDiscardAllStateObjects(sharedState.values());
+			} catch (Exception e) {
+				LOG.warn("Could not properly discard new sst file states.", e);
+			}
 		}
 	}
 
@@ -202,10 +208,21 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
 	@Override
 	public void registerSharedStates(SharedStateRegistry stateRegistry) {
 
-		Preconditions.checkState(sharedStateRegistry == null, "The state handle has already registered its shared states.");
+		// This is a quick check to avoid that we register twice with the same registry. However, the code allows to
+		// register again with a different registry. The implication is that ownership is transferred to this new
+		// registry. This should only happen in case of a restart, when the CheckpointCoordinator creates a new
+		// SharedStateRegistry for the current attempt and the old registry becomes meaningless. We also assume that
+		// an old registry object from a previous run is due to be GCed and will never be used for registration again.
+		Preconditions.checkState(
+			sharedStateRegistry != stateRegistry,
+			"The state handle has already registered its shared states to the given registry.");
 
 		sharedStateRegistry = Preconditions.checkNotNull(stateRegistry);
 
+		LOG.trace("Registering IncrementalKeyedStateHandle for checkpoint {} from backend with id {}.",
+			checkpointId,
+			backendIdentifier);
+
 		for (Map.Entry<StateHandleID, StreamStateHandle> sharedStateHandle : sharedState.entrySet()) {
 			SharedStateRegistryKey registryKey =
 				createSharedStateRegistryKeyFromFileName(sharedStateHandle.getKey());
@@ -284,5 +301,18 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
 		result = 31 * result + getMetaStateHandle().hashCode();
 		return result;
 	}
+
+	@Override
+	public String toString() {
+		return "IncrementalKeyedStateHandle{" +
+			"backendIdentifier=" + backendIdentifier +
+			", keyGroupRange=" + keyGroupRange +
+			", checkpointId=" + checkpointId +
+			", sharedState=" + sharedState +
+			", privateState=" + privateState +
+			", metaStateHandle=" + metaStateHandle +
+			", registered=" + (sharedStateRegistry != null) +
+			'}';
+	}
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
index 8e38ad4..8092f6c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
@@ -141,7 +141,7 @@ public class KeyGroupsStateHandle implements StreamStateHandle, KeyedStateHandle
 	public String toString() {
 		return "KeyGroupsStateHandle{" +
 				"groupRangeOffsets=" + groupRangeOffsets +
-				", data=" + stateHandle +
+				", stateHandle=" + stateHandle +
 				'}';
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java
index b95dace..1960c1c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java
@@ -38,7 +38,7 @@ public class MultiStreamStateHandle implements StreamStateHandle {
 	private final List<StreamStateHandle> stateHandles;
 	private final long stateSize;
 
-	public MultiStreamStateHandle(List<StreamStateHandle> stateHandles) throws IOException {
+	public MultiStreamStateHandle(List<StreamStateHandle> stateHandles) {
 		this.stateHandles = Preconditions.checkNotNull(stateHandles);
 		long calculateSize = 0L;
 		for(StreamStateHandle stateHandle : stateHandles) {
@@ -62,6 +62,14 @@ public class MultiStreamStateHandle implements StreamStateHandle {
 		return stateSize;
 	}
 
+	@Override
+	public String toString() {
+		return "MultiStreamStateHandle{" +
+			"stateHandles=" + stateHandles +
+			", stateSize=" + stateSize +
+			'}';
+	}
+
 	static final class MultiFSDataInputStream extends AbstractMultiFSDataInputStream {
 
 		private final TreeMap<Long, StreamStateHandle> stateHandleMap;

http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
index e0ca873..347f30c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
@@ -38,13 +38,24 @@ import java.util.concurrent.Executor;
  * maintain the reference count of {@link StreamStateHandle}s by a key that (logically) identifies
  * them.
  */
-public class SharedStateRegistry {
+public class SharedStateRegistry implements AutoCloseable {
 
 	private static final Logger LOG = LoggerFactory.getLogger(SharedStateRegistry.class);
 
+	/** A singleton object for the default implementation of a {@link SharedStateRegistryFactory} */
+	public static final SharedStateRegistryFactory DEFAULT_FACTORY = new SharedStateRegistryFactory() {
+		@Override
+		public SharedStateRegistry create(Executor deleteExecutor) {
+			return new SharedStateRegistry(deleteExecutor);
+		}
+	};
+
 	/** All registered state objects by an artificial key */
 	private final Map<SharedStateRegistryKey, SharedStateRegistry.SharedStateEntry> registeredStates;
 
+	/** This flag indicates whether or not the registry is open or if close() was called */
+	private boolean open;
+
 	/** Executor for async state deletion */
 	private final Executor asyncDisposalExecutor;
 
@@ -56,6 +67,7 @@ public class SharedStateRegistry {
 	public SharedStateRegistry(Executor asyncDisposalExecutor) {
 		this.registeredStates = new HashMap<>();
 		this.asyncDisposalExecutor = Preconditions.checkNotNull(asyncDisposalExecutor);
+		this.open = true;
 	}
 
 	/**
@@ -82,6 +94,9 @@ public class SharedStateRegistry {
 		SharedStateRegistry.SharedStateEntry entry;
 
 		synchronized (registeredStates) {
+
+			Preconditions.checkState(open, "Attempt to register state to closed SharedStateRegistry.");
+
 			entry = registeredStates.get(registrationKey);
 
 			if (entry == null) {
@@ -96,6 +111,11 @@ public class SharedStateRegistry {
 				// delete if this is a real duplicate
 				if (!Objects.equals(state, entry.stateHandle)) {
 					scheduledStateDeletion = state;
+					LOG.trace("Identified duplicate state registration under key {}. New state {} was determined to " +
+							"be an unnecessary copy of existing state {} and will be dropped.",
+						registrationKey,
+						state,
+						entry.stateHandle);
 				}
 				entry.increaseReferenceCount();
 			}
@@ -112,7 +132,8 @@ public class SharedStateRegistry {
 	 *
 	 * @param registrationKey the shared state for which we release a reference.
 	 * @return the result of the request, consisting of the reference count after this operation
-	 * and the state handle, or null if the state handle was deleted through this request.
+	 * and the state handle, or null if the state handle was deleted through this request. Returns null if the registry
+	 * was previously closed.
 	 */
 	public Result unregisterReference(SharedStateRegistryKey registrationKey) {
 
@@ -123,6 +144,7 @@ public class SharedStateRegistry {
 		SharedStateRegistry.SharedStateEntry entry;
 
 		synchronized (registeredStates) {
+
 			entry = registeredStates.get(registrationKey);
 
 			Preconditions.checkState(entry != null,
@@ -164,10 +186,18 @@ public class SharedStateRegistry {
 		}
 	}
 
+	@Override
+	public String toString() {
+		synchronized (registeredStates) {
+			return "SharedStateRegistry{" +
+				"registeredStates=" + registeredStates +
+				'}';
+		}
+	}
+
 	private void scheduleAsyncDelete(StreamStateHandle streamStateHandle) {
 		// We do the small optimization to not issue discards for placeholders, which are NOPs.
 		if (streamStateHandle != null && !isPlaceholder(streamStateHandle)) {
-
 			LOG.trace("Scheduled delete of state handle {}.", streamStateHandle);
 			asyncDisposalExecutor.execute(
 				new SharedStateRegistry.AsyncDisposalRunnable(streamStateHandle));
@@ -178,6 +208,13 @@ public class SharedStateRegistry {
 		return stateHandle instanceof PlaceholderStreamStateHandle;
 	}
 
+	@Override
+	public void close() {
+		synchronized (registeredStates) {
+			open = false;
+		}
+	}
+
 	/**
 	 * An entry in the registry, tracking the handle and the corresponding reference count.
 	 */
@@ -279,13 +316,4 @@ public class SharedStateRegistry {
 			}
 		}
 	}
-
-	/**
-	 * Clears the registry.
-	 */
-	public void clear() {
-		synchronized (registeredStates) {
-			registeredStates.clear();
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryFactory.java
new file mode 100644
index 0000000..05c9825
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import java.util.concurrent.Executor;
+
+/**
+ * Simple factory to produce {@link SharedStateRegistry} objects.
+ */
+public interface SharedStateRegistryFactory {
+
+	/**
+	 * Factory method for {@link SharedStateRegistry}.
+	 *
+	 * @param deleteExecutor executor used to run (async) deletes.
+	 * @return a SharedStateRegistry object
+	 */
+	SharedStateRegistry create(Executor deleteExecutor);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
index 9ba9d35..3a43d4f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
@@ -95,6 +95,7 @@ public class ByteStreamStateHandle implements StreamStateHandle {
 	public String toString() {
 		return "ByteStreamStateHandle{" +
 			"handleName='" + handleName + '\'' +
+			", dataBytes=" + data.length +
 			'}';
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java
index d293eea..edc29fe 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java
@@ -18,14 +18,6 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader;
 import org.apache.flink.runtime.concurrent.Executors;
@@ -37,11 +29,22 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
 /**
  * CheckpointCoordinator tests for externalized checkpoints.
  *
@@ -91,7 +94,8 @@ public class CheckpointCoordinatorExternalizedCheckpointsTest {
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(1),
 			checkpointDir.getAbsolutePath(),
-			Executors.directExecutor());
+			Executors.directExecutor(),
+			SharedStateRegistry.DEFAULT_FACTORY);
 
 		assertEquals(0, coord.getNumberOfPendingCheckpoints());
 		assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());

http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
index 88b95f5..26db772 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -79,7 +79,8 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
 			new StandaloneCheckpointIDCounter(),
 			new FailingCompletedCheckpointStore(),
 			null,
-			Executors.directExecutor());
+			Executors.directExecutor(),
+			SharedStateRegistry.DEFAULT_FACTORY);
 
 		coord.triggerCheckpoint(triggerTimestamp, false);
 
@@ -111,7 +112,7 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
 		when(subtaskState.getSubtaskStateByOperatorID(OperatorID.fromJobVertexID(vertex.getJobvertexId()))).thenReturn(operatorSubtaskState);
 
 		AcknowledgeCheckpoint acknowledgeMessage = new AcknowledgeCheckpoint(jid, executionAttemptId, checkpointId, new CheckpointMetrics(), subtaskState);
-		
+
 		try {
 			coord.receiveAcknowledgeMessage(acknowledgeMessage);
 			fail("Expected a checkpoint exception because the completed checkpoint store could not " +
@@ -135,7 +136,7 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
 	private static final class FailingCompletedCheckpointStore implements CompletedCheckpointStore {
 
 		@Override
-		public void recover(SharedStateRegistry sharedStateRegistry) throws Exception {
+		public void recover() throws Exception {
 			throw new UnsupportedOperationException("Not implemented.");
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
index e23f6a2..2f860e0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
@@ -28,9 +28,9 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.state.SharedStateRegistry;
 
 import org.junit.Test;
-
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -46,14 +46,12 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
 import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest.mockExecutionVertex;
-
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.isNull;
 import static org.mockito.Mockito.any;
@@ -404,7 +402,8 @@ public class CheckpointCoordinatorMasterHooksTest {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(10),
 				null,
-				Executors.directExecutor());
+				Executors.directExecutor(),
+				SharedStateRegistry.DEFAULT_FACTORY);
 	}
 
 	private static <T> T mockGeneric(Class<?> clazz) {

http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index d9af879..45cbbc3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -36,32 +36,36 @@ import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
 import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.SharedStateRegistryFactory;
+import org.apache.flink.runtime.state.StateHandleID;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.runtime.testutils.RecoverableCompletedCheckpointStore;
 import org.apache.flink.runtime.util.TestByteStreamStateHandleDeepCompare;
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 
-import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
-import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
-
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
+import org.mockito.verification.VerificationMode;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -139,7 +143,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1),
 				null,
-				Executors.directExecutor());
+				Executors.directExecutor(),
+				SharedStateRegistry.DEFAULT_FACTORY);
 
 			// nothing should be happening
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -199,7 +204,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1),
 				null,
-				Executors.directExecutor());
+				Executors.directExecutor(),
+				SharedStateRegistry.DEFAULT_FACTORY);
 
 			// nothing should be happening
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -250,7 +256,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1),
 				null,
-				Executors.directExecutor());
+				Executors.directExecutor(),
+				SharedStateRegistry.DEFAULT_FACTORY);
 
 			// nothing should be happening
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -302,7 +309,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1),
 				null,
-				Executors.directExecutor());
+				Executors.directExecutor(),
+				SharedStateRegistry.DEFAULT_FACTORY);
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -406,7 +414,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1),
 				null,
-				Executors.directExecutor());
+				Executors.directExecutor(),
+				SharedStateRegistry.DEFAULT_FACTORY);
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -525,7 +534,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1),
 				null,
-				Executors.directExecutor());
+				Executors.directExecutor(),
+				SharedStateRegistry.DEFAULT_FACTORY);
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -692,7 +702,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(2),
 				null,
-				Executors.directExecutor());
+				Executors.directExecutor(),
+				SharedStateRegistry.DEFAULT_FACTORY);
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -822,7 +833,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(10),
 				null,
-				Executors.directExecutor());
+				Executors.directExecutor(),
+				SharedStateRegistry.DEFAULT_FACTORY);
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -986,7 +998,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(2),
 				null,
-				Executors.directExecutor());
+				Executors.directExecutor(),
+				SharedStateRegistry.DEFAULT_FACTORY);
 
 			// trigger a checkpoint, partially acknowledged
 			assertTrue(coord.triggerCheckpoint(timestamp, false));
@@ -1063,7 +1076,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(2),
 				null,
-				Executors.directExecutor());
+				Executors.directExecutor(),
+				SharedStateRegistry.DEFAULT_FACTORY);
 
 			assertTrue(coord.triggerCheckpoint(timestamp, false));
 
@@ -1126,7 +1140,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(1),
 			null,
-			Executors.directExecutor());
+			Executors.directExecutor(),
+			SharedStateRegistry.DEFAULT_FACTORY);
 
 		assertTrue(coord.triggerCheckpoint(timestamp, false));
 
@@ -1258,7 +1273,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(2),
 				null,
-				Executors.directExecutor());
+				Executors.directExecutor(),
+				SharedStateRegistry.DEFAULT_FACTORY);
 
 
 			coord.startCheckpointScheduler();
@@ -1350,7 +1366,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(2),
 				"dummy-path",
-				Executors.directExecutor());
+				Executors.directExecutor(),
+				SharedStateRegistry.DEFAULT_FACTORY);
 
 		try {
 			coord.startCheckpointScheduler();
@@ -1423,7 +1440,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(1),
 			null,
-			Executors.directExecutor());
+			Executors.directExecutor(),
+			SharedStateRegistry.DEFAULT_FACTORY);
 
 		assertEquals(0, coord.getNumberOfPendingCheckpoints());
 		assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -1574,7 +1592,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			counter,
 			new StandaloneCompletedCheckpointStore(10),
 			null,
-			Executors.directExecutor());
+			Executors.directExecutor(),
+			SharedStateRegistry.DEFAULT_FACTORY);
 
 		String savepointDir = tmpFolder.newFolder().getAbsolutePath();
 
@@ -1680,7 +1699,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(2),
 				null,
-				Executors.directExecutor());
+				Executors.directExecutor(),
+				SharedStateRegistry.DEFAULT_FACTORY);
 
 			coord.startCheckpointScheduler();
 
@@ -1753,7 +1773,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(2),
 				null,
-				Executors.directExecutor());
+				Executors.directExecutor(),
+				SharedStateRegistry.DEFAULT_FACTORY);
 
 			coord.startCheckpointScheduler();
 
@@ -1835,7 +1856,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(2),
 				null,
-				Executors.directExecutor());
+				Executors.directExecutor(),
+				SharedStateRegistry.DEFAULT_FACTORY);
 
 			coord.startCheckpointScheduler();
 
@@ -1887,7 +1909,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			checkpointIDCounter,
 			new StandaloneCompletedCheckpointStore(2),
 			null,
-			Executors.directExecutor());
+			Executors.directExecutor(),
+			SharedStateRegistry.DEFAULT_FACTORY);
 
 		List<CompletableFuture<CompletedCheckpoint>> savepointFutures = new ArrayList<>();
 
@@ -1940,7 +1963,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(2),
 			null,
-			Executors.directExecutor());
+			Executors.directExecutor(),
+			SharedStateRegistry.DEFAULT_FACTORY);
 
 		String savepointDir = tmpFolder.newFolder().getAbsolutePath();
 
@@ -2002,7 +2026,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			new StandaloneCheckpointIDCounter(),
 			store,
 			null,
-			Executors.directExecutor());
+			Executors.directExecutor(),
+			SharedStateRegistry.DEFAULT_FACTORY);
 
 		// trigger the checkpoint
 		coord.triggerCheckpoint(timestamp, false);
@@ -2116,7 +2141,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(1),
 			null,
-			Executors.directExecutor());
+			Executors.directExecutor(),
+			SharedStateRegistry.DEFAULT_FACTORY);
 
 		// trigger the checkpoint
 		coord.triggerCheckpoint(timestamp, false);
@@ -2237,7 +2263,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(1),
 			null,
-			Executors.directExecutor());
+			Executors.directExecutor(),
+			SharedStateRegistry.DEFAULT_FACTORY);
 
 		// trigger the checkpoint
 		coord.triggerCheckpoint(timestamp, false);
@@ -2395,7 +2422,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(1),
 			null,
-			Executors.directExecutor());
+			Executors.directExecutor(),
+			SharedStateRegistry.DEFAULT_FACTORY);
 
 		// trigger the checkpoint
 		coord.triggerCheckpoint(timestamp, false);
@@ -2686,7 +2714,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			new StandaloneCheckpointIDCounter(),
 			standaloneCompletedCheckpointStore,
 			null,
-			Executors.directExecutor());
+			Executors.directExecutor(),
+			SharedStateRegistry.DEFAULT_FACTORY);
 
 		coord.restoreLatestCheckpointedState(tasks, false, true);
 
@@ -2847,7 +2876,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1),
 				"fake-directory",
-				Executors.directExecutor());
+				Executors.directExecutor(),
+				SharedStateRegistry.DEFAULT_FACTORY);
 
 			assertTrue(coord.triggerCheckpoint(timestamp, false));
 
@@ -3351,7 +3381,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(1),
 			null,
-			Executors.directExecutor());
+			Executors.directExecutor(),
+			SharedStateRegistry.DEFAULT_FACTORY);
 
 		// Periodic
 		CheckpointTriggerResult triggerResult = coord.triggerCheckpoint(
@@ -3529,7 +3560,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(1),
 			null,
-			Executors.directExecutor());
+			Executors.directExecutor(),
+			SharedStateRegistry.DEFAULT_FACTORY);
 
 		CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
 		coord.setCheckpointStatsTracker(tracker);
@@ -3567,7 +3599,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			new StandaloneCheckpointIDCounter(),
 			store,
 			null,
-			Executors.directExecutor());
+			Executors.directExecutor(),
+			SharedStateRegistry.DEFAULT_FACTORY);
 
 		store.addCheckpoint(new CompletedCheckpoint(
 			new JobID(),
@@ -3623,7 +3656,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			checkpointIDCounter,
 			completedCheckpointStore,
 			null,
-			Executors.directExecutor());
+			Executors.directExecutor(),
+			SharedStateRegistry.DEFAULT_FACTORY);
 
 		// trigger a first checkpoint
 		assertTrue(
@@ -3673,4 +3707,245 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			"The latest completed (proper) checkpoint should have been added to the completed checkpoint store.",
 			completedCheckpointStore.getLatestCheckpoint().getCheckpointID() == checkpointIDCounter.getLast());
 	}
+
+	@Test
+	public void testSharedStateRegistrationOnRestore() throws Exception {
+
+		final JobID jid = new JobID();
+		final long timestamp = System.currentTimeMillis();
+
+		final JobVertexID jobVertexID1 = new JobVertexID();
+
+		int parallelism1 = 2;
+		int maxParallelism1 = 4;
+
+		final ExecutionJobVertex jobVertex1 = mockExecutionJobVertex(
+			jobVertexID1,
+			parallelism1,
+			maxParallelism1);
+
+		List<ExecutionVertex> allExecutionVertices = new ArrayList<>(parallelism1);
+
+		allExecutionVertices.addAll(Arrays.asList(jobVertex1.getTaskVertices()));
+
+		ExecutionVertex[] arrayExecutionVertices =
+			allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]);
+
+		RecoverableCompletedCheckpointStore store = new RecoverableCompletedCheckpointStore(10);
+
+		final List<SharedStateRegistry> createdSharedStateRegistries = new ArrayList<>(2);
+
+		// set up the coordinator and validate the initial state
+		CheckpointCoordinator coord = new CheckpointCoordinator(
+			jid,
+			600000,
+			600000,
+			0,
+			Integer.MAX_VALUE,
+			ExternalizedCheckpointSettings.none(),
+			arrayExecutionVertices,
+			arrayExecutionVertices,
+			arrayExecutionVertices,
+			new StandaloneCheckpointIDCounter(),
+			store,
+			null,
+			Executors.directExecutor(),
+			new SharedStateRegistryFactory() {
+				@Override
+				public SharedStateRegistry create(Executor deleteExecutor) {
+					SharedStateRegistry instance = new SharedStateRegistry(deleteExecutor);
+					createdSharedStateRegistries.add(instance);
+					return instance;
+				}
+			});
+
+		final int numCheckpoints = 3;
+
+		List<KeyGroupRange> keyGroupPartitions1 =
+			StateAssignmentOperation.createKeyGroupPartitions(maxParallelism1, parallelism1);
+
+		for (int i = 0; i < numCheckpoints; ++i) {
+			performIncrementalCheckpoint(jid, coord, jobVertex1, keyGroupPartitions1, timestamp + i, i);
+		}
+
+		List<CompletedCheckpoint> completedCheckpoints = coord.getSuccessfulCheckpoints();
+		assertEquals(numCheckpoints, completedCheckpoints.size());
+
+		int sharedHandleCount = 0;
+
+		List<Map<StateHandleID, StreamStateHandle>> sharedHandlesByCheckpoint = new ArrayList<>(numCheckpoints);
+
+		for (int i = 0; i < numCheckpoints; ++i) {
+			sharedHandlesByCheckpoint.add(new HashMap<StateHandleID, StreamStateHandle>(2));
+		}
+
+		int cp = 0;
+		for (CompletedCheckpoint completedCheckpoint : completedCheckpoints) {
+			for (OperatorState taskState : completedCheckpoint.getOperatorStates().values()) {
+				for (OperatorSubtaskState subtaskState : taskState.getStates()) {
+					for (KeyedStateHandle keyedStateHandle : subtaskState.getManagedKeyedState()) {
+						// test we are once registered with the current registry
+						verify(keyedStateHandle, times(1)).registerSharedStates(createdSharedStateRegistries.get(0));
+						IncrementalKeyedStateHandle incrementalKeyedStateHandle = (IncrementalKeyedStateHandle) keyedStateHandle;
+
+						sharedHandlesByCheckpoint.get(cp).putAll(incrementalKeyedStateHandle.getSharedState());
+
+						for (StreamStateHandle streamStateHandle : incrementalKeyedStateHandle.getSharedState().values()) {
+							assertTrue(!(streamStateHandle instanceof PlaceholderStreamStateHandle));
+							verify(streamStateHandle, never()).discardState();
+							++sharedHandleCount;
+						}
+
+						for (StreamStateHandle streamStateHandle : incrementalKeyedStateHandle.getPrivateState().values()) {
+							verify(streamStateHandle, never()).discardState();
+						}
+
+						verify(incrementalKeyedStateHandle.getMetaStateHandle(), never()).discardState();
+					}
+
+					verify(subtaskState, never()).discardState();
+				}
+			}
+			++cp;
+		}
+
+		// 2 (parallelism) x (1 (CP0) + 2 (CP1) + 2 (CP2)) = 10
+		assertEquals(10, sharedHandleCount);
+
+		// discard CP0
+		store.removeOldestCheckpoint();
+
+		// we expect no shared state was discarded because the state of CP0 is still referenced by CP1
+		for (Map<StateHandleID, StreamStateHandle> cpList : sharedHandlesByCheckpoint) {
+			for (StreamStateHandle streamStateHandle : cpList.values()) {
+				verify(streamStateHandle, never()).discardState();
+			}
+		}
+
+		// shutdown the store
+		store.shutdown(JobStatus.SUSPENDED);
+
+		// restore the store
+		Map<JobVertexID, ExecutionJobVertex> tasks = new HashMap<>();
+		tasks.put(jobVertexID1, jobVertex1);
+		coord.restoreLatestCheckpointedState(tasks, true, false);
+
+		// validate that all shared states are registered again after the recovery.
+		cp = 0;
+		for (CompletedCheckpoint completedCheckpoint : completedCheckpoints) {
+			for (OperatorState taskState : completedCheckpoint.getOperatorStates().values()) {
+				for (OperatorSubtaskState subtaskState : taskState.getStates()) {
+					for (KeyedStateHandle keyedStateHandle : subtaskState.getManagedKeyedState()) {
+						VerificationMode verificationMode;
+						// test we are once registered with the new registry
+						if (cp > 0) {
+							verificationMode = times(1);
+						} else {
+							verificationMode = never();
+						}
+
+						//check that all are registered with the new registry
+						verify(keyedStateHandle, verificationMode).registerSharedStates(createdSharedStateRegistries.get(1));
+					}
+				}
+			}
+			++cp;
+		}
+
+		// discard CP1
+		store.removeOldestCheckpoint();
+
+		// we expect that all shared state from CP0 is no longer referenced and discarded. CP2 is still live and also
+		// references the state from CP1, so we expect they are not discarded.
+		for (Map<StateHandleID, StreamStateHandle> cpList : sharedHandlesByCheckpoint) {
+			for (Map.Entry<StateHandleID, StreamStateHandle> entry : cpList.entrySet()) {
+				String key = entry.getKey().getKeyString();
+				int belongToCP = Integer.parseInt(String.valueOf(key.charAt(key.length() - 1)));
+				if (belongToCP == 0) {
+					verify(entry.getValue(), times(1)).discardState();
+				} else {
+					verify(entry.getValue(), never()).discardState();
+				}
+			}
+		}
+
+		// discard CP2
+		store.removeOldestCheckpoint();
+
+		// we expect all shared state was discarded now, because all CPs are
+		for (Map<StateHandleID, StreamStateHandle> cpList : sharedHandlesByCheckpoint) {
+			for (StreamStateHandle streamStateHandle : cpList.values()) {
+				verify(streamStateHandle, times(1)).discardState();
+			}
+		}
+	}
+
+	private void performIncrementalCheckpoint(
+		JobID jid,
+		CheckpointCoordinator coord,
+		ExecutionJobVertex jobVertex1,
+		List<KeyGroupRange> keyGroupPartitions1,
+		long timestamp,
+		int cpSequenceNumber) throws Exception {
+
+		// trigger the checkpoint
+		coord.triggerCheckpoint(timestamp, false);
+
+		assertTrue(coord.getPendingCheckpoints().keySet().size() == 1);
+		long checkpointId = Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
+
+		for (int index = 0; index < jobVertex1.getParallelism(); index++) {
+
+			KeyGroupRange keyGroupRange = keyGroupPartitions1.get(index);
+
+			Map<StateHandleID, StreamStateHandle> privateState = new HashMap<>();
+			privateState.put(
+				new StateHandleID("private-1"),
+				spy(new ByteStreamStateHandle("private-1", new byte[]{'p'})));
+
+			Map<StateHandleID, StreamStateHandle> sharedState = new HashMap<>();
+
+			// let all but the first CP overlap by one shared state.
+			if (cpSequenceNumber > 0) {
+				sharedState.put(
+					new StateHandleID("shared-" + (cpSequenceNumber - 1)),
+					spy(new PlaceholderStreamStateHandle()));
+			}
+
+			sharedState.put(
+				new StateHandleID("shared-" + cpSequenceNumber),
+				spy(new ByteStreamStateHandle("shared-" + cpSequenceNumber + "-" + keyGroupRange, new byte[]{'s'})));
+
+			IncrementalKeyedStateHandle managedState =
+				spy(new IncrementalKeyedStateHandle(
+					new UUID(42L, 42L),
+					keyGroupRange,
+					checkpointId,
+					sharedState,
+					privateState,
+					spy(new ByteStreamStateHandle("meta", new byte[]{'m'}))));
+
+			OperatorSubtaskState operatorSubtaskState =
+				spy(new OperatorSubtaskState(null,
+					Collections.<OperatorStateHandle>emptyList(),
+					Collections.<OperatorStateHandle>emptyList(),
+					Collections.<KeyedStateHandle>singletonList(managedState),
+					Collections.<KeyedStateHandle>emptyList()));
+
+			Map<OperatorID, OperatorSubtaskState> opStates = new HashMap<>();
+
+			opStates.put(jobVertex1.getOperatorIDs().get(0), operatorSubtaskState);
+
+			TaskStateSnapshot taskStateSnapshot = new TaskStateSnapshot(opStates);
+
+			AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
+				jid,
+				jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
+				checkpointId,
+				new CheckpointMetrics(),
+				taskStateSnapshot);
+
+			coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index 6ce071b..791bffa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.util.SerializableObject;
 
@@ -109,7 +110,8 @@ public class CheckpointStateRestoreTest {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1),
 				null,
-				Executors.directExecutor());
+				Executors.directExecutor(),
+				SharedStateRegistry.DEFAULT_FACTORY);
 
 			// create ourselves a checkpoint with state
 			final long timestamp = 34623786L;
@@ -186,7 +188,8 @@ public class CheckpointStateRestoreTest {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1),
 				null,
-				Executors.directExecutor());
+				Executors.directExecutor(),
+				SharedStateRegistry.DEFAULT_FACTORY);
 
 			try {
 				coord.restoreLatestCheckpointedState(new HashMap<JobVertexID, ExecutionJobVertex>(), true, false);
@@ -243,7 +246,8 @@ public class CheckpointStateRestoreTest {
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(1),
 			null,
-			Executors.directExecutor());
+			Executors.directExecutor(),
+			SharedStateRegistry.DEFAULT_FACTORY);
 
 		StreamStateHandle serializedState = CheckpointCoordinatorTest
 				.generateChainedStateHandle(new SerializableObject())

http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
index 77423c2..dc2b11e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
@@ -18,13 +18,14 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import org.apache.curator.framework.CuratorFramework;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
+
+import org.apache.curator.framework.CuratorFramework;
 import org.apache.zookeeper.data.Stat;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -106,8 +107,9 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 		assertEquals(3, checkpoints.getNumberOfRetainedCheckpoints());
 
 		// Recover
-		sharedStateRegistry.clear();
-		checkpoints.recover(sharedStateRegistry);
+		sharedStateRegistry.close();
+		sharedStateRegistry = new SharedStateRegistry();
+		checkpoints.recover();
 
 		assertEquals(3, ZOOKEEPER.getClient().getChildren().forPath(CHECKPOINT_PATH).size());
 		assertEquals(3, checkpoints.getNumberOfRetainedCheckpoints());
@@ -148,8 +150,8 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 		assertEquals(0, store.getNumberOfRetainedCheckpoints());
 		assertNull(client.checkExists().forPath(CHECKPOINT_PATH + ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID())));
 
-		sharedStateRegistry.clear();
-		store.recover(sharedStateRegistry);
+		sharedStateRegistry.close();
+		store.recover();
 
 		assertEquals(0, store.getNumberOfRetainedCheckpoints());
 	}
@@ -182,8 +184,8 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 		assertEquals("The checkpoint node should not be locked.", 0, stat.getNumChildren());
 
 		// Recover again
-		sharedStateRegistry.clear();
-		store.recover(sharedStateRegistry);
+		sharedStateRegistry.close();
+		store.recover();
 
 		CompletedCheckpoint recovered = store.getLatestCheckpoint();
 		assertEquals(checkpoint, recovered);
@@ -209,8 +211,8 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 			checkpointStore.addCheckpoint(checkpoint);
 		}
 
-		sharedStateRegistry.clear();
-		checkpointStore.recover(sharedStateRegistry);
+		sharedStateRegistry.close();
+		checkpointStore.recover();
 
 		CompletedCheckpoint latestCheckpoint = checkpointStore.getLatestCheckpoint();
 
@@ -239,8 +241,9 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 		zkCheckpointStore1.addCheckpoint(completedCheckpoint);
 
 		// recover the checkpoint by a different checkpoint store
-		sharedStateRegistry.clear();
-		zkCheckpointStore2.recover(sharedStateRegistry);
+		sharedStateRegistry.close();
+		sharedStateRegistry = new SharedStateRegistry();
+		zkCheckpointStore2.recover();
 
 		CompletedCheckpoint recoveredCheckpoint = zkCheckpointStore2.getLatestCheckpoint();
 		assertTrue(recoveredCheckpoint instanceof TestCompletedCheckpoint);

http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
index 91bab85..3171f1f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
@@ -52,7 +52,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyCollection;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -162,11 +161,7 @@ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger {
 			stateStorage,
 			Executors.directExecutor());
 
-		SharedStateRegistry sharedStateRegistry = spy(new SharedStateRegistry());
-		zooKeeperCompletedCheckpointStore.recover(sharedStateRegistry);
-
-		verify(retrievableStateHandle1.retrieveState(), times(1)).registerSharedStatesAfterRestored(sharedStateRegistry);
-		verify(retrievableStateHandle2.retrieveState(), times(1)).registerSharedStatesAfterRestored(sharedStateRegistry);
+		zooKeeperCompletedCheckpointStore.recover();
 
 		CompletedCheckpoint latestCompletedCheckpoint = zooKeeperCompletedCheckpointStore.getLatestCheckpoint();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
index c1b3ccd..9f6f88e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
@@ -19,12 +19,15 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.runtime.checkpoint.savepoint.CheckpointTestUtils;
+
 import org.junit.Test;
 
 import java.util.Map;
 import java.util.Random;
 import java.util.UUID;
 
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.powermock.api.mockito.PowerMockito.spy;
@@ -59,8 +62,6 @@ public class IncrementalKeyedStateHandleTest {
 	@Test
 	public void testSharedStateDeRegistration() throws Exception {
 
-		Random rnd = new Random(42);
-
 		SharedStateRegistry registry = spy(new SharedStateRegistry());
 
 		// Create two state handles with overlapping shared state
@@ -186,6 +187,76 @@ public class IncrementalKeyedStateHandleTest {
 		verify(stateHandle2.getMetaStateHandle(), times(1)).discardState();
 	}
 
+	/**
+	 * This tests that re-registration of shared state with another registry works as expected. This simulates a
+	 * recovery from a checkpoint, when the checkpoint coordinator creates a new shared state registry and re-registers
+	 * all live checkpoint states.
+	 */
+	@Test
+	public void testSharedStateReRegistration() throws Exception {
+
+		SharedStateRegistry stateRegistryA = spy(new SharedStateRegistry());
+
+		IncrementalKeyedStateHandle stateHandleX = create(new Random(1));
+		IncrementalKeyedStateHandle stateHandleY = create(new Random(2));
+		IncrementalKeyedStateHandle stateHandleZ = create(new Random(3));
+
+		// Now we register first time ...
+		stateHandleX.registerSharedStates(stateRegistryA);
+		stateHandleY.registerSharedStates(stateRegistryA);
+		stateHandleZ.registerSharedStates(stateRegistryA);
+
+		try {
+			// Second attempt should fail
+			stateHandleX.registerSharedStates(stateRegistryA);
+			fail("Should not be able to register twice with the same registry.");
+		} catch (IllegalStateException ignore) {
+		}
+
+		// Everything should be discarded for this handle
+		stateHandleZ.discardState();
+		verify(stateHandleZ.getMetaStateHandle(), times(1)).discardState();
+		for (StreamStateHandle stateHandle : stateHandleZ.getSharedState().values()) {
+			verify(stateHandle, times(1)).discardState();
+		}
+
+		// Close the first registry
+		stateRegistryA.close();
+
+		// Attempt to register to closed registry should trigger exception
+		try {
+			create(new Random(4)).registerSharedStates(stateRegistryA);
+			fail("Should not be able to register new state to closed registry.");
+		} catch (IllegalStateException ignore) {
+		}
+
+		// All state should still get discarded
+		stateHandleY.discardState();
+		verify(stateHandleY.getMetaStateHandle(), times(1)).discardState();
+		for (StreamStateHandle stateHandle : stateHandleY.getSharedState().values()) {
+			verify(stateHandle, times(1)).discardState();
+		}
+
+		// This should still be unaffected
+		verify(stateHandleX.getMetaStateHandle(), never()).discardState();
+		for (StreamStateHandle stateHandle : stateHandleX.getSharedState().values()) {
+			verify(stateHandle, never()).discardState();
+		}
+
+		// We re-register the handle with a new registry
+		SharedStateRegistry sharedStateRegistryB = spy(new SharedStateRegistry());
+		stateHandleX.registerSharedStates(sharedStateRegistryB);
+		stateHandleX.discardState();
+
+		// Should be completely discarded because it is tracked through the new registry
+		verify(stateHandleX.getMetaStateHandle(), times(1)).discardState();
+		for (StreamStateHandle stateHandle : stateHandleX.getSharedState().values()) {
+			verify(stateHandle, times(1)).discardState();
+		}
+
+		sharedStateRegistryB.close();
+	}
+
 	private static IncrementalKeyedStateHandle create(Random rnd) {
 		return new IncrementalKeyedStateHandle(
 			UUID.nameUUIDFromBytes("test".getBytes()),

http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
index a0c4412..037ecd1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
@@ -21,7 +21,8 @@ package org.apache.flink.runtime.testutils;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,14 +42,21 @@ public class RecoverableCompletedCheckpointStore implements CompletedCheckpointS
 
 	private final ArrayDeque<CompletedCheckpoint> suspended = new ArrayDeque<>(2);
 
+	private final int maxRetainedCheckpoints;
+
+	public RecoverableCompletedCheckpointStore() {
+		this(1);
+	}
+
+	public RecoverableCompletedCheckpointStore(int maxRetainedCheckpoints) {
+		Preconditions.checkArgument(maxRetainedCheckpoints > 0);
+		this.maxRetainedCheckpoints = maxRetainedCheckpoints;
+	}
+
 	@Override
-	public void recover(SharedStateRegistry sharedStateRegistry) throws Exception {
+	public void recover() throws Exception {
 		checkpoints.addAll(suspended);
 		suspended.clear();
-
-		for (CompletedCheckpoint checkpoint : checkpoints) {
-			checkpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
-		}
 	}
 
 	@Override
@@ -56,13 +64,16 @@ public class RecoverableCompletedCheckpointStore implements CompletedCheckpointS
 
 		checkpoints.addLast(checkpoint);
 
-
-		if (checkpoints.size() > 1) {
-			CompletedCheckpoint checkpointToSubsume = checkpoints.removeFirst();
-			checkpointToSubsume.discardOnSubsume();
+		if (checkpoints.size() > maxRetainedCheckpoints) {
+			removeOldestCheckpoint();
 		}
 	}
 
+	public void removeOldestCheckpoint() throws Exception {
+		CompletedCheckpoint checkpointToSubsume = checkpoints.removeFirst();
+		checkpointToSubsume.discardOnSubsume();
+	}
+
 	@Override
 	public CompletedCheckpoint getLatestCheckpoint() throws Exception {
 		return checkpoints.isEmpty() ? null : checkpoints.getLast();
@@ -96,7 +107,7 @@ public class RecoverableCompletedCheckpointStore implements CompletedCheckpointS
 
 	@Override
 	public int getMaxNumberOfRetainedCheckpoints() {
-		return 1;
+		return maxRetainedCheckpoints;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index cb8639b..1ba5fb1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.fs.CloseableRegistry;
@@ -779,9 +780,11 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 	}
 
 	private String createOperatorIdentifier(StreamOperator<?> operator, int vertexId) {
+
+		TaskInfo taskInfo = getEnvironment().getTaskInfo();
 		return operator.getClass().getSimpleName() +
-				"_" + vertexId +
-				"_" + getEnvironment().getTaskInfo().getIndexOfThisSubtask();
+			"_" + operator.getOperatorID() +
+			"_(" + taskInfo.getIndexOfThisSubtask() + "/" + taskInfo.getNumberOfParallelSubtasks() + ")";
 	}
 
 	/**
@@ -892,18 +895,22 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 				if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING,
 						CheckpointingOperation.AsynCheckpointState.COMPLETED)) {
 
+					TaskStateSnapshot acknowledgedState = hasState ? taskOperatorSubtaskStates : null;
+
 					// we signal stateless tasks by reporting null, so that there are no attempts to assign empty state
 					// to stateless tasks on restore. This enables simple job modifications that only concern
 					// stateless without the need to assign them uids to match their (always empty) states.
 					owner.getEnvironment().acknowledgeCheckpoint(
 						checkpointMetaData.getCheckpointId(),
 						checkpointMetrics,
-						hasState ? taskOperatorSubtaskStates : null);
+						acknowledgedState);
+
+					LOG.debug("{} - finished asynchronous part of checkpoint {}. Asynchronous duration: {} ms",
+						owner.getName(), checkpointMetaData.getCheckpointId(), asyncDurationMillis);
+
+					LOG.trace("{} - reported the following states in snapshot for checkpoint {}: {}.",
+						owner.getName(), checkpointMetaData.getCheckpointId(), acknowledgedState);
 
-					if (LOG.isDebugEnabled()) {
-						LOG.debug("{} - finished asynchronous part of checkpoint {}. Asynchronous duration: {} ms",
-							owner.getName(), checkpointMetaData.getCheckpointId(), asyncDurationMillis);
-					}
 				} else {
 					LOG.debug("{} - asynchronous part of checkpoint {} could not be completed because it was closed before.",
 						owner.getName(),

http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
index 22ed847..c525a37 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.core.fs.Path;
@@ -48,21 +49,22 @@ import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 
+import org.apache.curator.test.TestingServer;
 import org.junit.After;
-import org.junit.AfterClass;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.junit.rules.TestName;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_INCREMENTAL_ZK;
 import static org.apache.flink.test.util.TestUtils.tryExecute;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -87,6 +89,8 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 
 	private static TestStreamEnvironment env;
 
+	private static TestingServer zkServer;
+
 	@Rule
 	public TemporaryFolder tempFolder = new TemporaryFolder();
 
@@ -101,11 +105,27 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 	}
 
 	enum StateBackendEnum {
-		MEM, FILE, ROCKSDB_FULLY_ASYNC, ROCKSDB_INCREMENTAL, MEM_ASYNC, FILE_ASYNC
+		MEM, FILE, ROCKSDB_FULLY_ASYNC, ROCKSDB_INCREMENTAL, ROCKSDB_INCREMENTAL_ZK, MEM_ASYNC, FILE_ASYNC
 	}
 
-	@BeforeClass
-	public static void startTestCluster() {
+	@Before
+	public void startTestCluster() throws Exception {
+
+		// print a message when starting a test method to avoid Travis' <tt>"Maven produced no
+		// output for xxx seconds."</tt> messages
+		System.out.println(
+			"Starting " + getClass().getCanonicalName() + "#" + name.getMethodName() + ".");
+
+		// Testing HA Scenario / ZKCompletedCheckpointStore with incremental checkpoints
+		if (ROCKSDB_INCREMENTAL_ZK.equals(stateBackendEnum)) {
+			zkServer = new TestingServer();
+			zkServer.start();
+		}
+
+		TemporaryFolder temporaryFolder = new TemporaryFolder();
+		temporaryFolder.create();
+		final File haDir = temporaryFolder.newFolder();
+
 		Configuration config = new Configuration();
 		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
 		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM / 2);
@@ -113,28 +133,18 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 		// the default network buffers size (10% of heap max =~ 150MB) seems to much for this test case
 		config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 80L << 20); // 80 MB
 
+		if (zkServer != null) {
+			config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
+			config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString());
+			config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haDir.toURI().toString());
+		}
+
 		cluster = new LocalFlinkMiniCluster(config, false);
 		cluster.start();
 
 		env = new TestStreamEnvironment(cluster, PARALLELISM);
 		env.getConfig().setUseSnapshotCompression(true);
-	}
-
-	@AfterClass
-	public static void stopTestCluster() {
-		if (cluster != null) {
-			cluster.stop();
-		}
-	}
-
-	@Before
-	public void beforeTest() throws IOException {
-		// print a message when starting a test method to avoid Travis' <tt>"Maven produced no
-		// output for xxx seconds."</tt> messages
-		System.out.println(
-			"Starting " + getClass().getCanonicalName() + "#" + name.getMethodName() + ".");
 
-		// init state back-end
 		switch (stateBackendEnum) {
 			case MEM:
 				this.stateBackend = new MemoryStateBackend(MAX_MEM_STATE_SIZE, false);
@@ -159,7 +169,8 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 				this.stateBackend = rdb;
 				break;
 			}
-			case ROCKSDB_INCREMENTAL: {
+			case ROCKSDB_INCREMENTAL:
+			case ROCKSDB_INCREMENTAL_ZK: {
 				String rocksDb = tempFolder.newFolder().getAbsolutePath();
 				String backups = tempFolder.newFolder().getAbsolutePath();
 				// we use the fs backend with small threshold here to test the behaviour with file
@@ -173,16 +184,25 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 				this.stateBackend = rdb;
 				break;
 			}
-
+			default:
+				throw new IllegalStateException("No backend selected.");
 		}
 	}
 
-	/**
-	 * Prints a message when finishing a test method to avoid Travis' <tt>"Maven produced no output
-	 * for xxx seconds."</tt> messages.
-	 */
 	@After
-	public void afterTest() {
+	public void stopTestCluster() throws IOException {
+		if (cluster != null) {
+			cluster.stop();
+			cluster = null;
+		}
+
+		if (zkServer != null) {
+			zkServer.stop();
+			zkServer = null;
+		}
+
+		//Prints a message when finishing a test method to avoid Travis' <tt>"Maven produced no output
+		// for xxx seconds."</tt> messages.
 		System.out.println(
 			"Finished " + getClass().getCanonicalName() + "#" + name.getMethodName() + ".");
 	}


Mime
View raw message