flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kklou...@apache.org
Subject [2/2] flink git commit: [FLINK-7826][QS] Add support for all types of state to the QS Client.
Date Wed, 18 Oct 2017 08:03:42 GMT
[FLINK-7826][QS] Add support for all types of state to the QS Client.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/abc3e1c8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/abc3e1c8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/abc3e1c8

Branch: refs/heads/master
Commit: abc3e1c888ceee941d557381a8cb8a7df8af2058
Parents: 717a7dc
Author: kkloudas <kkloudas@gmail.com>
Authored: Tue Oct 10 16:40:57 2017 +0200
Committer: kkloudas <kkloudas@gmail.com>
Committed: Wed Oct 18 09:40:16 2017 +0200

----------------------------------------------------------------------
 .../common/state/FoldingStateDescriptor.java    |   2 +-
 .../client/QueryableStateClient.java            |  57 +-
 .../client/state/ImmutableAggregatingState.java |  71 ++
 .../client/state/ImmutableFoldingState.java     |  70 ++
 .../client/state/ImmutableListState.java        |  70 ++
 .../client/state/ImmutableMapState.java         | 139 ++++
 .../client/state/ImmutableReducingState.java    |  69 ++
 .../client/state/ImmutableState.java            |  29 +
 .../client/state/ImmutableStateBinder.java      |  80 +++
 .../client/state/ImmutableValueState.java       |  69 ++
 .../network/AbstractServerHandler.java          |   2 +-
 .../itcases/AbstractQueryableStateITCase.java   | 644 +++++++++++++++----
 .../state/ImmutableAggregatingStateTest.java    | 114 ++++
 .../state/ImmutableFoldingStateTest.java        |  94 +++
 .../state/ImmutableListStateTest.java           | 112 ++++
 .../state/ImmutableMapStateTest.java            | 189 ++++++
 .../state/ImmutableReducingStateTest.java       |  84 +++
 .../state/ImmutableValueStateTest.java          |  70 ++
 .../streaming/api/datastream/KeyedStream.java   |   6 +-
 .../api/datastream/QueryableStateStream.java    |  28 +-
 .../flink/streaming/api/scala/KeyedStream.scala |   6 +-
 21 files changed, 1817 insertions(+), 188 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/abc3e1c8/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
index f7609c3..0954047 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
@@ -30,7 +30,7 @@ import static java.util.Objects.requireNonNull;
  * {@link StateDescriptor} for {@link FoldingState}. This can be used to create partitioned
  * folding state.
  *
- * @param <T> Type of the values folded int othe state
+ * @param <T> Type of the values folded in the other state
  * @param <ACC> Type of the value in the state
  *
  * @deprecated will be removed in a future version in favor of {@link AggregatingStateDescriptor}

http://git-wip-us.apache.org/repos/asf/flink/blob/abc3e1c8/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
index 005c874..70bccf0 100644
--- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
+++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
@@ -21,10 +21,12 @@ package org.apache.flink.queryablestate.client;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.queryablestate.client.state.ImmutableStateBinder;
 import org.apache.flink.queryablestate.messages.KvStateRequest;
 import org.apache.flink.queryablestate.messages.KvStateResponse;
 import org.apache.flink.queryablestate.network.Client;
