flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srich...@apache.org
Subject [3/5] flink git commit: [FLINK-6537] [checkpoint] First set of fixes for (de)registration of shared state in incremental checkpoints
Date Sun, 14 May 2017 11:50:10 GMT
[FLINK-6537] [checkpoint] First set of fixes for (de)registration of shared state in incremental
checkpoints


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4745d0c0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4745d0c0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4745d0c0

Branch: refs/heads/master
Commit: 4745d0c0822ba1f1c32568d0c4869cb44fa35426
Parents: b54f448
Author: Stefan Richter <s.richter@data-artisans.com>
Authored: Wed May 10 17:59:39 2017 +0200
Committer: Stefan Richter <s.richter@data-artisans.com>
Committed: Sun May 14 13:49:50 2017 +0200

----------------------------------------------------------------------
 .../RocksDBIncrementalKeyedStateHandle.java     | 123 ++++++------
 .../state/RocksDBKeyedStateBackend.java         |  64 +++++-
 .../runtime/state/SharedStateRegistry.java      | 196 ++++++++++++++-----
 .../runtime/state/SharedStateRegistryKey.java   |  68 +++++++
 .../runtime/state/SharedStateRegistryTest.java  |  85 +++++---
 .../runtime/state/StateBackendTestBase.java     |  18 +-
 6 files changed, 397 insertions(+), 157 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4745d0c0/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalKeyedStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalKeyedStateHandle.java
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalKeyedStateHandle.java
index 5ac9e46..961182d 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalKeyedStateHandle.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalKeyedStateHandle.java
@@ -18,12 +18,11 @@
 
 package org.apache.flink.contrib.streaming.state;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.state.CompositeStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
-import org.apache.flink.runtime.state.SharedStateHandle;
 import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.SharedStateRegistryKey;
 import org.apache.flink.runtime.state.StateUtil;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.util.Preconditions;
@@ -54,17 +53,15 @@ public class RocksDBIncrementalKeyedStateHandle implements KeyedStateHandle,
Com
 
 	private static final long serialVersionUID = -8328808513197388231L;
 
-	private final JobID jobId;
-
 	private final String operatorIdentifier;
 
 	private final KeyGroupRange keyGroupRange;
 
 	private final long checkpointId;
 
-	private final Map<String, StreamStateHandle> newSstFiles;
+	private final Map<String, StreamStateHandle> unregisteredSstFiles;
 
-	private final Map<String, StreamStateHandle> oldSstFiles;
+	private final Map<String, StreamStateHandle> registeredSstFiles;
 
 	private final Map<String, StreamStateHandle> miscFiles;
 
@@ -81,21 +78,19 @@ public class RocksDBIncrementalKeyedStateHandle implements KeyedStateHandle,
Com
 	private boolean registered;
 
 	RocksDBIncrementalKeyedStateHandle(
-			JobID jobId,
 			String operatorIdentifier,
 			KeyGroupRange keyGroupRange,
 			long checkpointId,
-			Map<String, StreamStateHandle> newSstFiles,
-			Map<String, StreamStateHandle> oldSstFiles,
+			Map<String, StreamStateHandle> unregisteredSstFiles,
+			Map<String, StreamStateHandle> registeredSstFiles,
 			Map<String, StreamStateHandle> miscFiles,
 			StreamStateHandle metaStateHandle) {
 
-		this.jobId = Preconditions.checkNotNull(jobId);
 		this.operatorIdentifier = Preconditions.checkNotNull(operatorIdentifier);
 		this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange);
 		this.checkpointId = checkpointId;
-		this.newSstFiles = Preconditions.checkNotNull(newSstFiles);
-		this.oldSstFiles = Preconditions.checkNotNull(oldSstFiles);
+		this.unregisteredSstFiles = Preconditions.checkNotNull(unregisteredSstFiles);
+		this.registeredSstFiles = Preconditions.checkNotNull(registeredSstFiles);
 		this.miscFiles = Preconditions.checkNotNull(miscFiles);
 		this.metaStateHandle = Preconditions.checkNotNull(metaStateHandle);
 		this.registered = false;
@@ -110,12 +105,12 @@ public class RocksDBIncrementalKeyedStateHandle implements KeyedStateHandle,
Com
 		return checkpointId;
 	}
 
