flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject flink git commit: [FLINK-7044] [qs] Allow to specify namespace and descriptor in query.
Date Thu, 06 Jul 2017 14:18:36 GMT
Repository: flink
Updated Branches:
  refs/heads/master 6eae45ea8 -> 6c05abe34


[FLINK-7044] [qs] Allow to specify namespace and descriptor in query.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6c05abe3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6c05abe3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6c05abe3

Branch: refs/heads/master
Commit: 6c05abe3433070afe0931fff213bd3ce9da2b116
Parents: 6eae45e
Author: kkloudas <kkloudas@gmail.com>
Authored: Mon Jun 19 15:01:40 2017 +0200
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Thu Jul 6 12:10:08 2017 +0200

----------------------------------------------------------------------
 .../runtime/query/QueryableStateClient.java     | 198 ++++++++++++++
 .../runtime/state/VoidNamespaceTypeInfo.java    | 102 +++++++
 .../runtime/query/QueryableStateClientTest.java |  67 +++--
 .../state/VoidNamespaceTypeInfoTest.java        |  33 +++
 .../query/AbstractQueryableStateITCase.java     | 267 ++++++++++++-------
 5 files changed, 550 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6c05abe3/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
index 003d803..306333a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
@@ -22,8 +22,14 @@ import akka.actor.ActorSystem;
 import akka.dispatch.Futures;
 import akka.dispatch.Mapper;
 import akka.dispatch.Recover;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
@@ -38,7 +44,10 @@ import org.apache.flink.runtime.query.netty.KvStateClient;
 import org.apache.flink.runtime.query.netty.KvStateServer;
 import org.apache.flink.runtime.query.netty.UnknownKeyOrNamespace;
 import org.apache.flink.runtime.query.netty.UnknownKvStateID;
