flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kklou...@apache.org
Subject [13/14] flink git commit: [FLINK-7770][QS] Hide the queryable state behind a proxy.
Date Wed, 11 Oct 2017 15:46:13 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
index 27257d7..005c874 100644
--- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
+++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
@@ -25,306 +25,117 @@ import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.ListSerializer;
-import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.configuration.QueryableStateOptions;
-import org.apache.flink.queryablestate.UnknownKeyOrNamespace;
-import org.apache.flink.queryablestate.UnknownKvStateID;
-import org.apache.flink.queryablestate.UnknownKvStateKeyGroupLocation;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.query.KvStateID;
-import org.apache.flink.runtime.query.KvStateLocation;
-import org.apache.flink.runtime.query.KvStateServer;
+import org.apache.flink.queryablestate.messages.KvStateRequest;
+import org.apache.flink.queryablestate.messages.KvStateResponse;
+import org.apache.flink.queryablestate.network.Client;
+import org.apache.flink.queryablestate.network.messages.MessageSerializer;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.query.KvStateServerAddress;
-import org.apache.flink.runtime.query.UnknownKvStateLocation;
 import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
 import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
-import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceTypeInfo;
+import org.apache.flink.runtime.util.Hardware;
+import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
 
-import akka.actor.ActorSystem;
-import akka.dispatch.Futures;
-import akka.dispatch.Mapper;
-import akka.dispatch.Recover;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.net.ConnectException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import scala.Option;
-import scala.Some;
-import scala.Tuple2;
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Duration;
-import scala.concurrent.duration.FiniteDuration;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.concurrent.CompletableFuture;
 
 /**
- * Client for queryable state.
+ * Client for querying Flink's managed state.
  *
  * <p>You can mark state as queryable via {@link StateDescriptor#setQueryable(String)}.
- * The state instance created from this descriptor will be published for queries
- * when it's created on the TaskManagers and the location will be reported to
- * the JobManager.
+ * The state instance created from this descriptor will be published for queries when it's
+ * created on the Task Managers and the location will be reported to the Job Manager.
  *
- * <p>The client resolves the location of the requested KvState via the
- * JobManager. Resolved locations are cached. When the server address of the
- * requested KvState instance is determined, the client sends out a request to
- * the server.
+ * <p>The client connects to a {@link org.apache.flink.runtime.query.KvStateClientProxy Client Proxy}
+ * running on a given Task Manager. The proxy is the entry point of the client to the Flink cluster.
+ * It forwards the requests of the client to the Job Manager and the required Task Manager, and forwards
+ * the final response back the client.
+ *
+ * <p>The proxy, initially resolves the location of the requested KvState via the JobManager. Resolved
+ * locations are cached. When the server address of the requested KvState instance is determined, the
+ * client sends out a request to the server. The returned final answer is then forwarded to the Client.
  */
