flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [11/27] flink git commit: [FLINK-3761] Refactor State Backends/Make Keyed State Key-Group Aware
Date Wed, 31 Aug 2016 17:28:29 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
index e09b868..5213fe9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
@@ -40,7 +40,7 @@ import java.util.concurrent.atomic.AtomicReference;
 public class KvStateRegistry {
 
 	/** All registered KvState instances. */
-	private final ConcurrentHashMap<KvStateID, KvState<?, ?, ?, ?, ?>> registeredKvStates =
+	private final ConcurrentHashMap<KvStateID, KvState<?>> registeredKvStates =
 			new ConcurrentHashMap<>();
 
 	/** Registry listener to be notified on registration/unregistration. */
@@ -83,7 +83,7 @@ public class KvStateRegistry {
 			JobVertexID jobVertexId,
 			int keyGroupIndex,
 			String registrationName,
-			KvState<?, ?, ?, ?, ?> kvState) {
+			KvState<?> kvState) {
 
 		KvStateID kvStateId = new KvStateID();
 
@@ -136,7 +136,7 @@ public class KvStateRegistry {
 	 * @param kvStateId KvStateID to identify the KvState instance
 	 * @return KvState instance identified by the KvStateID or <code>null</code>
 	 */
-	public KvState<?, ?, ?, ?, ?> getKvState(KvStateID kvStateId) {
+	public KvState<?> getKvState(KvStateID kvStateId) {
 		return registeredKvStates.get(kvStateId);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java
index 15f0160..b5c09aa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java
@@ -58,7 +58,7 @@ public class TaskKvStateRegistry {
 	 *                         descriptor used to create the KvState instance)
 	 * @param kvState          The
 	 */
-	public void registerKvState(int keyGroupIndex, String registrationName, KvState<?, ?, ?, ?, ?> kvState) {
+	public void registerKvState(int keyGroupIndex, String registrationName, KvState<?> kvState) {
 		KvStateID kvStateId = registry.registerKvState(jobId, jobVertexId, keyGroupIndex, registrationName, kvState);
 		registeredKvStates.add(new KvStateInfo(keyGroupIndex, registrationName, kvStateId));
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServerHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServerHandler.java
index 47f2ad6..8201708 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServerHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServerHandler.java
@@ -103,7 +103,7 @@ class KvStateServerHandler extends ChannelInboundHandlerAdapter {
 
 				stats.reportRequest();
 
-				KvState<?, ?, ?, ?, ?> kvState = registry.getKvState(request.getKvStateId());
+				KvState<?> kvState = registry.getKvState(request.getKvStateId());
 
 				if (kvState != null) {
 					// Execute actual query async, because it is possibly
@@ -186,7 +186,7 @@ class KvStateServerHandler extends ChannelInboundHandlerAdapter {
 
 		private final KvStateRequest request;
 
-		private final KvState<?, ?, ?, ?, ?> kvState;
+		private final KvState<?> kvState;
 
 		private final KvStateRequestStats stats;
 
@@ -195,7 +195,7 @@ class KvStateServerHandler extends ChannelInboundHandlerAdapter {
 		public AsyncKvStateQueryTask(
 				ChannelHandlerContext ctx,
 				KvStateRequest request,
-				KvState<?, ?, ?, ?, ?> kvState,
+				KvState<?> kvState,
 				KvStateRequestStats stats) {
 
 			this.ctx = Objects.requireNonNull(ctx, "Channel handler context");
@@ -238,6 +238,8 @@ class KvStateServerHandler extends ChannelInboundHandlerAdapter {
 
 					success = true;
 				} else {
+					kvState.getSerializedValue(serializedKeyAndNamespace);
+
 					// No data for the key/namespace. This is considered to be
 					// a failure.
 					ByteBuf unknownKey = KvStateRequestSerializer.serializeKvStateRequestFailure(

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapState.java
deleted file mode 100644
index 6fa4575..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapState.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.State;
-import org.apache.flink.api.common.state.StateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
-import org.apache.flink.util.Preconditions;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * Base class for partitioned {@link ListState} implementations that are backed by a regular
- * heap hash map. The concrete implementations define how the state is checkpointed.
- * 
- * @param <K> The type of the key.
- * @param <N> The type of the namespace.
- * @param <SV> The type of the values in the state.
- * @param <S> The type of State
- * @param <SD> The type of StateDescriptor for the State S
- * @param <Backend> The type of the backend that snapshots this key/value state.
- */
-public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>, Backend extends AbstractStateBackend>
-		implements KvState<K, N, S, SD, Backend>, State {
-
-	/** Map containing the actual key/value pairs */
-	protected final Map<N, Map<K, SV>> state;
-
-	/** Serializer for the state value. The state value could be a List<V>, for example. */
-	protected final TypeSerializer<SV> stateSerializer;
-
-	/** The serializer for the keys */
-	protected final TypeSerializer<K> keySerializer;
-
-	/** The serializer for the namespace */
-	protected final TypeSerializer<N> namespaceSerializer;
-
-	/** This holds the name of the state and can create an initial default value for the state. */
-	protected final SD stateDesc;
-
-	/** The current key, which the next value methods will refer to */
-	protected K currentKey;
-
-	/** The current namespace, which the access methods will refer to. */
-	protected N currentNamespace = null;
-
-	/** Cache the state map for the current key. */
-	protected Map<K, SV> currentNSState;
-
-	/**
-	 * Creates a new empty key/value state.
-	 *
-	 * @param keySerializer The serializer for the keys.
-	 * @param namespaceSerializer The serializer for the namespace.
-	 * @param stateDesc The state identifier for the state. This contains name
-	 *                           and can create a default state value.
-	 */
-	protected AbstractHeapState(TypeSerializer<K> keySerializer,
-		TypeSerializer<N> namespaceSerializer,
-		TypeSerializer<SV> stateSerializer,
-		SD stateDesc) {
-		this(keySerializer, namespaceSerializer, stateSerializer, stateDesc, new HashMap<N, Map<K, SV>>());
-	}
-
-	/**
-	 * Creates a new key/value state for the given hash map of key/value pairs.
-	 *
-	 * @param keySerializer The serializer for the keys.
-	 * @param stateDesc The state identifier for the state. This contains name
-	 *                           and can create a default state value.
-	 * @param state The state map to use in this kev/value state. May contain initial state.
-	 */
-	protected AbstractHeapState(TypeSerializer<K> keySerializer,
-		TypeSerializer<N> namespaceSerializer,
-		TypeSerializer<SV> stateSerializer,
-		SD stateDesc,
-		Map<N, Map<K, SV>> state) {
-
-		Preconditions.checkNotNull(state, "State map");
-
-		// Make sure that the state map supports concurrent read access for
-		// queries. See also #createNewNamespaceMap for the namespace maps.
-		if (stateDesc.isQueryable()) {
-			this.state = new ConcurrentHashMap<>(state);
-		} else {
-			this.state = state;
-		}
-
-		this.keySerializer = Preconditions.checkNotNull(keySerializer);
-		this.namespaceSerializer = Preconditions.checkNotNull(namespaceSerializer);
-		this.stateSerializer = stateSerializer;
-		this.stateDesc = stateDesc;
-	}
-
-	// ------------------------------------------------------------------------
-
-	@Override
-	public final void clear() {
-		if (currentNSState != null) {
-			currentNSState.remove(currentKey);
-			if (currentNSState.isEmpty()) {
-				state.remove(currentNamespace);
-				currentNSState = null;
-			}
-		}
-	}
-
-	@Override
-	public final void setCurrentKey(K currentKey) {
-		this.currentKey = Preconditions.checkNotNull(currentKey, "Key");
-	}
-
-	@Override
-	public final void setCurrentNamespace(N namespace) {
-		if (namespace != null && namespace.equals(this.currentNamespace)) {
-			return;
-		}
-		this.currentNamespace = Preconditions.checkNotNull(namespace, "Namespace");
-		this.currentNSState = state.get(currentNamespace);
-	}
-
-	@Override
-	public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception {
-		Preconditions.checkNotNull(serializedKeyAndNamespace, "Serialized key and namespace");
-
-		Tuple2<K, N> keyAndNamespace = KvStateRequestSerializer.deserializeKeyAndNamespace(
-				serializedKeyAndNamespace, keySerializer, namespaceSerializer);
-
-		return getSerializedValue(keyAndNamespace.f0, keyAndNamespace.f1);
-	}
-
-	protected abstract byte[] getSerializedValue(K key, N namespace) throws Exception;
-
-	/**
-	 * Returns the number of all state pairs in this state, across namespaces.
-	 */
-	protected final int size() {
-		int size = 0;
-		for (Map<K, SV> namespace: state.values()) {
-			size += namespace.size();
-		}
-		return size;
-	}
-
-	@Override
-	public void dispose() {
-		state.clear();
-	}
-
-	@Override
-	public SD getStateDescriptor() {
-		return stateDesc;
-	}
-
-	/**
-	 * Gets the serializer for the keys.
-	 *
-	 * @return The serializer for the keys.
-	 */
-	public final TypeSerializer<K> getKeySerializer() {
-		return keySerializer;
-	}
-
-	/**
-	 * Gets the serializer for the namespace.
-	 *
-	 * @return The serializer for the namespace.
-	 */
-	public final TypeSerializer<N> getNamespaceSerializer() {
-		return namespaceSerializer;
-	}
-
-	/**
-	 * Creates a new namespace map.
-	 *
-	 * <p>If the state queryable ({@link StateDescriptor#isQueryable()}, this
-	 * will create a concurrent hash map instead of a regular one.
-	 *
-	 * @return A new namespace map.
-	 */
-	protected Map<K, SV> createNewNamespaceMap() {
-		if (stateDesc.isQueryable()) {
-			return new ConcurrentHashMap<>();
-		} else {
-			return new HashMap<>();
-		}
-	}
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Returns the internal state map for testing.
-	 *
-	 * @return The internal state map
-	 */
-	Map<N, Map<K, SV>> getStateMap() {
-		return state;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
index b2cde22..e6093a8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
@@ -18,417 +18,57 @@
 
 package org.apache.flink.runtime.state;
 
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.state.FoldingState;
-import org.apache.flink.api.common.state.FoldingStateDescriptor;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.MergingState;
-import org.apache.flink.api.common.state.ReducingState;
-import org.apache.flink.api.common.state.ReducingStateDescriptor;
-import org.apache.flink.api.common.state.State;
-import org.apache.flink.api.common.state.StateBackend;
-import org.apache.flink.api.common.state.StateDescriptor;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.KeyGroupAssigner;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 /**
  * A state backend defines how state is stored and snapshotted during checkpoints.
  */
 public abstract class AbstractStateBackend implements java.io.Serializable {
-	
-	private static final long serialVersionUID = 4620413814639220247L;
-
-	protected transient TypeSerializer<?> keySerializer;
-
-	protected transient ClassLoader userCodeClassLoader;
-
-	protected transient Object currentKey;
-
-	/** For efficient access in setCurrentKey() */
-	private transient KvState<?, ?, ?, ?, ?>[] keyValueStates;
-
-	/** So that we can give out state when the user uses the same key. */
-	protected transient HashMap<String, KvState<?, ?, ?, ?, ?>> keyValueStatesByName;
-
-	/** For caching the last accessed partitioned state */
-	private transient String lastName;
-
-	@SuppressWarnings("rawtypes")
-	private transient KvState lastState;
-
-	/** KvStateRegistry helper for this task */
-	protected transient TaskKvStateRegistry kvStateRegistry;
-
-	/** Key group index of this state backend */
-	protected transient int keyGroupIndex;
-
-	// ------------------------------------------------------------------------
-	//  initialization and cleanup
-	// ------------------------------------------------------------------------
-
-	/**
-	 * This method is called by the task upon deployment to initialize the state backend for
-	 * data for a specific job.
-	 *
-	 * @param env The {@link Environment} of the task that instantiated the state backend
-	 * @param operatorIdentifier Unique identifier for naming states created by this backend
-	 * @throws Exception Overwritten versions of this method may throw exceptions, in which
-	 *                   case the job that uses the state backend is considered failed during
-	 *                   deployment.
-	 */
-	public void initializeForJob(
-			Environment env,
-			String operatorIdentifier,
-			TypeSerializer<?> keySerializer) throws Exception {
-
-		this.userCodeClassLoader = env.getUserClassLoader();
-		this.keySerializer = keySerializer;
-
-		this.keyGroupIndex = env.getTaskInfo().getIndexOfThisSubtask();
-		this.kvStateRegistry = env.getTaskKvStateRegistry();
-	}
-
-	/**
-	 * Disposes all state associated with the current job.
-	 *
-	 * @throws Exception Exceptions may occur during disposal of the state and should be forwarded.
-	 */
-	public abstract void disposeAllStateForCurrentJob() throws Exception;
-
-	/**
-	 * Closes the state backend, releasing all internal resources, but does not delete any persistent
-	 * checkpoint data.
-	 *
-	 * @throws Exception Exceptions can be forwarded and will be logged by the system
-	 */
-	public abstract void close() throws Exception;
-
-	public void discardState() throws Exception {
-		if (kvStateRegistry != null) {
-			kvStateRegistry.unregisterAll();
-		}
-
-		lastName = null;
-		lastState = null;
-		if (keyValueStates != null) {
-			for (KvState<?, ?, ?, ?, ?> state : keyValueStates) {
-				state.dispose();
-			}
-		}
-		keyValueStates = null;
-		keyValueStatesByName = null;
-	}
-	
-	// ------------------------------------------------------------------------
-	//  key/value state
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Creates and returns a new {@link ValueState}.
-	 *
-	 * @param namespaceSerializer TypeSerializer for the state namespace.
-	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
-	 *
-	 * @param <N> The type of the namespace.
-	 * @param <T> The type of the value that the {@code ValueState} can store.
-	 */
-	protected abstract <N, T> ValueState<T> createValueState(TypeSerializer<N> namespaceSerializer, ValueStateDescriptor<T> stateDesc) throws Exception;
-
-	/**
-	 * Creates and returns a new {@link ListState}.
-	 *
-	 * @param namespaceSerializer TypeSerializer for the state namespace.
-	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
-	 *
-	 * @param <N> The type of the namespace.
-	 * @param <T> The type of the values that the {@code ListState} can store.
-	 */
-	protected abstract <N, T> ListState<T> createListState(TypeSerializer<N> namespaceSerializer, ListStateDescriptor<T> stateDesc) throws Exception;
-
-	/**
-	 * Creates and returns a new {@link ReducingState}.
-	 *
-	 * @param namespaceSerializer TypeSerializer for the state namespace.
-	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
-	 *
-	 * @param <N> The type of the namespace.
-	 * @param <T> The type of the values that the {@code ListState} can store.
-	 */
-	protected abstract <N, T> ReducingState<T> createReducingState(TypeSerializer<N> namespaceSerializer, ReducingStateDescriptor<T> stateDesc) throws Exception;
-
-	/**
-	 * Creates and returns a new {@link FoldingState}.
-	 *
-	 * @param namespaceSerializer TypeSerializer for the state namespace.
-	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
-	 *
-	 * @param <N> The type of the namespace.
-	 * @param <T> Type of the values folded into the state
-	 * @param <ACC> Type of the value in the state	 *
-	 */
-	protected abstract <N, T, ACC> FoldingState<T, ACC> createFoldingState(TypeSerializer<N> namespaceSerializer, FoldingStateDescriptor<T, ACC> stateDesc) throws Exception;
-
-	/**
-	 * Sets the current key that is used for partitioned state.
-	 * @param currentKey The current key.
-	 */
-	@SuppressWarnings({"unchecked", "rawtypes"})
-	public void setCurrentKey(Object currentKey) {
-		this.currentKey = Preconditions.checkNotNull(currentKey, "Key");
-		if (keyValueStates != null) {
-			for (KvState kv : keyValueStates) {
-				kv.setCurrentKey(currentKey);
-			}
-		}
-	}
-
-	public Object getCurrentKey() {
-		return currentKey;
-	}
+	private static final long serialVersionUID = 4620415814639230247L;
 
 	/**
-	 * Creates or retrieves a partitioned state backed by this state backend.
-	 *
-	 * @param stateDescriptor The state identifier for the state. This contains name
-	 *                           and can create a default state value.
-
-	 * @param <N> The type of the namespace.
-	 * @param <S> The type of the state.
+	 * Creates a {@link CheckpointStreamFactory} that can be used to create streams
+	 * that should end up in a checkpoint.
 	 *
-	 * @return A new key/value state backed by this backend.
-	 *
-	 * @throws Exception Exceptions may occur during initialization of the state and should be forwarded.
+	 * @param jobId The {@link JobID} of the job for which we are creating checkpoint streams.
+	 * @param operatorIdentifier An identifier of the operator for which we create streams.
 	 */
-	@SuppressWarnings({"rawtypes", "unchecked"})
-	public <N, S extends State> S getPartitionedState(final N namespace, final TypeSerializer<N> namespaceSerializer, final StateDescriptor<S, ?> stateDescriptor) throws Exception {
-		Preconditions.checkNotNull(namespace, "Namespace");
-		Preconditions.checkNotNull(namespaceSerializer, "Namespace serializer");
-
-		if (keySerializer == null) {
-			throw new RuntimeException("State key serializer has not been configured in the config. " +
-					"This operation cannot use partitioned state.");
-		}
-		
-		if (!stateDescriptor.isSerializerInitialized()) {
-			stateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
-		}
-
-		if (keyValueStatesByName == null) {
-			keyValueStatesByName = new HashMap<>();
-		}
-
-		if (lastName != null && lastName.equals(stateDescriptor.getName())) {
-			lastState.setCurrentNamespace(namespace);
-			return (S) lastState;
-		}
-
-		KvState<?, ?, ?, ?, ?> previous = keyValueStatesByName.get(stateDescriptor.getName());
-		if (previous != null) {
-			lastState = previous;
-			lastState.setCurrentNamespace(namespace);
-			lastName = stateDescriptor.getName();
-			return (S) previous;
-		}
-
-		// create a new blank key/value state
-		S state = stateDescriptor.bind(new StateBackend() {
-			@Override
-			public <T> ValueState<T> createValueState(ValueStateDescriptor<T> stateDesc) throws Exception {
-				return AbstractStateBackend.this.createValueState(namespaceSerializer, stateDesc);
-			}
-
-			@Override
-			public <T> ListState<T> createListState(ListStateDescriptor<T> stateDesc) throws Exception {
-				return AbstractStateBackend.this.createListState(namespaceSerializer, stateDesc);
-			}
-
-			@Override
-			public <T> ReducingState<T> createReducingState(ReducingStateDescriptor<T> stateDesc) throws Exception {
-				return AbstractStateBackend.this.createReducingState(namespaceSerializer, stateDesc);
-			}
-
-			@Override
-			public <T, ACC> FoldingState<T, ACC> createFoldingState(FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
-				return AbstractStateBackend.this.createFoldingState(namespaceSerializer, stateDesc);
-			}
-
-		});
-
-		KvState kvState = (KvState) state;
-
-		keyValueStatesByName.put(stateDescriptor.getName(), kvState);
-		keyValueStates = keyValueStatesByName.values().toArray(new KvState[keyValueStatesByName.size()]);
-
-		lastName = stateDescriptor.getName();
-		lastState = kvState;
-
-		if (currentKey != null) {
-			kvState.setCurrentKey(currentKey);
-		}
-
-		kvState.setCurrentNamespace(namespace);
-
-		// Publish queryable state
-		if (stateDescriptor.isQueryable()) {
-			if (kvStateRegistry == null) {
-				throw new IllegalStateException("State backend has not been initialized for job.");
-			}
-
-			String name = stateDescriptor.getQueryableStateName();
-			kvStateRegistry.registerKvState(keyGroupIndex, name, kvState);
-		}
-
-		return state;
-	}
-
-	@SuppressWarnings("unchecked,rawtypes")
-	public <N, S extends MergingState<?, ?>> void mergePartitionedStates(final N target, Collection<N> sources, final TypeSerializer<N> namespaceSerializer, final StateDescriptor<S, ?> stateDescriptor) throws Exception {
-		if (stateDescriptor instanceof ReducingStateDescriptor) {
-			ReducingStateDescriptor reducingStateDescriptor = (ReducingStateDescriptor) stateDescriptor;
-			ReduceFunction reduceFn = reducingStateDescriptor.getReduceFunction();
-			ReducingState state = (ReducingState) getPartitionedState(target, namespaceSerializer, stateDescriptor);
-			KvState kvState = (KvState) state;
-			Object result = null;
-			for (N source: sources) {
-				kvState.setCurrentNamespace(source);
-				Object sourceValue = state.get();
-				if (result == null) {
-					result = state.get();
-				} else if (sourceValue != null) {
-					result = reduceFn.reduce(result, sourceValue);
-				}
-				state.clear();
-			}
-			kvState.setCurrentNamespace(target);
-			if (result != null) {
-				state.add(result);
-			}
-		} else if (stateDescriptor instanceof ListStateDescriptor) {
-			ListState<Object> state = (ListState) getPartitionedState(target, namespaceSerializer, stateDescriptor);
-			KvState kvState = (KvState) state;
-			List<Object> result = new ArrayList<>();
-			for (N source: sources) {
-				kvState.setCurrentNamespace(source);
-				Iterable<Object> sourceValue = state.get();
-				if (sourceValue != null) {
-					for (Object o : sourceValue) {
-						result.add(o);
-					}
-				}
-				state.clear();
-			}
-			kvState.setCurrentNamespace(target);
-			for (Object o : result) {
-				state.add(o);
-			}
-		} else {
-			throw new RuntimeException("Cannot merge states for " + stateDescriptor);
-		}
-	}
-
-	public HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> snapshotPartitionedState(long checkpointId, long timestamp) throws Exception {
-		if (keyValueStates != null) {
-			HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> snapshots = new HashMap<>(keyValueStatesByName.size());
-
-			for (Map.Entry<String, KvState<?, ?, ?, ?, ?>> entry : keyValueStatesByName.entrySet()) {
-				KvStateSnapshot<?, ?, ?, ?, ?> snapshot = entry.getValue().snapshot(checkpointId, timestamp);
-				snapshots.put(entry.getKey(), snapshot);
-			}
-			return snapshots;
-		}
-
-		return null;
-	}
-
-	public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception {
-		// We check whether the KvStates require notifications
-		if (keyValueStates != null) {
-			for (KvState<?, ?, ?, ?, ?> kvstate : keyValueStates) {
-				if (kvstate instanceof CheckpointListener) {
-					((CheckpointListener) kvstate).notifyCheckpointComplete(checkpointId);
-				}
-			}
-		}
-	}
-
-	/**
-	 * Injects K/V state snapshots for lazy restore.
-	 * @param keyValueStateSnapshots The Map of snapshots
-	 */
-	@SuppressWarnings("unchecked,rawtypes")
-	public void injectKeyValueStateSnapshots(HashMap<String, KvStateSnapshot> keyValueStateSnapshots) throws Exception {
-		if (keyValueStateSnapshots != null) {
-			if (keyValueStatesByName == null) {
-				keyValueStatesByName = new HashMap<>();
-			}
-
-			for (Map.Entry<String, KvStateSnapshot> state : keyValueStateSnapshots.entrySet()) {
-				KvState kvState = state.getValue().restoreState(this,
-					keySerializer,
-					userCodeClassLoader);
-				keyValueStatesByName.put(state.getKey(), kvState);
-
-				try {
-					// Publish queryable state
-					StateDescriptor stateDesc = kvState.getStateDescriptor();
-					if (stateDesc.isQueryable()) {
-						String queryableStateName = stateDesc.getQueryableStateName();
-						kvStateRegistry.registerKvState(keyGroupIndex, queryableStateName, kvState);
-					}
-				} catch (Throwable ignored) {
-				}
-			}
-			keyValueStates = keyValueStatesByName.values().toArray(new KvState[keyValueStatesByName.size()]);
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  storing state for a checkpoint
-	// ------------------------------------------------------------------------
+	public abstract CheckpointStreamFactory createStreamFactory(
+			JobID jobId,
+			String operatorIdentifier) throws IOException;
 
 	/**
-	 * Creates an output stream that writes into the state of the given checkpoint. When the stream
-	 * is closes, it returns a state handle that can retrieve the state back.
-	 *
-	 * @param checkpointID The ID of the checkpoint.
-	 * @param timestamp The timestamp of the checkpoint.
-	 * @return An output stream that writes state for the given checkpoint.
-	 *
-	 * @throws Exception Exceptions may occur while creating the stream and should be forwarded.
+	 * Creates a new {@link KeyedStateBackend} that is responsible for keeping keyed state
+	 * and can be checkpointed to checkpoint streams.
 	 */
-	public abstract CheckpointStateOutputStream createCheckpointStateOutputStream(
-			long checkpointID, long timestamp) throws Exception;
-
-	// ------------------------------------------------------------------------
-	//  Checkpoint state output stream
-	// ------------------------------------------------------------------------
+	public abstract <K> KeyedStateBackend<K> createKeyedStateBackend(
+			Environment env,
+			JobID jobID,
+			String operatorIdentifier,
+			TypeSerializer<K> keySerializer,
+			KeyGroupAssigner<K> keyGroupAssigner,
+			KeyGroupRange keyGroupRange,
+			TaskKvStateRegistry kvStateRegistry) throws Exception;
 
 	/**
-	 * A dedicated output stream that produces a {@link StreamStateHandle} when closed.
+	 * Creates a new {@link KeyedStateBackend} that restores its state from the given list
+	 * {@link KeyGroupsStateHandle KeyGroupStateHandles}.
 	 */
-	public static abstract class CheckpointStateOutputStream extends FSDataOutputStream {
+	public abstract <K> KeyedStateBackend<K> restoreKeyedStateBackend(
+			Environment env,
+			JobID jobID,
+			String operatorIdentifier,
+			TypeSerializer<K> keySerializer,
+			KeyGroupAssigner<K> keyGroupAssigner,
+			KeyGroupRange keyGroupRange,
+			List<KeyGroupsStateHandle> restoredState,
+			TaskKvStateRegistry kvStateRegistry) throws Exception;
 
-		/**
-		 * Closes the stream and gets a state handle that can create an input stream
-		 * producing the data written to this stream.
-		 *
-		 * @return A state handle that can create an input stream producing the data written to this stream.
-		 * @throws IOException Thrown, if the stream cannot be closed.
-		 */
-		public abstract StreamStateHandle closeAndGetHandle() throws IOException;
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousKvStateSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousKvStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousKvStateSnapshot.java
deleted file mode 100644
index c2fc8a4..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousKvStateSnapshot.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import org.apache.flink.api.common.state.State;
-import org.apache.flink.api.common.state.StateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-
-import java.io.IOException;
-
-/**
- * {@link KvStateSnapshot} that asynchronously materializes the state that it represents. Instead
- * of representing a materialized handle to state this would normally hold the (immutable) state
- * internally and materializes it when {@link #materialize()} is called.
- *
- * @param <K> The type of the key
- * @param <N> The type of the namespace
- * @param <S> The type of the {@link State}
- * @param <SD> The type of the {@link StateDescriptor}
- * @param <Backend> The type of the backend that can restore the state from this snapshot.
- */
-public abstract class AsynchronousKvStateSnapshot<K, N, S extends State, SD extends StateDescriptor<S, ?>, Backend extends AbstractStateBackend> implements KvStateSnapshot<K, N, S, SD, Backend> {
-	private static final long serialVersionUID = 1L;
-
-	/**
-	 * Materializes the state held by this {@code AsynchronousKvStateSnapshot}.
-	 */
-	public abstract KvStateSnapshot<K, N, S, SD, Backend> materialize() throws Exception;
-
-	@Override
-	public final KvState<K, N, S, SD, Backend> restoreState(
-		Backend stateBackend,
-		TypeSerializer<K> keySerializer,
-		ClassLoader classLoader) throws Exception {
-		throw new RuntimeException("This should never be called and probably points to a bug.");
-	}
-
-	@Override
-	public void discardState() throws Exception {
-		throw new RuntimeException("This should never be called and probably points to a bug.");
-	}
-
-	@Override
-	public long getStateSize() throws Exception {
-		throw new RuntimeException("This should never be called and probably points to a bug.");
-	}
-
-	@Override
-	public void close() throws IOException {
-		throw new RuntimeException("This should never be called and probably points to a bug.");
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java
new file mode 100644
index 0000000..199a856
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.core.fs.FSDataOutputStream;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public interface CheckpointStreamFactory {
+
+	/**
+	 * Creates an new {@link CheckpointStateOutputStream}. When the stream
+	 * is closed, it returns a state handle that can retrieve the state back.
+	 *
+	 * @param checkpointID The ID of the checkpoint.
+	 * @param timestamp The timestamp of the checkpoint.
+	 *
+	 * @return An output stream that writes state for the given checkpoint.
+	 *
+	 * @throws Exception Exceptions may occur while creating the stream and should be forwarded.
+	 */
+	CheckpointStateOutputStream createCheckpointStateOutputStream(
+			long checkpointID,
+			long timestamp) throws Exception;
+
+	/**
+	 * Closes the stream factory, releasing all internal resources, but does not delete any
+	 * persistent checkpoint data.
+	 *
+	 * @throws Exception Exceptions can be forwarded and will be logged by the system
+	 */
+	void close() throws Exception;
+
+	/**
+	 * A dedicated output stream that produces a {@link StreamStateHandle} when closed.
+	 *
+	 * <p>Note: This is an abstract class and not an interface because {@link OutputStream}
+	 * is an abstract class.
+	 */
+	abstract class CheckpointStateOutputStream extends FSDataOutputStream {
+
+		/**
+		 * Closes the stream and gets a state handle that can create an input stream
+		 * producing the data written to this stream.
+		 *
+		 * @return A state handle that can create an input stream producing the data written to this stream.
+		 * @throws IOException Thrown, if the stream cannot be closed.
+		 */
+		public abstract StreamStateHandle closeAndGetHandle() throws IOException;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/DoneFuture.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DoneFuture.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DoneFuture.java
new file mode 100644
index 0000000..777ab69
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DoneFuture.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.state;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * A {@link Future} that is always done and will just yield the object that was given at creation
+ * time.
+ *
+ * @param <T> The type of object in this {@code Future}.
+ */
+public class DoneFuture<T> implements RunnableFuture<T> {
+	private final T keyGroupsStateHandle;
+
+	public DoneFuture(T keyGroupsStateHandle) {
+		this.keyGroupsStateHandle = keyGroupsStateHandle;
+	}
+
+	@Override
+	public boolean cancel(boolean mayInterruptIfRunning) {
+		return false;
+	}
+
+	@Override
+	public boolean isCancelled() {
+		return false;
+	}
+
+	@Override
+	public boolean isDone() {
+		return true;
+	}
+
+	@Override
+	public T get() throws InterruptedException, ExecutionException {
+		return keyGroupsStateHandle;
+	}
+
+	@Override
+	public T get(
+			long timeout,
+			TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+		return get();
+	}
+
+	@Override
+	public void run() {
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericFoldingState.java
index e13ac98..ee2d86d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericFoldingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericFoldingState.java
@@ -20,25 +20,18 @@ package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.state.FoldingState;
-import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-
-import java.io.IOException;
 
 /**
  * Generic implementation of {@link FoldingState} based on a wrapped {@link ValueState}.
  *
- * @param <K> The type of the key.
  * @param <N> The type of the namespace.
  * @param <T> The type of the values that can be folded into the state.
  * @param <ACC> The type of the value in the folding state.
- * @param <Backend> The type of {@link AbstractStateBackend} that manages this {@code KvState}.
  * @param <W> Generic type that extends both the underlying {@code ValueState} and {@code KvState}.
  */
-public class GenericFoldingState<K, N, T, ACC, Backend extends AbstractStateBackend, W extends ValueState<ACC> & KvState<K, N, ValueState<ACC>, ValueStateDescriptor<ACC>, Backend>>
-	implements FoldingState<T, ACC>, KvState<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>, Backend> {
+public class GenericFoldingState<N, T, ACC, W extends ValueState<ACC> & KvState<N>>
+	implements FoldingState<T, ACC>, KvState<N> {
 
 	private final W wrappedState;
 	private final FoldFunction<T, ACC> foldFunction;
@@ -60,11 +53,6 @@ public class GenericFoldingState<K, N, T, ACC, Backend extends AbstractStateBack
 	}
 
 	@Override
-	public void setCurrentKey(K key) {
-		wrappedState.setCurrentKey(key);
-	}
-
-	@Override
 	public void setCurrentNamespace(N namespace) {
 		wrappedState.setCurrentNamespace(namespace);
 	}
@@ -75,26 +63,6 @@ public class GenericFoldingState<K, N, T, ACC, Backend extends AbstractStateBack
 	}
 
 	@Override
-	public KvStateSnapshot<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>, Backend> snapshot(
-		long checkpointId,
-		long timestamp) throws Exception {
-		KvStateSnapshot<K, N, ValueState<ACC>, ValueStateDescriptor<ACC>, Backend> wrappedSnapshot = wrappedState.snapshot(
-			checkpointId,
-			timestamp);
-		return new Snapshot<>(wrappedSnapshot, foldFunction);
-	}
-
-	@Override
-	public void dispose() {
-		wrappedState.dispose();
-	}
-
-	@Override
-	public FoldingStateDescriptor<T, ACC> getStateDescriptor() {
-		throw new UnsupportedOperationException("Not supported by generic state type");
-	}
-
-	@Override
 	public ACC get() throws Exception {
 		return wrappedState.value();
 	}
@@ -109,42 +77,4 @@ public class GenericFoldingState<K, N, T, ACC, Backend extends AbstractStateBack
 	public void clear() {
 		wrappedState.clear();
 	}
-
-	private static class Snapshot<K, N, T, ACC, Backend extends AbstractStateBackend> implements KvStateSnapshot<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>, Backend> {
-		private static final long serialVersionUID = 1L;
-
-		private final KvStateSnapshot<K, N, ValueState<ACC>, ValueStateDescriptor<ACC>, Backend> wrappedSnapshot;
-
-		private final FoldFunction<T, ACC> foldFunction;
-
-		public Snapshot(KvStateSnapshot<K, N, ValueState<ACC>, ValueStateDescriptor<ACC>, Backend> wrappedSnapshot,
-			FoldFunction<T, ACC> foldFunction) {
-			this.wrappedSnapshot = wrappedSnapshot;
-			this.foldFunction = foldFunction;
-		}
-
-		@Override
-		@SuppressWarnings("unchecked")
-		public KvState<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>, Backend> restoreState(
-				Backend stateBackend,
-				TypeSerializer<K> keySerializer,
-				ClassLoader classLoader) throws Exception {
-			return new GenericFoldingState((ValueState<ACC>) wrappedSnapshot.restoreState(stateBackend, keySerializer, classLoader), foldFunction);
-		}
-
-		@Override
-		public void discardState() throws Exception {
-			wrappedSnapshot.discardState();
-		}
-
-		@Override
-		public long getStateSize() throws Exception {
-			return wrappedSnapshot.getStateSize();
-		}
-
-		@Override
-		public void close() throws IOException {
-			wrappedSnapshot.close();
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java
index 45460b4..ba81837 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java
@@ -19,25 +19,19 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 
-import java.io.IOException;
 import java.util.ArrayList;
 
 /**
  * Generic implementation of {@link ListState} based on a wrapped {@link ValueState}.
  *
- * @param <K> The type of the key.
  * @param <N> The type of the namespace.
  * @param <T> The type of the values stored in this {@code ListState}.
- * @param <Backend> The type of {@link AbstractStateBackend} that manages this {@code KvState}.
  * @param <W> Generic type that extends both the underlying {@code ValueState} and {@code KvState}.
  */
-public class GenericListState<K, N, T, Backend extends AbstractStateBackend, W extends ValueState<ArrayList<T>> & KvState<K, N, ValueState<ArrayList<T>>, ValueStateDescriptor<ArrayList<T>>, Backend>>
-	implements ListState<T>, KvState<K, N, ListState<T>, ListStateDescriptor<T>, Backend> {
+public class GenericListState<N, T, W extends ValueState<ArrayList<T>> & KvState<N>>
+	implements ListState<T>, KvState<N> {
 
 	private final W wrappedState;
 
@@ -56,11 +50,6 @@ public class GenericListState<K, N, T, Backend extends AbstractStateBackend, W e
 	}
 
 	@Override
-	public void setCurrentKey(K key) {
-		wrappedState.setCurrentKey(key);
-	}
-
-	@Override
 	public void setCurrentNamespace(N namespace) {
 		wrappedState.setCurrentNamespace(namespace);
 	}
@@ -71,26 +60,6 @@ public class GenericListState<K, N, T, Backend extends AbstractStateBackend, W e
 	}
 
 	@Override
-	public KvStateSnapshot<K, N, ListState<T>, ListStateDescriptor<T>, Backend> snapshot(
-		long checkpointId,
-		long timestamp) throws Exception {
-		KvStateSnapshot<K, N, ValueState<ArrayList<T>>, ValueStateDescriptor<ArrayList<T>>, Backend> wrappedSnapshot = wrappedState.snapshot(
-			checkpointId,
-			timestamp);
-		return new Snapshot<>(wrappedSnapshot);
-	}
-
-	@Override
-	public void dispose() {
-		wrappedState.dispose();
-	}
-
-	@Override
-	public ListStateDescriptor<T> getStateDescriptor() {
-		throw new UnsupportedOperationException("Not supported by generic state type");
-	}
-
-	@Override
 	public Iterable<T> get() throws Exception {
 		return wrappedState.value();
 	}
@@ -112,38 +81,4 @@ public class GenericListState<K, N, T, Backend extends AbstractStateBackend, W e
 	public void clear() {
 		wrappedState.clear();
 	}
-
-	private static class Snapshot<K, N, T, Backend extends AbstractStateBackend> implements KvStateSnapshot<K, N, ListState<T>, ListStateDescriptor<T>, Backend> {
-		private static final long serialVersionUID = 1L;
-
-		private final KvStateSnapshot<K, N, ValueState<ArrayList<T>>, ValueStateDescriptor<ArrayList<T>>, Backend> wrappedSnapshot;
-
-		public Snapshot(KvStateSnapshot<K, N, ValueState<ArrayList<T>>, ValueStateDescriptor<ArrayList<T>>, Backend> wrappedSnapshot) {
-			this.wrappedSnapshot = wrappedSnapshot;
-		}
-
-		@Override
-		@SuppressWarnings("unchecked")
-		public KvState<K, N, ListState<T>, ListStateDescriptor<T>, Backend> restoreState(
-			Backend stateBackend,
-			TypeSerializer<K> keySerializer,
-			ClassLoader classLoader) throws Exception {
-			return new GenericListState((ValueState<T>) wrappedSnapshot.restoreState(stateBackend, keySerializer, classLoader));
-		}
-
-		@Override
-		public void discardState() throws Exception {
-			wrappedSnapshot.discardState();
-		}
-
-		@Override
-		public long getStateSize() throws Exception {
-			return wrappedSnapshot.getStateSize();
-		}
-
-		@Override
-		public void close() throws IOException {
-			wrappedSnapshot.close();
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericReducingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericReducingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericReducingState.java
index e4bb279..214231e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericReducingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericReducingState.java
@@ -20,24 +20,17 @@ package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.state.ReducingState;
-import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-
-import java.io.IOException;
 
 /**
  * Generic implementation of {@link ReducingState} based on a wrapped {@link ValueState}.
  *
- * @param <K> The type of the key.
  * @param <N> The type of the namespace.
  * @param <T> The type of the values stored in this {@code ReducingState}.
- * @param <Backend> The type of {@link AbstractStateBackend} that manages this {@code KvState}.
  * @param <W> Generic type that extends both the underlying {@code ValueState} and {@code KvState}.
  */
-public class GenericReducingState<K, N, T, Backend extends AbstractStateBackend, W extends ValueState<T> & KvState<K, N, ValueState<T>, ValueStateDescriptor<T>, Backend>>
-	implements ReducingState<T>, KvState<K, N, ReducingState<T>, ReducingStateDescriptor<T>, Backend> {
+public class GenericReducingState<N, T, W extends ValueState<T> & KvState<N>>
+	implements ReducingState<T>, KvState<N> {
 
 	private final W wrappedState;
 	private final ReduceFunction<T> reduceFunction;
@@ -59,11 +52,6 @@ public class GenericReducingState<K, N, T, Backend extends AbstractStateBackend,
 	}
 
 	@Override
-	public void setCurrentKey(K key) {
-		wrappedState.setCurrentKey(key);
-	}
-
-	@Override
 	public void setCurrentNamespace(N namespace) {
 		wrappedState.setCurrentNamespace(namespace);
 	}
@@ -74,26 +62,6 @@ public class GenericReducingState<K, N, T, Backend extends AbstractStateBackend,
 	}
 
 	@Override
-	public KvStateSnapshot<K, N, ReducingState<T>, ReducingStateDescriptor<T>, Backend> snapshot(
-		long checkpointId,
-		long timestamp) throws Exception {
-		KvStateSnapshot<K, N, ValueState<T>, ValueStateDescriptor<T>, Backend> wrappedSnapshot = wrappedState.snapshot(
-			checkpointId,
-			timestamp);
-		return new Snapshot<>(wrappedSnapshot, reduceFunction);
-	}
-
-	@Override
-	public void dispose() {
-		wrappedState.dispose();
-	}
-
-	@Override
-	public ReducingStateDescriptor<T> getStateDescriptor() {
-		throw new UnsupportedOperationException("Not supported by generic state type");
-	}
-
-	@Override
 	public T get() throws Exception {
 		return wrappedState.value();
 	}
@@ -112,42 +80,4 @@ public class GenericReducingState<K, N, T, Backend extends AbstractStateBackend,
 	public void clear() {
 		wrappedState.clear();
 	}
-
-	private static class Snapshot<K, N, T, Backend extends AbstractStateBackend> implements KvStateSnapshot<K, N, ReducingState<T>, ReducingStateDescriptor<T>, Backend> {
-		private static final long serialVersionUID = 1L;
-
-		private final KvStateSnapshot<K, N, ValueState<T>, ValueStateDescriptor<T>, Backend> wrappedSnapshot;
-
-		private final ReduceFunction<T> reduceFunction;
-
-		public Snapshot(KvStateSnapshot<K, N, ValueState<T>, ValueStateDescriptor<T>, Backend> wrappedSnapshot,
-			ReduceFunction<T> reduceFunction) {
-			this.wrappedSnapshot = wrappedSnapshot;
-			this.reduceFunction = reduceFunction;
-		}
-
-		@Override
-		@SuppressWarnings("unchecked")
-		public KvState<K, N, ReducingState<T>, ReducingStateDescriptor<T>, Backend> restoreState(
-			Backend stateBackend,
-			TypeSerializer<K> keySerializer,
-			ClassLoader classLoader) throws Exception {
-			return new GenericReducingState((ValueState<T>) wrappedSnapshot.restoreState(stateBackend, keySerializer, classLoader), reduceFunction);
-		}
-
-		@Override
-		public void discardState() throws Exception {
-			wrappedSnapshot.discardState();
-		}
-
-		@Override
-		public long getStateSize() throws Exception {
-			return wrappedSnapshot.getStateSize();
-		}
-
-		@Override
-		public void close() throws IOException {
-			wrappedSnapshot.close();
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRange.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRange.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRange.java
index de42bdb..9e74036 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRange.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRange.java
@@ -191,6 +191,9 @@ public class KeyGroupRange implements Iterable<Integer>, Serializable {
 			int maxParallelism,
 			int parallelism,
 			int operatorIndex) {
+		Preconditions.checkArgument(parallelism > 0, "Parallelism must not be smaller than zero.");
+		Preconditions.checkArgument(maxParallelism >= parallelism, "Maximum parallelism must not be smaller than parallelism.");
+		Preconditions.checkArgument(maxParallelism <= Short.MAX_VALUE, "Maximum parallelism must be smaller than Short.MAX_VALUE.");
 
 		int start = operatorIndex == 0 ? 0 : ((operatorIndex * maxParallelism - 1) / parallelism) + 1;
 		int end = ((operatorIndex + 1) * maxParallelism - 1) / parallelism;

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
new file mode 100644
index 0000000..2d1d25c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.KeyGroupAssigner;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MergingState;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateBackend;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.RunnableFuture;
+
+/**
+ * A keyed state backend is responsible for managing keyed state. The state can be checkpointed
+ * to streams using {@link #snapshot(long, long, CheckpointStreamFactory)}.
+ *
+ * @param <K> The key by which state is keyed.
+ */
+public abstract class KeyedStateBackend<K> {
+
+	/** {@link TypeSerializer} for our key. */
+	protected final TypeSerializer<K> keySerializer;
+
+	/** The currently active key. */
+	protected K currentKey;
+
+	/** The key group of the currently active key */
+	private int currentKeyGroup;
+
+	/** So that we can give out state when the user uses the same key. */
+	protected HashMap<String, KvState<?>> keyValueStatesByName;
+
+	/** For caching the last accessed partitioned state */
+	private String lastName;
+
+	@SuppressWarnings("rawtypes")
+	private KvState lastState;
+
+	/** KeyGroupAssigner which determines the key group for each keys */
+	protected final KeyGroupAssigner<K> keyGroupAssigner;
+
+	/** Range of key-groups for which this backend is responsible */
+	protected final KeyGroupRange keyGroupRange;
+
+	/** KvStateRegistry helper for this task */
+	protected final TaskKvStateRegistry kvStateRegistry;
+
+	public KeyedStateBackend(
+			TaskKvStateRegistry kvStateRegistry,
+			TypeSerializer<K> keySerializer,
+			KeyGroupAssigner<K> keyGroupAssigner,
+			KeyGroupRange keyGroupRange) {
+
+		this.kvStateRegistry = Preconditions.checkNotNull(kvStateRegistry);
+		this.keySerializer = Preconditions.checkNotNull(keySerializer);
+		this.keyGroupAssigner = Preconditions.checkNotNull(keyGroupAssigner);
+		this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange);
+	}
+
+	/**
+	 * Closes the state backend, releasing all internal resources, but does not delete any persistent
+	 * checkpoint data.
+	 *
+	 * @throws Exception Exceptions can be forwarded and will be logged by the system
+	 */
+	public void close() throws Exception {
+		if (kvStateRegistry != null) {
+			kvStateRegistry.unregisterAll();
+		}
+
+		lastName = null;
+		lastState = null;
+		keyValueStatesByName = null;
+	}
+
+	/**
+	 * Creates and returns a new {@link ValueState}.
+	 *
+	 * @param namespaceSerializer TypeSerializer for the state namespace.
+	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
+	 *
+	 * @param <N> The type of the namespace.
+	 * @param <T> The type of the value that the {@code ValueState} can store.
+	 */
+	protected abstract <N, T> ValueState<T> createValueState(TypeSerializer<N> namespaceSerializer, ValueStateDescriptor<T> stateDesc) throws Exception;
+
+	/**
+	 * Creates and returns a new {@link ListState}.
+	 *
+	 * @param namespaceSerializer TypeSerializer for the state namespace.
+	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
+	 *
+	 * @param <N> The type of the namespace.
+	 * @param <T> The type of the values that the {@code ListState} can store.
+	 */
+	protected abstract <N, T> ListState<T> createListState(TypeSerializer<N> namespaceSerializer, ListStateDescriptor<T> stateDesc) throws Exception;
+
+	/**
+	 * Creates and returns a new {@link ReducingState}.
+	 *
+	 * @param namespaceSerializer TypeSerializer for the state namespace.
+	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
+	 *
+	 * @param <N> The type of the namespace.
+	 * @param <T> The type of the values that the {@code ListState} can store.
+	 */
+	protected abstract <N, T> ReducingState<T> createReducingState(TypeSerializer<N> namespaceSerializer, ReducingStateDescriptor<T> stateDesc) throws Exception;
+
+	/**
+	 * Creates and returns a new {@link FoldingState}.
+	 *
+	 * @param namespaceSerializer TypeSerializer for the state namespace.
+	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
+	 *
+	 * @param <N> The type of the namespace.
+	 * @param <T> Type of the values folded into the state
+	 * @param <ACC> Type of the value in the state	 *
+	 */
+	protected abstract <N, T, ACC> FoldingState<T, ACC> createFoldingState(TypeSerializer<N> namespaceSerializer, FoldingStateDescriptor<T, ACC> stateDesc) throws Exception;
+
+	/**
+	 * Sets the current key that is used for partitioned state.
+	 * @param newKey The new current key.
+	 */
+	public void setCurrentKey(K newKey) {
+		this.currentKey = newKey;
+		this.currentKeyGroup = keyGroupAssigner.getKeyGroupIndex(newKey);
+	}
+
+	/**
+	 * {@link TypeSerializer} for the state backend key type.
+	 */
+	public TypeSerializer<K> getKeySerializer() {
+		return keySerializer;
+	}
+
+	/**
+	 * Used by states to access the current key.
+	 */
+	public K getCurrentKey() {
+		return currentKey;
+	}
+
+	public int getCurrentKeyGroupIndex() {
+		return currentKeyGroup;
+	}
+
+	public int getNumberOfKeyGroups() {
+		return keyGroupAssigner.getNumberKeyGroups();
+	}
+
+	public KeyGroupAssigner<K> getKeyGroupAssigner() {
+		return keyGroupAssigner;
+	}
+
+	/**
+	 * Creates or retrieves a partitioned state backed by this state backend.
+	 *
+	 * @param stateDescriptor The state identifier for the state. This contains name
+	 *                           and can create a default state value.
+
+	 * @param <N> The type of the namespace.
+	 * @param <S> The type of the state.
+	 *
+	 * @return A new key/value state backed by this backend.
+	 *
+	 * @throws Exception Exceptions may occur during initialization of the state and should be forwarded.
+	 */
+	@SuppressWarnings({"rawtypes", "unchecked"})
+	public <N, S extends State> S getPartitionedState(final N namespace, final TypeSerializer<N> namespaceSerializer, final StateDescriptor<S, ?> stateDescriptor) throws Exception {
+		Preconditions.checkNotNull(namespace, "Namespace");
+		Preconditions.checkNotNull(namespaceSerializer, "Namespace serializer");
+
+		if (keySerializer == null) {
+			throw new RuntimeException("State key serializer has not been configured in the config. " +
+					"This operation cannot use partitioned state.");
+		}
+		
+		if (!stateDescriptor.isSerializerInitialized()) {
+			stateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
+		}
+
+		if (keyValueStatesByName == null) {
+			keyValueStatesByName = new HashMap<>();
+		}
+
+		if (lastName != null && lastName.equals(stateDescriptor.getName())) {
+			lastState.setCurrentNamespace(namespace);
+			return (S) lastState;
+		}
+
+		KvState<?> previous = keyValueStatesByName.get(stateDescriptor.getName());
+		if (previous != null) {
+			lastState = previous;
+			lastState.setCurrentNamespace(namespace);
+			lastName = stateDescriptor.getName();
+			return (S) previous;
+		}
+
+		// create a new blank key/value state
+		S state = stateDescriptor.bind(new StateBackend() {
+			@Override
+			public <T> ValueState<T> createValueState(ValueStateDescriptor<T> stateDesc) throws Exception {
+				return KeyedStateBackend.this.createValueState(namespaceSerializer, stateDesc);
+			}
+
+			@Override
+			public <T> ListState<T> createListState(ListStateDescriptor<T> stateDesc) throws Exception {
+				return KeyedStateBackend.this.createListState(namespaceSerializer, stateDesc);
+			}
+
+			@Override
+			public <T> ReducingState<T> createReducingState(ReducingStateDescriptor<T> stateDesc) throws Exception {
+				return KeyedStateBackend.this.createReducingState(namespaceSerializer, stateDesc);
+			}
+
+			@Override
+			public <T, ACC> FoldingState<T, ACC> createFoldingState(FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
+				return KeyedStateBackend.this.createFoldingState(namespaceSerializer, stateDesc);
+			}
+
+		});
+
+		KvState kvState = (KvState) state;
+
+		keyValueStatesByName.put(stateDescriptor.getName(), kvState);
+
+		lastName = stateDescriptor.getName();
+		lastState = kvState;
+
+		kvState.setCurrentNamespace(namespace);
+
+		// Publish queryable state
+		if (stateDescriptor.isQueryable()) {
+			if (kvStateRegistry == null) {
+				throw new IllegalStateException("State backend has not been initialized for job.");
+			}
+
+			String name = stateDescriptor.getQueryableStateName();
+			// TODO: deal with key group indices here
+			kvStateRegistry.registerKvState(0, name, kvState);
+		}
+
+		return state;
+	}
+
+	@SuppressWarnings("unchecked,rawtypes")
+	public <N, S extends MergingState<?, ?>> void mergePartitionedStates(final N target, Collection<N> sources, final TypeSerializer<N> namespaceSerializer, final StateDescriptor<S, ?> stateDescriptor) throws Exception {
+		if (stateDescriptor instanceof ReducingStateDescriptor) {
+			ReducingStateDescriptor reducingStateDescriptor = (ReducingStateDescriptor) stateDescriptor;
+			ReduceFunction reduceFn = reducingStateDescriptor.getReduceFunction();
+			ReducingState state = (ReducingState) getPartitionedState(target, namespaceSerializer, stateDescriptor);
+			KvState kvState = (KvState) state;
+			Object result = null;
+			for (N source: sources) {
+				kvState.setCurrentNamespace(source);
+				Object sourceValue = state.get();
+				if (result == null) {
+					result = state.get();
+				} else if (sourceValue != null) {
+					result = reduceFn.reduce(result, sourceValue);
+				}
+				state.clear();
+			}
+			kvState.setCurrentNamespace(target);
+			if (result != null) {
+				state.add(result);
+			}
+		} else if (stateDescriptor instanceof ListStateDescriptor) {
+			ListState<Object> state = (ListState) getPartitionedState(target, namespaceSerializer, stateDescriptor);
+			KvState kvState = (KvState) state;
+			List<Object> result = new ArrayList<>();
+			for (N source: sources) {
+				kvState.setCurrentNamespace(source);
+				Iterable<Object> sourceValue = state.get();
+				if (sourceValue != null) {
+					for (Object o : sourceValue) {
+						result.add(o);
+					}
+				}
+				state.clear();
+			}
+			kvState.setCurrentNamespace(target);
+			for (Object o : result) {
+				state.add(o);
+			}
+		} else {
+			throw new RuntimeException("Cannot merge states for " + stateDescriptor);
+		}
+	}
+
+	/**
+	 * Snapshots the keyed state by writing it to streams that are provided by a
+	 * {@link CheckpointStreamFactory}.
+	 *
+	 * @param checkpointId The ID of the checkpoint.
+	 * @param timestamp The timestamp of the checkpoint.
+	 * @param streamFactory The factory that we can use for writing our state to streams.
+	 *
+	 * @return A future that will yield a {@link KeyGroupsStateHandle} with the index and
+	 * written key group state stream.
+	 */
+	public abstract RunnableFuture<KeyGroupsStateHandle> snapshot(
+			long checkpointId,
+			long timestamp,
+			CheckpointStreamFactory streamFactory) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvState.java
index a8aa872..aded79f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvState.java
@@ -18,9 +18,6 @@
 
 package org.apache.flink.runtime.state;
 
-import org.apache.flink.api.common.state.State;
-import org.apache.flink.api.common.state.StateDescriptor;
-
 /**
  * Key/Value state implementation for user-defined state. The state is backed by a state
  * backend, which typically follows one of the following patterns: Either the state is stored
@@ -29,20 +26,9 @@ import org.apache.flink.api.common.state.StateDescriptor;
  * by an external key/value store as the state backend, and checkpoints merely record the
  * metadata of what is considered part of the checkpoint.
  * 
- * @param <K> The type of the key.
  * @param <N> The type of the namespace.
- * @param <S> The type of {@link State} this {@code KvState} holds.
- * @param <SD> The type of the {@link StateDescriptor} for state {@code S}.
- * @param <Backend> The type of {@link AbstractStateBackend} that manages this {@code KvState}.
  */
-public interface KvState<K, N, S extends State, SD extends StateDescriptor<S, ?>, Backend extends AbstractStateBackend> {
-
-	/**
-	 * Sets the current key, which will be used when using the state access methods.
-	 *
-	 * @param key The key.
-	 */
-	void setCurrentKey(K key);
+public interface KvState<N> {
 
 	/**
 	 * Sets the current namespace, which will be used when using the state access methods.
@@ -63,27 +49,4 @@ public interface KvState<K, N, S extends State, SD extends StateDescriptor<S, ?>
 	 * @throws Exception Exceptions during serialization are forwarded
 	 */
 	byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception;
-
-	/**
-	 * Creates a snapshot of this state.
-	 * 
-	 * @param checkpointId The ID of the checkpoint for which the snapshot should be created.
-	 * @param timestamp The timestamp of the checkpoint.
-	 * @return A snapshot handle for this key/value state.
-	 * 
-	 * @throws Exception Exceptions during snapshotting the state should be forwarded, so the system
-	 *                   can react to failed snapshots.
-	 */
-	KvStateSnapshot<K, N, S, SD, Backend> snapshot(long checkpointId, long timestamp) throws Exception;
-
-	/**
-	 * Disposes the key/value state, releasing all occupied resources.
-	 */
-	void dispose();
-
-	/**
-	 * Returns the state descriptor from which the KvState instance was created.
-	 */
-	SD getStateDescriptor();
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvStateSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvStateSnapshot.java
deleted file mode 100644
index 5654845..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvStateSnapshot.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import org.apache.flink.api.common.state.State;
-import org.apache.flink.api.common.state.StateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-
-/**
- * This class represents a snapshot of the {@link KvState}, taken for a checkpoint. Where exactly
- * the snapshot stores the snapshot data (in this object, in an external data store, etc) depends
- * on the actual implementation. This snapshot defines merely how to restore the state and
- * how to discard the state.
- *
- * <p>One possible implementation is that this snapshot simply contains a copy of the key/value map.
- * 
- * <p>Another possible implementation for this snapshot is that the key/value map is serialized into
- * a file and this snapshot object contains a pointer to that file.
- *
- * @param <K> The type of the key
- * @param <N> The type of the namespace
- * @param <S> The type of the {@link State}
- * @param <SD> The type of the {@link StateDescriptor}
- * @param <Backend> The type of the backend that can restore the state from this snapshot.
- */
-public interface KvStateSnapshot<K, N, S extends State, SD extends StateDescriptor<S, ?>, Backend extends AbstractStateBackend> 
-		extends StateObject {
-
-	/**
-	 * Loads the key/value state back from this snapshot.
-	 *
-	 * @param stateBackend The state backend that created this snapshot and can restore the key/value state
-	 *                     from this snapshot.
-	 * @param keySerializer The serializer for the keys.
-	 * @param classLoader The class loader for user-defined types.
-	 *
-	 * @return An instance of the key/value state loaded from this snapshot.
-	 * 
-	 * @throws Exception Exceptions can occur during the state loading and are forwarded. 
-	 */
-	KvState<K, N, S, SD, Backend> restoreState(
-		Backend stateBackend,
-		TypeSerializer<K> keySerializer,
-		ClassLoader classLoader) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java
index e3538af..c6fd02c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java
@@ -61,7 +61,7 @@ public class RetrievableStreamStateHandle<T extends Serializable> implements
 	}
 
 	@Override
-	public FSDataInputStream openInputStream() throws Exception {
+	public FSDataInputStream openInputStream() throws IOException {
 		return wrappedStreamStateHandle.openInputStream();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java
index a43a2c5..47103c1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java
@@ -20,9 +20,8 @@ package org.apache.flink.runtime.state;
 
 /**
  * Base of all types that represent checkpointed state. Specializations are for
- * example {@link StateHandle StateHandles} (directly resolve to state) and 
- * {@link KvStateSnapshot key/value state snapshots}.
- * 
+ * example {@link StateHandle StateHandles} (directly resolve to state).
+ *
  * <p>State objects define how to:
  * <ul>
  *     <li><b>Discard State</b>: The {@link #discardState()} method defines how state is permanently

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamStateHandle.java
index 46e4299..e792e62 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamStateHandle.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.state;
 
 import org.apache.flink.core.fs.FSDataInputStream;
 
+import java.io.IOException;
+
 /**
  * A {@link StateObject} that represents state that was written to a stream. The data can be read
  * back via {@link #openInputStream()}.
@@ -30,5 +32,5 @@ public interface StreamStateHandle extends StateObject {
 	 * Returns an {@link FSDataInputStream} that can be used to read back the data that
 	 * was previously written to the stream.
 	 */
-	FSDataInputStream openInputStream() throws Exception;
+	FSDataInputStream openInputStream() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsState.java
deleted file mode 100644
index 3cae629..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsState.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.filesystem;
-
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.State;
-import org.apache.flink.api.common.state.StateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.state.AbstractHeapState;
-import org.apache.flink.runtime.state.KvStateSnapshot;
-
-import java.io.DataOutputStream;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Base class for partitioned {@link ListState} implementations that are backed by a regular
- * heap hash map. The concrete implementations define how the state is checkpointed.
- * 
- * @param <K> The type of the key.
- * @param <N> The type of the namespace.
- * @param <SV> The type of the values in the state.
- * @param <S> The type of State
- * @param <SD> The type of StateDescriptor for the State S
- */
-public abstract class AbstractFsState<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>>
-		extends AbstractHeapState<K, N, SV, S, SD, FsStateBackend> {
-
-	/** The file system state backend backing snapshots of this state */
-	private final FsStateBackend backend;
-
-	public AbstractFsState(FsStateBackend backend,
-		TypeSerializer<K> keySerializer,
-		TypeSerializer<N> namespaceSerializer,
-		TypeSerializer<SV> stateSerializer,
-		SD stateDesc) {
-		super(keySerializer, namespaceSerializer, stateSerializer, stateDesc);
-		this.backend = backend;
-	}
-
-	public AbstractFsState(FsStateBackend backend,
-		TypeSerializer<K> keySerializer,
-		TypeSerializer<N> namespaceSerializer,
-		TypeSerializer<SV> stateSerializer,
-		SD stateDesc,
-		HashMap<N, Map<K, SV>> state) {
-		super(keySerializer, namespaceSerializer, stateSerializer, stateDesc, state);
-		this.backend = backend;
-	}
-
-	public abstract KvStateSnapshot<K, N, S, SD, FsStateBackend> createHeapSnapshot(Path filePath);
-
-	@Override
-	public KvStateSnapshot<K, N, S, SD, FsStateBackend> snapshot(long checkpointId, long timestamp) throws Exception {
-
-		try (FsStateBackend.FsCheckpointStateOutputStream out = backend.createCheckpointStateOutputStream(checkpointId, timestamp)) {
-
-			// serialize the state to the output stream
-			DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(new DataOutputStream(out));
-			outView.writeInt(state.size());
-			for (Map.Entry<N, Map<K, SV>> namespaceState: state.entrySet()) {
-				N namespace = namespaceState.getKey();
-				namespaceSerializer.serialize(namespace, outView);
-				outView.writeInt(namespaceState.getValue().size());
-				for (Map.Entry<K, SV> entry: namespaceState.getValue().entrySet()) {
-					keySerializer.serialize(entry.getKey(), outView);
-					stateSerializer.serialize(entry.getValue(), outView);
-				}
-			}
-			outView.flush();
-
-			// create a handle to the state
-//			return new FsHeapValueStateSnapshot<>(getKeySerializer(), getNamespaceSerializer(), stateDesc, out.closeAndGetPath());
-			return createHeapSnapshot(out.closeAndGetPath());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsStateSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsStateSnapshot.java
deleted file mode 100644
index 51e8b5a..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsStateSnapshot.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.filesystem;
-
-import org.apache.flink.api.common.state.State;
-import org.apache.flink.api.common.state.StateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.runtime.state.KvState;
-import org.apache.flink.runtime.state.KvStateSnapshot;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * A snapshot of a heap key/value state stored in a file.
- * 
- * @param <K> The type of the key in the snapshot state.
- * @param <N> The type of the namespace in the snapshot state.
- * @param <SV> The type of the state value.
- */
-public abstract class AbstractFsStateSnapshot<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>>
-		extends FileStateHandle
-		implements KvStateSnapshot<K, N, S, SD, FsStateBackend> {
-
-	private static final long serialVersionUID = 1L;
-
-	/** Key Serializer */
-	protected final TypeSerializer<K> keySerializer;
-
-	/** Namespace Serializer */
-	protected final TypeSerializer<N> namespaceSerializer;
-
-	/** Serializer for the state value */
-	protected final TypeSerializer<SV> stateSerializer;
-
-	/** StateDescriptor, for sanity checks */
-	protected final SD stateDesc;
-
-	/**
-	 * Creates a new state snapshot with data in the file system.
-	 *
-	 * @param keySerializer The serializer for the keys.
-	 * @param namespaceSerializer The serializer for the namespace.
-	 * @param stateSerializer The serializer for the elements in the state HashMap
-	 * @param stateDesc The state identifier
-	 * @param filePath The path where the snapshot data is stored.
-	 */
-	public AbstractFsStateSnapshot(TypeSerializer<K> keySerializer,
-		TypeSerializer<N> namespaceSerializer,
-		TypeSerializer<SV> stateSerializer,
-		SD stateDesc,
-		Path filePath) {
-		super(filePath);
-		this.stateDesc = stateDesc;
-		this.keySerializer = keySerializer;
-		this.stateSerializer = stateSerializer;
-		this.namespaceSerializer = namespaceSerializer;
-
-	}
-
-	public abstract KvState<K, N, S, SD, FsStateBackend> createFsState(FsStateBackend backend, HashMap<N, Map<K, SV>> stateMap);
-
-	@Override
-	public KvState<K, N, S, SD, FsStateBackend> restoreState(
-		FsStateBackend stateBackend,
-		final TypeSerializer<K> keySerializer,
-		ClassLoader classLoader) throws Exception {
-
-		// validity checks
-		if (!this.keySerializer.equals(keySerializer)) {
-			throw new IllegalArgumentException(
-				"Cannot restore the state from the snapshot with the given serializers. " +
-					"State (K/V) was serialized with " +
-					"(" + this.keySerializer + ") " +
-					"now is (" + keySerializer + ")");
-		}
-
-		// state restore
-		ensureNotClosed();
-
-		try (FSDataInputStream inStream = stateBackend.getFileSystem().open(getFilePath())) {
-			// make sure the in-progress restore from the handle can be closed 
-			registerCloseable(inStream);
-
-			DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(inStream);
-
-			final int numKeys = inView.readInt();
-			HashMap<N, Map<K, SV>> stateMap = new HashMap<>(numKeys);
-
-			for (int i = 0; i < numKeys; i++) {
-				N namespace = namespaceSerializer.deserialize(inView);
-				final int numValues = inView.readInt();
-				Map<K, SV> namespaceMap = new HashMap<>(numValues);
-				stateMap.put(namespace, namespaceMap);
-				for (int j = 0; j < numValues; j++) {
-					K key = keySerializer.deserialize(inView);
-					SV value = stateSerializer.deserialize(inView);
-					namespaceMap.put(key, value);
-				}
-			}
-
-			return createFsState(stateBackend, stateMap);
-		}
-		catch (Exception e) {
-			throw new Exception("Failed to restore state from file system", e);
-		}
-	}
-
-	/**
-	 * Returns the file size in bytes.
-	 *
-	 * @return The file size in bytes.
-	 * @throws IOException Thrown if the file system cannot be accessed.
-	 */
-	@Override
-	public void discardState() throws Exception {
-		super.discardState();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
index 871e56c..5ae751b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
@@ -65,7 +65,7 @@ public class FileStateHandle extends AbstractCloseableHandle implements StreamSt
 	}
 
 	@Override
-	public FSDataInputStream openInputStream() throws Exception {
+	public FSDataInputStream openInputStream() throws IOException {
 		ensureNotClosed();
 		FSDataInputStream inputStream = getFileSystem().open(filePath);
 		registerCloseable(inputStream);


Mime
View raw message