+import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceTypeInfo;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,6 +59,7 @@ import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
+import java.io.IOException;
 import java.net.ConnectException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -95,6 +105,8 @@ public class QueryableStateClient {
 	/** This is != null, if we started the actor system. */
 	private final ActorSystem actorSystem;
 
+	private ExecutionConfig executionConfig;
+
 	/**
 	 * Creates a client from the given configuration.
 	 *
@@ -157,10 +169,21 @@ public class QueryableStateClient {
 		this.lookupService = lookupService;
 		this.kvStateClient = networkClient;
 		this.executionContext = actorSystem.dispatcher();
+		this.executionConfig = new ExecutionConfig();
 
 		this.lookupService.start();
 	}
 
+	/** Gets the {@link ExecutionConfig}. */
+	public ExecutionConfig getExecutionConfig() {
+		return executionConfig;
+	}
+
+	/** Sets the {@link ExecutionConfig}. */
+	public void setExecutionConfig(ExecutionConfig config) {
+		this.executionConfig = config;
+	}
+
 	/**
 	 * Creates a client.
 	 *
@@ -267,6 +290,181 @@ public class QueryableStateClient {
 	}
 
 	/**
+	 * Returns a future holding the request result.
+	 *
+	 * <p>If the server does not serve a KvState instance with the given ID,
+	 * the Future will be failed with a {@link UnknownKvStateID}.
+	 *
+	 * <p>If the KvState instance does not hold any data for the given key
+	 * and namespace, the Future will be failed with a {@link UnknownKeyOrNamespace}.
+	 *
+	 * <p>All other failures are forwarded to the Future.
+	 *
+	 * @param jobId                     JobID of the job the queryable state belongs to.
+	 * @param queryableStateName        Name under which the state is queryable.
+	 * @param key			            The key we are interested in.
+	 * @param keyTypeHint				A {@link TypeHint} used to extract the type of the key.
+	 * @param stateDescriptor			The {@link StateDescriptor} of the state we want to query.
+	 * @return Future holding the result.
+	 */
+	@PublicEvolving
+	public <K, V> Future<V> getKvState(
+			final JobID jobId,
+			final String queryableStateName,
+			final K key,
+			final TypeHint<K> keyTypeHint,
+			final StateDescriptor<?, V> stateDescriptor) {
+
+		Preconditions.checkNotNull(keyTypeHint);
+
+		TypeInformation<K> keyTypeInfo = keyTypeHint.getTypeInfo();
+		return getKvState(jobId, queryableStateName, key, keyTypeInfo, stateDescriptor);
+	}
+
+	/**
+	 * Returns a future holding the request result.
+	 *
+	 * <p>If the server does not serve a KvState instance with the given ID,
+	 * the Future will be failed with a {@link UnknownKvStateID}.
+	 *
+	 * <p>If the KvState instance does not hold any data for the given key
+	 * and namespace, the Future will be failed with a {@link UnknownKeyOrNamespace}.
+	 *
+	 * <p>All other failures are forwarded to the Future.
+	 *
+	 * @param jobId                     JobID of the job the queryable state belongs to.
+	 * @param queryableStateName        Name under which the state is queryable.
+	 * @param key			            The key we are interested in.
+	 * @param keyTypeInfo				The {@link TypeInformation} of the key.
+	 * @param stateDescriptor			The {@link StateDescriptor} of the state we want to query.
+	 * @return Future holding the result.
+	 */
+	@PublicEvolving
+	public <K, V> Future<V> getKvState(
+			final JobID jobId,
+			final String queryableStateName,
+			final K key,
+			final TypeInformation<K> keyTypeInfo,
+			final StateDescriptor<?, V> stateDescriptor) {
+
+		Preconditions.checkNotNull(keyTypeInfo);
+
+		return getKvState(jobId, queryableStateName, key, VoidNamespace.INSTANCE,
+				keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, stateDescriptor);
+	}
+
+	/**
+	 * Returns a future holding the request result.
+	 *
+	 * <p>If the server does not serve a KvState instance with the given ID,
+	 * the Future will be failed with a {@link UnknownKvStateID}.
+	 *
+	 * <p>If the KvState instance does not hold any data for the given key
+	 * and namespace, the Future will be failed with a {@link UnknownKeyOrNamespace}.
+	 *
+	 * <p>All other failures are forwarded to the Future.
+	 *
+	 * @param jobId                     JobID of the job the queryable state belongs to.
+	 * @param queryableStateName        Name under which the state is queryable.
+	 * @param key			            The key that the state we request is associated with.
+	 * @param namespace					The namespace of the state.
+	 * @param keyTypeInfo				The {@link TypeInformation} of the keys.
+	 * @param namespaceTypeInfo			The {@link TypeInformation} of the namespace.
+	 * @param stateDescriptor			The {@link StateDescriptor} of the state we want to query.
+	 * @return Future holding the result.
+	 */
+	@PublicEvolving
+	public <K, V, N> Future<V> getKvState(
+			final JobID jobId,
+			final String queryableStateName,
+			final K key,
+			final N namespace,
+			final TypeInformation<K> keyTypeInfo,
+			final TypeInformation<N> namespaceTypeInfo,
+			final StateDescriptor<?, V> stateDescriptor) {
+
+		Preconditions.checkNotNull(stateDescriptor);
+
+		// initialize the value serializer based on the execution config.
+		stateDescriptor.initializeSerializerUnlessSet(executionConfig);
+		TypeSerializer<V> stateSerializer = stateDescriptor.getSerializer();
+
+		return getKvState(jobId, queryableStateName, key,
+				namespace, keyTypeInfo, namespaceTypeInfo, stateSerializer);
+	}
+
+	/**
+	 * Returns a future holding the request result.
+	 *
+	 * <p>If the server does not serve a KvState instance with the given ID,
+	 * the Future will be failed with a {@link UnknownKvStateID}.
+	 *
+	 * <p>If the KvState instance does not hold any data for the given key
+	 * and namespace, the Future will be failed with a {@link UnknownKeyOrNamespace}.
+	 *
+	 * <p>All other failures are forwarded to the Future.
+	 *
+	 * @param jobId                     JobID of the job the queryable state belongs to.
+	 * @param queryableStateName        Name under which the state is queryable.
+	 * @param key			            The key that the state we request is associated with.
+	 * @param namespace					The namespace of the state.
+	 * @param keyTypeInfo				The {@link TypeInformation} of the keys.
+	 * @param namespaceTypeInfo			The {@link TypeInformation} of the namespace.
+	 * @param stateSerializer			The {@link TypeSerializer} of the state we want to query.
+	 * @return Future holding the result.
+	 */
+	@PublicEvolving
+	public <K, V, N> Future<V> getKvState(
+			final JobID jobId,
+			final String queryableStateName,
+			final K key,
+			final N namespace,
+			final TypeInformation<K> keyTypeInfo,
+			final TypeInformation<N> namespaceTypeInfo,
+			final TypeSerializer<V> stateSerializer) {
+
+		Preconditions.checkNotNull(queryableStateName);
+
+		Preconditions.checkNotNull(key);
+		Preconditions.checkNotNull(namespace);
+
+		Preconditions.checkNotNull(keyTypeInfo);
+		Preconditions.checkNotNull(namespaceTypeInfo);
+		Preconditions.checkNotNull(stateSerializer);
+
+		if (stateSerializer instanceof ListSerializer) {
+			throw new IllegalArgumentException("ListState is not supported out-of-the-box yet.");
+		}
+
+		TypeSerializer<K> keySerializer = keyTypeInfo.createSerializer(executionConfig);
+		TypeSerializer<N> namespaceSerializer = namespaceTypeInfo.createSerializer(executionConfig);
+
+		final byte[] serializedKeyAndNamespace;
+		try {
+			serializedKeyAndNamespace = KvStateRequestSerializer.serializeKeyAndNamespace(
+					key,
+					keySerializer,
+					namespace,
+					namespaceSerializer);
+		} catch (IOException e) {
+			return Futures.failed(e);
+		}
+
+		return getKvState(jobId, queryableStateName, key.hashCode(), serializedKeyAndNamespace)
+				.flatMap(new Mapper<byte[], Future<V>>() {
+					@Override
+					public Future<V> apply(byte[] parameter) {
+						try {
+							return Futures.successful(
+									KvStateRequestSerializer.deserializeValue(parameter, stateSerializer));
+						} catch (IOException e) {
+							return Futures.failed(e);
+						}
+					}
+				}, executionContext);
+	}
+
+	/**
 	 * Returns a future holding the serialized request result.
 	 *
 	 * @param jobId                     JobID of the job the queryable state

http://git-wip-us.apache.org/repos/asf/flink/blob/6c05abe3/flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespaceTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespaceTypeInfo.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespaceTypeInfo.java
new file mode 100644
index 0000000..9917410
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespaceTypeInfo.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+/**
+ * {@link TypeInformation} for {@link VoidNamespace}.
+ */
+@Public
+public class VoidNamespaceTypeInfo extends TypeInformation<VoidNamespace> {
+
+	private static final long serialVersionUID = 5453679706408610586L;
+
+	public static final VoidNamespaceTypeInfo INSTANCE = new VoidNamespaceTypeInfo();
+
+	@Override
+	@PublicEvolving
+	public boolean isBasicType() {
+		return false;
+	}
+
+	@Override
+	@PublicEvolving
+	public boolean isTupleType() {
+		return false;
+	}
+
+	@Override
+	@PublicEvolving
+	public int getArity() {
+		return 0;
+	}
+
+	@Override
+	@PublicEvolving
+	public int getTotalFields() {
+		return 0;
+	}
+
+	@Override
+	@PublicEvolving
+	public Class<VoidNamespace> getTypeClass() {
+		return VoidNamespace.class;
+	}
+
+	@Override
+	@PublicEvolving
+	public boolean isKeyType() {
+		return false;
+	}
+
+	@Override
+	@PublicEvolving
+	public TypeSerializer<VoidNamespace> createSerializer(ExecutionConfig config) {
+		return VoidNamespaceSerializer.INSTANCE;
+	}
+
+	@Override
+	@PublicEvolving
+	public String toString() {
+		return "VoidNamespaceTypeInfo";
+	}
+
+	@Override
+	@PublicEvolving
+	public boolean equals(Object obj) {
+		return this == obj || obj instanceof VoidNamespaceTypeInfo;
+	}
+
+	@Override
+	@PublicEvolving
+	public int hashCode() {
+		return 0;
+	}
+
+	@Override
+	@PublicEvolving
+	public boolean canEqual(Object obj) {
+		return obj instanceof VoidNamespaceTypeInfo;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6c05abe3/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
index 8c4e049..0806ba8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
@@ -22,6 +22,7 @@ import akka.actor.ActorSystem;
 import akka.dispatch.Futures;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -31,7 +32,6 @@ import org.apache.flink.runtime.query.netty.AtomicKvStateRequestStats;
 import org.apache.flink.runtime.query.netty.KvStateClient;
 import org.apache.flink.runtime.query.netty.KvStateServer;
 import org.apache.flink.runtime.query.netty.UnknownKvStateID;
-import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
@@ -108,11 +108,12 @@ public class QueryableStateClientTest {
 			when(lookupService.getKvStateLookupInfo(eq(jobId), eq(query1)))
 					.thenReturn(unknownKvStateLocation);
 
-			Future<byte[]> result = client.getKvState(
+			Future<Integer> result = client.getKvState(
 					jobId,
 					query1,
 					0,
-					new byte[0]);
+					BasicTypeInfo.INT_TYPE_INFO,
+					new ValueStateDescriptor<>("test", IntSerializer.INSTANCE));
 
 			try {
 				Await.result(result, timeout);
@@ -134,7 +135,12 @@ public class QueryableStateClientTest {
 			when(lookupService.getKvStateLookupInfo(eq(jobId), eq(query2)))
 					.thenReturn(unknownKeyGroupLocation);
 
-			result = client.getKvState(jobId, query2, 0, new byte[0]);
+			result = client.getKvState(
+					jobId,
+					query2,
+					0,
+					BasicTypeInfo.INT_TYPE_INFO,
+					new ValueStateDescriptor<>("test", IntSerializer.INSTANCE));
 
 			try {
 				Await.result(result, timeout);
@@ -164,7 +170,12 @@ public class QueryableStateClientTest {
 			when(networkClient.getKvState(eq(serverAddress), eq(kvStateId), any(byte[].class)))
 					.thenReturn(unknownKvStateId);
 
-			result = client.getKvState(jobId, query3, 0, new byte[0]);
+			result = client.getKvState(
+					jobId,
+					query3,
+					0,
+					BasicTypeInfo.INT_TYPE_INFO,
+					new ValueStateDescriptor<>("test", IntSerializer.INSTANCE));
 
 			try {
 				Await.result(result, timeout);
@@ -194,7 +205,12 @@ public class QueryableStateClientTest {
 			when(networkClient.getKvState(eq(serverAddress), eq(kvStateId), any(byte[].class)))
 					.thenReturn(connectException);
 
-			result = client.getKvState(jobId, query4, 0, new byte[0]);
+			result = client.getKvState(
+					jobId,
+					query4,
+					0,
+					BasicTypeInfo.INT_TYPE_INFO,
+					new ValueStateDescriptor<>("test", IntSerializer.INSTANCE));
 
 			try {
 				Await.result(result, timeout);
@@ -213,7 +229,12 @@ public class QueryableStateClientTest {
 			when(lookupService.getKvStateLookupInfo(eq(jobId), eq(query5)))
 					.thenReturn(exception);
 
-			client.getKvState(jobId, query5, 0, new byte[0]);
+			client.getKvState(
+					jobId,
+					query5,
+					0,
+					BasicTypeInfo.INT_TYPE_INFO,
+					new ValueStateDescriptor<>("test", IntSerializer.INSTANCE));
 
 			verify(lookupService, times(1)).getKvStateLookupInfo(eq(jobId), eq(query5));
 		} finally {
@@ -279,7 +300,7 @@ public class QueryableStateClientTest {
 				// Register state
 				HeapValueState<Integer, VoidNamespace, Integer> kvState = new HeapValueState<>(
 						descriptor,
-						new NestedMapsStateTable<Integer, VoidNamespace, Integer>(keyedStateBackend,
registeredKeyedBackendStateMetaInfo),
+						new NestedMapsStateTable<>(keyedStateBackend, registeredKeyedBackendStateMetaInfo),
 						IntSerializer.INSTANCE,
 						VoidNamespaceSerializer.INSTANCE);
 
@@ -322,25 +343,25 @@ public class QueryableStateClientTest {
 			client = new QueryableStateClient(lookupService, networkClient, testActorSystem.dispatcher());
 
 			// Send all queries
-			List<Future<byte[]>> futures = new ArrayList<>(numKeys);
+			List<Future<Integer>> futures = new ArrayList<>(numKeys);
 			for (int key = 0; key < numKeys; key++) {
-				byte[] serializedKeyAndNamespace = KvStateRequestSerializer.serializeKeyAndNamespace(
+				ValueStateDescriptor<Integer> descriptor =
+						new ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
+				futures.add(client.getKvState(
+						jobId,
+						"choco",
 						key,
-						IntSerializer.INSTANCE,
-						VoidNamespace.INSTANCE,
-						VoidNamespaceSerializer.INSTANCE);
-
-				futures.add(client.getKvState(jobId, "choco", key, serializedKeyAndNamespace));
+						BasicTypeInfo.INT_TYPE_INFO,
+						descriptor));
 			}
 
 			// Verify results
-			Future<Iterable<byte[]>> future = Futures.sequence(futures, testActorSystem.dispatcher());
-			Iterable<byte[]> results = Await.result(future, timeout);
+			Future<Iterable<Integer>> future = Futures.sequence(futures, testActorSystem.dispatcher());
+			Iterable<Integer> results = Await.result(future, timeout);
 
 			int index = 0;
-			for (byte[] buffer : results) {
-				int deserializedValue = KvStateRequestSerializer.deserializeValue(buffer, IntSerializer.INSTANCE);
-				assertEquals(1337 + index, deserializedValue);
+			for (int buffer : results) {
+				assertEquals(1337 + index, buffer);
 				index++;
 			}
 
@@ -411,10 +432,12 @@ public class QueryableStateClientTest {
 				networkClient,
 				testActorSystem.dispatcher());
 
+		ValueStateDescriptor<Integer> stateDesc = new ValueStateDescriptor<>("test",
IntSerializer.INSTANCE);
+
 		// Query ies with same name, but different job IDs should lead to a
 		// single lookup per query and job ID.
-		client.getKvState(jobId1, name, 0, new byte[0]);
-		client.getKvState(jobId2, name, 0, new byte[0]);
+		client.getKvState(jobId1, name, 0, BasicTypeInfo.INT_TYPE_INFO, stateDesc);
+		client.getKvState(jobId2, name, 0, BasicTypeInfo.INT_TYPE_INFO, stateDesc);
 
 		verify(lookupService, times(1)).getKvStateLookupInfo(eq(jobId1), eq(name));
 		verify(lookupService, times(1)).getKvStateLookupInfo(eq(jobId2), eq(name));

http://git-wip-us.apache.org/repos/asf/flink/blob/6c05abe3/flink-runtime/src/test/java/org/apache/flink/runtime/state/VoidNamespaceTypeInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/VoidNamespaceTypeInfoTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/VoidNamespaceTypeInfoTest.java
new file mode 100644
index 0000000..a4329de
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/VoidNamespaceTypeInfoTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeinfo.IntegerTypeInfo;
+import org.apache.flink.api.common.typeutils.TypeInformationTestBase;
+
+/**
+ * Test for {@link IntegerTypeInfo}.
+ */
+public class VoidNamespaceTypeInfoTest extends TypeInformationTestBase<VoidNamespaceTypeInfo>
{
+
+	@Override
+	protected VoidNamespaceTypeInfo[] getTestData() {
+		return new VoidNamespaceTypeInfo[] { VoidNamespaceTypeInfo.INSTANCE };
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6c05abe3/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java
index f07113d..f012a47 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java
@@ -29,7 +29,11 @@ import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -45,11 +49,10 @@ import org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess;
 import org.apache.flink.runtime.messages.JobManagerMessages.JobFound;
 import org.apache.flink.runtime.query.QueryableStateClient;
 import org.apache.flink.runtime.query.netty.UnknownKeyOrNamespace;
-import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.VoidNamespaceTypeInfo;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.QueryableStateStream;
@@ -86,14 +89,14 @@ import static org.junit.Assert.fail;
  */
 public abstract class AbstractQueryableStateITCase extends TestLogger {
 
-	private final static FiniteDuration TEST_TIMEOUT = new FiniteDuration(100, TimeUnit.SECONDS);
-	private final static FiniteDuration QUERY_RETRY_DELAY = new FiniteDuration(100, TimeUnit.MILLISECONDS);
+	private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(100, TimeUnit.SECONDS);
+	private static final FiniteDuration QUERY_RETRY_DELAY = new FiniteDuration(100, TimeUnit.MILLISECONDS);
 
 	private static ActorSystem TEST_ACTOR_SYSTEM;
 
-	private final static int NUM_TMS = 2;
-	private final static int NUM_SLOTS_PER_TM = 4;
-	private final static int NUM_SLOTS = NUM_TMS * NUM_SLOTS_PER_TM;
+	private static final int NUM_TMS = 2;
+	private static final int NUM_SLOTS_PER_TM = 4;
+	private static final int NUM_SLOTS = NUM_TMS * NUM_SLOTS_PER_TM;
 
 	/**
 	 * State backend to use.
@@ -201,6 +204,8 @@ public abstract class AbstractQueryableStateITCase extends TestLogger
{
 
 			final QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState
=
 					source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+						private static final long serialVersionUID = 7143749578983540352L;
+
 						@Override
 						public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
 							return value.f0;
@@ -222,7 +227,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger
{
 			while (!allNonZero && deadline.hasTimeLeft()) {
 				allNonZero = true;
 
-				final List<Future<byte[]>> futures = new ArrayList<>(numKeys);
+				final List<Future<Tuple2<Integer, Long>>> futures = new ArrayList<>(numKeys);
 
 				for (int i = 0; i < numKeys; i++) {
 					final int key = i;
@@ -234,38 +239,28 @@ public abstract class AbstractQueryableStateITCase extends TestLogger
{
 						allNonZero = false;
 					}
 
-					final byte[] serializedKey = KvStateRequestSerializer.serializeKeyAndNamespace(
-							key,
-							queryableState.getKeySerializer(),
-							VoidNamespace.INSTANCE,
-							VoidNamespaceSerializer.INSTANCE);
-
-					Future<byte[]> serializedResult = getKvStateWithRetries(
+					Future<Tuple2<Integer, Long>> result = getKvStateWithRetries(
 							client,
 							jobId,
 							queryName,
 							key,
-							serializedKey,
+							BasicTypeInfo.INT_TYPE_INFO,
+							reducingState,
 							QUERY_RETRY_DELAY,
 							false);
 
-					serializedResult.onSuccess(new OnSuccess<byte[]>() {
+					result.onSuccess(new OnSuccess<Tuple2<Integer, Long>>() {
 						@Override
-						public void onSuccess(byte[] result) throws Throwable {
-							Tuple2<Integer, Long> value = KvStateRequestSerializer.deserializeValue(
-									result,
-									queryableState.getValueSerializer());
-
-							counts.set(key, value.f1);
-
-							assertEquals("Key mismatch", key, value.f0.intValue());
+						public void onSuccess(Tuple2<Integer, Long> result) throws Throwable {
+							counts.set(key, result.f1);
+							assertEquals("Key mismatch", key, result.f0.intValue());
 						}
 					}, TEST_ACTOR_SYSTEM.dispatcher());
 
-					futures.add(serializedResult);
+					futures.add(result);
 				}
 
-				Future<Iterable<byte[]>> futureSequence = Futures.sequence(
+				Future<Iterable<Tuple2<Integer, Long>>> futureSequence = Futures.sequence(
 						futures,
 						TEST_ACTOR_SYSTEM.dispatcher());
 
@@ -330,6 +325,8 @@ public abstract class AbstractQueryableStateITCase extends TestLogger
{
 
 			final QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState
=
 					source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+						private static final long serialVersionUID = -4126824763829132959L;
+
 						@Override
 						public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
 							return value.f0;
@@ -338,6 +335,8 @@ public abstract class AbstractQueryableStateITCase extends TestLogger
{
 
 			final QueryableStateStream<Integer, Tuple2<Integer, Long>> duplicate =
 					source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+						private static final long serialVersionUID = -6265024000462809436L;
+
 						@Override
 						public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
 							return value.f0;
@@ -422,6 +421,8 @@ public abstract class AbstractQueryableStateITCase extends TestLogger
{
 
 			QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState =
 					source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+						private static final long serialVersionUID = 7662520075515707428L;
+
 						@Override
 						public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
 							return value.f0;
@@ -437,8 +438,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger
{
 			// Now query
 			long expected = numElements;
 
-			executeValueQuery(deadline, client, jobId, queryableState,
-				expected);
+			executeQuery(deadline, client, jobId, "hakuna", valueState, expected);
 		} finally {
 			// Free cluster resources
 			if (jobId != null) {
@@ -490,6 +490,8 @@ public abstract class AbstractQueryableStateITCase extends TestLogger
{
 
 			QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState =
 				source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+					private static final long serialVersionUID = 7480503339992214681L;
+
 					@Override
 					public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
 						return value.f0;
@@ -504,17 +506,18 @@ public abstract class AbstractQueryableStateITCase extends TestLogger
{
 			long expected = numElements;
 
 			// query once
-			client.getKvState(jobId, queryableState.getQueryableStateName(), 0,
-				KvStateRequestSerializer.serializeKeyAndNamespace(
+			client.getKvState(
+					jobId,
+					queryableState.getQueryableStateName(),
 					0,
-					queryableState.getKeySerializer(),
 					VoidNamespace.INSTANCE,
-					VoidNamespaceSerializer.INSTANCE));
+					BasicTypeInfo.INT_TYPE_INFO,
+					VoidNamespaceTypeInfo.INSTANCE,
+					valueState);
 
 			cluster.submitJobDetached(jobGraph);
 
-			executeValueQuery(deadline, client, jobId, queryableState,
-				expected);
+			executeQuery(deadline, client, jobId, "hakuna", valueState, expected);
 		} finally {
 			// Free cluster resources
 			if (jobId != null) {
@@ -534,33 +537,66 @@ public abstract class AbstractQueryableStateITCase extends TestLogger
{
 	 * Retry a query for state for keys between 0 and {@link #NUM_SLOTS} until
 	 * <tt>expected</tt> equals the value of the result tuple's second field.
 	 */
-	private void executeValueQuery(final Deadline deadline,
-		final QueryableStateClient client, final JobID jobId,
-		final QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState,
-		final long expected) throws Exception {
+	private void executeQuery(
+			final Deadline deadline,
+			final QueryableStateClient client,
+			final JobID jobId,
+			final String queryableStateName,
+			final StateDescriptor<?, Tuple2<Integer, Long>> stateDescriptor,
+			final long expected) throws Exception {
 
 		for (int key = 0; key < NUM_SLOTS; key++) {
-			final byte[] serializedKey = KvStateRequestSerializer.serializeKeyAndNamespace(
-				key,
-				queryableState.getKeySerializer(),
-				VoidNamespace.INSTANCE,
-				VoidNamespaceSerializer.INSTANCE);
-
 			boolean success = false;
 			while (deadline.hasTimeLeft() && !success) {
-				Future<byte[]> future = getKvStateWithRetries(client,
+				Future<Tuple2<Integer, Long>> future = getKvStateWithRetries(client,
 					jobId,
-					queryableState.getQueryableStateName(),
+					queryableStateName,
 					key,
-					serializedKey,
+					BasicTypeInfo.INT_TYPE_INFO,
+					stateDescriptor,
 					QUERY_RETRY_DELAY,
 					false);
 
-				byte[] serializedValue = Await.result(future, deadline.timeLeft());
+				Tuple2<Integer, Long> value = Await.result(future, deadline.timeLeft());
+
+				assertEquals("Key mismatch", key, value.f0.intValue());
+				if (expected == value.f1) {
+					success = true;
+				} else {
+					// Retry
+					Thread.sleep(50);
+				}
+			}
+
+			assertTrue("Did not succeed query", success);
+		}
+	}
+
+	/**
+	 * Retry a query for state for keys between 0 and {@link #NUM_SLOTS} until
+	 * <tt>expected</tt> equals the value of the result tuple's second field.
+	 */
+	private void executeQuery(
+			final Deadline deadline,
+			final QueryableStateClient client,
+			final JobID jobId,
+			final String queryableStateName,
+			final TypeSerializer<Tuple2<Integer, Long>> valueSerializer,
+			final long expected) throws Exception {
+
+		for (int key = 0; key < NUM_SLOTS; key++) {
+			boolean success = false;
+			while (deadline.hasTimeLeft() && !success) {
+				Future<Tuple2<Integer, Long>> future = getKvStateWithRetries(client,
+						jobId,
+						queryableStateName,
+						key,
+						BasicTypeInfo.INT_TYPE_INFO,
+						valueSerializer,
+						QUERY_RETRY_DELAY,
+						false);
 
-				Tuple2<Integer, Long> value = KvStateRequestSerializer.deserializeValue(
-					serializedValue,
-					queryableState.getValueSerializer());
+				Tuple2<Integer, Long> value = Await.result(future, deadline.timeLeft());
 
 				assertEquals("Key mismatch", key, value.f0.intValue());
 				if (expected == value.f1) {
@@ -623,6 +659,8 @@ public abstract class AbstractQueryableStateITCase extends TestLogger
{
 				queryableState =
 				source.keyBy(
 					new KeySelector<Tuple2<Integer, Long>, Integer>() {
+						private static final long serialVersionUID = 4509274556892655887L;
+
 						@Override
 						public Integer getKey(
 							Tuple2<Integer, Long> value) throws
@@ -639,18 +677,12 @@ public abstract class AbstractQueryableStateITCase extends TestLogger
{
 
 			// Now query
 			int key = 0;
-			final byte[] serializedKey =
-				KvStateRequestSerializer.serializeKeyAndNamespace(
-					key,
-					queryableState.getKeySerializer(),
-					VoidNamespace.INSTANCE,
-					VoidNamespaceSerializer.INSTANCE);
-
-			Future<byte[]> future = getKvStateWithRetries(client,
+			Future<Tuple2<Integer, Long>> future = getKvStateWithRetries(client,
 				jobId,
 				queryableState.getQueryableStateName(),
 				key,
-				serializedKey,
+				BasicTypeInfo.INT_TYPE_INFO,
+				valueState,
 				QUERY_RETRY_DELAY,
 				true);
 
@@ -707,6 +739,8 @@ public abstract class AbstractQueryableStateITCase extends TestLogger
{
 			// Value state shortcut
 			QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState =
 					source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+						private static final long serialVersionUID = 9168901838808830068L;
+
 						@Override
 						public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
 							return value.f0;
@@ -722,8 +756,8 @@ public abstract class AbstractQueryableStateITCase extends TestLogger
{
 			// Now query
 			long expected = numElements;
 
-			executeValueQuery(deadline, client, jobId, queryableState,
-				expected);
+			executeQuery(deadline, client, jobId, "matata",
+					queryableState.getValueSerializer(), expected);
 		} finally {
 			// Free cluster resources
 			if (jobId != null) {
@@ -780,6 +814,8 @@ public abstract class AbstractQueryableStateITCase extends TestLogger
{
 
 			QueryableStateStream<Integer, String> queryableState =
 					source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+						private static final long serialVersionUID = -842809958106747539L;
+
 						@Override
 						public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
 							return value.f0;
@@ -796,28 +832,18 @@ public abstract class AbstractQueryableStateITCase extends TestLogger
{
 			String expected = Integer.toString(numElements * (numElements + 1) / 2);
 
 			for (int key = 0; key < NUM_SLOTS; key++) {
-				final byte[] serializedKey = KvStateRequestSerializer.serializeKeyAndNamespace(
-						key,
-						queryableState.getKeySerializer(),
-						VoidNamespace.INSTANCE,
-						VoidNamespaceSerializer.INSTANCE);
-
 				boolean success = false;
 				while (deadline.hasTimeLeft() && !success) {
-					Future<byte[]> future = getKvStateWithRetries(client,
+					Future<String> future = getKvStateWithRetries(client,
 							jobId,
 							queryableState.getQueryableStateName(),
 							key,
-							serializedKey,
+							BasicTypeInfo.INT_TYPE_INFO,
+							foldingState,
 							QUERY_RETRY_DELAY,
 							false);
 
-					byte[] serializedValue = Await.result(future, deadline.timeLeft());
-
-					String value = KvStateRequestSerializer.deserializeValue(
-							serializedValue,
-							queryableState.getValueSerializer());
-
+					String value = Await.result(future, deadline.timeLeft());
 					if (expected.equals(value)) {
 						success = true;
 					} else {
@@ -882,6 +908,8 @@ public abstract class AbstractQueryableStateITCase extends TestLogger
{
 
 			QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState =
 					source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+						private static final long serialVersionUID = 8470749712274833552L;
+
 						@Override
 						public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
 							return value.f0;
@@ -899,8 +927,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger
{
 			// Now query
 			long expected = numElements * (numElements + 1) / 2;
 
-			executeValueQuery(deadline, client, jobId, queryableState,
-				expected);
+			executeQuery(deadline, client, jobId, "jungle", reducingState, expected);
 		} finally {
 			// Free cluster resources
 			if (jobId != null) {
@@ -916,23 +943,23 @@ public abstract class AbstractQueryableStateITCase extends TestLogger
{
 		}
 	}
 
-	@SuppressWarnings("unchecked")
-	private static Future<byte[]> getKvStateWithRetries(
+	private static <K, V> Future<V> getKvStateWithRetries(
 			final QueryableStateClient client,
 			final JobID jobId,
 			final String queryName,
-			final int key,
-			final byte[] serializedKey,
+			final K key,
+			final TypeInformation<K> keyTypeInfo,
+			final TypeSerializer<V> valueTypeSerializer,
 			final FiniteDuration retryDelay,
-			final boolean failForUknownKeyOrNamespace) {
+			final boolean failForUnknownKeyOrNamespace) {
 
-		return client.getKvState(jobId, queryName, key, serializedKey)
-				.recoverWith(new Recover<Future<byte[]>>() {
+		return client.getKvState(jobId, queryName, key, VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE,
valueTypeSerializer)
+				.recoverWith(new Recover<Future<V>>() {
 					@Override
-					public Future<byte[]> recover(Throwable failure) throws Throwable {
+					public Future<V> recover(Throwable failure) throws Throwable {
 						if (failure instanceof AssertionError) {
 							return Futures.failed(failure);
-						} else if (failForUknownKeyOrNamespace &&
+						} else if (failForUnknownKeyOrNamespace &&
 								(failure instanceof UnknownKeyOrNamespace)) {
 							return Futures.failed(failure);
 						} else {
@@ -943,17 +970,65 @@ public abstract class AbstractQueryableStateITCase extends TestLogger
{
 									retryDelay,
 									TEST_ACTOR_SYSTEM.scheduler(),
 									TEST_ACTOR_SYSTEM.dispatcher(),
-									new Callable<Future<byte[]>>() {
+									new Callable<Future<V>>() {
 										@Override
-										public Future<byte[]> call() throws Exception {
+										public Future<V> call() throws Exception {
 											return getKvStateWithRetries(
 													client,
 													jobId,
 													queryName,
 													key,
-													serializedKey,
+													keyTypeInfo,
+													valueTypeSerializer,
 													retryDelay,
-													failForUknownKeyOrNamespace);
+													failForUnknownKeyOrNamespace);
+										}
+									});
+						}
+					}
+				}, TEST_ACTOR_SYSTEM.dispatcher());
+
+	}
+
+	private static <K, V> Future<V> getKvStateWithRetries(
+			final QueryableStateClient client,
+			final JobID jobId,
+			final String queryName,
+			final K key,
+			final TypeInformation<K> keyTypeInfo,
+			final StateDescriptor<?, V> stateDescriptor,
+			final FiniteDuration retryDelay,
+			final boolean failForUnknownKeyOrNamespace) {
+
+		return client.getKvState(jobId, queryName, key, VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE,
stateDescriptor)
+				.recoverWith(new Recover<Future<V>>() {
+					@Override
+					public Future<V> recover(Throwable failure) throws Throwable {
+						if (failure instanceof AssertionError) {
+							return Futures.failed(failure);
+						} else if (failForUnknownKeyOrNamespace &&
+								(failure instanceof UnknownKeyOrNamespace)) {
+							return Futures.failed(failure);
+						} else {
+							// At startup some failures are expected
+							// due to races. Make sure that they don't
+							// fail this test.
+							return Patterns.after(
+									retryDelay,
+									TEST_ACTOR_SYSTEM.scheduler(),
+									TEST_ACTOR_SYSTEM.dispatcher(),
+									new Callable<Future<V>>() {
+										@Override
+										public Future<V> call() throws Exception {
+											return getKvStateWithRetries(
+													client,
+													jobId,
+													queryName,
+													key,
+													keyTypeInfo,
+													stateDescriptor,
+													retryDelay,
+													failForUnknownKeyOrNamespace);
 										}
 									});
 						}
@@ -970,10 +1045,12 @@ public abstract class AbstractQueryableStateITCase extends TestLogger
{
 	 */
 	private static class TestAscendingValueSource extends RichParallelSourceFunction<Tuple2<Integer,
Long>> {
 
+		private static final long serialVersionUID = 1459935229498173245L;
+
 		private final long maxValue;
 		private volatile boolean isRunning = true;
 
-		public TestAscendingValueSource(long maxValue) {
+		TestAscendingValueSource(long maxValue) {
 			Preconditions.checkArgument(maxValue >= 0);
 			this.maxValue = maxValue;
 		}
@@ -1024,12 +1101,12 @@ public abstract class AbstractQueryableStateITCase extends TestLogger
{
 			implements CheckpointListener {
 		private static final long serialVersionUID = -5744725196953582710L;
 
-		private final static AtomicLong LATEST_CHECKPOINT_ID = new AtomicLong();
+		private static final AtomicLong LATEST_CHECKPOINT_ID = new AtomicLong();
 		private final int numKeys;
 		private final ThreadLocalRandom random = ThreadLocalRandom.current();
 		private volatile boolean isRunning = true;
 
-		public TestKeyRangeSource(int numKeys) {
+		TestKeyRangeSource(int numKeys) {
 			this.numKeys = numKeys;
 		}
 


Mime
View raw message