+@PublicEvolving
 public class QueryableStateClient {
 
 	private static final Logger LOG = LoggerFactory.getLogger(QueryableStateClient.class);
 
-	/**
-	 * {@link KvStateLocation} lookup to resolve the address of KvState instances.
-	 */
-	private final KvStateLocationLookupService lookupService;
-
-	/**
-	 * Network client for queries against {@link KvStateServer} instances.
-	 */
-	private final KvStateClient kvStateClient;
-
-	/**
-	 * Execution context.
-	 */
-	private final ExecutionContext executionContext;
+	/** The client that forwards the requests to the proxy. */
+	private final Client<KvStateRequest, KvStateResponse> client;
 
-	/**
-	 * Cache for {@link KvStateLocation} instances keyed by job and name.
-	 */
-	private final ConcurrentMap<Tuple2<JobID, String>, Future<KvStateLocation>> lookupCache =
-			new ConcurrentHashMap<>();
-
-	/** This is != null, if we started the actor system. */
-	private final ActorSystem actorSystem;
+	/** The address of the proxy this client is connected to. */
+	private final KvStateServerAddress remoteAddress;
 
+	/** The execution configuration used to instantiate the different (de-)serializers. */
 	private ExecutionConfig executionConfig;
 
 	/**
-	 * Creates a client from the given configuration.
-	 *
-	 * <p>This will create multiple Thread pools: one for the started actor
-	 * system and another for the network client.
-	 *
-	 * @param config Configuration to use.
-	 * @throws Exception Failures are forwarded
+	 * Create the Queryable State Client.
+	 * @param remoteHostname the hostname of the {@link org.apache.flink.runtime.query.KvStateClientProxy proxy}
+	 *                       to connect to.
+	 * @param remotePort the port of the proxy to connect to.
 	 */
-	public QueryableStateClient(Configuration config) throws Exception {
-		this(config, HighAvailabilityServicesUtils.createHighAvailabilityServices(
-				config, Executors.directExecutor(), HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION));
+	public QueryableStateClient(final String remoteHostname, final int remotePort) throws UnknownHostException {
+		this(InetAddress.getByName(Preconditions.checkNotNull(remoteHostname)), remotePort);
 	}
 
 	/**
-	 * Creates a client from the given configuration.
-	 *
-	 * <p>This will create multiple Thread pools: one for the started actor
-	 * system and another for the network client.
-	 *
-	 * @param config Configuration to use.
-	 * @param highAvailabilityServices Service factory for high availability services
-	 * @throws Exception Failures are forwarded
-	 *
-	 * @deprecated This constructor is deprecated and stays only for backwards compatibility. Use the
-	 * {@link #QueryableStateClient(Configuration)} instead.
+	 * Create the Queryable State Client.
+	 * @param remoteAddress the {@link InetAddress address} of the
+	 *                      {@link org.apache.flink.runtime.query.KvStateClientProxy proxy} to connect to.
+	 * @param remotePort the port of the proxy to connect to.
 	 */
-	@Deprecated
-	public QueryableStateClient(
-			Configuration config,
-			HighAvailabilityServices highAvailabilityServices) throws Exception {
-		Preconditions.checkNotNull(config, "Configuration");
-
-		// Create a leader retrieval service
-		LeaderRetrievalService leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID);
-
-		// Get the ask timeout
-		String askTimeoutString = config.getString(AkkaOptions.ASK_TIMEOUT);
-
-		Duration timeout = FiniteDuration.apply(askTimeoutString);
-		if (!timeout.isFinite()) {
-			throw new IllegalConfigurationException(AkkaOptions.ASK_TIMEOUT.key()
-					+ " is not a finite timeout ('" + askTimeoutString + "')");
-		}
-
-		FiniteDuration askTimeout = (FiniteDuration) timeout;
-
-		int lookupRetries = config.getInteger(QueryableStateOptions.CLIENT_LOOKUP_RETRIES);
-		int lookupRetryDelayMillis = config.getInteger(QueryableStateOptions.CLIENT_LOOKUP_RETRY_DELAY);
-
-		// Retries if no JobManager is around
-		AkkaKvStateLocationLookupService.LookupRetryStrategyFactory retryStrategy =
-				new AkkaKvStateLocationLookupService.FixedDelayLookupRetryStrategyFactory(
-						lookupRetries,
-						FiniteDuration.apply(lookupRetryDelayMillis, "ms"));
+	public QueryableStateClient(final InetAddress remoteAddress, final int remotePort) {
+		Preconditions.checkArgument(remotePort >= 0 && remotePort <= 65536,
+				"Remote Port " + remotePort + " is out of valid port range (0-65536).");
 
-		// Create the actor system
-		@SuppressWarnings("unchecked")
-		Option<Tuple2<String, Object>> remoting = new Some(new Tuple2<>("", 0));
-		this.actorSystem = AkkaUtils.createActorSystem(config, remoting);
+		this.remoteAddress = new KvStateServerAddress(remoteAddress, remotePort);
 
-		AkkaKvStateLocationLookupService lookupService = new AkkaKvStateLocationLookupService(
-				leaderRetrievalService,
-				actorSystem,
-				askTimeout,
-				retryStrategy);
+		final MessageSerializer<KvStateRequest, KvStateResponse> messageSerializer =
+				new MessageSerializer<>(
+						new KvStateRequest.KvStateRequestDeserializer(),
+						new KvStateResponse.KvStateResponseDeserializer());
 
-		int numEventLoopThreads = config.getInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS);
-
-		if (numEventLoopThreads == 0) {
-			numEventLoopThreads = Runtime.getRuntime().availableProcessors();
-		}
-
-		// Create the network client
-		KvStateClient networkClient = new KvStateClient(
-				numEventLoopThreads,
+		this.client = new Client<>(
+				"Queryable State Client",
+				Hardware.getNumberCPUCores(),
+				messageSerializer,
 				new DisabledKvStateRequestStats());
-
-		this.lookupService = lookupService;
-		this.kvStateClient = networkClient;
-		this.executionContext = actorSystem.dispatcher();
-		this.executionConfig = new ExecutionConfig();
-
-		this.lookupService.start();
-	}
-
-	/** Gets the {@link ExecutionConfig}. */
-	public ExecutionConfig getExecutionConfig() {
-		return executionConfig;
 	}
 
-	/** Sets the {@link ExecutionConfig}. */
-	public void setExecutionConfig(ExecutionConfig config) {
-		this.executionConfig = config;
-	}
-
-	/**
-	 * Creates a client.
-	 *
-	 * @param lookupService    Location lookup service
-	 * @param kvStateClient    Network client for queries
-	 * @param executionContext Execution context for futures
-	 */
-	public QueryableStateClient(
-			KvStateLocationLookupService lookupService,
-			KvStateClient kvStateClient,
-			ExecutionContext executionContext) {
-
-		this.lookupService = Preconditions.checkNotNull(lookupService, "KvStateLocationLookupService");
-		this.kvStateClient = Preconditions.checkNotNull(kvStateClient, "KvStateClient");
-		this.executionContext = Preconditions.checkNotNull(executionContext, "ExecutionContext");
-		this.actorSystem = null;
-
-		this.lookupService.start();
+	/** Shuts down the client. */
+	public void shutdown() {
+		client.shutdown();
 	}
 
 	/**
-	 * Returns the execution context of this client.
-	 *
-	 * @return The execution context used by the client.
+	 * Gets the {@link ExecutionConfig}.
 	 */
-	public ExecutionContext getExecutionContext() {
-		return executionContext;
-	}
-
-	/**
-	 * Shuts down the client and all components.
-	 */
-	public void shutDown() {
-		try {
-			lookupService.shutDown();
-		} catch (Throwable t) {
-			LOG.error("Failed to shut down KvStateLookupService", t);
-		}
-
-		try {
-			kvStateClient.shutDown();
-		} catch (Throwable t) {
-			LOG.error("Failed to shut down KvStateClient", t);
-		}
-
-		if (actorSystem != null) {
-			try {
-				actorSystem.shutdown();
-			} catch (Throwable t) {
-				LOG.error("Failed to shut down ActorSystem", t);
-			}
-		}
+	public ExecutionConfig getExecutionConfig() {
+		return executionConfig;
 	}
 
 	/**
-	 * Returns a future holding the serialized request result.
-	 *
-	 * <p>If the server does not serve a KvState instance with the given ID,
-	 * the Future will be failed with a {@link UnknownKvStateID}.
-	 *
-	 * <p>If the KvState instance does not hold any data for the given key
-	 * and namespace, the Future will be failed with a {@link UnknownKeyOrNamespace}.
-	 *
-	 * <p>All other failures are forwarded to the Future.
-	 *
-	 * @param jobId                     JobID of the job the queryable state
-	 *                                  belongs to
-	 * @param queryableStateName        Name under which the state is queryable
-	 * @param keyHashCode               Integer hash code of the key (result of
-	 *                                  a call to {@link Object#hashCode()}
-	 * @param serializedKeyAndNamespace Serialized key and namespace to query
-	 *                                  KvState instance with
-	 * @return Future holding the serialized result
-	 */
-	@SuppressWarnings("unchecked")
-	public Future<byte[]> getKvState(
-			final JobID jobId,
-			final String queryableStateName,
-			final int keyHashCode,
-			final byte[] serializedKeyAndNamespace) {
-
-		return getKvState(jobId, queryableStateName, keyHashCode, serializedKeyAndNamespace, false)
-				.recoverWith(new Recover<Future<byte[]>>() {
-					@Override
-					public Future<byte[]> recover(Throwable failure) throws Throwable {
-						if (failure instanceof UnknownKvStateID ||
-								failure instanceof UnknownKvStateKeyGroupLocation ||
-								failure instanceof UnknownKvStateLocation ||
-								failure instanceof ConnectException) {
-							// These failures are likely to be caused by out-of-sync
-							// KvStateLocation. Therefore we retry this query and
-							// force look up the location.
-							return getKvState(
-									jobId,
-									queryableStateName,
-									keyHashCode,
-									serializedKeyAndNamespace,
-									true);
-						} else {
-							return Futures.failed(failure);
-						}
-					}
-				}, executionContext);
+	 * Replaces the existing {@link ExecutionConfig} (possibly {@code null}), with the provided one.
+	 * @param config The new {@code configuration}.
+	 * @return The old configuration, or {@code null} if none was specified.
+	 * */
+	public ExecutionConfig setExecutionConfig(ExecutionConfig config) {
+		ExecutionConfig prev = executionConfig;
+		this.executionConfig = config;
+		return prev;
 	}
 
 	/**
-	 * Returns a future holding the request result.
-	 *
-	 * <p>If the server does not serve a KvState instance with the given ID,
-	 * the Future will be failed with a {@link UnknownKvStateID}.
-	 *
-	 * <p>If the KvState instance does not hold any data for the given key
-	 * and namespace, the Future will be failed with a {@link UnknownKeyOrNamespace}.
-	 *
-	 * <p>All other failures are forwarded to the Future.
-	 *
+	 * Returns a future holding the request result.	 *
 	 * @param jobId                     JobID of the job the queryable state belongs to.
 	 * @param queryableStateName        Name under which the state is queryable.
 	 * @param key			            The key we are interested in.
@@ -333,7 +144,7 @@ public class QueryableStateClient {
 	 * @return Future holding the result.
 	 */
 	@PublicEvolving
-	public <K, V> Future<V> getKvState(
+	public <K, V> CompletableFuture<V> getKvState(
 			final JobID jobId,
 			final String queryableStateName,
 			final K key,
@@ -347,16 +158,7 @@ public class QueryableStateClient {
 	}
 
 	/**
-	 * Returns a future holding the request result.
-	 *
-	 * <p>If the server does not serve a KvState instance with the given ID,
-	 * the Future will be failed with a {@link UnknownKvStateID}.
-	 *
-	 * <p>If the KvState instance does not hold any data for the given key
-	 * and namespace, the Future will be failed with a {@link UnknownKeyOrNamespace}.
-	 *
-	 * <p>All other failures are forwarded to the Future.
-	 *
+	 * Returns a future holding the request result.	 *
 	 * @param jobId                     JobID of the job the queryable state belongs to.
 	 * @param queryableStateName        Name under which the state is queryable.
 	 * @param key			            The key we are interested in.
@@ -365,30 +167,19 @@ public class QueryableStateClient {
 	 * @return Future holding the result.
 	 */
 	@PublicEvolving
-	public <K, V> Future<V> getKvState(
+	public <K, V> CompletableFuture<V> getKvState(
 			final JobID jobId,
 			final String queryableStateName,
 			final K key,
 			final TypeInformation<K> keyTypeInfo,
 			final StateDescriptor<?, V> stateDescriptor) {
 
-		Preconditions.checkNotNull(keyTypeInfo);
-
 		return getKvState(jobId, queryableStateName, key, VoidNamespace.INSTANCE,
 				keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, stateDescriptor);
 	}
 
 	/**
 	 * Returns a future holding the request result.
-	 *
-	 * <p>If the server does not serve a KvState instance with the given ID,
-	 * the Future will be failed with a {@link UnknownKvStateID}.
-	 *
-	 * <p>If the KvState instance does not hold any data for the given key
-	 * and namespace, the Future will be failed with a {@link UnknownKeyOrNamespace}.
-	 *
-	 * <p>All other failures are forwarded to the Future.
-	 *
 	 * @param jobId                     JobID of the job the queryable state belongs to.
 	 * @param queryableStateName        Name under which the state is queryable.
 	 * @param key			            The key that the state we request is associated with.
@@ -399,7 +190,7 @@ public class QueryableStateClient {
 	 * @return Future holding the result.
 	 */
 	@PublicEvolving
-	public <K, V, N> Future<V> getKvState(
+	public <K, V, N> CompletableFuture<V> getKvState(
 			final JobID jobId,
 			final String queryableStateName,
 			final K key,
@@ -420,15 +211,6 @@ public class QueryableStateClient {
 
 	/**
 	 * Returns a future holding the request result.
-	 *
-	 * <p>If the server does not serve a KvState instance with the given ID,
-	 * the Future will be failed with a {@link UnknownKvStateID}.
-	 *
-	 * <p>If the KvState instance does not hold any data for the given key
-	 * and namespace, the Future will be failed with a {@link UnknownKeyOrNamespace}.
-	 *
-	 * <p>All other failures are forwarded to the Future.
-	 *
 	 * @param jobId                     JobID of the job the queryable state belongs to.
 	 * @param queryableStateName        Name under which the state is queryable.
 	 * @param key			            The key that the state we request is associated with.
@@ -439,7 +221,7 @@ public class QueryableStateClient {
 	 * @return Future holding the result.
 	 */
 	@PublicEvolving
-	public <K, V, N> Future<V> getKvState(
+	public <K, N, V> CompletableFuture<V> getKvState(
 			final JobID jobId,
 			final String queryableStateName,
 			final K key,
@@ -448,8 +230,8 @@ public class QueryableStateClient {
 			final TypeInformation<N> namespaceTypeInfo,
 			final TypeSerializer<V> stateSerializer) {
 
+		Preconditions.checkNotNull(jobId);
 		Preconditions.checkNotNull(queryableStateName);
-
 		Preconditions.checkNotNull(key);
 		Preconditions.checkNotNull(namespace);
 
@@ -457,36 +239,25 @@ public class QueryableStateClient {
 		Preconditions.checkNotNull(namespaceTypeInfo);
 		Preconditions.checkNotNull(stateSerializer);
 
-		if (stateSerializer instanceof ListSerializer) {
-			throw new IllegalArgumentException("ListState is not supported out-of-the-box yet.");
-		}
-
 		TypeSerializer<K> keySerializer = keyTypeInfo.createSerializer(executionConfig);
 		TypeSerializer<N> namespaceSerializer = namespaceTypeInfo.createSerializer(executionConfig);
 
 		final byte[] serializedKeyAndNamespace;
 		try {
-			serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace(
-					key,
-					keySerializer,
-					namespace,
-					namespaceSerializer);
+			serializedKeyAndNamespace = KvStateSerializer
+					.serializeKeyAndNamespace(key, keySerializer, namespace, namespaceSerializer);
 		} catch (IOException e) {
-			return Futures.failed(e);
+			return FutureUtils.getFailedFuture(e);
 		}
 
-		return getKvState(jobId, queryableStateName, key.hashCode(), serializedKeyAndNamespace)
-				.flatMap(new Mapper<byte[], Future<V>>() {
-					@Override
-					public Future<V> apply(byte[] parameter) {
-						try {
-							return Futures.successful(
-									KvStateSerializer.deserializeValue(parameter, stateSerializer));
-						} catch (IOException e) {
-							return Futures.failed(e);
-						}
+		return getKvState(jobId, queryableStateName, key.hashCode(), serializedKeyAndNamespace).thenApply(
+				stateResponse -> {
+					try {
+						return KvStateSerializer.deserializeValue(stateResponse.getContent(), stateSerializer);
+					} catch (IOException e) {
+						throw new FlinkRuntimeException(e);
 					}
-				}, executionContext);
+				});
 	}
 
 	/**
@@ -499,92 +270,20 @@ public class QueryableStateClient {
 	 *                                  a call to {@link Object#hashCode()}
 	 * @param serializedKeyAndNamespace Serialized key and namespace to query
 	 *                                  KvState instance with
-	 * @param forceLookup               Flag to force lookup of the {@link KvStateLocation}
 	 * @return Future holding the serialized result
 	 */
-	private Future<byte[]> getKvState(
+	private CompletableFuture<KvStateResponse> getKvState(
 			final JobID jobId,
 			final String queryableStateName,
 			final int keyHashCode,
-			final byte[] serializedKeyAndNamespace,
-			boolean forceLookup) {
-
-		return getKvStateLookupInfo(jobId, queryableStateName, forceLookup)
-				.flatMap(new Mapper<KvStateLocation, Future<byte[]>>() {
-					@Override
-					public Future<byte[]> apply(KvStateLocation lookup) {
-						int keyGroupIndex = KeyGroupRangeAssignment.computeKeyGroupForKeyHash(keyHashCode, lookup.getNumKeyGroups());
-
-						KvStateServerAddress serverAddress = lookup.getKvStateServerAddress(keyGroupIndex);
-						if (serverAddress == null) {
-							return Futures.failed(new UnknownKvStateKeyGroupLocation());
-						} else {
-							// Query server
-							KvStateID kvStateId = lookup.getKvStateID(keyGroupIndex);
-							return kvStateClient.getKvState(serverAddress, kvStateId, serializedKeyAndNamespace);
-						}
-					}
-				}, executionContext);
-	}
-
-	/**
-	 * Lookup the {@link KvStateLocation} for the given job and queryable state
-	 * name.
-	 *
-	 * <p>The job manager will be queried for the location only if forced or no
-	 * cached location can be found. There are no guarantees about
-	 *
-	 * @param jobId              JobID the state instance belongs to.
-	 * @param queryableStateName Name under which the state instance has been published.
-	 * @param forceUpdate        Flag to indicate whether to force a update via the lookup service.
-	 * @return Future holding the KvStateLocation
-	 */
-	private Future<KvStateLocation> getKvStateLookupInfo(
-			JobID jobId,
-			final String queryableStateName,
-			boolean forceUpdate) {
-
-		if (forceUpdate) {
-			Future<KvStateLocation> lookupFuture = lookupService
-					.getKvStateLookupInfo(jobId, queryableStateName);
-			lookupCache.put(new Tuple2<>(jobId, queryableStateName), lookupFuture);
-			return lookupFuture;
-		} else {
-			Tuple2<JobID, String> cacheKey = new Tuple2<>(jobId, queryableStateName);
-			final Future<KvStateLocation> cachedFuture = lookupCache.get(cacheKey);
-
-			if (cachedFuture == null) {
-				Future<KvStateLocation> lookupFuture = lookupService
-						.getKvStateLookupInfo(jobId, queryableStateName);
-
-				Future<KvStateLocation> previous = lookupCache.putIfAbsent(cacheKey, lookupFuture);
-				if (previous == null) {
-					return lookupFuture;
-				} else {
-					return previous;
-				}
-			} else {
-				// do not retain futures which failed as they will remain in
-				// the cache even if the error cause is not present any more
-				// and a new lookup may succeed
-				if (cachedFuture.isCompleted() &&
-						cachedFuture.value().get().isFailure()) {
-					// issue a new lookup
-					Future<KvStateLocation> lookupFuture = lookupService
-							.getKvStateLookupInfo(jobId, queryableStateName);
-
-					// replace the existing one if it has not been replaced yet
-					// otherwise return the one in the cache
-					if (lookupCache.replace(cacheKey, cachedFuture, lookupFuture)) {
-						return lookupFuture;
-					} else {
-						return lookupCache.get(cacheKey);
-					}
-				} else {
-					return cachedFuture;
-				}
-			}
+			final byte[] serializedKeyAndNamespace) {
+		LOG.info("Sending State Request to {}.", remoteAddress);
+		try {
+			KvStateRequest request = new KvStateRequest(jobId, queryableStateName, keyHashCode, serializedKeyAndNamespace);
+			return client.sendRequest(remoteAddress, request);
+		} catch (Exception e) {
+			LOG.error("Unable to send KVStateRequest: ", e);
+			return FutureUtils.getFailedFuture(e);
 		}
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
new file mode 100644
index 0000000..d7191b6
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client.proxy;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.queryablestate.UnknownKvStateIdException;
+import org.apache.flink.queryablestate.UnknownKvStateKeyGroupLocationException;
+import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
+import org.apache.flink.queryablestate.messages.KvStateRequest;
+import org.apache.flink.queryablestate.messages.KvStateResponse;
+import org.apache.flink.queryablestate.network.AbstractServerHandler;
+import org.apache.flink.queryablestate.network.Client;
+import org.apache.flink.queryablestate.network.messages.MessageSerializer;
+import org.apache.flink.queryablestate.server.KvStateServerImpl;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.query.KvStateClientProxy;
+import org.apache.flink.runtime.query.KvStateID;
+import org.apache.flink.runtime.query.KvStateLocation;
+import org.apache.flink.runtime.query.KvStateMessage;
+import org.apache.flink.runtime.query.KvStateServerAddress;
+import org.apache.flink.runtime.query.UnknownKvStateLocation;
+import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
+import org.apache.flink.runtime.query.netty.KvStateRequestStats;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.ConnectException;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import scala.concurrent.duration.FiniteDuration;
+import scala.reflect.ClassTag$;
+
+/**
+ * This handler acts as an internal (to the Flink cluster) client that receives
+ * the requests from external clients, executes them by contacting the Job Manager (if necessary) and
+ * the Task Manager holding the requested state, and forwards the answer back to the client.
+ */
+@Internal
+@ChannelHandler.Sharable
+public class KvStateClientProxyHandler extends AbstractServerHandler<KvStateRequest, KvStateResponse> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(KvStateClientProxyHandler.class);
+
+	/** The proxy using this handler. */
+	private final KvStateClientProxy proxy;
+
+	/** A cache to hold the location of different states for which we have already seen requests. */
+	private final ConcurrentMap<Tuple2<JobID, String>, CompletableFuture<KvStateLocation>> lookupCache =
+			new ConcurrentHashMap<>();
+
+	/**
+	 * Network client to forward queries to {@link KvStateServerImpl state server}
+	 * instances inside the cluster.
+	 */
+	private final Client<KvStateInternalRequest, KvStateResponse> kvStateClient;
+
+	/**
+	 * Create the handler used by the {@link KvStateClientProxyImpl}.
+	 *
+	 * @param proxy the {@link KvStateClientProxyImpl proxy} using the handler.
+	 * @param queryExecutorThreads the number of threads used to process incoming requests.
+	 * @param serializer the {@link MessageSerializer} used to (de-) serialize the different messages.
+	 * @param stats server statistics collector.
+	 */
+	public KvStateClientProxyHandler(
+			final KvStateClientProxyImpl proxy,
+			final int queryExecutorThreads,
+			final MessageSerializer<KvStateRequest, KvStateResponse> serializer,
+			final KvStateRequestStats stats) {
+
+		super(proxy, serializer, stats);
+		this.proxy = Preconditions.checkNotNull(proxy);
+		this.kvStateClient = createInternalClient(queryExecutorThreads);
+	}
+
+	private static Client<KvStateInternalRequest, KvStateResponse> createInternalClient(int threads) {
+		final MessageSerializer<KvStateInternalRequest, KvStateResponse> messageSerializer =
+				new MessageSerializer<>(
+						new KvStateInternalRequest.KvStateInternalRequestDeserializer(),
+						new KvStateResponse.KvStateResponseDeserializer());
+
+		return new Client<>(
+				"Queryable State Proxy Client",
+				threads,
+				messageSerializer,
+				new DisabledKvStateRequestStats());
+	}
+
+	@Override
+	public CompletableFuture<KvStateResponse> handleRequest(
+			final long requestId,
+			final KvStateRequest request) {
+		CompletableFuture<KvStateResponse> response = new CompletableFuture<>();
+		executeActionAsync(response, request, false);
+		return response;
+	}
+
+	private void executeActionAsync(
+			final CompletableFuture<KvStateResponse> result,
+			final KvStateRequest request,
+			final boolean update) {
+
+		if (!result.isDone()) {
+			final CompletableFuture<KvStateResponse> operationFuture = getState(request, update);
+			operationFuture.whenCompleteAsync(
+					(t, throwable) -> {
+						if (throwable != null) {
+							if (throwable instanceof CancellationException) {
+								result.completeExceptionally(throwable);
+							} else if (throwable.getCause() instanceof UnknownKvStateIdException ||
+									throwable.getCause() instanceof UnknownKvStateKeyGroupLocationException ||
+									throwable.getCause() instanceof UnknownKvStateLocation ||
+									throwable.getCause() instanceof ConnectException) {
+
+								// These failures are likely to be caused by out-of-sync
+								// KvStateLocation. Therefore we retry this query and
+								// force look up the location.
+
+								executeActionAsync(result, request, true);
+							} else {
+								result.completeExceptionally(throwable);
+							}
+						} else {
+							result.complete(t);
+						}
+					}, queryExecutor);
+
+			result.whenComplete(
+					(t, throwable) -> operationFuture.cancel(false));
+		}
+	}
+
+	private CompletableFuture<KvStateResponse> getState(
+			final KvStateRequest request,
+			final boolean forceUpdate) {
+
+		return getKvStateLookupInfo(request.getJobId(), request.getStateName(), forceUpdate)
+				.thenComposeAsync((Function<KvStateLocation, CompletableFuture<KvStateResponse>>) location -> {
+					final int keyGroupIndex = KeyGroupRangeAssignment.computeKeyGroupForKeyHash(
+							request.getKeyHashCode(), location.getNumKeyGroups());
+
+					final KvStateServerAddress serverAddress = location.getKvStateServerAddress(keyGroupIndex);
+					if (serverAddress == null) {
+						return FutureUtils.getFailedFuture(new UnknownKvStateKeyGroupLocationException(getServerName()));
+					} else {
+						// Query server
+						final KvStateID kvStateId = location.getKvStateID(keyGroupIndex);
+						final KvStateInternalRequest internalRequest = new KvStateInternalRequest(
+								kvStateId, request.getSerializedKeyAndNamespace());
+						return kvStateClient.sendRequest(serverAddress, internalRequest);
+					}
+				}, queryExecutor);
+	}
+
+	/**
+	 * Lookup the {@link KvStateLocation} for the given job and queryable state name.
+	 *
+	 * <p>The job manager will be queried for the location only if forced or no
+	 * cached location can be found. There are no guarantees about
+	 *
+	 * @param jobId              JobID the state instance belongs to.
+	 * @param queryableStateName Name under which the state instance has been published.
+	 * @param forceUpdate        Flag to indicate whether to force a update via the lookup service.
+	 * @return Future holding the KvStateLocation
+	 */
+	private CompletableFuture<KvStateLocation> getKvStateLookupInfo(
+			final JobID jobId,
+			final String queryableStateName,
+			final boolean forceUpdate) {
+
+		final Tuple2<JobID, String> cacheKey = new Tuple2<>(jobId, queryableStateName);
+		final CompletableFuture<KvStateLocation> cachedFuture = lookupCache.get(cacheKey);
+
+		if (!forceUpdate && cachedFuture != null && !cachedFuture.isCompletedExceptionally()) {
+			LOG.debug("Retrieving location for state={} of job={} from the cache.", jobId, queryableStateName);
+			return cachedFuture;
+		}
+
+		LOG.debug("Retrieving location for state={} of job={} from the job manager.", jobId, queryableStateName);
+
+		return proxy.getJobManagerFuture().thenComposeAsync(
+				jobManagerGateway -> {
+					final Object msg = new KvStateMessage.LookupKvStateLocation(jobId, queryableStateName);
+					final CompletableFuture<KvStateLocation> locationFuture = FutureUtils.toJava(
+							jobManagerGateway.ask(msg, FiniteDuration.apply(1000L, TimeUnit.MILLISECONDS))
+									.mapTo(ClassTag$.MODULE$.<KvStateLocation>apply(KvStateLocation.class)));
+
+					lookupCache.put(cacheKey, locationFuture);
+					return locationFuture;
+				}, queryExecutor);
+	}
+
+	@Override
+	public void shutdown() {
+		kvStateClient.shutdown();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java
new file mode 100644
index 0000000..bca80de
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client.proxy;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.queryablestate.UnknownJobManagerException;
+import org.apache.flink.queryablestate.messages.KvStateRequest;
+import org.apache.flink.queryablestate.messages.KvStateResponse;
+import org.apache.flink.queryablestate.network.AbstractServerBase;
+import org.apache.flink.queryablestate.network.AbstractServerHandler;
+import org.apache.flink.queryablestate.network.messages.MessageSerializer;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.query.KvStateClientProxy;
+import org.apache.flink.runtime.query.KvStateServerAddress;
+import org.apache.flink.runtime.query.netty.KvStateRequestStats;
+import org.apache.flink.util.Preconditions;
+
+import java.net.InetAddress;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The default implementation of the {@link KvStateClientProxy}.
+ */
+@Internal
+public class KvStateClientProxyImpl extends AbstractServerBase<KvStateRequest, KvStateResponse> implements KvStateClientProxy {
+
+	private static final CompletableFuture<ActorGateway> UNKNOWN_JOB_MANAGER =
+			FutureUtils.getFailedFuture(new UnknownJobManagerException());
+
+	/** Number of threads used to process incoming requests. */
+	private final int queryExecutorThreads;
+
+	/** Statistics collector. */
+	private final KvStateRequestStats stats;
+
+	private final Object leaderLock = new Object();
+
+	private CompletableFuture<ActorGateway> jobManagerFuture = UNKNOWN_JOB_MANAGER;
+
+	/**
+	 * Creates the Queryable State Client Proxy.
+	 *
+	 * <p>The server is instantiated using reflection by the
+	 * {@link org.apache.flink.runtime.query.QueryableStateUtils#createKvStateClientProxy(InetAddress, int, int, int, KvStateRequestStats)
+	 * QueryableStateUtils.startKvStateClientProxy(InetAddress, int, int, int, KvStateRequestStats)}.
+	 *
+	 * <p>The server needs to be started via {@link #start()} in order to bind
+	 * to the configured bind address.
+	 *
+	 * @param bindAddress the address to listen to.
+	 * @param bindPort the port to listen to.
+	 * @param numEventLoopThreads number of event loop threads.
+	 * @param numQueryThreads number of query threads.
+	 * @param stats the statistics collector.
+	 */
+	public KvStateClientProxyImpl(
+			final InetAddress bindAddress,
+			final Integer bindPort,
+			final Integer numEventLoopThreads,
+			final Integer numQueryThreads,
+			final KvStateRequestStats stats) {
+
+		super("Queryable State Proxy Server", bindAddress, bindPort, numEventLoopThreads, numQueryThreads);
+		Preconditions.checkArgument(numQueryThreads >= 1, "Non-positive number of query threads.");
+		this.queryExecutorThreads = numQueryThreads;
+		this.stats = Preconditions.checkNotNull(stats);
+	}
+
+	@Override
+	public KvStateServerAddress getServerAddress() {
+		return super.getServerAddress();
+	}
+
+	@Override
+	public void start() throws InterruptedException {
+		super.start();
+	}
+
+	@Override
+	public void shutdown() {
+		super.shutdown();
+	}
+
+	@Override
+	public void updateJobManager(CompletableFuture<ActorGateway> leadingJobManager) throws Exception {
+		synchronized (leaderLock) {
+			if (leadingJobManager == null) {
+				jobManagerFuture = UNKNOWN_JOB_MANAGER;
+			} else {
+				jobManagerFuture = leadingJobManager;
+			}
+		}
+	}
+
+	@Override
+	public CompletableFuture<ActorGateway> getJobManagerFuture() {
+		synchronized (leaderLock) {
+			return jobManagerFuture;
+		}
+	}
+
+	@Override
+	public AbstractServerHandler<KvStateRequest, KvStateResponse> initializeHandler() {
+		MessageSerializer<KvStateRequest, KvStateResponse> serializer =
+				new MessageSerializer<>(
+						new KvStateRequest.KvStateRequestDeserializer(),
+						new KvStateResponse.KvStateResponseDeserializer());
+		return new KvStateClientProxyHandler(this, queryExecutorThreads, serializer, stats);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateInternalRequest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateInternalRequest.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateInternalRequest.java
new file mode 100644
index 0000000..eedc2a1
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateInternalRequest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.messages;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.queryablestate.network.messages.MessageBody;
+import org.apache.flink.queryablestate.network.messages.MessageDeserializer;
+import org.apache.flink.runtime.query.KvStateID;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+
+import java.nio.ByteBuffer;
+
+/**
+ * The request to be forwarded by the {@link org.apache.flink.runtime.query.KvStateClientProxy
+ * Queryable State Client Proxy} to the {@link org.apache.flink.runtime.query.KvStateServer State Server}
+ * of the Task Manager responsible for the requested state.
+ */
+@Internal
+public class KvStateInternalRequest extends MessageBody {
+
+	private final KvStateID kvStateId;
+	private final byte[] serializedKeyAndNamespace;
+
+	public KvStateInternalRequest(
+			final KvStateID stateId,
+			final byte[] serializedKeyAndNamespace) {
+
+		this.kvStateId = Preconditions.checkNotNull(stateId);
+		this.serializedKeyAndNamespace = Preconditions.checkNotNull(serializedKeyAndNamespace);
+	}
+
+	public KvStateID getKvStateId() {
+		return kvStateId;
+	}
+
+	public byte[] getSerializedKeyAndNamespace() {
+		return serializedKeyAndNamespace;
+	}
+
+	@Override
+	public byte[] serialize() {
+
+		// KvStateId + sizeOf(serializedKeyAndNamespace) + serializedKeyAndNamespace
+		final int size = KvStateID.SIZE + Integer.BYTES + serializedKeyAndNamespace.length;
+
+		return ByteBuffer.allocate(size)
+				.putLong(kvStateId.getLowerPart())
+				.putLong(kvStateId.getUpperPart())
+				.putInt(serializedKeyAndNamespace.length)
+				.put(serializedKeyAndNamespace)
+				.array();
+	}
+
+	/**
+	 * A {@link MessageDeserializer deserializer} for {@link KvStateInternalRequest}.
+	 */
+	public static class KvStateInternalRequestDeserializer implements MessageDeserializer<KvStateInternalRequest> {
+
+		@Override
+		public KvStateInternalRequest deserializeMessage(ByteBuf buf) {
+			KvStateID kvStateId = new KvStateID(buf.readLong(), buf.readLong());
+
+			int length = buf.readInt();
+			Preconditions.checkArgument(length >= 0,
+					"Negative length for key and namespace. " +
+							"This indicates a serialization error.");
+
+			byte[] serializedKeyAndNamespace = new byte[length];
+			if (length > 0) {
+				buf.readBytes(serializedKeyAndNamespace);
+			}
+			return new KvStateInternalRequest(kvStateId, serializedKeyAndNamespace);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateRequest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateRequest.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateRequest.java
index eb33bce..7eb39c7 100644
--- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateRequest.java
+++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateRequest.java
@@ -18,72 +18,124 @@
 
 package org.apache.flink.queryablestate.messages;
 
-import org.apache.flink.runtime.query.KvStateID;
-import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.queryablestate.network.messages.MessageBody;
+import org.apache.flink.queryablestate.network.messages.MessageDeserializer;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
 /**
- * A {@link InternalKvState} instance request for a specific key and namespace.
+ * The request to be sent by the {@link org.apache.flink.queryablestate.client.QueryableStateClient
+ * Queryable State Client} to the {@link org.apache.flink.runtime.query.KvStateClientProxy Client Proxy}
+ * requesting a given state.
  */
-public final class KvStateRequest {
+@Internal
+public class KvStateRequest extends MessageBody {
 
-	/** ID for this request. */
-	private final long requestId;
+	private final JobID jobId;
+	private final String stateName;
+	private final int keyHashCode;
+	private final byte[] serializedKeyAndNamespace;
 
-	/** ID of the requested KvState instance. */
-	private final KvStateID kvStateId;
+	public KvStateRequest(
+			final JobID jobId,
+			final String stateName,
+			final int keyHashCode,
+			final byte[] serializedKeyAndNamespace) {
 
-	/** Serialized key and namespace to request from the KvState instance. */
-	private final byte[] serializedKeyAndNamespace;
+		this.jobId = Preconditions.checkNotNull(jobId);
+		this.stateName = Preconditions.checkNotNull(stateName);
+		this.keyHashCode = keyHashCode;
+		this.serializedKeyAndNamespace = Preconditions.checkNotNull(serializedKeyAndNamespace);
+	}
 
-	/**
-	 * Creates a KvState instance request.
-	 *
-	 * @param requestId                 ID for this request
-	 * @param kvStateId                 ID of the requested KvState instance
-	 * @param serializedKeyAndNamespace Serialized key and namespace to request from the KvState
-	 *                                  instance
-	 */
-	public KvStateRequest(long requestId, KvStateID kvStateId, byte[] serializedKeyAndNamespace) {
-		this.requestId = requestId;
-		this.kvStateId = Preconditions.checkNotNull(kvStateId, "KvStateID");
-		this.serializedKeyAndNamespace = Preconditions.checkNotNull(serializedKeyAndNamespace, "Serialized key and namespace");
+	public JobID getJobId() {
+		return jobId;
 	}
 
-	/**
-	 * Returns the request ID.
-	 *
-	 * @return Request ID
-	 */
-	public long getRequestId() {
-		return requestId;
+	public String getStateName() {
+		return stateName;
 	}
 
-	/**
-	 * Returns the ID of the requested KvState instance.
-	 *
-	 * @return ID of the requested KvState instance
-	 */
-	public KvStateID getKvStateId() {
-		return kvStateId;
+	public int getKeyHashCode() {
+		return keyHashCode;
 	}
 
-	/**
-	 * Returns the serialized key and namespace to request from the KvState
-	 * instance.
-	 *
-	 * @return Serialized key and namespace to request from the KvState instance
-	 */
 	public byte[] getSerializedKeyAndNamespace() {
 		return serializedKeyAndNamespace;
 	}
 
 	@Override
+	public byte[] serialize() {
+
+		byte[] serializedStateName = stateName.getBytes();
+
+		// JobID + stateName + sizeOf(stateName) + hashCode + keyAndNamespace + sizeOf(keyAndNamespace)
+		final int size =
+				JobID.SIZE +
+				serializedStateName.length + Integer.BYTES +
+				Integer.BYTES +
+				serializedKeyAndNamespace.length + Integer.BYTES;
+
+		return ByteBuffer.allocate(size)
+				.putLong(jobId.getLowerPart())
+				.putLong(jobId.getUpperPart())
+				.putInt(serializedStateName.length)
+				.put(serializedStateName)
+				.putInt(keyHashCode)
+				.putInt(serializedKeyAndNamespace.length)
+				.put(serializedKeyAndNamespace)
+				.array();
+	}
+
+	@Override
 	public String toString() {
 		return "KvStateRequest{" +
-				"requestId=" + requestId +
-				", kvStateId=" + kvStateId +
-				", serializedKeyAndNamespace.length=" + serializedKeyAndNamespace.length +
+				"jobId=" + jobId +
+				", stateName='" + stateName + '\'' +
+				", keyHashCode=" + keyHashCode +
+				", serializedKeyAndNamespace=" + Arrays.toString(serializedKeyAndNamespace) +
 				'}';
 	}
+
+	/**
+	 * A {@link MessageDeserializer deserializer} for {@link KvStateRequest}.
+	 */
+	public static class KvStateRequestDeserializer implements MessageDeserializer<KvStateRequest> {
+
+		@Override
+		public KvStateRequest deserializeMessage(ByteBuf buf) {
+			JobID jobId = new JobID(buf.readLong(), buf.readLong());
+
+			int statenameLength = buf.readInt();
+			Preconditions.checkArgument(statenameLength >= 0,
+					"Negative length for state name. " +
+							"This indicates a serialization error.");
+
+			String stateName = "";
+			if (statenameLength > 0) {
+				byte[] name = new byte[statenameLength];
+				buf.readBytes(name);
+				stateName = new String(name);
+			}
+
+			int keyHashCode = buf.readInt();
+
+			int knamespaceLength = buf.readInt();
+			Preconditions.checkArgument(knamespaceLength >= 0,
+					"Negative length for key and namespace. " +
+							"This indicates a serialization error.");
+
+			byte[] serializedKeyAndNamespace = new byte[knamespaceLength];
+			if (knamespaceLength > 0) {
+				buf.readBytes(serializedKeyAndNamespace);
+			}
+			return new KvStateRequest(jobId, stateName, keyHashCode, serializedKeyAndNamespace);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateRequestFailure.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateRequestFailure.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateRequestFailure.java
deleted file mode 100644
index 4015d79..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateRequestFailure.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.messages;
-
-/**
- * A failure response to a {@link KvStateRequest}.
- */
-public final class KvStateRequestFailure {
-
-	/** ID of the request responding to. */
-	private final long requestId;
-
-	/** Failure cause. Not allowed to be a user type. */
-	private final Throwable cause;
-
-	/**
-	 * Creates a failure response to a {@link KvStateRequest}.
-	 *
-	 * @param requestId ID for the request responding to
-	 * @param cause     Failure cause (not allowed to be a user type)
-	 */
-	public KvStateRequestFailure(long requestId, Throwable cause) {
-		this.requestId = requestId;
-		this.cause = cause;
-	}
-
-	/**
-	 * Returns the request ID responding to.
-	 *
-	 * @return Request ID responding to
-	 */
-	public long getRequestId() {
-		return requestId;
-	}
-
-	/**
-	 * Returns the failure cause.
-	 *
-	 * @return Failure cause
-	 */
-	public Throwable getCause() {
-		return cause;
-	}
-
-	@Override
-	public String toString() {
-		return "KvStateRequestFailure{" +
-				"requestId=" + requestId +
-				", cause=" + cause +
-				'}';
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateRequestResult.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateRequestResult.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateRequestResult.java
deleted file mode 100644
index 6bf2397..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateRequestResult.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.messages;
-
-import org.apache.flink.util.Preconditions;
-
-/**
- * A successful response to a {@link KvStateRequest} containing the serialized
- * result for the requested key and namespace.
- */
-public final class KvStateRequestResult {
-
-	/** ID of the request responding to. */
-	private final long requestId;
-
-	/**
-	 * Serialized result for the requested key and namespace. If no result was
-	 * available for the specified key and namespace, this is <code>null</code>.
-	 */
-	private final byte[] serializedResult;
-
-	/**
-	 * Creates a successful {@link KvStateRequestResult} response.
-	 *
-	 * @param requestId        ID of the request responding to
-	 * @param serializedResult Serialized result or <code>null</code> if none
-	 */
-	public KvStateRequestResult(long requestId, byte[] serializedResult) {
-		this.requestId = requestId;
-		this.serializedResult = Preconditions.checkNotNull(serializedResult, "Serialization result");
-	}
-
-	/**
-	 * Returns the request ID responding to.
-	 *
-	 * @return Request ID responding to
-	 */
-	public long getRequestId() {
-		return requestId;
-	}
-
-	/**
-	 * Returns the serialized result or <code>null</code> if none available.
-	 *
-	 * @return Serialized result or <code>null</code> if none available.
-	 */
-	public byte[] getSerializedResult() {
-		return serializedResult;
-	}
-
-	@Override
-	public String toString() {
-		return "KvStateRequestResult{" +
-				"requestId=" + requestId +
-				", serializedResult.length=" + serializedResult.length +
-				'}';
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateResponse.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateResponse.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateResponse.java
new file mode 100644
index 0000000..462135f
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateResponse.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.messages;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.queryablestate.network.messages.MessageBody;
+import org.apache.flink.queryablestate.network.messages.MessageDeserializer;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+
+import java.nio.ByteBuffer;
+
+/**
+ * The response containing the (serialized) state sent by the {@link org.apache.flink.runtime.query.KvStateServer
+ * State Server} to the {@link org.apache.flink.runtime.query.KvStateClientProxy Client Proxy}, and then forwarded
+ * by the proxy to the original {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State
+ * Client}.
+ */
+@Internal
+public class KvStateResponse extends MessageBody {
+
+	private final byte[] content;
+
+	public KvStateResponse(final byte[] content) {
+		this.content = Preconditions.checkNotNull(content);
+	}
+
+	public byte[] getContent() {
+		return content;
+	}
+
+	@Override
+	public byte[] serialize() {
+		final int size = Integer.BYTES + content.length;
+		return ByteBuffer.allocate(size)
+				.putInt(content.length)
+				.put(content)
+				.array();
+	}
+
+	/**
+	 * A {@link MessageDeserializer deserializer} for {@link KvStateResponseDeserializer}.
+	 */
+	public static class KvStateResponseDeserializer implements MessageDeserializer<KvStateResponse> {
+
+		@Override
+		public KvStateResponse deserializeMessage(ByteBuf buf) {
+			int length = buf.readInt();
+			Preconditions.checkArgument(length >= 0,
+					"Negative length for state content. " +
+							"This indicates a serialization error.");
+			byte[] content = new byte[length];
+			buf.readBytes(content);
+
+			return new KvStateResponse(content);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
new file mode 100644
index 0000000..4bf8e98
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.queryablestate.network.messages.MessageBody;
+import org.apache.flink.runtime.io.network.netty.NettyBufferPool;
+import org.apache.flink.runtime.query.KvStateServerAddress;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption;
+import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
+import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
+import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The base class for every server in the queryable state module.
+ * It is using pure netty to send and receive messages of type {@link MessageBody}.
+ *
+ * @param <REQ> the type of request the server expects to receive.
+ * @param <RESP> the type of response the server will send.
+ */
+@Internal
+public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends MessageBody> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(AbstractServerBase.class);
+
+	/** AbstractServerBase config: low water mark. */
+	private static final int LOW_WATER_MARK = 8 * 1024;
+
+	/** AbstractServerBase config: high water mark. */
+	private static final int HIGH_WATER_MARK = 32 * 1024;
+
+	private final String serverName;
+
+	/** Netty's ServerBootstrap. */
+	private final ServerBootstrap bootstrap;
+
+	/** Query executor thread pool. */
+	private final ExecutorService queryExecutor;
+
+	/** Address of this server. */
+	private KvStateServerAddress serverAddress;
+
+	/** The handler used for the incoming messages. */
+	private AbstractServerHandler<REQ, RESP> handler;
+
+	/**
+	 * Creates the {@link AbstractServerBase}.
+	 *
+	 * <p>The server needs to be started via {@link #start()} in order to bind
+	 * to the configured bind address.
+	 *
+	 * @param serverName the name of the server
+	 * @param bindAddress address to bind to
+	 * @param bindPort port to bind to (random port if 0)
+	 * @param numEventLoopThreads number of event loop threads
+	 */
+	protected AbstractServerBase(
+			final String serverName,
+			final InetAddress bindAddress,
+			final Integer bindPort,
+			final Integer numEventLoopThreads,
+			final Integer numQueryThreads) {
+
+		Preconditions.checkNotNull(bindAddress);
+		Preconditions.checkArgument(bindPort >= 0 && bindPort <= 65536, "Port " + bindPort + " out of valid range (0-65536).");
+		Preconditions.checkArgument(numEventLoopThreads >= 1, "Non-positive number of event loop threads.");
+		Preconditions.checkArgument(numQueryThreads >= 1, "Non-positive number of query threads.");
+
+		this.serverName = Preconditions.checkNotNull(serverName);
+		this.queryExecutor = createQueryExecutor(numQueryThreads);
+
+		final NettyBufferPool bufferPool = new NettyBufferPool(numEventLoopThreads);
+
+		final ThreadFactory threadFactory = new ThreadFactoryBuilder()
+				.setDaemon(true)
+				.setNameFormat("Flink " + serverName + " EventLoop Thread %d")
+				.build();
+
+		final NioEventLoopGroup nioGroup = new NioEventLoopGroup(numEventLoopThreads, threadFactory);
+
+		bootstrap = new ServerBootstrap()
+				// Bind address and port
+				.localAddress(bindAddress, bindPort)
+				// NIO server channels
+				.group(nioGroup)
+				.channel(NioServerSocketChannel.class)
+				// AbstractServerBase channel Options
+				.option(ChannelOption.ALLOCATOR, bufferPool)
+				// Child channel options
+				.childOption(ChannelOption.ALLOCATOR, bufferPool)
+				.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, HIGH_WATER_MARK)
+				.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, LOW_WATER_MARK);
+	}
+
+	/**
+	 * Creates a thread pool for the query execution.
+	 *
+	 * @param numQueryThreads Number of query threads.
+	 * @return Thread pool for query execution
+	 */
+	private ExecutorService createQueryExecutor(int numQueryThreads) {
+		ThreadFactory threadFactory = new ThreadFactoryBuilder()
+				.setDaemon(true)
+				.setNameFormat("Flink " + getServerName() + " Thread %d")
+				.build();
+
+		return Executors.newFixedThreadPool(numQueryThreads, threadFactory);
+	}
+
+	protected ExecutorService getQueryExecutor() {
+		return queryExecutor;
+	}
+
+	public String getServerName() {
+		return serverName;
+	}
+
+	public abstract AbstractServerHandler<REQ, RESP> initializeHandler();
+
+	/**
+	 * Starts the server by binding to the configured bind address (blocking).
+	 * @throws InterruptedException If interrupted during the bind operation
+	 */
+	public void start() throws InterruptedException {
+		Preconditions.checkState(serverAddress == null,
+				"Server " + serverName + " has already been started @ " + serverAddress + '.');
+
+		this.handler = initializeHandler();
+		bootstrap.childHandler(new ServerChannelInitializer<>(handler));
+
+		Channel channel = bootstrap.bind().sync().channel();
+		InetSocketAddress localAddress = (InetSocketAddress) channel.localAddress();
+		serverAddress = new KvStateServerAddress(localAddress.getAddress(), localAddress.getPort());
+
+		LOG.info("Started server {} @ {}", serverName, serverAddress);
+	}
+
+	/**
+	 * Returns the address of this server.
+	 *
+	 * @return AbstractServerBase address
+	 * @throws IllegalStateException If server has not been started yet
+	 */
+	public KvStateServerAddress getServerAddress() {
+		Preconditions.checkState(serverAddress != null, "Server " + serverName + " has not been started.");
+		return serverAddress;
+	}
+
+	/**
+	 * Shuts down the server and all related thread pools.
+	 */
+	public void shutdown() {
+		LOG.info("Shutting down server {} @ {}", serverName, serverAddress);
+
+		if (handler != null) {
+			handler.shutdown();
+		}
+
+		if (queryExecutor != null) {
+			queryExecutor.shutdown();
+		}
+
+		if (bootstrap != null) {
+			EventLoopGroup group = bootstrap.group();
+			if (group != null) {
+				group.shutdownGracefully(0L, 10L, TimeUnit.SECONDS);
+			}
+		}
+		serverAddress = null;
+	}
+
+	/**
+	 * Channel pipeline initializer.
+	 *
+	 * <p>The request handler is shared, whereas the other handlers are created
+	 * per channel.
+	 */
+	private static final class ServerChannelInitializer<REQ extends MessageBody, RESP extends MessageBody> extends ChannelInitializer<SocketChannel> {
+
+		/** The shared request handler. */
+		private final AbstractServerHandler<REQ, RESP> sharedRequestHandler;
+
+		/**
+		 * Creates the channel pipeline initializer with the shared request handler.
+		 *
+		 * @param sharedRequestHandler Shared request handler.
+		 */
+		ServerChannelInitializer(AbstractServerHandler<REQ, RESP> sharedRequestHandler) {
+			this.sharedRequestHandler = Preconditions.checkNotNull(sharedRequestHandler, "MessageBody handler");
+		}
+
+		@Override
+		protected void initChannel(SocketChannel channel) throws Exception {
+			channel.pipeline()
+					.addLast(new ChunkedWriteHandler())
+					.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))
+					.addLast(sharedRequestHandler);
+		}
+	}
+
+	@VisibleForTesting
+	public boolean isExecutorShutdown() {
+		return queryExecutor.isShutdown();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
new file mode 100644
index 0000000..b9bf671
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.queryablestate.network.messages.MessageBody;
+import org.apache.flink.queryablestate.network.messages.MessageSerializer;
+import org.apache.flink.queryablestate.network.messages.MessageType;
+import org.apache.flink.runtime.query.netty.KvStateRequestStats;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
+import org.apache.flink.shaded.netty4.io.netty.util.ReferenceCountUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The base class of every handler used by an {@link AbstractServerBase}.
+ *
+ * @param <REQ> the type of request the server expects to receive.
+ * @param <RESP> the type of response the server will send.
+ */
+@Internal
+@ChannelHandler.Sharable
+public abstract class AbstractServerHandler<REQ extends MessageBody, RESP extends MessageBody> extends ChannelInboundHandlerAdapter {
+
+	private static final Logger LOG = LoggerFactory.getLogger(AbstractServerHandler.class);
+
+	/** The owning server of this handler. */
+	private final AbstractServerBase<REQ, RESP> server;
+
+	/** The serializer used to (de-)serialize messages. */
+	private final MessageSerializer<REQ, RESP> serializer;
+
+	/** Thread pool for query execution. */
+	protected final ExecutorService queryExecutor;
+
+	/** Exposed server statistics. */
+	private final KvStateRequestStats stats;
+
+	/**
+	 * Create the handler.
+	 *
+	 * @param serializer the serializer used to (de-)serialize messages
+	 * @param stats statistics collector
+	 */
+	public AbstractServerHandler(
+			final AbstractServerBase<REQ, RESP> server,
+			final MessageSerializer<REQ, RESP> serializer,
+			final KvStateRequestStats stats) {
+
+		this.server = Preconditions.checkNotNull(server);
+		this.serializer = Preconditions.checkNotNull(serializer);
+		this.queryExecutor = server.getQueryExecutor();
+		this.stats = Preconditions.checkNotNull(stats);
+	}
+
+	protected String getServerName() {
+		return server.getServerName();
+	}
+
+	@Override
+	public void channelActive(ChannelHandlerContext ctx) throws Exception {
+		stats.reportActiveConnection();
+	}
+
+	@Override
+	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+		stats.reportInactiveConnection();
+	}
+
+	@Override
+	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+		REQ request = null;
+		long requestId = -1L;
+
+		try {
+			final ByteBuf buf = (ByteBuf) msg;
+			final MessageType msgType = MessageSerializer.deserializeHeader(buf);
+
+			requestId = MessageSerializer.getRequestId(buf);
+
+			if (msgType == MessageType.REQUEST) {
+
+				// ------------------------------------------------------------
+				// MessageBody
+				// ------------------------------------------------------------
+				request = serializer.deserializeRequest(buf);
+				stats.reportRequest();
+
+				// Execute actual query async, because it is possibly
+				// blocking (e.g. file I/O).
+				//
+				// A submission failure is not treated as fatal. todo here if there is a shared resource e.g. registry, then I will have to sync on that.
+				queryExecutor.submit(new AsyncRequestTask<>(this, ctx, requestId, request, stats));
+
+			} else {
+				// ------------------------------------------------------------
+				// Unexpected
+				// ------------------------------------------------------------
+
+				final String errMsg = "Unexpected message type " + msgType + ". Expected " + MessageType.REQUEST + ".";
+				final ByteBuf failure = MessageSerializer.serializeServerFailure(ctx.alloc(), new IllegalArgumentException(errMsg));
+
+				LOG.debug(errMsg);
+				ctx.writeAndFlush(failure);
+			}
+		} catch (Throwable t) {
+			final String stringifiedCause = ExceptionUtils.stringifyException(t);
+
+			String errMsg;
+			ByteBuf err;
+			if (request != null) {
+				errMsg = "Failed request with ID " + requestId + ". Caused by: " + stringifiedCause;
+				err = MessageSerializer.serializeRequestFailure(ctx.alloc(), requestId, new RuntimeException(errMsg));
+				stats.reportFailedRequest();
+			} else {
+				errMsg = "Failed incoming message. Caused by: " + stringifiedCause;
+				err = MessageSerializer.serializeServerFailure(ctx.alloc(), new RuntimeException(errMsg));
+			}
+
+			LOG.debug(errMsg);
+			ctx.writeAndFlush(err);
+
+		} finally {
+			// IMPORTANT: We have to always recycle the incoming buffer.
+			// Otherwise we will leak memory out of Netty's buffer pool.
+			//
+			// If any operation ever holds on to the buffer, it is the
+			// responsibility of that operation to retain the buffer and
+			// release it later.
+			ReferenceCountUtil.release(msg);
+		}
+	}
+
+	@Override
+	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+		final String msg = "Exception in server pipeline. Caused by: " + ExceptionUtils.stringifyException(cause);
+		final ByteBuf err = serializer.serializeServerFailure(ctx.alloc(), new RuntimeException(msg));
+
+		LOG.debug(msg);
+		ctx.writeAndFlush(err).addListener(ChannelFutureListener.CLOSE);
+	}
+
+	/**
+	 * Handles an incoming request and returns a {@link CompletableFuture} containing the corresponding response.
+	 *
+	 * <p><b>NOTE:</b> This method is called by multiple threads.
+	 *
+	 * @param requestId the id of the received request to be handled.
+	 * @param request the request to be handled.
+	 * @return A future with the response to be forwarded to the client.
+	 */
+	public abstract CompletableFuture<RESP> handleRequest(final long requestId, final REQ request);
+
+	/**
+	 * Shuts down any handler specific resources, e.g. thread pools etc.
+	 */
+	public abstract void shutdown();
+
+	/**
+	 * Task to execute the actual query against the {@link InternalKvState} instance.
+	 */
+	private static class AsyncRequestTask<REQ extends MessageBody, RESP extends MessageBody> implements Runnable {
+
+		private final AbstractServerHandler<REQ, RESP> handler;
+
+		private final ChannelHandlerContext ctx;
+
+		private final long requestId;
+
+		private final REQ request;
+
+		private final KvStateRequestStats stats;
+
+		private final long creationNanos;
+
+		AsyncRequestTask(
+				final AbstractServerHandler<REQ, RESP> handler,
+				final ChannelHandlerContext ctx,
+				final long requestId,
+				final REQ request,
+				final KvStateRequestStats stats) {
+
+			this.handler = Preconditions.checkNotNull(handler);
+			this.ctx = Preconditions.checkNotNull(ctx);
+			this.requestId = requestId;
+			this.request = Preconditions.checkNotNull(request);
+			this.stats = Preconditions.checkNotNull(stats);
+			this.creationNanos = System.nanoTime();
+		}
+
+		@Override
+		public void run() {
+
+			if (!ctx.channel().isActive()) {
+				return;
+			}
+
+			handler.handleRequest(requestId, request).whenComplete((resp, throwable) -> {
+				try {
+					if (throwable != null) {
+						throw throwable instanceof CompletionException
+								? throwable.getCause()
+								: throwable;
+					}
+
+					if (resp == null) {
+						throw new BadRequestException(handler.getServerName(), "NULL returned for request with ID " + requestId + ".");
+					}
+
+					final ByteBuf serialResp = MessageSerializer.serializeResponse(ctx.alloc(), requestId, resp);
+
+					int highWatermark = ctx.channel().config().getWriteBufferHighWaterMark();
+
+					ChannelFuture write;
+					if (serialResp.readableBytes() <= highWatermark) {
+						write = ctx.writeAndFlush(serialResp);
+					} else {
+						write = ctx.writeAndFlush(new ChunkedByteBuf(serialResp, highWatermark));
+					}
+					write.addListener(new RequestWriteListener());
+
+				} catch (BadRequestException e) {
+					try {
+						stats.reportFailedRequest();
+						final ByteBuf err = MessageSerializer.serializeRequestFailure(ctx.alloc(), requestId, e);
+						ctx.writeAndFlush(err);
+					} catch (IOException io) {
+						LOG.error("Failed to respond with the error after failed request", io);
+					}
+				} catch (Throwable t) {
+					try {
+						stats.reportFailedRequest();
+
+						final String errMsg = "Failed request " + requestId + ". Caused by: " + ExceptionUtils.stringifyException(t);
+						final ByteBuf err = MessageSerializer.serializeRequestFailure(ctx.alloc(), requestId, new RuntimeException(errMsg));
+						ctx.writeAndFlush(err);
+					} catch (IOException io) {
+						LOG.error("Failed to respond with the error after failed request", io);
+					}
+				}
+			});
+		}
+
+		@Override
+		public String toString() {
+			return "AsyncRequestTask{" +
+					"requestId=" + requestId +
+					", request=" + request +
+					'}';
+		}
+
+		/**
+		 * Callback after query result has been written.
+		 *
+		 * <p>Gathers stats and logs errors.
+		 */
+		private class RequestWriteListener implements ChannelFutureListener {
+
+			@Override
+			public void operationComplete(ChannelFuture future) throws Exception {
+				long durationNanos = System.nanoTime() - creationNanos;
+				long durationMillis = TimeUnit.MILLISECONDS.convert(durationNanos, TimeUnit.NANOSECONDS);
+
+				if (future.isSuccess()) {
+					LOG.debug("Request {} was successfully answered after {} ms.", request, durationMillis);
+					stats.reportSuccessfulRequest(durationMillis);
+				} else {
+					LOG.debug("Request {} failed after {} ms : ", request, durationMillis, future.cause());
+					stats.reportFailedRequest();
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/BadRequestException.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/BadRequestException.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/BadRequestException.java
new file mode 100644
index 0000000..3c0c484
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/BadRequestException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Base class for exceptions thrown during querying Flink's managed state.
+ */
+@Internal
+public class BadRequestException extends Exception {
+
+	private static final long serialVersionUID = 3458743952407632903L;
+
+	public BadRequestException(String serverName, String message) {
+		super(Preconditions.checkNotNull(serverName) + " : " + message);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ChunkedByteBuf.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ChunkedByteBuf.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ChunkedByteBuf.java
new file mode 100644
index 0000000..9c56025
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ChunkedByteBuf.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedInput;
+import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
+
+/**
+ * A {@link ByteBuf} instance to be consumed in chunks by {@link ChunkedWriteHandler},
+ * respecting the high and low watermarks.
+ *
+ * @see <a href="http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#10.0">Low/High Watermarks</a>
+ */
+@Internal
+public class ChunkedByteBuf implements ChunkedInput<ByteBuf> {
+
+	/** The buffer to chunk. */
+	private final ByteBuf buf;
+
+	/** Size of chunks. */
+	private final int chunkSize;
+
+	/** Closed flag. */
+	private boolean isClosed;
+
+	/** End of input flag. */
+	private boolean isEndOfInput;
+
+	public ChunkedByteBuf(ByteBuf buf, int chunkSize) {
+		this.buf = Preconditions.checkNotNull(buf, "Buffer");
+		Preconditions.checkArgument(chunkSize > 0, "Non-positive chunk size");
+		this.chunkSize = chunkSize;
+	}
+
+	@Override
+	public boolean isEndOfInput() throws Exception {
+		return isClosed || isEndOfInput;
+	}
+
+	@Override
+	public void close() throws Exception {
+		if (!isClosed) {
+			// If we did not consume the whole buffer yet, we have to release
+			// it here. Otherwise, it's the responsibility of the consumer.
+			if (!isEndOfInput) {
+				buf.release();
+			}
+
+			isClosed = true;
+		}
+	}
+
+	@Override
+	public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
+		if (isClosed) {
+			return null;
+		} else if (buf.readableBytes() <= chunkSize) {
+			isEndOfInput = true;
+
+			// Don't retain as the consumer is responsible to release it
+			return buf.slice();
+		} else {
+			// Return a chunk sized slice of the buffer. The ref count is
+			// shared with the original buffer. That's why we need to retain
+			// a reference here.
+			return buf.readSlice(chunkSize).retain();
+		}
+	}
+
+	@Override
+	public String toString() {
+		return "ChunkedByteBuf{" +
+				"buf=" + buf +
+				", chunkSize=" + chunkSize +
+				", isClosed=" + isClosed +
+				", isEndOfInput=" + isEndOfInput +
+				'}';
+	}
+}


Mime
View raw message