flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [09/16] flink git commit: [FLINK-3201] Enhance Partitioned State Interface with State Types
Date Wed, 03 Feb 2016 20:12:28 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericReducingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericReducingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericReducingState.java
new file mode 100644
index 0000000..1181c66
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericReducingState.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+/**
+ * Generic implementation of {@link ReducingState} based on a wrapped {@link ValueState}.
+ *
+ * @param <K> The type of the key.
+ * @param <N> The type of the namespace.
+ * @param <T> The type of the values stored in this {@code ReducingState}.
+ * @param <Backend> The type of {@link AbstractStateBackend} that manages this {@code KvState}.
+ * @param <W> Generic type that extends both the underlying {@code ValueState} and {@code KvState}.
+ */
+public class GenericReducingState<K, N, T, Backend extends AbstractStateBackend, W extends ValueState<T> & KvState<K, N, ValueState<T>, ValueStateDescriptor<T>, Backend>>
+	implements ReducingState<T>, KvState<K, N, ReducingState<T>, ReducingStateDescriptor<T>, Backend> {
+
+	private final W wrappedState;
+	private final ReduceFunction<T> reduceFunction;
+
+	@SuppressWarnings("unchecked")
+	public GenericReducingState(ValueState<T> wrappedState, ReduceFunction<T> reduceFunction) {
+		if (!(wrappedState instanceof KvState)) {
+			throw new IllegalArgumentException("Wrapped state must be a KvState.");
+		}
+		this.wrappedState = (W) wrappedState;
+		this.reduceFunction = reduceFunction;
+	}
+
+	@Override
+	public void setCurrentKey(K key) {
+		wrappedState.setCurrentKey(key);
+	}
+
+	@Override
+	public void setCurrentNamespace(N namespace) {
+		wrappedState.setCurrentNamespace(namespace);
+	}
+
+	@Override
+	public KvStateSnapshot<K, N, ReducingState<T>, ReducingStateDescriptor<T>, Backend> snapshot(
+		long checkpointId,
+		long timestamp) throws Exception {
+		KvStateSnapshot<K, N, ValueState<T>, ValueStateDescriptor<T>, Backend> wrappedSnapshot = wrappedState.snapshot(
+			checkpointId,
+			timestamp);
+		return new Snapshot<>(wrappedSnapshot, reduceFunction);
+	}
+
+	@Override
+	public void dispose() {
+		wrappedState.dispose();
+	}
+
+	@Override
+	public T get() throws Exception {
+		return wrappedState.value();
+	}
+
+	@Override
+	public void add(T value) throws Exception {
+		T currentValue = wrappedState.value();
+		if (currentValue == null) {
+			wrappedState.update(value);
+		} else {
+			wrappedState.update(reduceFunction.reduce(currentValue, value));
+		}
+	}
+
+	@Override
+	public void clear() {
+		wrappedState.clear();
+	}
+
+	private static class Snapshot<K, N, T, Backend extends AbstractStateBackend> implements KvStateSnapshot<K, N, ReducingState<T>, ReducingStateDescriptor<T>, Backend> {
+		private static final long serialVersionUID = 1L;
+
+		private final KvStateSnapshot<K, N, ValueState<T>, ValueStateDescriptor<T>, Backend> wrappedSnapshot;
+
+		private final ReduceFunction<T> reduceFunction;
+
+		public Snapshot(KvStateSnapshot<K, N, ValueState<T>, ValueStateDescriptor<T>, Backend> wrappedSnapshot,
+			ReduceFunction<T> reduceFunction) {
+			this.wrappedSnapshot = wrappedSnapshot;
+			this.reduceFunction = reduceFunction;
+		}
+
+		@Override
+		@SuppressWarnings("unchecked")
+		public KvState<K, N, ReducingState<T>, ReducingStateDescriptor<T>, Backend> restoreState(
+			Backend stateBackend,
+			TypeSerializer<K> keySerializer,
+			ClassLoader classLoader,
+			long recoveryTimestamp) throws Exception {
+			return new GenericReducingState((ValueState<T>) wrappedSnapshot.restoreState(stateBackend, keySerializer, classLoader, recoveryTimestamp), reduceFunction);
+		}
+
+		@Override
+		public void discardState() throws Exception {
+			wrappedSnapshot.discardState();
+		}
+
+		@Override
+		public long getStateSize() throws Exception {
+			return wrappedSnapshot.getStateSize();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvState.java
index ef2c882..7a97dc0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvState.java
@@ -18,7 +18,8 @@
 
 package org.apache.flink.runtime.state;
 
-import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
 
 /**
  * Key/Value state implementation for user-defined state. The state is backed by a state
@@ -29,19 +30,28 @@ import org.apache.flink.api.common.state.OperatorState;
  * metadata of what is considered part of the checkpoint.
  * 
  * @param <K> The type of the key.
- * @param <V> The type of the value.
+ * @param <N> The type of the namespace.
+ * @param <S> The type of {@link State} this {@code KvState} holds.
+ * @param <SD> The type of the {@link StateDescriptor} for state {@code S}.
+ * @param <Backend> The type of {@link AbstractStateBackend} that manages this {@code KvState}.
  */
-public interface KvState<K, V, Backend extends StateBackend<Backend>> extends OperatorState<V> {
+public interface KvState<K, N, S extends State, SD extends StateDescriptor<S>, Backend extends AbstractStateBackend> {
 
 	/**
-	 * Sets the current key, which will be used to retrieve values for the next calls to
-	 * {@link #value()} and {@link #update(Object)}.
-	 * 
+	 * Sets the current key, which will be used when using the state access methods.
+	 *
 	 * @param key The key.
 	 */
 	void setCurrentKey(K key);
 
 	/**
+	 * Sets the current namespace, which will be used when using the state access methods.
+	 *
+	 * @param namespace The namespace.
+	 */
+	void setCurrentNamespace(N namespace);
+
+	/**
 	 * Creates a snapshot of this state.
 	 * 
 	 * @param checkpointId The ID of the checkpoint for which the snapshot should be created.
@@ -51,16 +61,7 @@ public interface KvState<K, V, Backend extends StateBackend<Backend>> extends Op
 	 * @throws Exception Exceptions during snapshotting the state should be forwarded, so the system
 	 *                   can react to failed snapshots.
 	 */
-	KvStateSnapshot<K, V, Backend> snapshot(long checkpointId, long timestamp) throws Exception;
-
-	/**
-	 * Gets the number of key/value pairs currently stored in the state. Note that is a key
-	 * has been associated with "null", the key is removed from the state an will not
-	 * be counted here.
-	 *
-	 * @return The number of key/value pairs currently stored in the state.
-	 */
-	int size();
+	KvStateSnapshot<K, N, S, SD, Backend> snapshot(long checkpointId, long timestamp) throws Exception;
 
 	/**
 	 * Disposes the key/value state, releasing all occupied resources.

http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvStateSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvStateSnapshot.java
index 682c093..ce72135 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvStateSnapshot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvStateSnapshot.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 
 /**
@@ -32,10 +34,12 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
  * a file and this snapshot object contains a pointer to that file.
  *
  * @param <K> The type of the key
- * @param <V> The type of the value
+ * @param <N> The type of the namespace
+ * @param <S> The type of the {@link State}
+ * @param <SD> The type of the {@link StateDescriptor}
  * @param <Backend> The type of the backend that can restore the state from this snapshot.
  */
-public interface KvStateSnapshot<K, V, Backend extends StateBackend<Backend>> extends java.io.Serializable {
+public interface KvStateSnapshot<K, N, S extends State, SD extends StateDescriptor<S>, Backend extends AbstractStateBackend> extends java.io.Serializable {
 
 	/**
 	 * Loads the key/value state back from this snapshot.
@@ -43,21 +47,18 @@ public interface KvStateSnapshot<K, V, Backend extends StateBackend<Backend>> ex
 	 * @param stateBackend The state backend that created this snapshot and can restore the key/value state
 	 *                     from this snapshot.
 	 * @param keySerializer The serializer for the keys.
-	 * @param valueSerializer The serializer for the values.
-	 * @param defaultValue The value that is returned when no other value has been associated with a key, yet.   
 	 * @param classLoader The class loader for user-defined types.
-	 * 
+	 * @param recoveryTimestamp The timestamp of the checkpoint we are recovering from.
+	 *
 	 * @return An instance of the key/value state loaded from this snapshot.
 	 * 
 	 * @throws Exception Exceptions can occur during the state loading and are forwarded. 
 	 */
-	KvState<K, V, Backend> restoreState(
-			Backend stateBackend,
-			TypeSerializer<K> keySerializer,
-			TypeSerializer<V> valueSerializer,
-			V defaultValue,
-			ClassLoader classLoader,
-			long recoveryTimestamp) throws Exception;
+	KvState<K, N, S, SD, Backend> restoreState(
+		Backend stateBackend,
+		TypeSerializer<K> keySerializer,
+		ClassLoader classLoader,
+		long recoveryTimestamp) throws Exception;
 
 	/**
 	 * Discards the state snapshot, removing any resources occupied by it.

http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
deleted file mode 100644
index 2c43125..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.execution.Environment;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.Serializable;
-
-/**
- * A state backend defines how state is stored and snapshotted during checkpoints.
- *
- * @param <Backend> The type of backend itself. This generic parameter is used to refer to the
- *                  type of backend when creating state backed by this backend.
- */
-public abstract class StateBackend<Backend extends StateBackend<Backend>> implements java.io.Serializable {
-
-	private static final long serialVersionUID = 4620413814639220247L;
-
-	// ------------------------------------------------------------------------
-	//  initialization and cleanup
-	// ------------------------------------------------------------------------
-
-	/**
-	 * This method is called by the task upon deployment to initialize the state backend for
-	 * data for a specific job.
-	 *
-	 * @param env The {@link Environment} of the task that instantiated the state backend
-	 * @throws Exception Overwritten versions of this method may throw exceptions, in which
-	 *                   case the job that uses the state backend is considered failed during
-	 *                   deployment.
-	 */
-	public abstract void initializeForJob(Environment env) throws Exception;
-
-	/**
-	 * Disposes all state associated with the current job.
-	 *
-	 * @throws Exception Exceptions may occur during disposal of the state and should be forwarded.
-	 */
-	public abstract void disposeAllStateForCurrentJob() throws Exception;
-
-	/**
-	 * Closes the state backend, releasing all internal resources, but does not delete any persistent
-	 * checkpoint data.
-	 *
-	 * @throws Exception Exceptions can be forwarded and will be logged by the system
-	 */
-	public abstract void close() throws Exception;
-
-	// ------------------------------------------------------------------------
-	//  key/value state
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Creates a key/value state backed by this state backend.
-	 *
-	 * @param stateId Unique id that identifies the kv state in the streaming program. 
-	 * @param stateName Name of the created state
-	 * @param keySerializer The serializer for the key.
-	 * @param valueSerializer The serializer for the value.
-	 * @param defaultValue The value that is returned when no other value has been associated with a key, yet.
-	 * @param <K> The type of the key.
-	 * @param <V> The type of the value.
-	 *
-	 * @return A new key/value state backed by this backend.
-	 *
-	 * @throws Exception Exceptions may occur during initialization of the state and should be forwarded.
-	 */
-	public abstract <K, V> KvState<K, V, Backend> createKvState(String stateId, String stateName,
-			TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer,
-			V defaultValue) throws Exception;
-
-
-	// ------------------------------------------------------------------------
-	//  storing state for a checkpoint
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Creates an output stream that writes into the state of the given checkpoint. When the stream
-	 * is closes, it returns a state handle that can retrieve the state back.
-	 *
-	 * @param checkpointID The ID of the checkpoint.
-	 * @param timestamp The timestamp of the checkpoint.
-	 * @return An output stream that writes state for the given checkpoint.
-	 *
-	 * @throws Exception Exceptions may occur while creating the stream and should be forwarded.
-	 */
-	public abstract CheckpointStateOutputStream createCheckpointStateOutputStream(
-			long checkpointID, long timestamp) throws Exception;
-
-	/**
-	 * Creates a {@link DataOutputView} stream that writes into the state of the given checkpoint.
-	 * When the stream is closes, it returns a state handle that can retrieve the state back.
-	 *
-	 * @param checkpointID The ID of the checkpoint.
-	 * @param timestamp The timestamp of the checkpoint.
-	 * @return An DataOutputView stream that writes state for the given checkpoint.
-	 *
-	 * @throws Exception Exceptions may occur while creating the stream and should be forwarded.
-	 */
-	public CheckpointStateOutputView createCheckpointStateOutputView(
-			long checkpointID, long timestamp) throws Exception {
-		return new CheckpointStateOutputView(createCheckpointStateOutputStream(checkpointID, timestamp));
-	}
-
-	/**
-	 * Writes the given state into the checkpoint, and returns a handle that can retrieve the state back.
-	 *
-	 * @param state The state to be checkpointed.
-	 * @param checkpointID The ID of the checkpoint.
-	 * @param timestamp The timestamp of the checkpoint.
-	 * @param <S> The type of the state.
-	 *
-	 * @return A state handle that can retrieve the checkpoined state.
-	 *
-	 * @throws Exception Exceptions may occur during serialization / storing the state and should be forwarded.
-	 */
-	public abstract <S extends Serializable> StateHandle<S> checkpointStateSerializable(
-			S state, long checkpointID, long timestamp) throws Exception;
-
-
-	// ------------------------------------------------------------------------
-	//  Checkpoint state output stream
-	// ------------------------------------------------------------------------
-
-	/**
-	 * A dedicated output stream that produces a {@link StreamStateHandle} when closed.
-	 */
-	public static abstract class CheckpointStateOutputStream extends OutputStream {
-
-		/**
-		 * Closes the stream and gets a state handle that can create an input stream
-		 * producing the data written to this stream.
-		 *
-		 * @return A state handle that can create an input stream producing the data written to this stream.
-		 * @throws IOException Thrown, if the stream cannot be closed.
-		 */
-		public abstract StreamStateHandle closeAndGetHandle() throws IOException;
-	}
-
-	/**
-	 * A dedicated DataOutputView stream that produces a {@code StateHandle<DataInputView>} when closed.
-	 */
-	public static final class CheckpointStateOutputView extends DataOutputViewStreamWrapper {
-
-		private final CheckpointStateOutputStream out;
-
-		public CheckpointStateOutputView(CheckpointStateOutputStream out) {
-			super(out);
-			this.out = out;
-		}
-
-		/**
-		 * Closes the stream and gets a state handle that can create a DataInputView.
-		 * producing the data written to this stream.
-		 *
-		 * @return A state handle that can create an input stream producing the data written to this stream.
-		 * @throws IOException Thrown, if the stream cannot be closed.
-		 */
-		public StateHandle<DataInputView> closeAndGetHandle() throws IOException {
-			return new DataInputViewHandle(out.closeAndGetHandle());
-		}
-
-		@Override
-		public void close() throws IOException {
-			out.close();
-		}
-	}
-
-	/**
-	 * Simple state handle that resolved a {@link DataInputView} from a StreamStateHandle.
-	 */
-	private static final class DataInputViewHandle implements StateHandle<DataInputView> {
-
-		private static final long serialVersionUID = 2891559813513532079L;
-
-		private final StreamStateHandle stream;
-
-		private DataInputViewHandle(StreamStateHandle stream) {
-			this.stream = stream;
-		}
-
-		@Override
-		public DataInputView getState(ClassLoader userCodeClassLoader) throws Exception {
-			return new DataInputViewStreamWrapper(stream.getState(userCodeClassLoader));
-		}
-
-		@Override
-		public void discardState() throws Exception {
-			stream.discardState();
-		}
-
-		@Override
-		public long getStateSize() throws Exception {
-			return stream.getStateSize();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendFactory.java
index 5b622eb..f17eb6e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendFactory.java
@@ -26,7 +26,7 @@ import org.apache.flink.configuration.Configuration;
  * 
  * @param <T> The type of the state backend created.
  */
-public interface StateBackendFactory<T extends StateBackend<T>> {
+public interface StateBackendFactory<T extends AbstractStateBackend> {
 
 	/**
 	 * Creates the state backend, optionally using the given configuration.
@@ -36,5 +36,5 @@ public interface StateBackendFactory<T extends StateBackend<T>> {
 	 * 
 	 * @throws Exception Exceptions during instantiation can be forwarded.
 	 */
-	StateBackend<T> createFromConfig(Configuration config) throws Exception;
+	AbstractStateBackend createFromConfig(Configuration config) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileState.java
deleted file mode 100644
index 8c2b12a..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileState.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.filesystem;
-
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-
-import java.io.IOException;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Base class for state that is stored in a file.
- */
-public abstract class AbstractFileState implements java.io.Serializable {
-	
-	private static final long serialVersionUID = 350284443258002355L;
-	
-	/** The path to the file in the filesystem, fully describing the file system */
-	private final Path filePath;
-
-	/** Cached file system handle */
-	private transient FileSystem fs;
-
-	/**
-	 * Creates a new file state for the given file path.
-	 * 
-	 * @param filePath The path to the file that stores the state.
-	 */
-	protected AbstractFileState(Path filePath) {
-		this.filePath = requireNonNull(filePath);
-	}
-
-	/**
-	 * Gets the path where this handle's state is stored.
-	 * @return The path where this handle's state is stored.
-	 */
-	public Path getFilePath() {
-		return filePath;
-	}
-
-	/**
-	 * Discard the state by deleting the file that stores the state. If the parent directory
-	 * of the state is empty after deleting the state file, it is also deleted.
-	 * 
-	 * @throws Exception Thrown, if the file deletion (not the directory deletion) fails.
-	 */
-	public void discardState() throws Exception {
-		getFileSystem().delete(filePath, false);
-
-		// send a call to delete the checkpoint directory containing the file. This will
-		// fail (and be ignored) when some files still exist
-		try {
-			getFileSystem().delete(filePath.getParent(), false);
-		} catch (IOException ignored) {}
-	}
-
-	/**
-	 * Gets the file system that stores the file state.
-	 * @return The file system that stores the file state.
-	 * @throws IOException Thrown if the file system cannot be accessed.
-	 */
-	protected FileSystem getFileSystem() throws IOException {
-		if (fs == null) {
-			fs = FileSystem.get(filePath.toUri());
-		}
-		return fs;
-	}
-
-	/**
-	 * Returns the file size in bytes.
-	 *
-	 * @return The file size in bytes.
-	 * @throws IOException Thrown if the file system cannot be accessed.
-	 */
-	protected long getFileSize() throws IOException {
-		return getFileSystem().getFileStatus(filePath).getLen();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateHandle.java
new file mode 100644
index 0000000..00800b2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateHandle.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Base class for state that is stored in a file.
+ */
+public abstract class AbstractFileStateHandle implements java.io.Serializable {
+	
+	private static final long serialVersionUID = 350284443258002355L;
+	
+	/** The path to the file in the filesystem, fully describing the file system */
+	private final Path filePath;
+
+	/** Cached file system handle */
+	private transient FileSystem fs;
+
+	/**
+	 * Creates a new file state for the given file path.
+	 * 
+	 * @param filePath The path to the file that stores the state.
+	 */
+	protected AbstractFileStateHandle(Path filePath) {
+		this.filePath = requireNonNull(filePath);
+	}
+
+	/**
+	 * Gets the path where this handle's state is stored.
+	 * @return The path where this handle's state is stored.
+	 */
+	public Path getFilePath() {
+		return filePath;
+	}
+
+	/**
+	 * Discard the state by deleting the file that stores the state. If the parent directory
+	 * of the state is empty after deleting the state file, it is also deleted.
+	 * 
+	 * @throws Exception Thrown, if the file deletion (not the directory deletion) fails.
+	 */
+	public void discardState() throws Exception {
+		getFileSystem().delete(filePath, false);
+
+		// send a call to delete the checkpoint directory containing the file. This will
+		// fail (and be ignored) when some files still exist
+		try {
+			getFileSystem().delete(filePath.getParent(), false);
+		} catch (IOException ignored) {}
+	}
+
+	/**
+	 * Gets the file system that stores the file state.
+	 * @return The file system that stores the file state.
+	 * @throws IOException Thrown if the file system cannot be accessed.
+	 */
+	protected FileSystem getFileSystem() throws IOException {
+		if (fs == null) {
+			fs = FileSystem.get(filePath.toUri());
+		}
+		return fs;
+	}
+
+	/**
+	 * Returns the file size in bytes.
+	 *
+	 * @return The file size in bytes.
+	 * @throws IOException Thrown if the file system cannot be accessed.
+	 */
+	protected long getFileSize() throws IOException {
+		return getFileSystem().getFileStatus(filePath).getLen();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsState.java
new file mode 100644
index 0000000..5035953
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsState.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.AbstractHeapState;
+import org.apache.flink.runtime.state.KvStateSnapshot;
+
+import java.io.DataOutputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Base class for partitioned {@link ListState} implementations that are backed by a regular
+ * heap hash map. The concrete implementations define how the state is checkpointed.
+ * 
+ * @param <K> The type of the key.
+ * @param <N> The type of the namespace.
+ * @param <SV> The type of the values in the state.
+ * @param <S> The type of State
+ * @param <SD> The type of StateDescriptor for the State S
+ */
+public abstract class AbstractFsState<K, N, SV, S extends State, SD extends StateDescriptor<S>>
+		extends AbstractHeapState<K, N, SV, S, SD, FsStateBackend> {
+
+	/** The file system state backend backing snapshots of this state */
+	private final FsStateBackend backend;
+
+	public AbstractFsState(FsStateBackend backend,
+		TypeSerializer<K> keySerializer,
+		TypeSerializer<N> namespaceSerializer,
+		TypeSerializer<SV> stateSerializer,
+		SD stateDesc) {
+		super(keySerializer, namespaceSerializer, stateSerializer, stateDesc);
+		this.backend = backend;
+	}
+
+	public AbstractFsState(FsStateBackend backend,
+		TypeSerializer<K> keySerializer,
+		TypeSerializer<N> namespaceSerializer,
+		TypeSerializer<SV> stateSerializer,
+		SD stateDesc,
+		HashMap<N, Map<K, SV>> state) {
+		super(keySerializer, namespaceSerializer, stateSerializer, stateDesc, state);
+		this.backend = backend;
+	}
+
+	public abstract KvStateSnapshot<K, N, S, SD, FsStateBackend> createHeapSnapshot(Path filePath);
+
+	@Override
+	public KvStateSnapshot<K, N, S, SD, FsStateBackend> snapshot(long checkpointId, long timestamp) throws Exception {
+
+		try (FsStateBackend.FsCheckpointStateOutputStream out = backend.createCheckpointStateOutputStream(checkpointId, timestamp)) {
+
+			// serialize the state to the output stream
+			DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(new DataOutputStream(out));
+			outView.writeInt(state.size());
+			for (Map.Entry<N, Map<K, SV>> namespaceState: state.entrySet()) {
+				N namespace = namespaceState.getKey();
+				namespaceSerializer.serialize(namespace, outView);
+				outView.writeInt(namespaceState.getValue().size());
+				for (Map.Entry<K, SV> entry: namespaceState.getValue().entrySet()) {
+					keySerializer.serialize(entry.getKey(), outView);
+					stateSerializer.serialize(entry.getValue(), outView);
+				}
+			}
+			outView.flush();
+
+			// create a handle to the state
+//			return new FsHeapValueStateSnapshot<>(getKeySerializer(), getNamespaceSerializer(), stateDesc, out.closeAndGetPath());
+			return createHeapSnapshot(out.closeAndGetPath());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsStateSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsStateSnapshot.java
new file mode 100644
index 0000000..c1e0f12
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsStateSnapshot.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.runtime.state.KvStateSnapshot;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A snapshot of a heap key/value state stored in a file.
+ * 
+ * @param <K> The type of the key in the snapshot state.
+ * @param <N> The type of the namespace in the snapshot state.
+ * @param <SV> The type of the state value.
+ */
+public abstract class AbstractFsStateSnapshot<K, N, SV, S extends State, SD extends StateDescriptor<S>> extends AbstractFileStateHandle implements KvStateSnapshot<K, N, S, SD, FsStateBackend> {
+
+	private static final long serialVersionUID = 1L;
+
+	/** Key Serializer */
+	protected final TypeSerializer<K> keySerializer;
+
+	/** Namespace Serializer */
+	protected final TypeSerializer<N> namespaceSerializer;
+
+	/** Serializer for the state value */
+	protected final TypeSerializer<SV> stateSerializer;
+
+	/** StateDescriptor, for sanity checks */
+	protected final SD stateDesc;
+
+	/**
+	 * Creates a new state snapshot with data in the file system.
+	 *
+	 * @param keySerializer The serializer for the keys.
+	 * @param namespaceSerializer The serializer for the namespace.
+	 * @param stateSerializer The serializer for the elements in the state HashMap
+	 * @param stateDesc The state identifier
+	 * @param filePath The path where the snapshot data is stored.
+	 */
+	public AbstractFsStateSnapshot(TypeSerializer<K> keySerializer,
+		TypeSerializer<N> namespaceSerializer,
+		TypeSerializer<SV> stateSerializer,
+		SD stateDesc,
+		Path filePath) {
+		super(filePath);
+		this.stateDesc = stateDesc;
+		this.keySerializer = keySerializer;
+		this.stateSerializer = stateSerializer;
+		this.namespaceSerializer = namespaceSerializer;
+
+	}
+
+	public abstract KvState<K, N, S, SD, FsStateBackend> createFsState(FsStateBackend backend, HashMap<N, Map<K, SV>> stateMap);
+
+	@Override
+	public KvState<K, N, S, SD, FsStateBackend> restoreState(
+		FsStateBackend stateBackend,
+		final TypeSerializer<K> keySerializer,
+		ClassLoader classLoader,
+		long recoveryTimestamp) throws Exception {
+
+		// validity checks
+		if (!this.keySerializer.equals(keySerializer)) {
+			throw new IllegalArgumentException(
+				"Cannot restore the state from the snapshot with the given serializers. " +
+					"State (K/V) was serialized with " +
+					"(" + this.keySerializer + ") " +
+					"now is (" + keySerializer + ")");
+		}
+
+		// state restore
+		try (FSDataInputStream inStream = stateBackend.getFileSystem().open(getFilePath())) {
+			DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(new DataInputStream(inStream));
+
+
+			final int numKeys = inView.readInt();
+			HashMap<N, Map<K, SV>> stateMap = new HashMap<>(numKeys);
+
+			for (int i = 0; i < numKeys; i++) {
+				N namespace = namespaceSerializer.deserialize(inView);
+				final int numValues = inView.readInt();
+				Map<K, SV> namespaceMap = new HashMap<>(numValues);
+				stateMap.put(namespace, namespaceMap);
+				for (int j = 0; j < numValues; j++) {
+					K key = keySerializer.deserialize(inView);
+					SV value = stateSerializer.deserialize(inView);
+					namespaceMap.put(key, value);
+				}
+			}
+
+//			return new FsHeapValueState<>(stateBackend, keySerializer, namespaceSerializer, stateDesc, stateMap);
+			return createFsState(stateBackend, stateMap);
+		}
+		catch (Exception e) {
+			throw new Exception("Failed to restore state from file system", e);
+		}
+	}
+
+	/**
+	 * Returns the file size in bytes.
+	 *
+	 * @return The file size in bytes.
+	 * @throws IOException Thrown if the file system cannot be accessed.
+	 */
+	@Override
+	public long getStateSize() throws IOException {
+		return getFileSize();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileSerializableStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileSerializableStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileSerializableStateHandle.java
index 456f2f2..662678e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileSerializableStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileSerializableStateHandle.java
@@ -32,7 +32,7 @@ import java.io.Serializable;
  * 
  * @param <T> The type of state pointed to by the state handle.
  */
-public class FileSerializableStateHandle<T extends Serializable> extends AbstractFileState implements StateHandle<T> {
+public class FileSerializableStateHandle<T extends Serializable> extends AbstractFileStateHandle implements StateHandle<T> {
 
 	private static final long serialVersionUID = -657631394290213622L;
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStreamStateHandle.java
index 3b060b5..be9c4cd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStreamStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStreamStateHandle.java
@@ -29,7 +29,7 @@ import java.io.Serializable;
 /**
  * A state handle that points to state in a file system, accessible as an input stream.
  */
-public class FileStreamStateHandle extends AbstractFileState implements StreamStateHandle {
+public class FileStreamStateHandle extends AbstractFileStateHandle implements StreamStateHandle {
 	
 	private static final long serialVersionUID = -6826990484549987311L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsHeapKvState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsHeapKvState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsHeapKvState.java
deleted file mode 100644
index a1c7782..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsHeapKvState.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.filesystem;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.state.AbstractHeapKvState;
-
-import java.util.HashMap;
-
-/**
- * Heap-backed key/value state that is snapshotted into files.
- * 
- * @param <K> The type of the key.
- * @param <V> The type of the value.
- */
-public class FsHeapKvState<K, V> extends AbstractHeapKvState<K, V, FsStateBackend> {
-	
-	/** The file system state backend backing snapshots of this state */
-	private final FsStateBackend backend;
-	
-	/**
-	 * Creates a new and empty key/value state.
-	 * 
-	 * @param keySerializer The serializer for the key.
-	 * @param valueSerializer The serializer for the value.
-	 * @param defaultValue The value that is returned when no other value has been associated with a key, yet.
-	 * @param backend The file system state backend backing snapshots of this state
-	 */
-	public FsHeapKvState(TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer,
-							V defaultValue, FsStateBackend backend) {
-		super(keySerializer, valueSerializer, defaultValue);
-		this.backend = backend;
-	}
-
-	/**
-	 * Creates a new key/value state with the given state contents.
-	 * This method is used to re-create key/value state with existing data, for example from
-	 * a snapshot.
-	 * 
-	 * @param keySerializer The serializer for the key.
-	 * @param valueSerializer The serializer for the value.
-	 * @param defaultValue The value that is returned when no other value has been associated with a key, yet.
-	 * @param state The map of key/value pairs to initialize the state with.
-	 * @param backend The file system state backend backing snapshots of this state
-	 */
-	public FsHeapKvState(TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer,
-							V defaultValue, HashMap<K, V> state, FsStateBackend backend) {
-		super(keySerializer, valueSerializer, defaultValue, state);
-		this.backend = backend;
-	}
-
-	
-	@Override
-	public FsHeapKvStateSnapshot<K, V> snapshot(long checkpointId, long timestamp) throws Exception {
-		// first, create an output stream to write to
-		try (FsStateBackend.FsCheckpointStateOutputStream out = 
-					backend.createCheckpointStateOutputStream(checkpointId, timestamp)) {
-
-			// serialize the state to the output stream
-			DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(out);
-			outView.writeInt(size());
-			writeStateToOutputView(outView);
-			outView.flush();
-			
-			// create a handle to the state
-			return new FsHeapKvStateSnapshot<>(getKeySerializer(), getValueSerializer(), out.closeAndGetPath());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsHeapKvStateSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsHeapKvStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsHeapKvStateSnapshot.java
deleted file mode 100644
index 9c8663a..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsHeapKvStateSnapshot.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.filesystem;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.runtime.state.KvStateSnapshot;
-
-import java.io.IOException;
-import java.util.HashMap;
-
-/**
- * A snapshot of a heap key/value state stored in a file.
- * 
- * @param <K> The type of the key in the snapshot state.
- * @param <V> The type of the value in the snapshot state.
- */
-public class FsHeapKvStateSnapshot<K, V> extends AbstractFileState implements KvStateSnapshot<K, V, FsStateBackend> {
-	
-	private static final long serialVersionUID = 1L;
-
-	/** Name of the key serializer class */
-	private final String keySerializerClassName;
-
-	/** Name of the value serializer class */
-	private final String valueSerializerClassName;
-
-	/**
-	 * Creates a new state snapshot with data in the file system.
-	 *
-	 * @param keySerializer The serializer for the keys.
-	 * @param valueSerializer The serializer for the values.
-	 * @param filePath The path where the snapshot data is stored.
-	 */
-	public FsHeapKvStateSnapshot(TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer, Path filePath) {
-		super(filePath);
-		this.keySerializerClassName = keySerializer.getClass().getName();
-		this.valueSerializerClassName = valueSerializer.getClass().getName();
-	}
-
-	@Override
-	public FsHeapKvState<K, V> restoreState(
-			FsStateBackend stateBackend,
-			final TypeSerializer<K> keySerializer,
-			final TypeSerializer<V> valueSerializer,
-			V defaultValue,
-			ClassLoader classLoader,
-			long recoveryTimestamp) throws Exception {
-
-		// validity checks
-		if (!keySerializer.getClass().getName().equals(keySerializerClassName) ||
-				!valueSerializer.getClass().getName().equals(valueSerializerClassName)) {
-			throw new IllegalArgumentException(
-					"Cannot restore the state from the snapshot with the given serializers. " +
-							"State (K/V) was serialized with (" + valueSerializerClassName +
-							"/" + keySerializerClassName + ")");
-		}
-		
-		// state restore
-		try (FSDataInputStream inStream = stateBackend.getFileSystem().open(getFilePath())) {
-			DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(inStream);
-			
-			final int numEntries = inView.readInt();
-			HashMap<K, V> stateMap = new HashMap<>(numEntries);
-			
-			for (int i = 0; i < numEntries; i++) {
-				K key = keySerializer.deserialize(inView);
-				V value = valueSerializer.deserialize(inView);
-				stateMap.put(key, value);
-			}
-			
-			return new FsHeapKvState<K, V>(keySerializer, valueSerializer, defaultValue, stateMap, stateBackend);
-		}
-		catch (Exception e) {
-			throw new Exception("Failed to restore state from file system", e);
-		}
-	}
-
-	/**
-	 * Returns the file size in bytes.
-	 *
-	 * @return The file size in bytes.
-	 * @throws IOException Thrown if the file system cannot be accessed.
-	 */
-	@Override
-	public long getStateSize() throws IOException {
-		return getFileSize();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsListState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsListState.java
new file mode 100644
index 0000000..1d5b5f8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsListState.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.ArrayListSerializer;
+import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.runtime.state.KvStateSnapshot;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Heap-backed partitioned {@link org.apache.flink.api.common.state.ListState} that is snapshotted
+ * into files.
+ * 
+ * @param <K> The type of the key.
+ * @param <N> The type of the namespace.
+ * @param <V> The type of the value.
+ */
+public class FsListState<K, N, V>
+	extends AbstractFsState<K, N, ArrayList<V>, ListState<V>, ListStateDescriptor<V>>
+	implements ListState<V> {
+
+	/**
+	 * Creates a new and empty partitioned state.
+	 *
+	 * @param keySerializer The serializer for the key.
+	 * @param stateDesc The state identifier for the state. This contains name
+	 * and can create a default state value.
+	 * @param backend The file system state backend backing snapshots of this state
+	 */
+	public FsListState(FsStateBackend backend,
+		TypeSerializer<K> keySerializer,
+		TypeSerializer<N> namespaceSerializer,
+		ListStateDescriptor<V> stateDesc) {
+		super(backend, keySerializer, namespaceSerializer, new ArrayListSerializer<>(stateDesc.getSerializer()), stateDesc);
+	}
+
+	/**
+	 * Creates a new key/value state with the given state contents.
+	 * This method is used to re-create key/value state with existing data, for example from
+	 * a snapshot.
+	 *
+	 * @param keySerializer The serializer for the key.
+	 * @param namespaceSerializer The serializer for the namespace.
+	 * @param stateDesc The state identifier for the state. This contains name
+	 *                           and can create a default state value.
+	 * @param state The map of key/value pairs to initialize the state with.
+	 * @param backend The file system state backend backing snapshots of this state
+	 */
+	public FsListState(FsStateBackend backend,
+		TypeSerializer<K> keySerializer,
+		TypeSerializer<N> namespaceSerializer,
+		ListStateDescriptor<V> stateDesc,
+		HashMap<N, Map<K, ArrayList<V>>> state) {
+		super(backend, keySerializer, namespaceSerializer, new ArrayListSerializer<>(stateDesc.getSerializer()), stateDesc, state);
+	}
+
+
+	@Override
+	public Iterable<V> get() {
+		if (currentNSState == null) {
+			currentNSState = state.get(currentNamespace);
+		}
+		if (currentNSState != null) {
+			List<V> result = currentNSState.get(currentKey);
+			if (result == null) {
+				return Collections.emptyList();
+			} else {
+				return result;
+			}
+		}
+		return Collections.emptyList();
+	}
+
+	@Override
+	public void add(V value) {
+		if (currentKey == null) {
+			throw new RuntimeException("No key available.");
+		}
+
+		if (currentNSState == null) {
+			currentNSState = new HashMap<>();
+			state.put(currentNamespace, currentNSState);
+		}
+
+
+		ArrayList<V> list = currentNSState.get(currentKey);
+		if (list == null) {
+			list = new ArrayList<>();
+			currentNSState.put(currentKey, list);
+		}
+		list.add(value);
+	}
+	
+	@Override
+	public KvStateSnapshot<K, N, ListState<V>, ListStateDescriptor<V>, FsStateBackend> createHeapSnapshot(Path filePath) {
+		return new Snapshot<>(getKeySerializer(), getNamespaceSerializer(), new ArrayListSerializer<>(stateDesc.getSerializer()), stateDesc, filePath);
+	}
+
+	public static class Snapshot<K, N, V> extends AbstractFsStateSnapshot<K, N, ArrayList<V>, ListState<V>, ListStateDescriptor<V>> {
+		private static final long serialVersionUID = 1L;
+
+		public Snapshot(TypeSerializer<K> keySerializer,
+			TypeSerializer<N> namespaceSerializer,
+			TypeSerializer<ArrayList<V>> stateSerializer,
+			ListStateDescriptor<V> stateDescs,
+			Path filePath) {
+			super(keySerializer, namespaceSerializer, stateSerializer, stateDescs, filePath);
+		}
+
+		@Override
+		public KvState<K, N, ListState<V>, ListStateDescriptor<V>, FsStateBackend> createFsState(FsStateBackend backend, HashMap<N, Map<K, ArrayList<V>>> stateMap) {
+			return new FsListState<>(backend, keySerializer, namespaceSerializer, stateDesc, stateMap);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsReducingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsReducingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsReducingState.java
new file mode 100644
index 0000000..ef721c9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsReducingState.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.runtime.state.KvStateSnapshot;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Heap-backed partitioned {@link org.apache.flink.api.common.state.ReducingState} that is
+ * snapshotted into files.
+ * 
+ * @param <K> The type of the key.
+ * @param <N> The type of the namespace.
+ * @param <V> The type of the value.
+ */
+public class FsReducingState<K, N, V>
+	extends AbstractFsState<K, N, V, ReducingState<V>, ReducingStateDescriptor<V>>
+	implements ReducingState<V> {
+
+	private final ReduceFunction<V> reduceFunction;
+
+	/**
+	 * Creates a new and empty partitioned state.
+	 *
+	 * @param backend The file system state backend backing snapshots of this state
+	 * @param keySerializer The serializer for the key.
+	 * @param namespaceSerializer The serializer for the namespace.
+	 * @param stateDesc The state identifier for the state. This contains name
+	 *                           and can create a default state value.
+	 */
+	public FsReducingState(FsStateBackend backend,
+		TypeSerializer<K> keySerializer,
+		TypeSerializer<N> namespaceSerializer,
+		ReducingStateDescriptor<V> stateDesc) {
+		super(backend, keySerializer, namespaceSerializer, stateDesc.getSerializer(), stateDesc);
+		this.reduceFunction = stateDesc.getReduceFunction();
+	}
+
+	/**
+	 * Creates a new key/value state with the given state contents.
+	 * This method is used to re-create key/value state with existing data, for example from
+	 * a snapshot.
+	 *
+	 * @param backend The file system state backend backing snapshots of this state
+	 * @param keySerializer The serializer for the key.
+	 * @param namespaceSerializer The serializer for the namespace.
+	 * @param stateDesc The state identifier for the state. This contains name
+*                           and can create a default state value.
+	 * @param state The map of key/value pairs to initialize the state with.
+	 */
+	public FsReducingState(FsStateBackend backend,
+		TypeSerializer<K> keySerializer,
+		TypeSerializer<N> namespaceSerializer,
+		ReducingStateDescriptor<V> stateDesc,
+		HashMap<N, Map<K, V>> state) {
+		super(backend, keySerializer, namespaceSerializer, stateDesc.getSerializer(), stateDesc, state);
+		this.reduceFunction = stateDesc.getReduceFunction();
+	}
+
+
+	@Override
+	public V get() {
+		if (currentNSState == null) {
+			currentNSState = state.get(currentNamespace);
+		}
+		if (currentNSState != null) {
+			return currentNSState.get(currentKey);
+		}
+		return null;
+	}
+
+	@Override
+	public void add(V value) throws IOException {
+		if (currentKey == null) {
+			throw new RuntimeException("No key available.");
+		}
+
+		if (currentNSState == null) {
+			currentNSState = new HashMap<>();
+			state.put(currentNamespace, currentNSState);
+		}
+//		currentKeyState.merge(currentNamespace, value, new BiFunction<V, V, V>() {
+//			@Override
+//			public V apply(V v, V v2) {
+//				try {
+//					return reduceFunction.reduce(v, v2);
+//				} catch (Exception e) {
+//					return null;
+//				}
+//			}
+//		});
+		V currentValue = currentNSState.get(currentKey);
+		if (currentValue == null) {
+			currentNSState.put(currentKey, value);
+		} else {
+			try {
+				currentNSState.put(currentKey, reduceFunction.reduce(currentValue, value));
+			} catch (Exception e) {
+				throw new RuntimeException("Could not add value to reducing state.", e);
+			}
+		}
+	}
+	@Override
+	public KvStateSnapshot<K, N, ReducingState<V>, ReducingStateDescriptor<V>, FsStateBackend> createHeapSnapshot(Path filePath) {
+		return new Snapshot<>(getKeySerializer(), getNamespaceSerializer(), stateSerializer, stateDesc, filePath);
+	}
+
+	public static class Snapshot<K, N, V> extends AbstractFsStateSnapshot<K, N, V, ReducingState<V>, ReducingStateDescriptor<V>> {
+		private static final long serialVersionUID = 1L;
+
+		public Snapshot(TypeSerializer<K> keySerializer,
+			TypeSerializer<N> namespaceSerializer,
+			TypeSerializer<V> stateSerializer,
+			ReducingStateDescriptor<V> stateDescs,
+			Path filePath) {
+			super(keySerializer, namespaceSerializer, stateSerializer, stateDescs, filePath);
+		}
+
+		@Override
+		public KvState<K, N, ReducingState<V>, ReducingStateDescriptor<V>, FsStateBackend> createFsState(FsStateBackend backend, HashMap<N, Map<K, V>> stateMap) {
+			return new FsReducingState<>(backend, keySerializer, namespaceSerializer, stateDesc, stateMap);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index ed28e5e..411b536 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -18,16 +18,22 @@
 
 package org.apache.flink.runtime.state.filesystem;
 
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,7 +54,7 @@ import java.util.UUID;
  *
  * {@code hdfs://namenode:port/flink-checkpoints/<job-id>/chk-17/6ba7b810-9dad-11d1-80b4-00c04fd430c8 }
  */
-public class FsStateBackend extends StateBackend<FsStateBackend> {
+public class FsStateBackend extends AbstractStateBackend {
 
 	private static final long serialVersionUID = -8191916350224044011L;
 
@@ -264,7 +270,11 @@ public class FsStateBackend extends StateBackend<FsStateBackend> {
 	// ------------------------------------------------------------------------
 
 	@Override
-	public void initializeForJob(Environment env) throws Exception {
+	public void initializeForJob(Environment env,
+		String operatorIdentifier,
+		TypeSerializer<?> keySerializer) throws Exception {
+		super.initializeForJob(env, operatorIdentifier, keySerializer);
+
 		Path dir = new Path(basePath, env.getJobID().toString());
 
 		LOG.info("Initializing file state backend to URI " + dir);
@@ -298,12 +308,22 @@ public class FsStateBackend extends StateBackend<FsStateBackend> {
 	// ------------------------------------------------------------------------
 
 	@Override
-	public <K, V> FsHeapKvState<K, V> createKvState(String stateId, String stateName,
-			TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer, V defaultValue) throws Exception {
-		return new FsHeapKvState<K, V>(keySerializer, valueSerializer, defaultValue, this);
+	public <N, V> ValueState<V> createValueState(TypeSerializer<N> namespaceSerializer, ValueStateDescriptor<V> stateDesc) throws Exception {
+		return new FsValueState<>(this, keySerializer, namespaceSerializer, stateDesc);
+	}
+
+	@Override
+	public <N, T> ListState<T> createListState(TypeSerializer<N> namespaceSerializer, ListStateDescriptor<T> stateDesc) throws Exception {
+		return new FsListState<>(this, keySerializer, namespaceSerializer, stateDesc);
 	}
 
 	@Override
+	public <N, T> ReducingState<T> createReducingState(TypeSerializer<N> namespaceSerializer, ReducingStateDescriptor<T> stateDesc) throws Exception {
+		return new FsReducingState<>(this, keySerializer, namespaceSerializer, stateDesc);
+	}
+
+
+	@Override
 	public <S extends Serializable> StateHandle<S> checkpointStateSerializable(
 			S state, long checkpointID, long timestamp) throws Exception
 	{

http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsValueState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsValueState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsValueState.java
new file mode 100644
index 0000000..1a53980
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsValueState.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.runtime.state.KvStateSnapshot;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Heap-backed partitioned {@link org.apache.flink.api.common.state.ValueState} that is snapshotted
+ * into files.
+ * 
+ * @param <K> The type of the key.
+ * @param <N> The type of the namespace.
+ * @param <V> The type of the value.
+ */
+public class FsValueState<K, N, V>
+	extends AbstractFsState<K, N, V, ValueState<V>, ValueStateDescriptor<V>>
+	implements ValueState<V> {
+
+	/**
+	 * Creates a new and empty key/value state.
+	 * 
+	 * @param keySerializer The serializer for the key.
+     * @param namespaceSerializer The serializer for the namespace.
+	 * @param stateDesc The state identifier for the state. This contains name
+	 * and can create a default state value.
+	 * @param backend The file system state backend backing snapshots of this state
+	 */
+	public FsValueState(FsStateBackend backend,
+		TypeSerializer<K> keySerializer,
+		TypeSerializer<N> namespaceSerializer,
+		ValueStateDescriptor<V> stateDesc) {
+		super(backend, keySerializer, namespaceSerializer, stateDesc.getSerializer(), stateDesc);
+	}
+
+	/**
+	 * Creates a new key/value state with the given state contents.
+	 * This method is used to re-create key/value state with existing data, for example from
+	 * a snapshot.
+	 * 
+	 * @param keySerializer The serializer for the key.
+	 * @param namespaceSerializer The serializer for the namespace.
+	 * @param stateDesc The state identifier for the state. This contains name
+	 *                           and can create a default state value.
+	 * @param state The map of key/value pairs to initialize the state with.
+	 * @param backend The file system state backend backing snapshots of this state
+	 */
+	public FsValueState(FsStateBackend backend,
+		TypeSerializer<K> keySerializer,
+		TypeSerializer<N> namespaceSerializer,
+		ValueStateDescriptor<V> stateDesc,
+		HashMap<N, Map<K, V>> state) {
+		super(backend, keySerializer, namespaceSerializer, stateDesc.getSerializer(), stateDesc, state);
+	}
+
+	@Override
+	public V value() {
+		if (currentNSState == null) {
+			currentNSState = state.get(currentNamespace);
+		}
+		if (currentNSState != null) {
+			V value = currentNSState.get(currentKey);
+			return value != null ? value : stateDesc.getDefaultValue();
+		}
+		return stateDesc.getDefaultValue();
+	}
+
+	@Override
+	public void update(V value) {
+		if (currentKey == null) {
+			throw new RuntimeException("No key available.");
+		}
+
+		if (currentNSState == null) {
+			currentNSState = new HashMap<>();
+			state.put(currentNamespace, currentNSState);
+		}
+
+		currentNSState.put(currentKey, value);
+	}
+
+	@Override
+	public KvStateSnapshot<K, N, ValueState<V>, ValueStateDescriptor<V>, FsStateBackend> createHeapSnapshot(Path filePath) {
+		return new Snapshot<>(getKeySerializer(), getNamespaceSerializer(), stateSerializer, stateDesc, filePath);
+	}
+
+	public static class Snapshot<K, N, V> extends AbstractFsStateSnapshot<K, N, V, ValueState<V>, ValueStateDescriptor<V>> {
+		private static final long serialVersionUID = 1L;
+
+		public Snapshot(TypeSerializer<K> keySerializer,
+			TypeSerializer<N> namespaceSerializer,
+			TypeSerializer<V> stateSerializer,
+			ValueStateDescriptor<V> stateDescs,
+			Path filePath) {
+			super(keySerializer, namespaceSerializer, stateSerializer, stateDescs, filePath);
+		}
+
+		@Override
+		public KvState<K, N, ValueState<V>, ValueStateDescriptor<V>, FsStateBackend> createFsState(FsStateBackend backend, HashMap<N, Map<K, V>> stateMap) {
+			return new FsValueState<>(backend, keySerializer, namespaceSerializer, stateDesc, stateMap);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemState.java
new file mode 100644
index 0000000..816c883
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemState.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.memory;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.AbstractHeapState;
+import org.apache.flink.runtime.state.KvStateSnapshot;
+import org.apache.flink.runtime.util.DataOutputSerializer;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Base class for partitioned {@link ListState} implementations that are backed by a regular
+ * heap hash map. The concrete implementations define how the state is checkpointed.
+ * 
+ * @param <K> The type of the key.
+ * @param <N> The type of the namespace.
+ * @param <SV> The type of the values in the state.
+ * @param <S> The type of State
+ * @param <SD> The type of StateDescriptor for the State S
+ */
+public abstract class AbstractMemState<K, N, SV, S extends State, SD extends StateDescriptor<S>>
+		extends AbstractHeapState<K, N, SV, S, SD, MemoryStateBackend> {
+
+	public AbstractMemState(TypeSerializer<K> keySerializer,
+		TypeSerializer<N> namespaceSerializer,
+		TypeSerializer<SV> stateSerializer,
+		SD stateDesc) {
+		super(keySerializer, namespaceSerializer, stateSerializer, stateDesc);
+	}
+
+	public AbstractMemState(TypeSerializer<K> keySerializer,
+		TypeSerializer<N> namespaceSerializer,
+		TypeSerializer<SV> stateSerializer,
+		SD stateDesc,
+		HashMap<N, Map<K, SV>> state) {
+		super(keySerializer, namespaceSerializer, stateSerializer, stateDesc, state);
+	}
+
+	public abstract KvStateSnapshot<K, N, S, SD, MemoryStateBackend> createHeapSnapshot(byte[] bytes);
+
+	@Override
+	public KvStateSnapshot<K, N, S, SD, MemoryStateBackend> snapshot(long checkpointId, long timestamp) throws Exception {
+
+		DataOutputSerializer out = new DataOutputSerializer(Math.max(size() * 16, 16));
+
+		out.writeInt(state.size());
+		for (Map.Entry<N, Map<K, SV>> namespaceState: state.entrySet()) {
+			N namespace = namespaceState.getKey();
+			namespaceSerializer.serialize(namespace, out);
+			out.writeInt(namespaceState.getValue().size());
+			for (Map.Entry<K, SV> entry: namespaceState.getValue().entrySet()) {
+				keySerializer.serialize(entry.getKey(), out);
+				stateSerializer.serialize(entry.getValue(), out);
+			}
+		}
+
+		byte[] bytes = out.getCopyOfBuffer();
+
+		return createHeapSnapshot(bytes);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemStateSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemStateSnapshot.java
new file mode 100644
index 0000000..d2efd53
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemStateSnapshot.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.memory;
+
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.runtime.state.KvStateSnapshot;
+import org.apache.flink.runtime.util.DataInputDeserializer;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A snapshot of a {@link MemValueState} for a checkpoint. The data is stored in a heap byte
+ * array, in serialized form.
+ * 
+ * @param <K> The type of the key in the snapshot state.
+ * @param <N> The type of the namespace in the snapshot state.
+ * @param <SV> The type of the value in the snapshot state.
+ */
+public abstract class AbstractMemStateSnapshot<K, N, SV, S extends State, SD extends StateDescriptor<S>> implements KvStateSnapshot<K, N, S, SD, MemoryStateBackend> {
+
+	private static final long serialVersionUID = 1L;
+
+	/** Key Serializer */
+	protected final TypeSerializer<K> keySerializer;
+
+	/** Namespace Serializer */
+	protected final TypeSerializer<N> namespaceSerializer;
+
+	/** Serializer for the state value */
+	protected final TypeSerializer<SV> stateSerializer;
+
+	/** StateDescriptor, for sanity checks */
+	protected final SD stateDesc;
+
+	/** The serialized data of the state key/value pairs */
+	private final byte[] data;
+
+	/**
+	 * Creates a new heap memory state snapshot.
+	 *
+	 * @param keySerializer The serializer for the keys.
+	 * @param namespaceSerializer The serializer for the namespace.
+	 * @param stateSerializer The serializer for the elements in the state HashMap
+	 * @param stateDesc The state identifier
+	 * @param data The serialized data of the state key/value pairs
+	 */
+	public AbstractMemStateSnapshot(TypeSerializer<K> keySerializer,
+		TypeSerializer<N> namespaceSerializer,
+		TypeSerializer<SV> stateSerializer,
+		SD stateDesc,
+		byte[] data) {
+		this.keySerializer = keySerializer;
+		this.namespaceSerializer = namespaceSerializer;
+		this.stateSerializer = stateSerializer;
+		this.stateDesc = stateDesc;
+		this.data = data;
+	}
+
+	public abstract KvState<K, N, S, SD, MemoryStateBackend> createMemState(HashMap<N, Map<K, SV>> stateMap);
+
+	@Override
+	public KvState<K, N, S, SD, MemoryStateBackend> restoreState(
+		MemoryStateBackend stateBackend,
+		final TypeSerializer<K> keySerializer,
+		ClassLoader classLoader, long recoveryTimestamp) throws Exception {
+
+		// validity checks
+		if (!this.keySerializer.equals(keySerializer)) {
+			throw new IllegalArgumentException(
+				"Cannot restore the state from the snapshot with the given serializers. " +
+					"State (K/V) was serialized with " +
+					"(" + this.keySerializer + ") " +
+					"now is (" + keySerializer + ")");
+		}
+		
+		// restore state
+		DataInputDeserializer inView = new DataInputDeserializer(data, 0, data.length);
+
+		final int numKeys = inView.readInt();
+		HashMap<N, Map<K, SV>> stateMap = new HashMap<>(numKeys);
+
+		for (int i = 0; i < numKeys; i++) {
+			N namespace = namespaceSerializer.deserialize(inView);
+			final int numValues = inView.readInt();
+			Map<K, SV> namespaceMap = new HashMap<>(numValues);
+			stateMap.put(namespace, namespaceMap);
+			for (int j = 0; j < numValues; j++) {
+				K key = keySerializer.deserialize(inView);
+				SV value = stateSerializer.deserialize(inView);
+				namespaceMap.put(key, value);
+			}
+		}
+
+		return createMemState(stateMap);
+	}
+
+	/**
+	 * Discarding the heap state is a no-op.
+	 */
+	@Override
+	public void discardState() {}
+
+	@Override
+	public long getStateSize() {
+		return data.length;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemHeapKvState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemHeapKvState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemHeapKvState.java
deleted file mode 100644
index 082cb9a..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemHeapKvState.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.memory;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.util.DataOutputSerializer;
-import org.apache.flink.runtime.state.AbstractHeapKvState;
-
-import java.util.HashMap;
-
-/**
- * Heap-backed key/value state that is snapshotted into a serialized memory copy.
- *
- * @param <K> The type of the key.
- * @param <V> The type of the value.
- */
-public class MemHeapKvState<K, V> extends AbstractHeapKvState<K, V, MemoryStateBackend> {
-	
-	public MemHeapKvState(TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer, V defaultValue) {
-		super(keySerializer, valueSerializer, defaultValue);
-	}
-
-	public MemHeapKvState(TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer,
-							V defaultValue, HashMap<K, V> state) {
-		super(keySerializer, valueSerializer, defaultValue, state);
-	}
-	
-	@Override
-	public MemoryHeapKvStateSnapshot<K, V> snapshot(long checkpointId, long timestamp) throws Exception {
-		DataOutputSerializer ser = new DataOutputSerializer(Math.max(size() * 16, 16));
-		writeStateToOutputView(ser);
-		byte[] bytes = ser.getCopyOfBuffer();
-		
-		return new MemoryHeapKvStateSnapshot<K, V>(getKeySerializer(), getValueSerializer(), bytes, size());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemListState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemListState.java
new file mode 100644
index 0000000..d5e4dfd
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemListState.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.memory;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.ArrayListSerializer;
+import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.runtime.state.KvStateSnapshot;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Heap-backed partitioned {@link org.apache.flink.api.common.state.ListState} that is snapshotted
+ * into a serialized memory copy.
+ *
+ * @param <K> The type of the key.
+ * @param <N> The type of the namespace.
+ * @param <V> The type of the values in the list state.
+ */
+public class MemListState<K, N, V>
+	extends AbstractMemState<K, N, ArrayList<V>, ListState<V>, ListStateDescriptor<V>>
+	implements ListState<V> {
+
+	public MemListState(TypeSerializer<K> keySerializer, TypeSerializer<N> namespaceSerializer, ListStateDescriptor<V> stateDesc) {
+		super(keySerializer, namespaceSerializer, new ArrayListSerializer<>(stateDesc.getSerializer()), stateDesc);
+	}
+
+	public MemListState(TypeSerializer<K> keySerializer, TypeSerializer<N> namespaceSerializer, ListStateDescriptor<V> stateDesc, HashMap<N, Map<K, ArrayList<V>>> state) {
+		super(keySerializer, namespaceSerializer, new ArrayListSerializer<>(stateDesc.getSerializer()), stateDesc, state);
+	}
+
+	@Override
+	public Iterable<V> get() {
+		if (currentNSState == null) {
+			currentNSState = state.get(currentNamespace);
+		}
+		if (currentNSState != null) {
+			List<V> result = currentNSState.get(currentKey);
+			if (result == null) {
+				return Collections.emptyList();
+			} else {
+				return result;
+			}
+		}
+		return Collections.emptyList();
+	}
+
+	@Override
+	public void add(V value) {
+		if (currentKey == null) {
+			throw new RuntimeException("No key available.");
+		}
+
+		if (currentNSState == null) {
+			currentNSState = new HashMap<>();
+			state.put(currentNamespace, currentNSState);
+		}
+
+
+		ArrayList<V> list = currentNSState.get(currentKey);
+		if (list == null) {
+			list = new ArrayList<>();
+			currentNSState.put(currentKey, list);
+		}
+		list.add(value);
+	}
+
+	@Override
+	public KvStateSnapshot<K, N, ListState<V>, ListStateDescriptor<V>, MemoryStateBackend> createHeapSnapshot(byte[] bytes) {
+		return new Snapshot<>(getKeySerializer(), getNamespaceSerializer(), stateSerializer, stateDesc, bytes);
+	}
+
+	public static class Snapshot<K, N, V> extends AbstractMemStateSnapshot<K, N, ArrayList<V>, ListState<V>, ListStateDescriptor<V>> {
+		private static final long serialVersionUID = 1L;
+
+		public Snapshot(TypeSerializer<K> keySerializer,
+			TypeSerializer<N> namespaceSerializer,
+			TypeSerializer<ArrayList<V>> stateSerializer,
+			ListStateDescriptor<V> stateDescs, byte[] data) {
+			super(keySerializer, namespaceSerializer, stateSerializer, stateDescs, data);
+		}
+
+		@Override
+		public KvState<K, N, ListState<V>, ListStateDescriptor<V>, MemoryStateBackend> createMemState(HashMap<N, Map<K, ArrayList<V>>> stateMap) {
+			return new MemListState<>(keySerializer, namespaceSerializer, stateDesc, stateMap);
+		}
+	}
+
+}


Mime
View raw message