-	Map<String, StreamStateHandle> getNewSstFiles() {
-		return newSstFiles;
+	Map<String, StreamStateHandle> getUnregisteredSstFiles() {
+		return unregisteredSstFiles;
 	}
 
-	Map<String, StreamStateHandle> getOldSstFiles() {
-		return oldSstFiles;
+	Map<String, StreamStateHandle> getRegisteredSstFiles() {
+		return registeredSstFiles;
 	}
 
 	Map<String, StreamStateHandle> getMiscFiles() {
@@ -138,6 +133,8 @@ public class RocksDBIncrementalKeyedStateHandle implements KeyedStateHandle,
Com
 	@Override
 	public void discardState() throws Exception {
 
+		Preconditions.checkState(!registered, "Attempt to dispose a registered composite state
with registered shared state. Must unregister first.");
+
 		try {
 			metaStateHandle.discardState();
 		} catch (Exception e) {
@@ -150,24 +147,23 @@ public class RocksDBIncrementalKeyedStateHandle implements KeyedStateHandle,
Com
 			LOG.warn("Could not properly discard misc file states.", e);
 		}
 
-		if (!registered) {
-			try {
-				StateUtil.bestEffortDiscardAllStateObjects(newSstFiles.values());
-			} catch (Exception e) {
-				LOG.warn("Could not properly discard new sst file states.", e);
-			}
+		try {
+			StateUtil.bestEffortDiscardAllStateObjects(unregisteredSstFiles.values());
+		} catch (Exception e) {
+			LOG.warn("Could not properly discard new sst file states.", e);
 		}
+
 	}
 
 	@Override
 	public long getStateSize() {
 		long size = StateUtil.getStateSize(metaStateHandle);
 
-		for (StreamStateHandle newSstFileHandle : newSstFiles.values()) {
+		for (StreamStateHandle newSstFileHandle : unregisteredSstFiles.values()) {
 			size += newSstFileHandle.getStateSize();
 		}
 
-		for (StreamStateHandle oldSstFileHandle : oldSstFiles.values()) {
+		for (StreamStateHandle oldSstFileHandle : registeredSstFiles.values()) {
 			size += oldSstFileHandle.getStateSize();
 		}
 
@@ -180,69 +176,66 @@ public class RocksDBIncrementalKeyedStateHandle implements KeyedStateHandle,
Com
 
 	@Override
 	public void registerSharedStates(SharedStateRegistry stateRegistry) {
+
 		Preconditions.checkState(!registered, "The state handle has already registered its shared
states.");
 
-		for (Map.Entry<String, StreamStateHandle> newSstFileEntry : newSstFiles.entrySet())
{
-			SstFileStateHandle stateHandle = new SstFileStateHandle(newSstFileEntry.getKey(), newSstFileEntry.getValue());
+		for (Map.Entry<String, StreamStateHandle> newSstFileEntry : unregisteredSstFiles.entrySet())
{
+			SharedStateRegistryKey registryKey =
+				createSharedStateRegistryKeyFromFileName(newSstFileEntry.getKey());
 
-			int referenceCount = stateRegistry.register(stateHandle);
-			Preconditions.checkState(referenceCount == 1);
+			SharedStateRegistry.Result result =
+				stateRegistry.registerNewReference(registryKey, newSstFileEntry.getValue());
+
+			// We update our reference with the result from the registry, to prevent the following
+			// problem:
+			// A previous checkpoint n has already registered the state. This can happen if a
+			// following checkpoint (n + x) wants to reference the same state before the backend got
+			// notified that checkpoint n completed. In this case, the shared registry did
+			// deduplication and returns the previous reference.
+			newSstFileEntry.setValue(result.getReference());
 		}
 
-		for (Map.Entry<String, StreamStateHandle> oldSstFileEntry : oldSstFiles.entrySet())
{
-			SstFileStateHandle stateHandle = new SstFileStateHandle(oldSstFileEntry.getKey(), oldSstFileEntry.getValue());
+		for (Map.Entry<String, StreamStateHandle> oldSstFileName : registeredSstFiles.entrySet())
{
+			SharedStateRegistryKey registryKey =
+				createSharedStateRegistryKeyFromFileName(oldSstFileName.getKey());
+
+			SharedStateRegistry.Result result = stateRegistry.obtainReference(registryKey);
 
-			int referenceCount = stateRegistry.register(stateHandle);
-			Preconditions.checkState(referenceCount > 1);
+			// Again we update our state handle with the result from the registry, thus replacing
+			// placeholder state handles with the originals.
+			oldSstFileName.setValue(result.getReference());
 		}
 
+		// Migrate state from unregistered to registered, so that it will not count as private
state
+		// for #discardState() from now.
+		registeredSstFiles.putAll(unregisteredSstFiles);
+		unregisteredSstFiles.clear();
+
 		registered = true;
 	}
 
 	@Override
 	public void unregisterSharedStates(SharedStateRegistry stateRegistry) {
+
 		Preconditions.checkState(registered, "The state handle has not registered its shared states
yet.");
 
-		for (Map.Entry<String, StreamStateHandle> newSstFileEntry : newSstFiles.entrySet())
{
-			stateRegistry.unregister(new SstFileStateHandle(newSstFileEntry.getKey(), newSstFileEntry.getValue()));
+		for (Map.Entry<String, StreamStateHandle> newSstFileEntry : unregisteredSstFiles.entrySet())
{
+			SharedStateRegistryKey registryKey =
+				createSharedStateRegistryKeyFromFileName(newSstFileEntry.getKey());
+			stateRegistry.releaseReference(registryKey);
 		}
 
-		for (Map.Entry<String, StreamStateHandle> oldSstFileEntry : oldSstFiles.entrySet())
{
-			stateRegistry.unregister(new SstFileStateHandle(oldSstFileEntry.getKey(), oldSstFileEntry.getValue()));
+		for (Map.Entry<String, StreamStateHandle> oldSstFileEntry : registeredSstFiles.entrySet())
{
+			SharedStateRegistryKey registryKey =
+				createSharedStateRegistryKeyFromFileName(oldSstFileEntry.getKey());
+			stateRegistry.releaseReference(registryKey);
 		}
 
 		registered = false;
 	}
 
-	private class SstFileStateHandle implements SharedStateHandle {
-
-		private static final long serialVersionUID = 9092049285789170669L;
-
-		private final String fileName;
-
-		private final StreamStateHandle delegateStateHandle;
-
-		private SstFileStateHandle(
-				String fileName,
-				StreamStateHandle delegateStateHandle) {
-			this.fileName = fileName;
-			this.delegateStateHandle = delegateStateHandle;
-		}
-
-		@Override
-		public String getRegistrationKey() {
-			return jobId + "-" + operatorIdentifier + "-" + keyGroupRange + "-" + fileName;
-		}
-
-		@Override
-		public void discardState() throws Exception {
-			delegateStateHandle.discardState();
-		}
-
-		@Override
-		public long getStateSize() {
-			return delegateStateHandle.getStateSize();
-		}
+	private SharedStateRegistryKey createSharedStateRegistryKeyFromFileName(String fileName)
{
+		return new SharedStateRegistryKey(operatorIdentifier + "-" + keyGroupRange + "-" + fileName);
 	}
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4745d0c0/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 6af53c3..1080e59 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -54,7 +54,6 @@ import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import org.apache.flink.runtime.state.StateMigrationUtil;
 import org.apache.flink.runtime.state.DoneFuture;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
@@ -62,9 +61,10 @@ import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
 import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
+import org.apache.flink.runtime.state.StateMigrationUtil;
 import org.apache.flink.runtime.state.StateObject;
 import org.apache.flink.runtime.state.StateUtil;
-import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.internal.InternalAggregatingState;
 import org.apache.flink.runtime.state.internal.InternalFoldingState;
@@ -72,6 +72,7 @@ import org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.runtime.state.internal.InternalMapState;
 import org.apache.flink.runtime.state.internal.InternalReducingState;
 import org.apache.flink.runtime.state.internal.InternalValueState;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.IOUtils;
@@ -709,16 +710,22 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 
 	private static final class RocksDBIncrementalSnapshotOperation {
 
+		/** The backend which we snapshot */
 		private final RocksDBKeyedStateBackend<?> stateBackend;
 
+		/** Stream factory that creates the outpus streams to DFS */
 		private final CheckpointStreamFactory checkpointStreamFactory;
 
+		/** Id for the current checkpoint */
 		private final long checkpointId;
 
+		/** Timestamp for the current checkpoint */
 		private final long checkpointTimestamp;
 
+		/** All sst files that were part of the last previously completed checkpoint */
 		private Map<String, StreamStateHandle> baseSstFiles;
 
+		/** The state meta data */
 		private final List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots
= new ArrayList<>();
 
 		private FileSystem backupFileSystem;
@@ -864,10 +871,11 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 
 						if (fileHandle == null) {
 							fileHandle = materializeStateData(filePath);
-
 							newSstFiles.put(fileName, fileHandle);
 						} else {
-							oldSstFiles.put(fileName, fileHandle);
+							// we introduce a placeholder state handle, that is replaced with the
+							// original from the shared state registry (created from a previous checkpoint)
+							oldSstFiles.put(fileName, new PlaceholderStreamStateHandle(fileHandle.getStateSize()));
 						}
 					} else {
 						StreamStateHandle fileHandle = materializeStateData(filePath);
@@ -882,9 +890,14 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 
 			stateBackend.materializedSstFiles.put(checkpointId, sstFiles);
 
-			return new RocksDBIncrementalKeyedStateHandle(stateBackend.jobId,
-				stateBackend.operatorIdentifier, stateBackend.keyGroupRange,
-				checkpointId, newSstFiles, oldSstFiles, miscFiles, metaStateHandle);
+			return new RocksDBIncrementalKeyedStateHandle(
+				stateBackend.operatorIdentifier,
+				stateBackend.keyGroupRange,
+				checkpointId,
+				newSstFiles,
+				oldSstFiles,
+				miscFiles,
+				metaStateHandle);
 		}
 
 		void stop() {
@@ -922,6 +935,39 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 				}
 			}
 		}
+
+		/**
+		 * A placeholder state handle for shared state that will replaced by an original that was
+		 * created in a previous checkpoint. So we don't have to send the handle twice, e.g. in
+		 * case of {@link ByteStreamStateHandle}.
+		 */
+		private static final class PlaceholderStreamStateHandle implements StreamStateHandle {
+
+			private static final long serialVersionUID = 1L;
+
+			/** We remember the size of the original file for which this is a placeholder */
+			private final long originalSize;
+
+			public PlaceholderStreamStateHandle(long originalSize) {
+				this.originalSize = originalSize;
+			}
+
+			@Override
+			public FSDataInputStream openInputStream() {
+				throw new UnsupportedOperationException(
+					"This is only a placeholder to be replaced by a real StreamStateHandle in the checkpoint
coordinator.");
+			}
+
+			@Override
+			public void discardState() throws Exception {
+				// nothing to do.
+			}
+
+			@Override
+			public long getStateSize() {
+				return originalSize;
+			}
+		}
 	}
 
 	@Override
@@ -1260,7 +1306,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 				UUID.randomUUID().toString());
 
 			try {
-				Map<String, StreamStateHandle> newSstFiles = restoreStateHandle.getNewSstFiles();
+				Map<String, StreamStateHandle> newSstFiles = restoreStateHandle.getUnregisteredSstFiles();
 				for (Map.Entry<String, StreamStateHandle> newSstFileEntry : newSstFiles.entrySet())
{
 					String fileName = newSstFileEntry.getKey();
 					StreamStateHandle remoteFileHandle = newSstFileEntry.getValue();
@@ -1268,7 +1314,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 					readStateData(new Path(restoreInstancePath, fileName), remoteFileHandle);
 				}
 
-				Map<String, StreamStateHandle> oldSstFiles = restoreStateHandle.getOldSstFiles();
+				Map<String, StreamStateHandle> oldSstFiles = restoreStateHandle.getRegisteredSstFiles();
 				for (Map.Entry<String, StreamStateHandle> oldSstFileEntry : oldSstFiles.entrySet())
{
 					String fileName = oldSstFileEntry.getKey();
 					StreamStateHandle remoteFileHandle = oldSstFileEntry.getValue();

http://git-wip-us.apache.org/repos/asf/flink/blob/4745d0c0/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
index 2cb43ac..dbf4642 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
@@ -18,91 +18,137 @@
 
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.Executor;
 
 /**
  * A {@code SharedStateRegistry} will be deployed in the 
- * {@link org.apache.flink.runtime.checkpoint.CheckpointCoordinator} to 
+ * {@link org.apache.flink.runtime.checkpoint.CompletedCheckpointStore} to
  * maintain the reference count of {@link SharedStateHandle}s which are shared
- * among different checkpoints.
- *
+ * among different incremental checkpoints.
  */
 public class SharedStateRegistry {
 
 	private static final Logger LOG = LoggerFactory.getLogger(SharedStateRegistry.class);
 
 	/** All registered state objects by an artificial key */
-	private final Map<String, SharedStateRegistry.SharedStateEntry> registeredStates;
+	private final Map<SharedStateRegistryKey, SharedStateRegistry.SharedStateEntry> registeredStates;
+
+	/** Executor for async state deletion */
+	private final Executor asyncDisposalExecutor;
 
 	public SharedStateRegistry() {
 		this.registeredStates = new HashMap<>();
+		this.asyncDisposalExecutor = Executors.directExecutor(); //TODO: FLINK-6534
 	}
 
 	/**
-	 * Register a reference to the given shared state in the registry. This increases the reference
-	 * count for the this shared state by one. Returns the reference count after the update.
+	 * Register a reference to the given (supposedly new) shared state in the registry.
+	 * This does the following: We check if the state handle is actually new by the
+	 * registrationKey. If it is new, we register it with a reference count of 1. If there is
+	 * already a state handle registered under the given key, we dispose the given "new" state
+	 * handle, uptick the reference count of the previously existing state handle and return
it as
+	 * a replacement with the result.
+	 *
+	 * <p>IMPORTANT: caller should check the state handle returned by the result, because
the
+	 * registry is performing deduplication and could potentially return a handle that is supposed
+	 * to replace the one from the registration request.
 	 *
 	 * @param state the shared state for which we register a reference.
-	 * @return the updated reference count for the given shared state.
+	 * @return the result of this registration request, consisting of the state handle that
is
+	 * registered under the key by the end of the oepration and its current reference count.
 	 */
-	public int register(SharedStateHandle state) {
-		if (state == null) {
-			return 0;
-		}
+	public Result registerNewReference(SharedStateRegistryKey registrationKey, StreamStateHandle
state) {
+
+		Preconditions.checkNotNull(state);
+
+		StreamStateHandle scheduledStateDeletion = null;
+		SharedStateRegistry.SharedStateEntry entry;
 
 		synchronized (registeredStates) {
-			SharedStateRegistry.SharedStateEntry entry =
-				registeredStates.get(state.getRegistrationKey());
+			entry = registeredStates.get(registrationKey);
 
 			if (entry == null) {
-				SharedStateRegistry.SharedStateEntry stateEntry =
-					new SharedStateRegistry.SharedStateEntry(state);
-				registeredStates.put(state.getRegistrationKey(), stateEntry);
-				return 1;
+				entry = new SharedStateRegistry.SharedStateEntry(state);
+				registeredStates.put(registrationKey, entry);
 			} else {
+				// delete if this is a real duplicate
+				if (!Objects.equals(state, entry.state)) {
+					scheduledStateDeletion = state;
+				}
 				entry.increaseReferenceCount();
-				return entry.getReferenceCount();
 			}
 		}
+
+		scheduleAsyncDelete(scheduledStateDeletion);
+		return new Result(entry);
 	}
 
 	/**
-	 * Unregister one reference to the given shared state in the registry. This decreases the
-	 * reference count by one. Once the count reaches zero, the shared state is deleted.
+	 * Obtains one reference to the given shared state in the registry. This increases the
+	 * reference count by one.
 	 *
-	 * @param state the shared state for which we unregister a reference.
-	 * @return the reference count for the shared state after the update.
+	 * @param registrationKey the shared state for which we obtain a reference.
+	 * @return the shared state for which we release a reference.
+	 * @return the result of the request, consisting of the reference count after this operation
+	 * and the state handle.
 	 */
-	public int unregister(SharedStateHandle state) {
-		if (state == null) {
-			return 0;
+	public Result obtainReference(SharedStateRegistryKey registrationKey) {
+
+		Preconditions.checkNotNull(registrationKey);
+
+		synchronized (registeredStates) {
+			SharedStateRegistry.SharedStateEntry entry =
+				Preconditions.checkNotNull(registeredStates.get(registrationKey),
+					"Could not find a state for the given registration key!");
+			entry.increaseReferenceCount();
+			return new Result(entry);
 		}
+	}
+
+	/**
+	 * Releases one reference to the given shared state in the registry. This decreases the
+	 * reference count by one. Once the count reaches zero, the shared state is deleted.
+	 *
+	 * @param registrationKey the shared state for which we release a reference.
+	 * @return the result of the request, consisting of the reference count after this operation
+	 * and the state handle, or null if the state handle was deleted through this request.
+	 */
+	public Result releaseReference(SharedStateRegistryKey registrationKey) {
+
+		Preconditions.checkNotNull(registrationKey);
+
+		final Result result;
+		final StreamStateHandle scheduledStateDeletion;
 
 		synchronized (registeredStates) {
-			SharedStateRegistry.SharedStateEntry entry = registeredStates.get(state.getRegistrationKey());
+			SharedStateRegistry.SharedStateEntry entry = registeredStates.get(registrationKey);
 
-			Preconditions.checkState(entry != null, "Cannot unregister a state that is not registered.");
+			Preconditions.checkState(entry != null,
+				"Cannot unregister a state that is not registered.");
 
 			entry.decreaseReferenceCount();
 
-			final int newReferenceCount = entry.getReferenceCount();
-
 			// Remove the state from the registry when it's not referenced any more.
-			if (newReferenceCount <= 0) {
-				registeredStates.remove(state.getRegistrationKey());
-				try {
-					entry.getState().discardState();
-				} catch (Exception e) {
-					LOG.warn("Cannot properly discard the state {}.", entry.getState(), e);
-				}
+			if (entry.getReferenceCount() <= 0) {
+				registeredStates.remove(registrationKey);
+				scheduledStateDeletion = entry.getState();
+				result = new Result(null, 0);
+			} else {
+				scheduledStateDeletion = null;
+				result = new Result(entry);
 			}
-			return newReferenceCount;
 		}
+
+		scheduleAsyncDelete(scheduledStateDeletion);
+		return result;
 	}
 
 	/**
@@ -122,8 +168,6 @@ public class SharedStateRegistry {
 		}
 	}
 
-
-
 	/**
 	 * Unregister all the shared states referenced by the given.
 	 *
@@ -141,20 +185,30 @@ public class SharedStateRegistry {
 		}
 	}
 
+	private void scheduleAsyncDelete(StreamStateHandle streamStateHandle) {
+		if (streamStateHandle != null) {
+			asyncDisposalExecutor.execute(
+				new SharedStateRegistry.AsyncDisposalRunnable(streamStateHandle));
+		}
+	}
+
+	/**
+	 * An entry in the registry, tracking the handle and the corresponding reference count.
+	 */
 	private static class SharedStateEntry {
 
-		/** The shared object */
-		private final SharedStateHandle state;
+		/** The shared state handle */
+		private final StreamStateHandle state;
 
-		/** The reference count of the object */
+		/** The current reference count of the state handle */
 		private int referenceCount;
 
-		SharedStateEntry(SharedStateHandle value) {
+		SharedStateEntry(StreamStateHandle value) {
 			this.state = value;
 			this.referenceCount = 1;
 		}
 
-		SharedStateHandle getState() {
+		StreamStateHandle getState() {
 			return state;
 		}
 
@@ -171,14 +225,56 @@ public class SharedStateRegistry {
 		}
 	}
 
-	public int getReferenceCount(SharedStateHandle state) {
-		if (state == null) {
-			return 0;
+	/**
+	 * The result of an attempt to (un)/reference state
+	 */
+	public static class Result {
+
+		/** The (un)registered state handle from the request */
+		private final StreamStateHandle reference;
+
+		/** The reference count to the state handle after the request to (un)register */
+		private final int referenceCount;
+
+		private Result(SharedStateEntry sharedStateEntry) {
+			this.reference = sharedStateEntry.getState();
+			this.referenceCount = sharedStateEntry.getReferenceCount();
 		}
 
-		SharedStateRegistry.SharedStateEntry entry =
-			registeredStates.get(state.getRegistrationKey());
+		public Result(StreamStateHandle reference, int referenceCount) {
+			Preconditions.checkArgument(referenceCount >= 0);
 
-		return entry == null ? 0 : entry.getReferenceCount();
+			this.reference = reference;
+			this.referenceCount = referenceCount;
+		}
+
+		public StreamStateHandle getReference() {
+			return reference;
+		}
+
+		public int getReferenceCount() {
+			return referenceCount;
+		}
+	}
+
+	/**
+	 * Encapsulates the operation the delete state handles asynchronously.
+	 */
+	private static final class AsyncDisposalRunnable implements Runnable {
+
+		private final StateObject toDispose;
+
+		public AsyncDisposalRunnable(StateObject toDispose) {
+			this.toDispose = Preconditions.checkNotNull(toDispose);
+		}
+
+		@Override
+		public void run() {
+			try {
+				toDispose.discardState();
+			} catch (Exception e) {
+				LOG.warn("A problem occurred during asynchronous disposal of a shared state object: {}",
toDispose, e);
+			}
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4745d0c0/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryKey.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryKey.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryKey.java
new file mode 100644
index 0000000..9e59359
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryKey.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * This class represents a key that uniquely identifies (on a logical level) state handles
for
+ * registration in the {@link SharedStateRegistry}. Two files which should logically
+ * be the same should have the same {@link SharedStateRegistryKey}. The meaning of logical
+ * equivalence is up to the application.
+ */
+public class SharedStateRegistryKey implements Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	/** Uses a String as internal representation */
+	private final String keyString;
+
+	public SharedStateRegistryKey(String keyString) {
+		this.keyString = Preconditions.checkNotNull(keyString);
+	}
+
+	public String getKeyString() {
+		return keyString;
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+
+		SharedStateRegistryKey that = (SharedStateRegistryKey) o;
+		return keyString.equals(that.keyString);
+	}
+
+	@Override
+	public int hashCode() {
+		return keyString.hashCode();
+	}
+
+	@Override
+	public String toString() {
+		return keyString;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4745d0c0/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java
index 821bb69..03e2a13 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java
@@ -19,9 +19,14 @@
 
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.core.fs.FSDataInputStream;
 import org.junit.Test;
 
+import java.io.IOException;
+
+import static junit.framework.TestCase.assertFalse;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class SharedStateRegistryTest {
 
@@ -30,24 +35,50 @@ public class SharedStateRegistryTest {
 	 */
 	@Test
 	public void testRegistryNormal() {
+
 		SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
 
 		// register one state
 		TestSharedState firstState = new TestSharedState("first");
-		assertEquals(1, sharedStateRegistry.register(firstState));
+		SharedStateRegistry.Result result = sharedStateRegistry.registerNewReference(firstState.getRegistrationKey(),
firstState);
+		assertEquals(1, result.getReferenceCount());
+		assertTrue(firstState == result.getReference());
+		assertFalse(firstState.isDiscarded());
 
 		// register another state
 		TestSharedState secondState = new TestSharedState("second");
-		assertEquals(1, sharedStateRegistry.register(secondState));
-
-		// register the first state again
-		assertEquals(2, sharedStateRegistry.register(firstState));
+		result = sharedStateRegistry.registerNewReference(secondState.getRegistrationKey(), secondState);
+		assertEquals(1, result.getReferenceCount());
+		assertTrue(secondState == result.getReference());
+		assertFalse(firstState.isDiscarded());
+		assertFalse(secondState.isDiscarded());
+
+		// attempt to register state under an existing key
+		TestSharedState firstStatePrime = new TestSharedState(firstState.getRegistrationKey().getKeyString());
+		result = sharedStateRegistry.registerNewReference(firstState.getRegistrationKey(), firstStatePrime);
+		assertEquals(2, result.getReferenceCount());
+		assertFalse(firstStatePrime == result.getReference());
+		assertTrue(firstState == result.getReference());
+		assertTrue(firstStatePrime.isDiscarded());
+		assertFalse(firstState.isDiscarded());
+
+		// reference the first state again
+		result = sharedStateRegistry.obtainReference(firstState.getRegistrationKey());
+		assertEquals(3, result.getReferenceCount());
+		assertTrue(firstState == result.getReference());
+		assertFalse(firstState.isDiscarded());
 
 		// unregister the second state
-		assertEquals(0, sharedStateRegistry.unregister(secondState));
+		result = sharedStateRegistry.releaseReference(secondState.getRegistrationKey());
+		assertEquals(0, result.getReferenceCount());
+		assertTrue(result.getReference() == null);
+		assertTrue(secondState.isDiscarded());
 
 		// unregister the first state
-		assertEquals(1, sharedStateRegistry.unregister(firstState));
+		result = sharedStateRegistry.releaseReference(firstState.getRegistrationKey());
+		assertEquals(2, result.getReferenceCount());
+		assertTrue(firstState == result.getReference());
+		assertFalse(firstState.isDiscarded());
 	}
 
 	/**
@@ -56,51 +87,47 @@ public class SharedStateRegistryTest {
 	@Test(expected = IllegalStateException.class)
 	public void testUnregisterWithUnexistedKey() {
 		SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
-
-		sharedStateRegistry.unregister(new TestSharedState("unexisted"));
+		sharedStateRegistry.releaseReference(new SharedStateRegistryKey("non-existent"));
 	}
 
-	private static class TestSharedState implements SharedStateHandle {
+	private static class TestSharedState implements StreamStateHandle {
 		private static final long serialVersionUID = 4468635881465159780L;
 
-		private String key;
+		private SharedStateRegistryKey key;
+
+		private boolean discarded;
 
 		TestSharedState(String key) {
-			this.key = key;
+			this.key = new SharedStateRegistryKey(key);
+			this.discarded = false;
 		}
 
-		@Override
-		public String getRegistrationKey() {
+		public SharedStateRegistryKey getRegistrationKey() {
 			return key;
 		}
 
 		@Override
 		public void discardState() throws Exception {
-			// nothing to do
+			this.discarded = true;
 		}
 
 		@Override
 		public long getStateSize() {
-			return key.length();
+			return key.toString().length();
 		}
 
 		@Override
-		public boolean equals(Object o) {
-			if (this == o) {
-				return true;
-			}
-			if (o == null || getClass() != o.getClass()) {
-				return false;
-			}
-
-			TestSharedState testState = (TestSharedState) o;
-
-			return key.equals(testState.key);
+		public int hashCode() {
+			return key.hashCode();
 		}
 
 		@Override
-		public int hashCode() {
-			return key.hashCode();
+		public FSDataInputStream openInputStream() throws IOException {
+			throw new UnsupportedOperationException();
+		}
+
+		public boolean isDiscarded() {
+			return discarded;
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4745d0c0/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 658ccde..ca66ffb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -482,6 +482,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend>
exten
 	@SuppressWarnings("unchecked")
 	public void testKryoRegisteringRestoreResilienceWithDefaultSerializer() throws Exception
{
 		CheckpointStreamFactory streamFactory = createStreamFactory();
+		SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
 		Environment env = new DummyEnvironment("test", 1, 0);
 		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE,
env);
 
@@ -509,6 +510,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend>
exten
 				streamFactory,
 				CheckpointOptions.forFullCheckpoint()));
 
+		snapshot.registerSharedStates(sharedStateRegistry);
 		backend.dispose();
 
 		// ========== restore snapshot - should use default serializer (ONLY SERIALIZATION) ==========
@@ -518,8 +520,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend>
exten
 
 		backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, env);
 
-		snapshot.discardState();
-
 		// re-initialize to ensure that we create the KryoSerializer from scratch, otherwise
 		// initializeSerializerUnlessSet would not pick up our new config
 		kvId = new ValueStateDescriptor<>("id", pojoType);
@@ -536,6 +536,11 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend>
exten
 				streamFactory,
 				CheckpointOptions.forFullCheckpoint()));
 
+		snapshot2.registerSharedStates(sharedStateRegistry);
+
+		snapshot.unregisterSharedStates(sharedStateRegistry);
+		snapshot.discardState();
+
 		backend.dispose();
 
 		// ========= restore snapshot - should use default serializer (FAIL ON DESERIALIZATION)
=========
@@ -570,6 +575,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend>
exten
 	@Test
 	public void testKryoRegisteringRestoreResilienceWithRegisteredSerializer() throws Exception
{
 		CheckpointStreamFactory streamFactory = createStreamFactory();
+		SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
 		Environment env = new DummyEnvironment("test", 1, 0);
 
 		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE,
env);
@@ -597,6 +603,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend>
exten
 				streamFactory,
 				CheckpointOptions.forFullCheckpoint()));
 
+		snapshot.registerSharedStates(sharedStateRegistry);
 		backend.dispose();
 
 		// ========== restore snapshot - should use specific serializer (ONLY SERIALIZATION) ==========
@@ -605,8 +612,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend>
exten
 
 		backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, env);
 
-		snapshot.discardState();
-
 		// re-initialize to ensure that we create the KryoSerializer from scratch, otherwise
 		// initializeSerializerUnlessSet would not pick up our new config
 		kvId = new ValueStateDescriptor<>("id", pojoType);
@@ -623,6 +628,11 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend>
exten
 				streamFactory,
 				CheckpointOptions.forFullCheckpoint()));
 
+		snapshot2.registerSharedStates(sharedStateRegistry);
+
+		snapshot.unregisterSharedStates(sharedStateRegistry);
+		snapshot.discardState();
+
 		backend.dispose();
 
 		// ========= restore snapshot - should use specific serializer (FAIL ON DESERIALIZATION)
=========


Mime
View raw message