@@ -141,15 +143,15 @@ public class QueryableStateClient {
 	 * @param key			            The key we are interested in.
 	 * @param keyTypeHint				A {@link TypeHint} used to extract the type of the key.
 	 * @param stateDescriptor			The {@link StateDescriptor} of the state we want to query.
-	 * @return Future holding the result.
+	 * @return Future holding the immutable {@link State} object containing the result.
 	 */
 	@PublicEvolving
-	public <K, V> CompletableFuture<V> getKvState(
+	public <K, S extends State, V> CompletableFuture<S> getKvState(
 			final JobID jobId,
 			final String queryableStateName,
 			final K key,
 			final TypeHint<K> keyTypeHint,
-			final StateDescriptor<?, V> stateDescriptor) {
+			final StateDescriptor<S, V> stateDescriptor) {
 
 		Preconditions.checkNotNull(keyTypeHint);
 
@@ -164,15 +166,15 @@ public class QueryableStateClient {
 	 * @param key			            The key we are interested in.
 	 * @param keyTypeInfo				The {@link TypeInformation} of the key.
 	 * @param stateDescriptor			The {@link StateDescriptor} of the state we want to query.
-	 * @return Future holding the result.
+	 * @return Future holding the immutable {@link State} object containing the result.
 	 */
 	@PublicEvolving
-	public <K, V> CompletableFuture<V> getKvState(
+	public <K, S extends State, V> CompletableFuture<S> getKvState(
 			final JobID jobId,
 			final String queryableStateName,
 			final K key,
 			final TypeInformation<K> keyTypeInfo,
-			final StateDescriptor<?, V> stateDescriptor) {
+			final StateDescriptor<S, V> stateDescriptor) {
 
 		return getKvState(jobId, queryableStateName, key, VoidNamespace.INSTANCE,
 				keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, stateDescriptor);
@@ -187,48 +189,17 @@ public class QueryableStateClient {
 	 * @param keyTypeInfo				The {@link TypeInformation} of the keys.
 	 * @param namespaceTypeInfo			The {@link TypeInformation} of the namespace.
 	 * @param stateDescriptor			The {@link StateDescriptor} of the state we want to query.
-	 * @return Future holding the result.
+	 * @return Future holding the immutable {@link State} object containing the result.
 	 */
 	@PublicEvolving
-	public <K, V, N> CompletableFuture<V> getKvState(
+	public <K, N, S extends State, V> CompletableFuture<S> getKvState(
 			final JobID jobId,
 			final String queryableStateName,
 			final K key,
 			final N namespace,
 			final TypeInformation<K> keyTypeInfo,
 			final TypeInformation<N> namespaceTypeInfo,
-			final StateDescriptor<?, V> stateDescriptor) {
-
-		Preconditions.checkNotNull(stateDescriptor);
-
-		// initialize the value serializer based on the execution config.
-		stateDescriptor.initializeSerializerUnlessSet(executionConfig);
-		TypeSerializer<V> stateSerializer = stateDescriptor.getSerializer();
-
-		return getKvState(jobId, queryableStateName, key,
-				namespace, keyTypeInfo, namespaceTypeInfo, stateSerializer);
-	}
-
-	/**
-	 * Returns a future holding the request result.
-	 * @param jobId                     JobID of the job the queryable state belongs to.
-	 * @param queryableStateName        Name under which the state is queryable.
-	 * @param key			            The key that the state we request is associated with.
-	 * @param namespace					The namespace of the state.
-	 * @param keyTypeInfo				The {@link TypeInformation} of the keys.
-	 * @param namespaceTypeInfo			The {@link TypeInformation} of the namespace.
-	 * @param stateSerializer			The {@link TypeSerializer} of the state we want to query.
-	 * @return Future holding the result.
-	 */
-	@PublicEvolving
-	public <K, N, V> CompletableFuture<V> getKvState(
-			final JobID jobId,
-			final String queryableStateName,
-			final K key,
-			final N namespace,
-			final TypeInformation<K> keyTypeInfo,
-			final TypeInformation<N> namespaceTypeInfo,
-			final TypeSerializer<V> stateSerializer) {
+			final StateDescriptor<S, V> stateDescriptor) {
 
 		Preconditions.checkNotNull(jobId);
 		Preconditions.checkNotNull(queryableStateName);
@@ -237,7 +208,7 @@ public class QueryableStateClient {
 
 		Preconditions.checkNotNull(keyTypeInfo);
 		Preconditions.checkNotNull(namespaceTypeInfo);
-		Preconditions.checkNotNull(stateSerializer);
+		Preconditions.checkNotNull(stateDescriptor);
 
 		TypeSerializer<K> keySerializer = keyTypeInfo.createSerializer(executionConfig);
 		TypeSerializer<N> namespaceSerializer = namespaceTypeInfo.createSerializer(executionConfig);
@@ -253,8 +224,8 @@ public class QueryableStateClient {
 		return getKvState(jobId, queryableStateName, key.hashCode(), serializedKeyAndNamespace).thenApply(
 				stateResponse -> {
 					try {
-						return KvStateSerializer.deserializeValue(stateResponse.getContent(), stateSerializer);
-					} catch (IOException e) {
+						return stateDescriptor.bind(new ImmutableStateBinder(stateResponse.getContent()));
+					} catch (Exception e) {
 						throw new FlinkRuntimeException(e);
 					}
 				});

http://git-wip-us.apache.org/repos/asf/flink/blob/abc3e1c8/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingState.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingState.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingState.java
new file mode 100644
index 0000000..b853cfc
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingState.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client.state;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.AggregatingState;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * A read-only {@link AggregatingState} that <b>does not</b> allow for modifications.
+ *
+ * <p>This is the type of the result returned when querying Flink's keyed state using the
+ * {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and
+ * providing an {@link AggregatingStateDescriptor}.
+ */
+@PublicEvolving
+public final class ImmutableAggregatingState<IN, OUT> extends ImmutableState implements AggregatingState<IN, OUT> {
+
+	private final OUT value;
+
+	private ImmutableAggregatingState(OUT value) {
+		this.value = Preconditions.checkNotNull(value);
+	}
+
+	@Override
+	public OUT get() {
+		return value;
+	}
+
+	@Override
+	public void add(Object newValue) {
+		throw MODIFICATION_ATTEMPT_ERROR;
+	}
+
+	@Override
+	public void clear() {
+		throw MODIFICATION_ATTEMPT_ERROR;
+	}
+
+	public static <IN, ACC, OUT> ImmutableAggregatingState<IN, OUT> createState(
+			final AggregatingStateDescriptor<IN, ACC, OUT> stateDescriptor,
+			final byte[] serializedValue) throws IOException {
+
+		final ACC accumulator = KvStateSerializer.deserializeValue(
+				serializedValue,
+				stateDescriptor.getSerializer());
+
+		final OUT state = stateDescriptor.getAggregateFunction().getResult(accumulator);
+		return new ImmutableAggregatingState<>(state);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/abc3e1c8/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingState.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingState.java
new file mode 100644
index 0000000..a12adaa
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingState.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client.state;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * A read-only {@link FoldingState} that does not allow for modifications.
+ *
+ * <p>This is the result returned when querying Flink's keyed state using the
+ * {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and
+ * providing an {@link FoldingStateDescriptor}.
+ */
+@PublicEvolving
+@Deprecated
+public final class ImmutableFoldingState<IN, ACC> extends ImmutableState implements FoldingState<IN, ACC> {
+
+	private final ACC value;
+
+	private ImmutableFoldingState(ACC value) {
+		this.value = Preconditions.checkNotNull(value);
+	}
+
+	@Override
+	public ACC get() {
+		return value;
+	}
+
+	@Override
+	public void add(Object newValue) {
+		throw MODIFICATION_ATTEMPT_ERROR;
+	}
+
+	@Override
+	public void clear() {
+		throw MODIFICATION_ATTEMPT_ERROR;
+	}
+
+	public static <IN, ACC> ImmutableFoldingState<IN, ACC> createState(
+			final FoldingStateDescriptor<IN, ACC> stateDescriptor,
+			final byte[] serializedState) throws IOException {
+
+		final ACC state = KvStateSerializer.deserializeValue(
+				serializedState,
+				stateDescriptor.getSerializer());
+		return new ImmutableFoldingState<>(state);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/abc3e1c8/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java
new file mode 100644
index 0000000..8416905
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client.state;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * A read-only {@link ListState} that does not allow for modifications.
+ *
+ * <p>This is the result returned when querying Flink's keyed state using the
+ * {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and
+ * providing an {@link ListStateDescriptor}.
+ */
+@PublicEvolving
+public final class ImmutableListState<V> extends ImmutableState implements ListState<V> {
+
+	private final List<V> listState;
+
+	private ImmutableListState(final List<V> state) {
+		this.listState = Preconditions.checkNotNull(state);
+	}
+
+	@Override
+	public Iterable<V> get() {
+		return listState;
+	}
+
+	@Override
+	public void add(V value) {
+		throw MODIFICATION_ATTEMPT_ERROR;
+	}
+
+	@Override
+	public void clear() {
+		throw MODIFICATION_ATTEMPT_ERROR;
+	}
+
+	public static <V> ImmutableListState<V> createState(
+			final ListStateDescriptor<V> stateDescriptor,
+			final byte[] serializedState) throws IOException {
+
+		final List<V> state = KvStateSerializer.deserializeList(
+				serializedState,
+				stateDescriptor.getElementSerializer());
+		return new ImmutableListState<>(state);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/abc3e1c8/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java
new file mode 100644
index 0000000..c216d5d
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client.state;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A read-only {@link MapState} that does not allow for modifications.
+ *
+ * <p>This is the result returned when querying Flink's keyed state using the
+ * {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and
+ * providing an {@link MapStateDescriptor}.
+ */
+@PublicEvolving
+public final class ImmutableMapState<K, V> extends ImmutableState implements MapState<K, V> {
+
+	private final Map<K, V> state;
+
+	private ImmutableMapState(final Map<K, V> mapState) {
+		this.state = Preconditions.checkNotNull(mapState);
+	}
+
+	@Override
+	public V get(K key) {
+		return state.get(key);
+	}
+
+	@Override
+	public void put(K key, V value) {
+		throw MODIFICATION_ATTEMPT_ERROR;
+	}
+
+	@Override
+	public void putAll(Map<K, V> map) {
+		throw MODIFICATION_ATTEMPT_ERROR;
+	}
+
+	@Override
+	public void remove(K key) {
+		throw MODIFICATION_ATTEMPT_ERROR;
+	}
+
+	@Override
+	public boolean contains(K key) {
+		return state.containsKey(key);
+	}
+
+	/**
+	 * Returns all the mappings in the state in a {@link Collections#unmodifiableSet(Set)}.
+	 *
+	 * @return A read-only iterable view of all the key-value pairs in the state.
+	 *
+	 * @throws Exception Thrown if the system cannot access the state.
+	 */
+	@Override
+	public Iterable<Map.Entry<K, V>> entries() {
+		return Collections.unmodifiableSet(state.entrySet());
+	}
+
+	/**
+	 * Returns all the keys in the state in a {@link Collections#unmodifiableSet(Set)}.
+	 *
+	 * @return A read-only iterable view of all the keys in the state.
+	 *
+	 * @throws Exception Thrown if the system cannot access the state.
+	 */
+	@Override
+	public Iterable<K> keys() {
+		return Collections.unmodifiableSet(state.keySet());
+	}
+
+	/**
+	 * Returns all the values in the state in a {@link Collections#unmodifiableCollection(Collection)}.
+	 *
+	 * @return A read-only iterable view of all the values in the state.
+	 *
+	 * @throws Exception Thrown if the system cannot access the state.
+	 */
+	@Override
+	public Iterable<V> values() {
+		return Collections.unmodifiableCollection(state.values());
+	}
+
+	/**
+	 * Iterates over all the mappings in the state. The iterator cannot
+	 * remove elements.
+	 *
+	 * @return A read-only iterator over all the mappings in the state
+	 *
+	 * @throws Exception Thrown if the system cannot access the state.
+	 */
+	@Override
+	public Iterator<Map.Entry<K, V>> iterator() {
+		return Collections.unmodifiableSet(state.entrySet()).iterator();
+	}
+
+	@Override
+	public void clear() {
+		throw MODIFICATION_ATTEMPT_ERROR;
+	}
+
+	public static <K, V> ImmutableMapState<K, V> createState(
+			final MapStateDescriptor<K, V> stateDescriptor,
+			final byte[] serializedState) throws IOException {
+
+		final Map<K, V> state = KvStateSerializer.deserializeMap(
+				serializedState,
+				stateDescriptor.getKeySerializer(),
+				stateDescriptor.getValueSerializer());
+		return new ImmutableMapState<>(state);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/abc3e1c8/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableReducingState.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableReducingState.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableReducingState.java
new file mode 100644
index 0000000..da08c53
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableReducingState.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client.state;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * A read-only {@link ReducingState} that does not allow for modifications.
+ *
+ * <p>This is the result returned when querying Flink's keyed state using the
+ * {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and
+ * providing an {@link ReducingStateDescriptor}.
+ */
+@PublicEvolving
+public final class ImmutableReducingState<V> extends ImmutableState implements ReducingState<V> {
+
+	private final V value;
+
+	private ImmutableReducingState(V value) {
+		this.value = Preconditions.checkNotNull(value);
+	}
+
+	@Override
+	public V get() {
+		return value;
+	}
+
+	@Override
+	public void add(V newValue) {
+		throw MODIFICATION_ATTEMPT_ERROR;
+	}
+
+	@Override
+	public void clear() {
+		throw MODIFICATION_ATTEMPT_ERROR;
+	}
+
+	public static <V> ImmutableReducingState<V> createState(
+			final ReducingStateDescriptor<V> stateDescriptor,
+			final byte[] serializedState) throws IOException {
+
+		final V state = KvStateSerializer.deserializeValue(
+				serializedState,
+				stateDescriptor.getSerializer());
+		return new ImmutableReducingState<>(state);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/abc3e1c8/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableState.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableState.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableState.java
new file mode 100644
index 0000000..863f07b
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableState.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client.state;
+
+/**
+ * A base class for the <b>read-only</b> types of state returned
+ * as results from the Queryable State Client.
+ */
+abstract class ImmutableState {
+
+	protected static final UnsupportedOperationException MODIFICATION_ATTEMPT_ERROR =
+			new UnsupportedOperationException("State is read-only. No modifications allowed.");
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/abc3e1c8/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableStateBinder.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableStateBinder.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableStateBinder.java
new file mode 100644
index 0000000..6ce2787
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableStateBinder.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client.state;
+
+import org.apache.flink.api.common.state.AggregatingState;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.StateBinder;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A {@link StateBinder} used to deserialize the results returned by the
+ * {@link org.apache.flink.queryablestate.client.QueryableStateClient}.
+ *
+ * <p>The result is an immutable {@link org.apache.flink.api.common.state.State State}
+ * object containing the requested result.
+ */
+public class ImmutableStateBinder implements StateBinder {
+
+	private final byte[] serializedState;
+
+	public ImmutableStateBinder(final byte[] content) {
+		serializedState = Preconditions.checkNotNull(content);
+	}
+
+	@Override
+	public <T> ValueState<T> createValueState(ValueStateDescriptor<T> stateDesc) throws Exception {
+		return ImmutableValueState.createState(stateDesc, serializedState);
+	}
+
+	@Override
+	public <T> ListState<T> createListState(ListStateDescriptor<T> stateDesc) throws Exception {
+		return ImmutableListState.createState(stateDesc, serializedState);
+	}
+
+	@Override
+	public <T> ReducingState<T> createReducingState(ReducingStateDescriptor<T> stateDesc) throws Exception {
+		return ImmutableReducingState.createState(stateDesc, serializedState);
+	}
+
+	@Override
+	public <IN, ACC, OUT> AggregatingState<IN, OUT> createAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateDesc) throws Exception {
+		return ImmutableAggregatingState.createState(stateDesc, serializedState);
+	}
+
+	@Override
+	public <T, ACC> FoldingState<T, ACC> createFoldingState(FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
+		return ImmutableFoldingState.createState(stateDesc, serializedState);
+	}
+
+	@Override
+	public <MK, MV> MapState<MK, MV> createMapState(MapStateDescriptor<MK, MV> stateDesc) throws Exception {
+		return ImmutableMapState.createState(stateDesc, serializedState);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/abc3e1c8/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableValueState.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableValueState.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableValueState.java
new file mode 100644
index 0000000..7fd6457
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableValueState.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client.state;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * A read-only {@link ValueState} that does not allow for modifications.
+ *
+ * <p>This is the result returned when querying Flink's keyed state using the
+ * {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and
+ * providing an {@link ValueStateDescriptor}.
+ */
+@PublicEvolving
+public final class ImmutableValueState<V> extends ImmutableState implements ValueState<V> {
+
+	private final V value;
+
+	private ImmutableValueState(V value) {
+		this.value = Preconditions.checkNotNull(value);
+	}
+
+	@Override
+	public V value() {
+		return value;
+	}
+
+	@Override
+	public void update(V newValue) {
+		throw MODIFICATION_ATTEMPT_ERROR;
+	}
+
+	@Override
+	public void clear() {
+		throw MODIFICATION_ATTEMPT_ERROR;
+	}
+
+	public static <V> ImmutableValueState<V> createState(
+			final ValueStateDescriptor<V> stateDescriptor,
+			final byte[] serializedState) throws IOException {
+
+		final V state = KvStateSerializer.deserializeValue(
+				serializedState,
+				stateDescriptor.getSerializer());
+		return new ImmutableValueState<>(state);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/abc3e1c8/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
index b9bf671..18a88da 100644
--- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
+++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
@@ -121,7 +121,7 @@ public abstract class AbstractServerHandler<REQ extends MessageBody, RESP extend
 				// Execute actual query async, because it is possibly
 				// blocking (e.g. file I/O).
 				//
-				// A submission failure is not treated as fatal. todo here if there is a shared resource e.g. registry, then I will have to sync on that.
+				// A submission failure is not treated as fatal.
 				queryExecutor.submit(new AsyncRequestTask<>(this, ctx, requestId, request, stats));
 
 			} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/abc3e1c8/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateITCase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateITCase.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateITCase.java
index a096f55..69316fa 100644
--- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateITCase.java
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateITCase.java
@@ -19,17 +19,27 @@
 package org.apache.flink.queryablestate.itcases;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.AggregatingState;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingState;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingState;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -49,12 +59,18 @@ import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.VoidNamespaceTypeInfo;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.QueryableStateStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Collector;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 
@@ -63,13 +79,16 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ThreadLocalRandom;
@@ -198,7 +217,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 			while (!allNonZero && deadline.hasTimeLeft()) {
 				allNonZero = true;
 
-				final List<CompletableFuture<Tuple2<Integer, Long>>> futures = new ArrayList<>(numKeys);
+				final List<CompletableFuture<ReducingState<Tuple2<Integer, Long>>>> futures = new ArrayList<>(numKeys);
 
 				for (int i = 0; i < numKeys; i++) {
 					final int key = i;
@@ -210,7 +229,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 						allNonZero = false;
 					}
 
-					CompletableFuture<Tuple2<Integer, Long>> result = getKvStateWithRetries(
+					CompletableFuture<ReducingState<Tuple2<Integer, Long>>> result = getKvStateWithRetries(
 							client,
 							jobId,
 							queryName,
@@ -221,9 +240,14 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 							false,
 							executor);
 
-					result.thenAccept(res -> {
-						counts.set(key, res.f1);
-						assertEquals("Key mismatch", key, res.f0.intValue());
+					result.thenAccept(response -> {
+						try {
+							Tuple2<Integer, Long> res = response.get();
+							counts.set(key, res.f1);
+							assertEquals("Key mismatch", key, res.f0.intValue());
+						} catch (Exception e) {
+							Assert.fail(e.getMessage());
+						}
 					});
 
 					futures.add(result);
@@ -406,7 +430,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 
 			cluster.submitJobDetached(jobGraph);
 
-			executeQuery(deadline, client, jobId, "hakuna", valueState, numElements);
+			executeValueQuery(deadline, client, jobId, "hakuna", valueState, numElements);
 		} finally {
 			// Free cluster resources
 			if (jobId != null) {
@@ -485,7 +509,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 
 			cluster.submitJobDetached(jobGraph);
 
-			executeQuery(deadline, client, jobId, "hakuna", valueState, expected);
+			executeValueQuery(deadline, client, jobId, "hakuna", valueState, expected);
 		} finally {
 			// Free cluster resources
 			if (jobId != null) {
@@ -502,87 +526,6 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 	}
 
 	/**
-	 * Retry a query for state for keys between 0 and {@link #maxParallelism} until
-	 * <tt>expected</tt> equals the value of the result tuple's second field.
-	 */
-	private void executeQuery(
-			final Deadline deadline,
-			final QueryableStateClient client,
-			final JobID jobId,
-			final String queryableStateName,
-			final StateDescriptor<?, Tuple2<Integer, Long>> stateDescriptor,
-			final long expected) throws Exception {
-
-		for (int key = 0; key < maxParallelism; key++) {
-			boolean success = false;
-			while (deadline.hasTimeLeft() && !success) {
-				CompletableFuture<Tuple2<Integer, Long>> future = getKvStateWithRetries(
-						client,
-						jobId,
-						queryableStateName,
-						key,
-						BasicTypeInfo.INT_TYPE_INFO,
-						stateDescriptor,
-						QUERY_RETRY_DELAY,
-						false,
-						executor);
-
-				Tuple2<Integer, Long> value = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-
-				assertEquals("Key mismatch", key, value.f0.intValue());
-				if (expected == value.f1) {
-					success = true;
-				} else {
-					// Retry
-					Thread.sleep(50L);
-				}
-			}
-
-			assertTrue("Did not succeed query", success);
-		}
-	}
-
-	/**
-	 * Retry a query for state for keys between 0 and {@link #maxParallelism} until
-	 * <tt>expected</tt> equals the value of the result tuple's second field.
-	 */
-	private void executeQuery(
-			final Deadline deadline,
-			final QueryableStateClient client,
-			final JobID jobId,
-			final String queryableStateName,
-			final TypeSerializer<Tuple2<Integer, Long>> valueSerializer,
-			final long expected) throws Exception {
-
-		for (int key = 0; key < maxParallelism; key++) {
-			boolean success = false;
-			while (deadline.hasTimeLeft() && !success) {
-				Future<Tuple2<Integer, Long>> future = getKvStateWithRetries(client,
-						jobId,
-						queryableStateName,
-						key,
-						BasicTypeInfo.INT_TYPE_INFO,
-						valueSerializer,
-						QUERY_RETRY_DELAY,
-						false,
-						executor);
-
-				Tuple2<Integer, Long> value = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-
-				assertEquals("Key mismatch", key, value.f0.intValue());
-				if (expected == value.f1) {
-					success = true;
-				} else {
-					// Retry
-					Thread.sleep(50L);
-				}
-			}
-
-			assertTrue("Did not succeed query", success);
-		}
-	}
-
-	/**
 	 * Tests simple value state queryable state instance with a default value
 	 * set. Each source emits (subtaskIndex, 0)..(subtaskIndex, numElements)
 	 * tuples, the key is mapped to 1 but key 0 is queried which should throw
@@ -647,7 +590,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 
 			// Now query
 			int key = 0;
-			CompletableFuture<Tuple2<Integer, Long>> future = getKvStateWithRetries(
+			CompletableFuture<ValueState<Tuple2<Integer, Long>>> future = getKvStateWithRetries(
 					client,
 					jobId,
 					queryableState.getQueryableStateName(),
@@ -730,8 +673,9 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 
 			cluster.submitJobDetached(jobGraph);
 
-			executeQuery(deadline, client, jobId, "matata",
-					queryableState.getValueSerializer(), numElements);
+			final ValueStateDescriptor<Tuple2<Integer, Long>> stateDesc =
+					(ValueStateDescriptor<Tuple2<Integer, Long>>) queryableState.getStateDescriptor();
+			executeValueQuery(deadline, client, jobId, "matata", stateDesc, numElements);
 		} finally {
 
 			// Free cluster resources
@@ -809,10 +753,10 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 			for (int key = 0; key < maxParallelism; key++) {
 				boolean success = false;
 				while (deadline.hasTimeLeft() && !success) {
-					CompletableFuture<String> future = getKvStateWithRetries(
+					CompletableFuture<FoldingState<Tuple2<Integer, Long>, String>> future = getKvStateWithRetries(
 							client,
 							jobId,
-							queryableState.getQueryableStateName(),
+							"pumba",
 							key,
 							BasicTypeInfo.INT_TYPE_INFO,
 							foldingState,
@@ -820,7 +764,9 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 							false,
 							executor);
 
-					String value = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+					String value = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS).get();
+
+					//assertEquals("Key mismatch", key, value.f0.intValue());
 					if (expected.equals(value)) {
 						success = true;
 					} else {
@@ -898,12 +844,150 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 
 			cluster.submitJobDetached(jobGraph);
 
-			// Wait until job is running
+			// Now query
+			long expected = numElements * (numElements + 1L) / 2L;
+
+			for (int key = 0; key < maxParallelism; key++) {
+				boolean success = false;
+				while (deadline.hasTimeLeft() && !success) {
+					CompletableFuture<ReducingState<Tuple2<Integer, Long>>> future = getKvStateWithRetries(
+							client,
+							jobId,
+							"jungle",
+							key,
+							BasicTypeInfo.INT_TYPE_INFO,
+							reducingState,
+							QUERY_RETRY_DELAY,
+							false,
+							executor);
+
+					Tuple2<Integer, Long> value = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS).get();
+
+					assertEquals("Key mismatch", key, value.f0.intValue());
+					if (expected == value.f1) {
+						success = true;
+					} else {
+						// Retry
+						Thread.sleep(50L);
+					}
+				}
+
+				assertTrue("Did not succeed query", success);
+			}
+		} finally {
+			// Free cluster resources
+			if (jobId != null) {
+				CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
+						.getLeaderGateway(deadline.timeLeft())
+						.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
+						.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
+
+				cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+			}
+
+			client.shutdown();
+		}
+	}
+
+	/**
+	 * Tests simple map state queryable state instance. Each source emits
+	 * (subtaskIndex, 0)..(subtaskIndex, numElements) tuples, which are then
+	 * queried. The map state instance sums the values up. The test succeeds
+	 * after each subtask index is queried with result n*(n+1)/2.
+	 */
+	@Test
+	public void testMapState() throws Exception {
+		// Config
+		final Deadline deadline = TEST_TIMEOUT.fromNow();
+
+		final long numElements = 1024L;
+
+		final QueryableStateClient client = new QueryableStateClient(
+				"localhost",
+				Integer.parseInt(QueryableStateOptions.PROXY_PORT_RANGE.defaultValue()));
+
+		JobID jobId = null;
+		try {
+			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+			env.setStateBackend(stateBackend);
+			env.setParallelism(maxParallelism);
+			// Very important, because cluster is shared between tests and we
+			// don't explicitly check that all slots are available before
+			// submitting.
+			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
+
+			DataStream<Tuple2<Integer, Long>> source = env
+					.addSource(new TestAscendingValueSource(numElements));
+
+			final MapStateDescriptor<Integer, Tuple2<Integer, Long>> mapStateDescriptor = new MapStateDescriptor<>(
+					"timon",
+					BasicTypeInfo.INT_TYPE_INFO,
+					source.getType());
+			mapStateDescriptor.setQueryable("timon-queryable");
+
+			source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+				private static final long serialVersionUID = 8470749712274833552L;
+
+				@Override
+				public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
+					return value.f0;
+				}
+			}).process(new ProcessFunction<Tuple2<Integer, Long>, Object>() {
+				private static final long serialVersionUID = -805125545438296619L;
+
+				private transient MapState<Integer, Tuple2<Integer, Long>> mapState;
+
+				@Override
+				public void open(Configuration parameters) throws Exception {
+					super.open(parameters);
+					mapState = getRuntimeContext().getMapState(mapStateDescriptor);
+				}
+
+				@Override
+				public void processElement(Tuple2<Integer, Long> value, Context ctx, Collector<Object> out) throws Exception {
+					Tuple2<Integer, Long> v = mapState.get(value.f0);
+					if (v == null) {
+						v = new Tuple2<>(value.f0, 0L);
+					}
+					mapState.put(value.f0, new Tuple2<>(v.f0, v.f1 + value.f1));
+				}
+			});
+
+			// Submit the job graph
+			JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+			jobId = jobGraph.getJobID();
+
+			cluster.submitJobDetached(jobGraph);
 
 			// Now query
 			long expected = numElements * (numElements + 1L) / 2L;
 
-			executeQuery(deadline, client, jobId, "jungle", reducingState, expected);
+			for (int key = 0; key < maxParallelism; key++) {
+				boolean success = false;
+				while (deadline.hasTimeLeft() && !success) {
+					CompletableFuture<MapState<Integer, Tuple2<Integer, Long>>> future = getKvStateWithRetries(
+							client,
+							jobId,
+							"timon-queryable",
+							key,
+							BasicTypeInfo.INT_TYPE_INFO,
+							mapStateDescriptor,
+							QUERY_RETRY_DELAY,
+							false,
+							executor);
+
+					Tuple2<Integer, Long> value = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS).get(key);
+					assertEquals("Key mismatch", key, value.f0.intValue());
+					if (expected == value.f1) {
+						success = true;
+					} else {
+						// Retry
+						Thread.sleep(50L);
+					}
+				}
+
+				assertTrue("Did not succeed query", success);
+			}
 		} finally {
 			// Free cluster resources
 			if (jobId != null) {
@@ -920,6 +1004,227 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 	}
 
 	/**
+	 * Tests simple list state queryable state instance. Each source emits
+	 * (subtaskIndex, 0)..(subtaskIndex, numElements) tuples, which are then
+	 * queried. The list state instance add the values to the list. The test
+	 * succeeds after each subtask index is queried and the list contains
+	 * the correct number of distinct elements.
+	 */
+	@Test
+	public void testListState() throws Exception {
+		// Config
+		final Deadline deadline = TEST_TIMEOUT.fromNow();
+
+		final long numElements = 1024L;
+
+		final QueryableStateClient client = new QueryableStateClient(
+				"localhost",
+				Integer.parseInt(QueryableStateOptions.PROXY_PORT_RANGE.defaultValue()));
+
+		JobID jobId = null;
+		try {
+			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+			env.setStateBackend(stateBackend);
+			env.setParallelism(maxParallelism);
+			// Very important, because cluster is shared between tests and we
+			// don't explicitly check that all slots are available before
+			// submitting.
+			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
+
+			DataStream<Tuple2<Integer, Long>> source = env
+					.addSource(new TestAscendingValueSource(numElements));
+
+			final ListStateDescriptor<Long> listStateDescriptor = new ListStateDescriptor<Long>(
+					"list",
+					BasicTypeInfo.LONG_TYPE_INFO);
+			listStateDescriptor.setQueryable("list-queryable");
+
+			source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+				private static final long serialVersionUID = 8470749712274833552L;
+
+				@Override
+				public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
+					return value.f0;
+				}
+			}).process(new ProcessFunction<Tuple2<Integer, Long>, Object>() {
+				private static final long serialVersionUID = -805125545438296619L;
+
+				private transient ListState<Long> listState;
+
+				@Override
+				public void open(Configuration parameters) throws Exception {
+					super.open(parameters);
+					listState = getRuntimeContext().getListState(listStateDescriptor);
+				}
+
+				@Override
+				public void processElement(Tuple2<Integer, Long> value, Context ctx, Collector<Object> out) throws Exception {
+					listState.add(value.f1);
+				}
+			});
+
+			// Submit the job graph
+			JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+			jobId = jobGraph.getJobID();
+
+			cluster.submitJobDetached(jobGraph);
+
+			// Now query
+
+			Map<Integer, Set<Long>> results = new HashMap<>();
+
+			for (int key = 0; key < maxParallelism; key++) {
+				boolean success = false;
+				while (deadline.hasTimeLeft() && !success) {
+					CompletableFuture<ListState<Long>> future = getKvStateWithRetries(
+							client,
+							jobId,
+							"list-queryable",
+							key,
+							BasicTypeInfo.INT_TYPE_INFO,
+							listStateDescriptor,
+							QUERY_RETRY_DELAY,
+							false,
+							executor);
+
+					Iterable<Long> value = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS).get();
+
+					Set<Long> res = new HashSet<>();
+					for (Long v: value) {
+						res.add(v);
+					}
+
+					// the source starts at 0, so +1
+					if (res.size() == numElements + 1L) {
+						success = true;
+						results.put(key, res);
+					} else {
+						// Retry
+						Thread.sleep(50L);
+					}
+				}
+
+				assertTrue("Did not succeed query", success);
+			}
+
+			for (int key = 0; key < maxParallelism; key++) {
+				Set<Long> values = results.get(key);
+				for (long i = 0L; i <= numElements; i++) {
+					assertTrue(values.contains(i));
+				}
+			}
+
+		} finally {
+			// Free cluster resources
+			if (jobId != null) {
+				CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
+						.getLeaderGateway(deadline.timeLeft())
+						.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
+						.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
+
+				cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+			}
+
+			client.shutdown();
+		}
+	}
+
+	@Test
+	public void testAggregatingState() throws Exception {
+		// Config
+		final Deadline deadline = TEST_TIMEOUT.fromNow();
+
+		final long numElements = 1024L;
+
+		final QueryableStateClient client = new QueryableStateClient(
+				"localhost",
+				Integer.parseInt(QueryableStateOptions.PROXY_PORT_RANGE.defaultValue()));
+
+		JobID jobId = null;
+		try {
+			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+			env.setStateBackend(stateBackend);
+			env.setParallelism(maxParallelism);
+			// Very important, because cluster is shared between tests and we
+			// don't explicitly check that all slots are available before
+			// submitting.
+			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
+
+			DataStream<Tuple2<Integer, Long>> source = env
+					.addSource(new TestAscendingValueSource(numElements));
+
+			final AggregatingStateDescriptor<Tuple2<Integer, Long>, MutableString, String> aggrStateDescriptor =
+					new AggregatingStateDescriptor<>(
+							"aggregates",
+							new SumAggr(),
+							MutableString.class);
+			aggrStateDescriptor.setQueryable("aggr-queryable");
+
+			source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+				private static final long serialVersionUID = 8470749712274833552L;
+
+				@Override
+				public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
+					return value.f0;
+				}
+			}).transform(
+					"TestAggregatingOperator",
+					BasicTypeInfo.STRING_TYPE_INFO,
+					new AggregatingTestOperator(aggrStateDescriptor)
+			);
+
+			// Submit the job graph
+			JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+			jobId = jobGraph.getJobID();
+
+			cluster.submitJobDetached(jobGraph);
+
+			// Now query
+
+			for (int key = 0; key < maxParallelism; key++) {
+				boolean success = false;
+				while (deadline.hasTimeLeft() && !success) {
+					CompletableFuture<AggregatingState<Tuple2<Integer, Long>, String>> future = getKvStateWithRetries(
+							client,
+							jobId,
+							"aggr-queryable",
+							key,
+							BasicTypeInfo.INT_TYPE_INFO,
+							aggrStateDescriptor,
+							QUERY_RETRY_DELAY,
+							false,
+							executor);
+
+					String value = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS).get();
+
+					if (Long.parseLong(value) == numElements * (numElements + 1L) / 2L) {
+						success = true;
+					} else {
+						// Retry
+						Thread.sleep(50L);
+					}
+				}
+
+				assertTrue("Did not succeed query", success);
+			}
+		} finally {
+			// Free cluster resources
+			if (jobId != null) {
+				CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster
+						.getLeaderGateway(deadline.timeLeft())
+						.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
+						.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
+
+				cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+			}
+
+			client.shutdown();
+		}
+	}
+
+	/////				Sources/UDFs Used in the Tests			//////
+
+	/**
 	 * Test source producing (key, 0)..(key, maxValue) with key being the sub
 	 * task index.
 	 *
@@ -980,8 +1285,8 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 	/**
 	 * Test source producing (key, 1) tuples with random key in key range (numKeys).
 	 */
-	protected static class TestKeyRangeSource extends RichParallelSourceFunction<Tuple2<Integer, Long>>
-			implements CheckpointListener {
+	private static class TestKeyRangeSource extends RichParallelSourceFunction<Tuple2<Integer, Long>> implements CheckpointListener {
+
 		private static final long serialVersionUID = -5744725196953582710L;
 
 		private static final AtomicLong LATEST_CHECKPOINT_ID = new AtomicLong();
@@ -997,7 +1302,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 		public void open(Configuration parameters) throws Exception {
 			super.open(parameters);
 			if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
-				LATEST_CHECKPOINT_ID.set(0);
+				LATEST_CHECKPOINT_ID.set(0L);
 			}
 		}
 
@@ -1012,7 +1317,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 					ctx.collect(record);
 				}
 				// mild slow down
-				Thread.sleep(1);
+				Thread.sleep(1L);
 			}
 		}
 
@@ -1030,6 +1335,77 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 	}
 
 	/**
+	 * An operator that uses {@link AggregatingState}.
+	 *
+	 * <p>The operator exists for lack of possibility to get an
+	 * {@link AggregatingState} from the {@link org.apache.flink.api.common.functions.RuntimeContext}.
+	 * If this were not the case, we could have a {@link ProcessFunction}.
+	 */
+	private static class AggregatingTestOperator
+			extends AbstractStreamOperator<String>
+			implements OneInputStreamOperator<Tuple2<Integer, Long>, String> {
+
+		private static final long serialVersionUID = 1L;
+
+		private final AggregatingStateDescriptor<Tuple2<Integer, Long>, MutableString, String> stateDescriptor;
+		private transient AggregatingState<Tuple2<Integer, Long>, String> state;
+
+		AggregatingTestOperator(AggregatingStateDescriptor<Tuple2<Integer, Long>, MutableString, String> stateDesc) {
+			this.stateDescriptor = stateDesc;
+		}
+
+		@Override
+		public void open() throws Exception {
+			super.open();
+			this.state = getKeyedStateBackend().getPartitionedState(
+					VoidNamespace.INSTANCE,
+					VoidNamespaceSerializer.INSTANCE,
+					stateDescriptor);
+		}
+
+		@Override
+		public void processElement(StreamRecord<Tuple2<Integer, Long>> element) throws Exception {
+			state.add(element.getValue());
+		}
+	}
+
+	/**
+	 * Test {@link AggregateFunction} concatenating the already stored string with the long passed as argument.
+	 */
+	private static class SumAggr implements AggregateFunction<Tuple2<Integer, Long>, MutableString, String> {
+
+		private static final long serialVersionUID = -6249227626701264599L;
+
+		@Override
+		public MutableString createAccumulator() {
+			return new MutableString();
+		}
+
+		@Override
+		public void add(Tuple2<Integer, Long> value, MutableString accumulator) {
+			long acc = Long.valueOf(accumulator.value);
+			acc += value.f1;
+			accumulator.value = Long.toString(acc);
+		}
+
+		@Override
+		public String getResult(MutableString accumulator) {
+			return accumulator.value;
+		}
+
+		@Override
+		public MutableString merge(MutableString a, MutableString b) {
+			MutableString nValue = new MutableString();
+			nValue.value = Long.toString(Long.valueOf(a.value) + Long.valueOf(b.value));
+			return nValue;
+		}
+	}
+
+	private static final class MutableString {
+		String value = "0";
+	}
+
+	/**
 	 * Test {@link FoldFunction} concatenating the already stored string with the long passed as argument.
 	 */
 	private static class SumFold implements FoldFunction<Tuple2<Integer, Long>, String> {
@@ -1058,32 +1434,13 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 
 	/////				General Utility Methods				//////
 
-	private static <K, V> Future<V> getKvStateWithRetries(
-			final QueryableStateClient client,
-			final JobID jobId,
-			final String queryName,
-			final K key,
-			final TypeInformation<K> keyTypeInfo,
-			final TypeSerializer<V> valueTypeSerializer,
-			final Time retryDelay,
-			final boolean failForUnknownKeyOrNamespace,
-			final ScheduledExecutor executor) {
-
-		return retryWithDelay(
-				() -> client.getKvState(jobId, queryName, key, VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, valueTypeSerializer),
-				NO_OF_RETRIES,
-				retryDelay,
-				executor,
-				failForUnknownKeyOrNamespace);
-	}
-
-	private static <K, V> CompletableFuture<V> getKvStateWithRetries(
+	private static <K, S extends State, V> CompletableFuture<S> getKvStateWithRetries(
 			final QueryableStateClient client,
 			final JobID jobId,
 			final String queryName,
 			final K key,
 			final TypeInformation<K> keyTypeInfo,
-			final StateDescriptor<?, V> stateDescriptor,
+			final StateDescriptor<S, V> stateDescriptor,
 			final Time retryDelay,
 			final boolean failForUnknownKeyOrNamespace,
 			final ScheduledExecutor executor) {
@@ -1157,4 +1514,45 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 					(t, throwable) -> operationResultFuture.cancel(false));
 		}
 	}
+
+	/**
+	 * Retry a query for state for keys between 0 and {@link #maxParallelism} until
+	 * <tt>expected</tt> equals the value of the result tuple's second field.
+	 */
+	private void executeValueQuery(
+			final Deadline deadline,
+			final QueryableStateClient client,
+			final JobID jobId,
+			final String queryableStateName,
+			final ValueStateDescriptor<Tuple2<Integer, Long>> stateDescriptor,
+			final long expected) throws Exception {
+
+		for (int key = 0; key < maxParallelism; key++) {
+			boolean success = false;
+			while (deadline.hasTimeLeft() && !success) {
+				CompletableFuture<ValueState<Tuple2<Integer, Long>>> future = getKvStateWithRetries(
+						client,
+						jobId,
+						queryableStateName,
+						key,
+						BasicTypeInfo.INT_TYPE_INFO,
+						stateDescriptor,
+						QUERY_RETRY_DELAY,
+						false,
+						executor);
+
+				Tuple2<Integer, Long> value = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS).value();
+
+				assertEquals("Key mismatch", key, value.f0.intValue());
+				if (expected == value.f1) {
+					success = true;
+				} else {
+					// Retry
+					Thread.sleep(50L);
+				}
+			}
+
+			assertTrue("Did not succeed query", success);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/abc3e1c8/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableAggregatingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableAggregatingStateTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableAggregatingStateTest.java
new file mode 100644
index 0000000..69b2f61
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableAggregatingStateTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.queryablestate.client.state.ImmutableAggregatingState;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the {@link ImmutableAggregatingStateTest}.
+ */
+public class ImmutableAggregatingStateTest {
+
+	private final AggregatingStateDescriptor<Long, MutableString, String> aggrStateDesc =
+			new AggregatingStateDescriptor<>(
+					"test",
+					new SumAggr(),
+					MutableString.class);
+
+	private ImmutableAggregatingState<Long, String> aggrState;
+
+	@Before
+	public void setUp() throws Exception {
+		if (!aggrStateDesc.isSerializerInitialized()) {
+			aggrStateDesc.initializeSerializerUnlessSet(new ExecutionConfig());
+		}
+
+		final MutableString initValue = new MutableString();
+		initValue.value = "42";
+
+		ByteArrayOutputStream out = new ByteArrayOutputStream();
+		aggrStateDesc.getSerializer().serialize(initValue, new DataOutputViewStreamWrapper(out));
+
+		aggrState = ImmutableAggregatingState.createState(
+				aggrStateDesc,
+				out.toByteArray()
+		);
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testUpdate() {
+		String value = aggrState.get();
+		assertEquals("42", value);
+
+		aggrState.add(54L);
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testClear() {
+		String value = aggrState.get();
+		assertEquals("42", value);
+
+		aggrState.clear();
+	}
+
+	/**
+	 * Test {@link AggregateFunction} concatenating the already stored string with the long passed as argument.
+	 */
+	private static class SumAggr implements AggregateFunction<Long, MutableString, String> {
+
+		private static final long serialVersionUID = -6249227626701264599L;
+
+		@Override
+		public MutableString createAccumulator() {
+			return new MutableString();
+		}
+
+		@Override
+		public void add(Long value, MutableString accumulator) {
+			accumulator.value += ", " + value;
+		}
+
+		@Override
+		public String getResult(MutableString accumulator) {
+			return accumulator.value;
+		}
+
+		@Override
+		public MutableString merge(MutableString a, MutableString b) {
+			MutableString nValue = new MutableString();
+			nValue.value = a.value + ", " + b.value;
+			return nValue;
+		}
+	}
+
+	private static final class MutableString {
+		String value;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/abc3e1c8/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableFoldingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableFoldingStateTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableFoldingStateTest.java
new file mode 100644
index 0000000..d2c9535
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableFoldingStateTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.queryablestate.client.state.ImmutableFoldingState;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the {@link ImmutableFoldingState}.
+ */
+public class ImmutableFoldingStateTest {
+
+	private final FoldingStateDescriptor<Long, String> foldingStateDesc =
+			new FoldingStateDescriptor<>(
+					"test",
+					"0",
+					new SumFold(),
+					StringSerializer.INSTANCE);
+
+	private ImmutableFoldingState<Long, String> foldingState;
+
+	@Before
+	public void setUp() throws Exception {
+		if (!foldingStateDesc.isSerializerInitialized()) {
+			foldingStateDesc.initializeSerializerUnlessSet(new ExecutionConfig());
+		}
+
+		ByteArrayOutputStream out = new ByteArrayOutputStream();
+		StringSerializer.INSTANCE.serialize("42", new DataOutputViewStreamWrapper(out));
+
+		foldingState = ImmutableFoldingState.createState(
+				foldingStateDesc,
+				out.toByteArray()
+		);
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testUpdate() {
+		String value = foldingState.get();
+		assertEquals("42", value);
+
+		foldingState.add(54L);
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testClear() {
+		String value = foldingState.get();
+		assertEquals("42", value);
+
+		foldingState.clear();
+	}
+
+	/**
+	 * Test {@link FoldFunction} concatenating the already stored string with the long passed as argument.
+	 */
+	private static class SumFold implements FoldFunction<Long, String> {
+
+		private static final long serialVersionUID = -6249227626701264599L;
+
+		@Override
+		public String fold(String accumulator, Long value) throws Exception {
+			long acc = Long.valueOf(accumulator);
+			acc += value;
+			return Long.toString(acc);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/abc3e1c8/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableListStateTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableListStateTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableListStateTest.java
new file mode 100644
index 0000000..3283295
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableListStateTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.queryablestate.client.state.ImmutableListState;
+import org.apache.flink.runtime.state.heap.HeapListState;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the {@link ImmutableListState}.
+ */
+public class ImmutableListStateTest {
+
+	private final ListStateDescriptor<Long> listStateDesc =
+			new ListStateDescriptor<>("test", BasicTypeInfo.LONG_TYPE_INFO);
+
+	private ImmutableListState<Long> listState;
+
+	@Before
+	public void setUp() throws Exception {
+		if (!listStateDesc.isSerializerInitialized()) {
+			listStateDesc.initializeSerializerUnlessSet(new ExecutionConfig());
+		}
+
+		List<Long> init = new ArrayList<>();
+		init.add(42L);
+
+		byte[] serInit = serializeInitValue(init);
+		listState = ImmutableListState.createState(listStateDesc, serInit);
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testUpdate() {
+		List<Long> list = getStateContents();
+		assertEquals(1L, list.size());
+
+		long element = list.get(0);
+		assertEquals(42L, element);
+
+		listState.add(54L);
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testClear() {
+		List<Long> list = getStateContents();
+		assertEquals(1L, list.size());
+
+		long element = list.get(0);
+		assertEquals(42L, element);
+
+		listState.clear();
+	}
+
+	/**
+	 * Copied from {@link HeapListState#getSerializedValue(Object, Object)}.
+	 */
+	private byte[] serializeInitValue(List<Long> toSerialize) throws IOException {
+		TypeSerializer<Long> serializer = listStateDesc.getElementSerializer();
+
+		ByteArrayOutputStream baos = new ByteArrayOutputStream();
+		DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(baos);
+
+		// write the same as RocksDB writes lists, with one ',' separator
+		for (int i = 0; i < toSerialize.size(); i++) {
+			serializer.serialize(toSerialize.get(i), view);
+			if (i < toSerialize.size() - 1) {
+				view.writeByte(',');
+			}
+		}
+		view.flush();
+
+		return baos.toByteArray();
+	}
+
+	private List<Long> getStateContents() {
+		List<Long> list = new ArrayList<>();
+		for (Long elem: listState.get()) {
+			list.add(elem);
+		}
+		return list;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/abc3e1c8/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableMapStateTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableMapStateTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableMapStateTest.java
new file mode 100644
index 0000000..30a8a50
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableMapStateTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.queryablestate.client.state.ImmutableMapState;
+import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests the {@link ImmutableMapState}.
+ */
+public class ImmutableMapStateTest {
+
+	private final MapStateDescriptor<Long, Long> mapStateDesc =
+			new MapStateDescriptor<>(
+					"test",
+					BasicTypeInfo.LONG_TYPE_INFO,
+					BasicTypeInfo.LONG_TYPE_INFO);
+
+	private ImmutableMapState<Long, Long> mapState;
+
+	@Before
+	public void setUp() throws Exception {
+		if (!mapStateDesc.isSerializerInitialized()) {
+			mapStateDesc.initializeSerializerUnlessSet(new ExecutionConfig());
+		}
+
+		Map<Long, Long> initMap = new HashMap<>();
+		initMap.put(1L, 5L);
+		initMap.put(2L, 5L);
+
+		byte[] initSer = KvStateSerializer.serializeMap(
+				initMap.entrySet(),
+				BasicTypeInfo.LONG_TYPE_INFO.createSerializer(new ExecutionConfig()),
+				BasicTypeInfo.LONG_TYPE_INFO.createSerializer(new ExecutionConfig()));
+
+		mapState = ImmutableMapState.createState(mapStateDesc, initSer);
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testPut() {
+		assertTrue(mapState.contains(1L));
+		long value = mapState.get(1L);
+		assertEquals(5L, value);
+
+		assertTrue(mapState.contains(2L));
+		value = mapState.get(2L);
+		assertEquals(5L, value);
+
+		mapState.put(2L, 54L);
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testPutAll() {
+		assertTrue(mapState.contains(1L));
+		long value = mapState.get(1L);
+		assertEquals(5L, value);
+
+		assertTrue(mapState.contains(2L));
+		value = mapState.get(2L);
+		assertEquals(5L, value);
+
+		Map<Long, Long> nMap = new HashMap<>();
+		nMap.put(1L, 7L);
+		nMap.put(2L, 7L);
+
+		mapState.putAll(nMap);
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testUpdate() {
+		assertTrue(mapState.contains(1L));
+		long value = mapState.get(1L);
+		assertEquals(5L, value);
+
+		assertTrue(mapState.contains(2L));
+		value = mapState.get(2L);
+		assertEquals(5L, value);
+
+		mapState.put(2L, 54L);
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testIterator() {
+		assertTrue(mapState.contains(1L));
+		long value = mapState.get(1L);
+		assertEquals(5L, value);
+
+		assertTrue(mapState.contains(2L));
+		value = mapState.get(2L);
+		assertEquals(5L, value);
+
+		Iterator<Map.Entry<Long, Long>> iterator = mapState.iterator();
+		while (iterator.hasNext()) {
+			iterator.remove();
+		}
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testIterable() {
+		assertTrue(mapState.contains(1L));
+		long value = mapState.get(1L);
+		assertEquals(5L, value);
+
+		assertTrue(mapState.contains(2L));
+		value = mapState.get(2L);
+		assertEquals(5L, value);
+
+		Iterable<Map.Entry<Long, Long>> iterable = mapState.entries();
+		Iterator<Map.Entry<Long, Long>> iterator = iterable.iterator();
+		while (iterator.hasNext()) {
+			assertEquals(5L, (long) iterator.next().getValue());
+			iterator.remove();
+		}
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testKeys() {
+		assertTrue(mapState.contains(1L));
+		long value = mapState.get(1L);
+		assertEquals(5L, value);
+
+		assertTrue(mapState.contains(2L));
+		value = mapState.get(2L);
+		assertEquals(5L, value);
+
+		Iterator<Long> iterator = mapState.keys().iterator();
+		while (iterator.hasNext()) {
+			iterator.remove();
+		}
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testValues() {
+		assertTrue(mapState.contains(1L));
+		long value = mapState.get(1L);
+		assertEquals(5L, value);
+
+		assertTrue(mapState.contains(2L));
+		value = mapState.get(2L);
+		assertEquals(5L, value);
+
+		Iterator<Long> iterator = mapState.values().iterator();
+		while (iterator.hasNext()) {
+			iterator.remove();
+		}
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testClear() {
+		assertTrue(mapState.contains(1L));
+		long value = mapState.get(1L);
+		assertEquals(5L, value);
+
+		assertTrue(mapState.contains(2L));
+		value = mapState.get(2L);
+		assertEquals(5L, value);
+
+		mapState.clear();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/abc3e1c8/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableReducingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableReducingStateTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableReducingStateTest.java
new file mode 100644
index 0000000..9b1ecf8
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableReducingStateTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.queryablestate.client.state.ImmutableReducingState;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the {@link ImmutableReducingState}.
+ */
+public class ImmutableReducingStateTest {
+
+	private final ReducingStateDescriptor<Long> reducingStateDesc =
+			new ReducingStateDescriptor<>("test", new SumReduce(), BasicTypeInfo.LONG_TYPE_INFO);
+
+	private ImmutableReducingState<Long> reduceState;
+
+	@Before
+	public void setUp() throws Exception {
+		if (!reducingStateDesc.isSerializerInitialized()) {
+			reducingStateDesc.initializeSerializerUnlessSet(new ExecutionConfig());
+		}
+
+		reduceState = ImmutableReducingState.createState(
+				reducingStateDesc,
+				ByteBuffer.allocate(Long.BYTES).putLong(42L).array()
+		);
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testUpdate() {
+		long value = reduceState.get();
+		assertEquals(42L, value);
+
+		reduceState.add(54L);
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testClear() {
+		long value = reduceState.get();
+		assertEquals(42L, value);
+
+		reduceState.clear();
+	}
+
+	/**
+	 * Test {@link ReduceFunction} summing up its two arguments.
+	 */
+	private static class SumReduce implements ReduceFunction<Long> {
+
+		private static final long serialVersionUID = 6041237513913189144L;
+
+		@Override
+		public Long reduce(Long value1, Long value2) throws Exception {
+			return value1 + value2;
+		}
+	}
+}


Mime
View raw message