flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [01/10] flink git commit: [FLINK-3779] [runtime] Add QueryableStateClient
Date Tue, 09 Aug 2016 14:47:35 GMT
Repository: flink
Updated Branches:
  refs/heads/master b5d58934d -> 490e7ebb6


[FLINK-3779] [runtime] Add QueryableStateClient

- Adds a client, which works with the network client and location lookup service
  to query KvState instances.

- Furthermore, location information is cached.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/329610d5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/329610d5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/329610d5

Branch: refs/heads/master
Commit: 329610d571a62c143170f1324d8db3b98755a9da
Parents: 775a787
Author: Ufuk Celebi <uce@apache.org>
Authored: Mon May 30 14:08:24 2016 +0200
Committer: Ufuk Celebi <uce@apache.org>
Committed: Tue Aug 9 16:42:05 2016 +0200

----------------------------------------------------------------------
 docs/setup/config.md                            |  14 +
 .../flink/configuration/ConfigConstants.java    |  38 ++
 .../runtime/io/network/NetworkEnvironment.java  |  40 +-
 .../runtime/query/QueryableStateClient.java     | 355 +++++++++++++++++
 .../query/UnknownKvStateKeyGroupLocation.java   |  29 ++
 .../runtime/query/UnknownKvStateLocation.java   |  35 ++
 .../flink/runtime/query/package-info.java       |  60 +++
 .../NetworkEnvironmentConfiguration.scala       |   3 +
 .../flink/runtime/taskmanager/TaskManager.scala |  15 +
 .../io/network/NetworkEnvironmentTest.java      |   5 +-
 .../runtime/query/QueryableStateClientTest.java | 394 +++++++++++++++++++
 ...askManagerComponentsStartupShutdownTest.java |   4 +-
 12 files changed, 975 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/329610d5/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 5baccfa..41b7ba7 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -313,6 +313,20 @@ of the JobManager, because the same ActorSystem is used. Its not possible
to use
 
 - `env.log.dir`: (Defaults to the `log` directory under Flink's home) Defines the directory
where the Flink logs are saved. It has to be an absolute path.
 
+## Queryable State
+
+### Server
+
+- `query.server.port`: Port to bind queryable state server to (Default: `0`, binds to random
port).
+- `query.server.network-threads`: Number of network (Netty's event loop) Threads for queryable
state server (Default: `0`, picks number of slots).
+- `query.server.query-threads`: Number of query Threads for queryable state server (Default:
`0`, picks number of slots).
+
+### Client
+
+- `query.client.network-threads`: Number of network (Netty's event loop) Threads for queryable
state client (Default: `0`, picks number of available cores as returned by `Runtime.getRuntime().availableProcessors()`).
+- `query.client.lookup.num-retries`: Number of retries on KvState lookup failure due to unavailable
JobManager (Default: `3`).
+- `query.client.lookup.retry-delay`: Retry delay in milliseconds on KvState lookup failure
due to unavailable JobManager (Default: `1000`).
+
 ## Metrics
 
 - `metrics.reporters`: The list of named reporters, i.e. "foo,bar".

http://git-wip-us.apache.org/repos/asf/flink/blob/329610d5/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 928497f..98a843d 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -1066,6 +1066,44 @@ public final class ConfigConstants {
 	/** ZooKeeper default leader port. */
 	public static final int DEFAULT_ZOOKEEPER_LEADER_PORT = 3888;
 
+	// ------------------------- Queryable state ------------------------------
+
+	/** Port to bind KvState server to. */
+	public static final String QUERYABLE_STATE_SERVER_PORT = "query.server.port";
+
+	/** Number of network (event loop) threads for the KvState server. */
+	public static final String QUERYABLE_STATE_SERVER_NETWORK_THREADS = "query.server.network-threads";
+
+	/** Number of query threads for the KvState server. */
+	public static final String QUERYABLE_STATE_SERVER_QUERY_THREADS = "query.server.query-threads";
+
+	/** Default port to bind KvState server to (0 => pick random free port). */
+	public static final int DEFAULT_QUERYABLE_STATE_SERVER_PORT = 0;
+
+	/** Default Number of network (event loop) threads for the KvState server (0 => #slots).
*/
+	public static final int DEFAULT_QUERYABLE_STATE_SERVER_NETWORK_THREADS = 0;
+
+	/** Default number of query threads for the KvState server (0 => #slots). */
+	public static final int DEFAULT_QUERYABLE_STATE_SERVER_QUERY_THREADS = 0;
+
+	/** Number of network (event loop) threads for the KvState client. */
+	public static final String QUERYABLE_STATE_CLIENT_NETWORK_THREADS = "query.client.network-threads";
+
+	/** Number of retries on location lookup failures. */
+	public static final String QUERYABLE_STATE_CLIENT_LOOKUP_RETRIES = "query.client.lookup.num-retries";
+
+	/** Retry delay on location lookup failures (millis). */
+	public static final String QUERYABLE_STATE_CLIENT_LOOKUP_RETRY_DELAY = "query.client.lookup.retry-delay";
+
+	/** Default number of query threads for the KvState client (0 => #cores) */
+	public static final int DEFAULT_QUERYABLE_STATE_CLIENT_NETWORK_THREADS = 0;
+
+	/** Default number of retries on location lookup failures. */
+	public static final int DEFAULT_QUERYABLE_STATE_CLIENT_LOOKUP_RETRIES = 3;
+
+	/** Default retry delay on location lookup failures. */
+	public static final int DEFAULT_QUERYABLE_STATE_CLIENT_LOOKUP_RETRY_DELAY = 1000;
+
 	// ----------------------------- Environment Variables ----------------------------
 
 	/** The environment variable name which contains the location of the configuration directory
*/

http://git-wip-us.apache.org/repos/asf/flink/blob/329610d5/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 283d804..844bc2d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -45,11 +45,11 @@ import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.KvStateRegistryListener;
 import org.apache.flink.runtime.query.KvStateServerAddress;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
+import org.apache.flink.runtime.query.netty.KvStateServer;
 import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskManager;
-import org.apache.flink.runtime.query.netty.AtomicKvStateRequestStats;
-import org.apache.flink.runtime.query.netty.KvStateServer;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -242,21 +242,33 @@ public class NetworkEnvironment {
 				try {
 					kvStateRegistry = new KvStateRegistry();
 
-					kvStateServer = new KvStateServer(
-							connectionInfo.address(),
-							0,
-							1,
-							10,
-							kvStateRegistry,
-							new AtomicKvStateRequestStats());
+					if (nettyConfig.isDefined()) {
+						int numNetworkThreads = configuration.queryServerNetworkThreads();
+						if (numNetworkThreads == 0) {
+							numNetworkThreads = nettyConfig.get().getNumberOfSlots();
+						}
 
-					kvStateServer.start();
+						int numQueryThreads = configuration.queryServerNetworkThreads();
+						if (numQueryThreads == 0) {
+							numQueryThreads = nettyConfig.get().getNumberOfSlots();
+						}
 
-					KvStateRegistryListener listener = new JobManagerKvStateRegistryListener(
-							jobManagerGateway,
-							kvStateServer.getAddress());
+						kvStateServer = new KvStateServer(
+								connectionInfo.address(),
+								configuration.queryServerPort(),
+								numNetworkThreads,
+								numQueryThreads,
+								kvStateRegistry,
+								new DisabledKvStateRequestStats());
 
-					kvStateRegistry.registerListener(listener);
+						kvStateServer.start();
+
+						KvStateRegistryListener listener = new JobManagerKvStateRegistryListener(
+								jobManagerGateway,
+								kvStateServer.getAddress());
+
+						kvStateRegistry.registerListener(listener);
+					}
 				} catch (Throwable t) {
 					throw new IOException("Failed to instantiate KvState management components: "
 							+ t.getMessage(), t);

http://git-wip-us.apache.org/repos/asf/flink/blob/329610d5/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
new file mode 100644
index 0000000..0e1ea57
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query;
+
+import akka.actor.ActorSystem;
+import akka.dispatch.Futures;
+import akka.dispatch.Mapper;
+import akka.dispatch.Recover;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.query.AkkaKvStateLocationLookupService.FixedDelayLookupRetryStrategyFactory;
+import org.apache.flink.runtime.query.AkkaKvStateLocationLookupService.LookupRetryStrategyFactory;
+import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
+import org.apache.flink.runtime.query.netty.KvStateClient;
+import org.apache.flink.runtime.query.netty.KvStateServer;
+import org.apache.flink.runtime.query.netty.UnknownKeyOrNamespace;
+import org.apache.flink.runtime.query.netty.UnknownKvStateID;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Some;
+import scala.Tuple2;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.net.ConnectException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Client for queryable state.
+ *
+ * <p>You can mark state as queryable via {@link StateDescriptor#setQueryable(String)}.
+ * The state instance created from this descriptor will be published for queries
+ * when it's created on the TaskManagers and the location will be reported to
+ * the JobManager.
+ *
+ * <p>The client resolves the location of the requested KvState via the
+ * JobManager. Resolved locations are cached. When the server address of the
+ * requested KvState instance is determined, the client sends out a request to
+ * the server.
+ */
+public class QueryableStateClient {
+
+	private static final Logger LOG = LoggerFactory.getLogger(QueryableStateClient.class);
+
+	/**
+	 * {@link KvStateLocation} lookup to resolve the address of KvState instances.
+	 */
+	private final KvStateLocationLookupService lookupService;
+
+	/**
+	 * Network client for queries against {@link KvStateServer} instances.
+	 */
+	private final KvStateClient kvStateClient;
+
+	/**
+	 * Execution context.
+	 */
+	private final ExecutionContext executionContext;
+
+	/**
+	 * Cache for {@link KvStateLocation} instances keyed by job and name.
+	 */
+	private final ConcurrentMap<Tuple2<JobID, String>, Future<KvStateLocation>>
lookupCache =
+			new ConcurrentHashMap<>();
+
+	/** This is != null, iff we started the actor system. */
+	private final ActorSystem actorSystem;
+
+	/**
+	 * Creates a client from the given configuration.
+	 *
+	 * <p>This will create multiple Thread pools: one for the started actor
+	 * system and another for the network client.
+	 *
+	 * @param config Configuration to use.
+	 * @throws Exception Failures are forwarded
+	 */
+	public QueryableStateClient(Configuration config) throws Exception {
+		Preconditions.checkNotNull(config, "Configuration");
+
+		// Create a leader retrieval service
+		LeaderRetrievalService leaderRetrievalService = LeaderRetrievalUtils
+				.createLeaderRetrievalService(config);
+
+		// Get the ask timeout
+		String askTimeoutString = config.getString(
+				ConfigConstants.AKKA_ASK_TIMEOUT,
+				ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT);
+
+		Duration timeout = FiniteDuration.apply(askTimeoutString);
+		if (!timeout.isFinite()) {
+			throw new IllegalConfigurationException(ConfigConstants.AKKA_ASK_TIMEOUT
+					+ " is not a finite timeout ('" + askTimeoutString + "')");
+		}
+
+		FiniteDuration askTimeout = (FiniteDuration) timeout;
+
+		int lookupRetries = config.getInteger(
+				ConfigConstants.QUERYABLE_STATE_CLIENT_LOOKUP_RETRIES,
+				ConfigConstants.DEFAULT_QUERYABLE_STATE_CLIENT_LOOKUP_RETRIES);
+
+		int lookupRetryDelayMillis = config.getInteger(
+				ConfigConstants.QUERYABLE_STATE_CLIENT_LOOKUP_RETRY_DELAY,
+				ConfigConstants.DEFAULT_QUERYABLE_STATE_CLIENT_LOOKUP_RETRY_DELAY);
+
+		// Retries if no JobManager is around
+		LookupRetryStrategyFactory retryStrategy = new FixedDelayLookupRetryStrategyFactory(
+				lookupRetries,
+				FiniteDuration.apply(lookupRetryDelayMillis, "ms"));
+
+		// Create the actor system
+		@SuppressWarnings("unchecked")
+		Option<Tuple2<String, Object>> remoting = new Some(new Tuple2<>("", 0));
+		this.actorSystem = AkkaUtils.createActorSystem(config, remoting);
+
+		AkkaKvStateLocationLookupService lookupService = new AkkaKvStateLocationLookupService(
+				leaderRetrievalService,
+				actorSystem,
+				askTimeout,
+				retryStrategy);
+
+		int numEventLoopThreads = config.getInteger(
+				ConfigConstants.QUERYABLE_STATE_CLIENT_NETWORK_THREADS,
+				ConfigConstants.DEFAULT_QUERYABLE_STATE_CLIENT_NETWORK_THREADS);
+
+		if (numEventLoopThreads == 0) {
+			numEventLoopThreads = Runtime.getRuntime().availableProcessors();
+		}
+
+		// Create the network client
+		KvStateClient networkClient = new KvStateClient(
+				numEventLoopThreads,
+				new DisabledKvStateRequestStats());
+
+		this.lookupService = lookupService;
+		this.kvStateClient = networkClient;
+		this.executionContext = actorSystem.dispatcher();
+
+		this.lookupService.start();
+	}
+
+	/**
+	 * Creates a client.
+	 *
+	 * @param lookupService    Location lookup service
+	 * @param kvStateClient    Network client for queries
+	 * @param executionContext Execution context for futures
+	 */
+	public QueryableStateClient(
+			KvStateLocationLookupService lookupService,
+			KvStateClient kvStateClient,
+			ExecutionContext executionContext) {
+
+		this.lookupService = Preconditions.checkNotNull(lookupService, "KvStateLocationLookupService");
+		this.kvStateClient = Preconditions.checkNotNull(kvStateClient, "KvStateClient");
+		this.executionContext = Preconditions.checkNotNull(executionContext, "ExecutionContext");
+		this.actorSystem = null;
+
+		this.lookupService.start();
+	}
+
+	/**
+	 * Returns the execution context of this client.
+	 *
+	 * @return The execution context used by the client.
+	 */
+	public ExecutionContext getExecutionContext() {
+		return executionContext;
+	}
+
+	/**
+	 * Shuts down the client and all components.
+	 */
+	public void shutDown() {
+		try {
+			lookupService.shutDown();
+		} catch (Throwable t) {
+			LOG.error("Failed to shut down KvStateLookupService", t);
+		}
+
+		try {
+			kvStateClient.shutDown();
+		} catch (Throwable t) {
+			LOG.error("Failed to shut down KvStateClient", t);
+		}
+
+		if (actorSystem != null) {
+			try {
+				actorSystem.shutdown();
+			} catch (Throwable t) {
+				LOG.error("Failed to shut down ActorSystem");
+			}
+		}
+	}
+
+	/**
+	 * Returns a future holding the serialized request result.
+	 *
+	 * <p>If the server does not serve a KvState instance with the given ID,
+	 * the Future will be failed with a {@link UnknownKvStateID}.
+	 *
+	 * <p>If the KvState instance does not hold any data for the given key
+	 * and namespace, the Future will be failed with a {@link UnknownKeyOrNamespace}.
+	 *
+	 * <p>All other failures are forwarded to the Future.
+	 *
+	 * @param jobId                     JobID of the job the queryable state
+	 *                                  belongs to
+	 * @param queryableStateName        Name under which the state is queryable
+	 * @param keyHashCode               Integer hash code of the key (result of
+	 *                                  a call to {@link Object#hashCode()}
+	 * @param serializedKeyAndNamespace Serialized key and namespace to query
+	 *                                  KvState instance with
+	 * @return Future holding the serialized result
+	 */
+	@SuppressWarnings("unchecked")
+	public Future<byte[]> getKvState(
+			final JobID jobId,
+			final String queryableStateName,
+			final int keyHashCode,
+			final byte[] serializedKeyAndNamespace) {
+
+		return getKvState(jobId, queryableStateName, keyHashCode, serializedKeyAndNamespace, false)
+				.recoverWith(new Recover<Future<byte[]>>() {
+					@Override
+					public Future<byte[]> recover(Throwable failure) throws Throwable {
+						if (failure instanceof UnknownKvStateID ||
+								failure instanceof UnknownKvStateKeyGroupLocation ||
+								failure instanceof UnknownKvStateLocation ||
+								failure instanceof ConnectException) {
+							// These failures are likely to be caused by out-of-sync
+							// KvStateLocation. Therefore we retry this query and
+							// force look up the location.
+							return getKvState(
+									jobId,
+									queryableStateName,
+									keyHashCode,
+									serializedKeyAndNamespace,
+									true);
+						} else {
+							return Futures.failed(failure);
+						}
+					}
+				}, executionContext);
+	}
+
+	/**
+	 * Returns a future holding the serialized request result.
+	 *
+	 * @param jobId                     JobID of the job the queryable state
+	 *                                  belongs to
+	 * @param queryableStateName        Name under which the state is queryable
+	 * @param keyHashCode               Integer hash code of the key (result of
+	 *                                  a call to {@link Object#hashCode()}
+	 * @param serializedKeyAndNamespace Serialized key and namespace to query
+	 *                                  KvState instance with
+	 * @param forceLookup               Flag to force lookup of the {@link KvStateLocation}
+	 * @return Future holding the serialized result
+	 */
+	private Future<byte[]> getKvState(
+			final JobID jobId,
+			final String queryableStateName,
+			final int keyHashCode,
+			final byte[] serializedKeyAndNamespace,
+			boolean forceLookup) {
+
+		return getKvStateLookupInfo(jobId, queryableStateName, forceLookup)
+				.flatMap(new Mapper<KvStateLocation, Future<byte[]>>() {
+					@Override
+					public Future<byte[]> apply(KvStateLocation lookup) {
+						int keyGroupIndex = MathUtils.murmurHash(keyHashCode) % lookup.getNumKeyGroups();
+
+						KvStateServerAddress serverAddress = lookup.getKvStateServerAddress(keyGroupIndex);
+						if (serverAddress == null) {
+							return Futures.failed(new UnknownKvStateKeyGroupLocation());
+						} else {
+							// Query server
+							KvStateID kvStateId = lookup.getKvStateID(keyGroupIndex);
+							return kvStateClient.getKvState(serverAddress, kvStateId, serializedKeyAndNamespace);
+						}
+					}
+				}, executionContext);
+	}
+
+	/**
+	 * Lookup the {@link KvStateLocation} for the given job and queryable state
+	 * name.
+	 *
+	 * <p>The job manager will be queried for the location only if forced or no
+	 * cached location can be found. There are no guarantees about
+	 *
+	 * @param jobId              JobID the state instance belongs to.
+	 * @param queryableStateName Name under which the state instance has been published.
+	 * @param forceUpdate        Flag to indicate whether to force a update via the lookup service.
+	 * @return Future holding the KvStateLocation
+	 */
+	private Future<KvStateLocation> getKvStateLookupInfo(
+			JobID jobId,
+			final String queryableStateName,
+			boolean forceUpdate) {
+
+		if (forceUpdate) {
+			Future<KvStateLocation> lookupFuture = lookupService
+					.getKvStateLookupInfo(jobId, queryableStateName);
+			lookupCache.put(new Tuple2<>(jobId, queryableStateName), lookupFuture);
+			return lookupFuture;
+		} else {
+			Tuple2<JobID, String> cacheKey = new Tuple2<>(jobId, queryableStateName);
+			final Future<KvStateLocation> cachedFuture = lookupCache.get(cacheKey);
+
+			if (cachedFuture == null) {
+				Future<KvStateLocation> lookupFuture = lookupService
+						.getKvStateLookupInfo(jobId, queryableStateName);
+
+				Future<KvStateLocation> previous = lookupCache.putIfAbsent(cacheKey, lookupFuture);
+				if (previous == null) {
+					return lookupFuture;
+				} else {
+					return previous;
+				}
+			} else {
+				return cachedFuture;
+			}
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/329610d5/flink-runtime/src/main/java/org/apache/flink/runtime/query/UnknownKvStateKeyGroupLocation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/UnknownKvStateKeyGroupLocation.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/UnknownKvStateKeyGroupLocation.java
new file mode 100644
index 0000000..8f62be5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/UnknownKvStateKeyGroupLocation.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query;
+
+/**
+ * Exception thrown if there is no location information available for the given
+ * key group in a {@link KvStateLocation} instance.
+ */
+class UnknownKvStateKeyGroupLocation extends Exception {
+
+	private static final long serialVersionUID = 1L;
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/329610d5/flink-runtime/src/main/java/org/apache/flink/runtime/query/UnknownKvStateLocation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/UnknownKvStateLocation.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/UnknownKvStateLocation.java
new file mode 100644
index 0000000..38cc1cc
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/UnknownKvStateLocation.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query;
+
+/**
+ * Thrown if there is no {@link KvStateLocation} found for the requested
+ * registration name.
+ *
+ * <p>This indicates that the requested KvState instance is not registered
+ * under this name (yet).
+ */
+public class UnknownKvStateLocation extends Exception {
+
+	private static final long serialVersionUID = 1L;
+
+	public UnknownKvStateLocation(String registrationName) {
+		super("No KvStateLocation found for KvState instance with name '" + registrationName +
"'.");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/329610d5/flink-runtime/src/main/java/org/apache/flink/runtime/query/package-info.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/package-info.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/package-info.java
new file mode 100644
index 0000000..07a4396
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/package-info.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains all KvState query related classes.
+ *
+ * <h2>TaskManager and JobManager</h2>
+ *
+ * <p>State backends register queryable state instances at the {@link
+ * org.apache.flink.runtime.query.KvStateRegistry}.
+ * There is one registry per TaskManager. Registered KvState instances are
+ * reported to the JobManager, where they are aggregated at the {@link
+ * org.apache.flink.runtime.query.KvStateLocationRegistry}.
+ *
+ * <p>Instances of {@link org.apache.flink.runtime.query.KvStateLocation} contain
+ * all information needed for a client to query a KvState instance.
+ *
+ * <p>See also:
+ * <ul>
+ * <li>{@link org.apache.flink.runtime.query.KvStateRegistry}</li>
+ * <li>{@link org.apache.flink.runtime.query.TaskKvStateRegistry}</li>
+ * <li>{@link org.apache.flink.runtime.query.KvStateLocation}</li>
+ * <li>{@link org.apache.flink.runtime.query.KvStateLocationRegistry}</li>
+ * </ul>
+ *
+ * <h2>Client</h2>
+ *
+ * The {@link org.apache.flink.runtime.query.QueryableStateClient} is used
+ * to query KvState instances. The client takes care of {@link
+ * org.apache.flink.runtime.query.KvStateLocation} lookup and caching. Queries
+ * are then dispatched via the network client.
+ *
+ * <h3>JobManager Communication</h3>
+ *
+ * <p>The JobManager is queried for {@link org.apache.flink.runtime.query.KvStateLocation}
+ * instances via the {@link org.apache.flink.runtime.query.KvStateLocationLookupService}.
+ * The client caches resolved locations and dispatches queries directly to the
+ * TaskManager.
+ *
+ * <h3>TaskManager Communication</h3>
+ *
+ * <p>After the location has been resolved, the TaskManager is queried via the
+ * {@link org.apache.flink.runtime.query.netty.KvStateClient}.
+ */
+package org.apache.flink.runtime.query;

http://git-wip-us.apache.org/repos/asf/flink/blob/329610d5/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
index 065211c..0788d7c 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
@@ -27,5 +27,8 @@ case class NetworkEnvironmentConfiguration(
   networkBufferSize: Int,
   memoryType: MemoryType,
   ioMode: IOMode,
+  queryServerPort: Int,
+  queryServerNetworkThreads: Int,
+  queryServerQueryThreads: Int,
   nettyConfig: Option[NettyConfig] = None,
   partitionRequestInitialAndMaxBackoff: (Integer, Integer) = (500, 3000))

http://git-wip-us.apache.org/repos/asf/flink/blob/329610d5/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 7c4b867..e732214 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -2097,11 +2097,26 @@ object TaskManager {
 
     val ioMode : IOMode = if (syncOrAsync == "async") IOMode.ASYNC else IOMode.SYNC
 
+    val queryServerPort =  configuration.getInteger(
+      ConfigConstants.QUERYABLE_STATE_SERVER_PORT,
+      ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_PORT)
+
+    val queryServerNetworkThreads =  configuration.getInteger(
+      ConfigConstants.QUERYABLE_STATE_SERVER_NETWORK_THREADS,
+      ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_NETWORK_THREADS)
+
+    val queryServerQueryThreads =  configuration.getInteger(
+      ConfigConstants.QUERYABLE_STATE_SERVER_QUERY_THREADS,
+      ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_QUERY_THREADS)
+
     val networkConfig = NetworkEnvironmentConfiguration(
       numNetworkBuffers,
       pageSize,
       memType,
       ioMode,
+      queryServerPort,
+      queryServerNetworkThreads,
+      queryServerQueryThreads,
       nettyConfig)
 
     // ----> timeouts, library caching, profiling

http://git-wip-us.apache.org/repos/asf/flink/blob/329610d5/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
index 938e661..4597e3b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -80,7 +80,7 @@ public class NetworkEnvironmentTest {
 			NettyConfig nettyConf = new NettyConfig(InetAddress.getLocalHost(), port, BUFFER_SIZE,
1, new Configuration());
 			NetworkEnvironmentConfiguration config = new NetworkEnvironmentConfiguration(
 					NUM_BUFFERS, BUFFER_SIZE, MemoryType.HEAP,
-					IOManager.IOMode.SYNC, new Some<>(nettyConf),
+					IOManager.IOMode.SYNC, 0, 0, 0, new Some<>(nettyConf),
 					new Tuple2<>(0, 0));
 
 			NetworkEnvironment env = new NetworkEnvironment(
@@ -174,6 +174,9 @@ public class NetworkEnvironmentTest {
 				1024,
 				MemoryType.HEAP,
 				IOManager.IOMode.SYNC,
+				0,
+				0,
+				0,
 				Some.<NettyConfig>empty(),
 				new Tuple2<>(0, 0));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/329610d5/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
new file mode 100644
index 0000000..36f2b45
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query;
+
+import akka.actor.ActorSystem;
+import akka.dispatch.Futures;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.query.netty.AtomicKvStateRequestStats;
+import org.apache.flink.runtime.query.netty.KvStateClient;
+import org.apache.flink.runtime.query.netty.KvStateServer;
+import org.apache.flink.runtime.query.netty.UnknownKvStateID;
+import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.memory.MemValueState;
+import org.apache.flink.util.MathUtils;
+import org.junit.AfterClass;
+import org.junit.Test;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.net.ConnectException;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class QueryableStateClientTest {
+
+	private static final ActorSystem testActorSystem = AkkaUtils.createLocalActorSystem(new
Configuration());
+
+	private static final FiniteDuration timeout = new FiniteDuration(100, TimeUnit.SECONDS);
+
+	@AfterClass
+	public static void tearDown() throws Exception {
+		if (testActorSystem != null) {
+			testActorSystem.shutdown();
+		}
+	}
+
+	/**
+	 * All failures should lead to a retry with a forced location lookup.
+	 *
+	 * UnknownKvStateID, UnknownKvStateKeyGroupLocation, UnknownKvStateLocation,
+	 * ConnectException are checked explicitly as these indicate out-of-sync
+	 * KvStateLocation.
+	 */
+	@Test
+	public void testForceLookupOnOutdatedLocation() throws Exception {
+		KvStateLocationLookupService lookupService = mock(KvStateLocationLookupService.class);
+		KvStateClient networkClient = mock(KvStateClient.class);
+
+		QueryableStateClient client = new QueryableStateClient(
+				lookupService,
+				networkClient,
+				testActorSystem.dispatcher());
+
+		try {
+			JobID jobId = new JobID();
+			int numKeyGroups = 4;
+
+			//
+			// UnknownKvStateLocation
+			//
+			String query1 = "lucky";
+
+			Future<KvStateLocation> unknownKvStateLocation = Futures.failed(
+					new UnknownKvStateLocation(query1));
+
+			when(lookupService.getKvStateLookupInfo(eq(jobId), eq(query1)))
+					.thenReturn(unknownKvStateLocation);
+
+			Future<byte[]> result = client.getKvState(
+					jobId,
+					query1,
+					0,
+					new byte[0]);
+
+			try {
+				Await.result(result, timeout);
+				fail("Did not throw expected UnknownKvStateLocation exception");
+			} catch (UnknownKvStateLocation ignored) {
+				// Expected
+			}
+
+			verify(lookupService, times(2)).getKvStateLookupInfo(eq(jobId), eq(query1));
+
+			//
+			// UnknownKvStateKeyGroupLocation
+			//
+			String query2 = "unlucky";
+
+			Future<KvStateLocation> unknownKeyGroupLocation = Futures.successful(
+					new KvStateLocation(jobId, new JobVertexID(), numKeyGroups, query2));
+
+			when(lookupService.getKvStateLookupInfo(eq(jobId), eq(query2)))
+					.thenReturn(unknownKeyGroupLocation);
+
+			result = client.getKvState(jobId, query2, 0, new byte[0]);
+
+			try {
+				Await.result(result, timeout);
+				fail("Did not throw expected UnknownKvStateKeyGroupLocation exception");
+			} catch (UnknownKvStateKeyGroupLocation ignored) {
+				// Expected
+			}
+
+			verify(lookupService, times(2)).getKvStateLookupInfo(eq(jobId), eq(query2));
+
+			//
+			// UnknownKvStateID
+			//
+			String query3 = "water";
+			KvStateID kvStateId = new KvStateID();
+			Future<byte[]> unknownKvStateId = Futures.failed(new UnknownKvStateID(kvStateId));
+
+			KvStateServerAddress serverAddress = new KvStateServerAddress(InetAddress.getLocalHost(),
12323);
+			KvStateLocation location = new KvStateLocation(jobId, new JobVertexID(), numKeyGroups,
query3);
+			for (int i = 0; i < numKeyGroups; i++) {
+				location.registerKvState(i, kvStateId, serverAddress);
+			}
+
+			when(lookupService.getKvStateLookupInfo(eq(jobId), eq(query3)))
+					.thenReturn(Futures.successful(location));
+
+			when(networkClient.getKvState(eq(serverAddress), eq(kvStateId), any(byte[].class)))
+					.thenReturn(unknownKvStateId);
+
+			result = client.getKvState(jobId, query3, 0, new byte[0]);
+
+			try {
+				Await.result(result, timeout);
+				fail("Did not throw expected UnknownKvStateID exception");
+			} catch (UnknownKvStateID ignored) {
+				// Expected
+			}
+
+			verify(lookupService, times(2)).getKvStateLookupInfo(eq(jobId), eq(query3));
+
+			//
+			// ConnectException
+			//
+			String query4 = "space";
+			Future<byte[]> connectException = Futures.failed(new ConnectException());
+			kvStateId = new KvStateID();
+
+			serverAddress = new KvStateServerAddress(InetAddress.getLocalHost(), 11123);
+			location = new KvStateLocation(jobId, new JobVertexID(), numKeyGroups, query4);
+			for (int i = 0; i < numKeyGroups; i++) {
+				location.registerKvState(i, kvStateId, serverAddress);
+			}
+
+			when(lookupService.getKvStateLookupInfo(eq(jobId), eq(query4)))
+					.thenReturn(Futures.successful(location));
+
+			when(networkClient.getKvState(eq(serverAddress), eq(kvStateId), any(byte[].class)))
+					.thenReturn(connectException);
+
+			result = client.getKvState(jobId, query4, 0, new byte[0]);
+
+			try {
+				Await.result(result, timeout);
+				fail("Did not throw expected ConnectException exception");
+			} catch (ConnectException ignored) {
+				// Expected
+			}
+
+			verify(lookupService, times(2)).getKvStateLookupInfo(eq(jobId), eq(query4));
+
+			//
+			// Other Exceptions don't lead to a retry no retry
+			//
+			String query5 = "universe";
+			Future<KvStateLocation> exception = Futures.failed(new RuntimeException("Test exception"));
+			when(lookupService.getKvStateLookupInfo(eq(jobId), eq(query5)))
+					.thenReturn(exception);
+
+			client.getKvState(jobId, query5, 0, new byte[0]);
+
+			verify(lookupService, times(1)).getKvStateLookupInfo(eq(jobId), eq(query5));
+		} finally {
+			client.shutDown();
+		}
+	}
+
+	/**
+	 * Tests queries against multiple servers.
+	 *
+	 * <p>The servers are populated with different keys and the client queries
+	 * all available keys from all servers.
+	 */
+	@Test
+	public void testIntegrationWithKvStateServer() throws Exception {
+		// Config
+		int numServers = 2;
+		int numKeys = 1024;
+
+		JobID jobId = new JobID();
+		JobVertexID jobVertexId = new JobVertexID();
+
+		KvStateServer[] servers = new KvStateServer[numServers];
+		AtomicKvStateRequestStats[] serverStats = new AtomicKvStateRequestStats[numServers];
+
+		QueryableStateClient client = null;
+		KvStateClient networkClient = null;
+		AtomicKvStateRequestStats networkClientStats = new AtomicKvStateRequestStats();
+
+		try {
+			KvStateRegistry[] registries = new KvStateRegistry[numServers];
+			KvStateID[] kvStateIds = new KvStateID[numServers];
+			List<MemValueState<Integer, VoidNamespace, Integer>> kvStates = new ArrayList<>();
+
+			// Start the servers
+			for (int i = 0; i < numServers; i++) {
+				registries[i] = new KvStateRegistry();
+				serverStats[i] = new AtomicKvStateRequestStats();
+				servers[i] = new KvStateServer(InetAddress.getLocalHost(), 0, 1, 1, registries[i], serverStats[i]);
+				servers[i].start();
+
+				// Register state
+				MemValueState<Integer, VoidNamespace, Integer> kvState = new MemValueState<>(
+						IntSerializer.INSTANCE,
+						VoidNamespaceSerializer.INSTANCE,
+						new ValueStateDescriptor<>("any", IntSerializer.INSTANCE, null));
+
+				kvStates.add(kvState);
+
+				kvStateIds[i] = registries[i].registerKvState(
+						jobId,
+						new JobVertexID(),
+						i, // key group index
+						"choco",
+						kvState);
+			}
+
+			int[] expectedRequests = new int[numServers];
+
+			for (int key = 0; key < numKeys; key++) {
+				int targetKeyGroupIndex = MathUtils.murmurHash(key) % numServers;
+				expectedRequests[targetKeyGroupIndex]++;
+
+				MemValueState<Integer, VoidNamespace, Integer> kvState = kvStates.get(targetKeyGroupIndex);
+
+				kvState.setCurrentKey(key);
+				kvState.setCurrentNamespace(VoidNamespace.INSTANCE);
+				kvState.update(1337 + key);
+			}
+
+			// Location lookup service
+			KvStateLocation location = new KvStateLocation(jobId, jobVertexId, numServers, "choco");
+			for (int keyGroupIndex = 0; keyGroupIndex < numServers; keyGroupIndex++) {
+				location.registerKvState(keyGroupIndex, kvStateIds[keyGroupIndex], servers[keyGroupIndex].getAddress());
+			}
+
+			KvStateLocationLookupService lookupService = mock(KvStateLocationLookupService.class);
+			when(lookupService.getKvStateLookupInfo(eq(jobId), eq("choco")))
+					.thenReturn(Futures.successful(location));
+
+			// The client
+			networkClient = new KvStateClient(1, networkClientStats);
+
+			client = new QueryableStateClient(lookupService, networkClient, testActorSystem.dispatcher());
+
+			// Send all queries
+			List<Future<byte[]>> futures = new ArrayList<>(numKeys);
+			for (int key = 0; key < numKeys; key++) {
+				byte[] serializedKeyAndNamespace = KvStateRequestSerializer.serializeKeyAndNamespace(
+						key,
+						IntSerializer.INSTANCE,
+						VoidNamespace.INSTANCE,
+						VoidNamespaceSerializer.INSTANCE);
+
+				futures.add(client.getKvState(jobId, "choco", key, serializedKeyAndNamespace));
+			}
+
+			// Verify results
+			Future<Iterable<byte[]>> future = Futures.sequence(futures, testActorSystem.dispatcher());
+			Iterable<byte[]> results = Await.result(future, timeout);
+
+			int index = 0;
+			for (byte[] buffer : results) {
+				int deserializedValue = KvStateRequestSerializer.deserializeValue(buffer, IntSerializer.INSTANCE);
+				assertEquals(1337 + index, deserializedValue);
+				index++;
+			}
+
+			// Verify requests
+			for (int i = 0; i < numServers; i++) {
+				int numRetries = 10;
+				for (int retry = 0; retry < numRetries; retry++) {
+					try {
+						assertEquals("Unexpected number of requests", expectedRequests[i], serverStats[i].getNumRequests());
+						assertEquals("Unexpected success requests", expectedRequests[i], serverStats[i].getNumSuccessful());
+						assertEquals("Unexpected failed requests", 0, serverStats[i].getNumFailed());
+						break;
+					} catch (Throwable t) {
+						// Retry
+						if (retry == numRetries-1) {
+							throw t;
+						} else {
+							Thread.sleep(100);
+						}
+					}
+				}
+			}
+		} finally {
+			if (client != null) {
+				client.shutDown();
+			}
+
+			if (networkClient != null) {
+				networkClient.shutDown();
+			}
+
+			for (KvStateServer server : servers) {
+				if (server != null) {
+					server.shutDown();
+				}
+			}
+		}
+	}
+
+	/**
+	 * Tests that the QueryableState client correctly caches location lookups
+	 * keyed by both job and name. This test is mainly due to a previous bug due
+	 * to which cache entries were by name only. This is a problem, because the
+	 * same client can be used to query multiple jobs.
+	 */
+	@Test
+	public void testLookupMultipleJobIds() throws Exception {
+		String name = "unique-per-job";
+
+		// Exact contents don't matter here
+		KvStateLocation location = new KvStateLocation(new JobID(), new JobVertexID(), 1, name);
+		location.registerKvState(0, new KvStateID(), new KvStateServerAddress(InetAddress.getLocalHost(),
892));
+
+		JobID jobId1 = new JobID();
+		JobID jobId2 = new JobID();
+
+		KvStateLocationLookupService lookupService = mock(KvStateLocationLookupService.class);
+
+		when(lookupService.getKvStateLookupInfo(any(JobID.class), anyString()))
+				.thenReturn(Futures.successful(location));
+
+		KvStateClient networkClient = mock(KvStateClient.class);
+		when(networkClient.getKvState(any(KvStateServerAddress.class), any(KvStateID.class), any(byte[].class)))
+				.thenReturn(Futures.successful(new byte[0]));
+
+		QueryableStateClient client = new QueryableStateClient(
+				lookupService,
+				networkClient,
+				testActorSystem.dispatcher());
+
+		// Query ies with same name, but different job IDs should lead to a
+		// single lookup per query and job ID.
+		client.getKvState(jobId1, name, 0, new byte[0]);
+		client.getKvState(jobId2, name, 0, new byte[0]);
+
+		verify(lookupService, times(1)).getKvStateLookupInfo(eq(jobId1), eq(name));
+		verify(lookupService, times(1)).getKvStateLookupInfo(eq(jobId2), eq(name));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/329610d5/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index ca7157a..147a3e0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -98,8 +98,8 @@ public class TaskManagerComponentsStartupShutdownTest {
 					config);
 
 			final NetworkEnvironmentConfiguration netConf = new NetworkEnvironmentConfiguration(
-					32, BUFFER_SIZE, MemoryType.HEAP, IOManager.IOMode.SYNC, Option.<NettyConfig>empty(),
-					new Tuple2<Integer, Integer>(0, 0));
+					32, BUFFER_SIZE, MemoryType.HEAP, IOManager.IOMode.SYNC, 0, 0, 0,
+					Option.<NettyConfig>empty(), new Tuple2<Integer, Integer>(0, 0));
 
 			final InstanceConnectionInfo connectionInfo = new InstanceConnectionInfo(InetAddress.getLocalHost(),
10000);
 


Mime
View raw message