flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kklou...@apache.org
Subject [03/14] flink git commit: [FLINK-7769][QS] Move queryable state outside the runtime.
Date Wed, 11 Oct 2017 15:46:03 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequest.java
deleted file mode 100644
index 4b73fbb..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequest.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.query.netty.message;
-
-import org.apache.flink.runtime.query.KvStateID;
-import org.apache.flink.runtime.state.internal.InternalKvState;
-import org.apache.flink.util.Preconditions;
-
-/**
- * A {@link InternalKvState} instance request for a specific key and namespace.
- */
-public final class KvStateRequest {
-
-	/** ID for this request. */
-	private final long requestId;
-
-	/** ID of the requested KvState instance. */
-	private final KvStateID kvStateId;
-
-	/** Serialized key and namespace to request from the KvState instance. */
-	private final byte[] serializedKeyAndNamespace;
-
-	/**
-	 * Creates a KvState instance request.
-	 *
-	 * @param requestId                 ID for this request
-	 * @param kvStateId                 ID of the requested KvState instance
-	 * @param serializedKeyAndNamespace Serialized key and namespace to request from the KvState
-	 *                                  instance
-	 */
-	KvStateRequest(long requestId, KvStateID kvStateId, byte[] serializedKeyAndNamespace) {
-		this.requestId = requestId;
-		this.kvStateId = Preconditions.checkNotNull(kvStateId, "KvStateID");
-		this.serializedKeyAndNamespace = Preconditions.checkNotNull(serializedKeyAndNamespace, "Serialized key and namespace");
-	}
-
-	/**
-	 * Returns the request ID.
-	 *
-	 * @return Request ID
-	 */
-	public long getRequestId() {
-		return requestId;
-	}
-
-	/**
-	 * Returns the ID of the requested KvState instance.
-	 *
-	 * @return ID of the requested KvState instance
-	 */
-	public KvStateID getKvStateId() {
-		return kvStateId;
-	}
-
-	/**
-	 * Returns the serialized key and namespace to request from the KvState
-	 * instance.
-	 *
-	 * @return Serialized key and namespace to request from the KvState instance
-	 */
-	public byte[] getSerializedKeyAndNamespace() {
-		return serializedKeyAndNamespace;
-	}
-
-	@Override
-	public String toString() {
-		return "KvStateRequest{" +
-				"requestId=" + requestId +
-				", kvStateId=" + kvStateId +
-				", serializedKeyAndNamespace.length=" + serializedKeyAndNamespace.length +
-				'}';
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestFailure.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestFailure.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestFailure.java
deleted file mode 100644
index 06a3ce8..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestFailure.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.query.netty.message;
-
-/**
- * A failure response to a {@link KvStateRequest}.
- */
-public final class KvStateRequestFailure {
-
-	/** ID of the request responding to. */
-	private final long requestId;
-
-	/** Failure cause. Not allowed to be a user type. */
-	private final Throwable cause;
-
-	/**
-	 * Creates a failure response to a {@link KvStateRequest}.
-	 *
-	 * @param requestId ID for the request responding to
-	 * @param cause     Failure cause (not allowed to be a user type)
-	 */
-	KvStateRequestFailure(long requestId, Throwable cause) {
-		this.requestId = requestId;
-		this.cause = cause;
-	}
-
-	/**
-	 * Returns the request ID responding to.
-	 *
-	 * @return Request ID responding to
-	 */
-	public long getRequestId() {
-		return requestId;
-	}
-
-	/**
-	 * Returns the failure cause.
-	 *
-	 * @return Failure cause
-	 */
-	public Throwable getCause() {
-		return cause;
-	}
-
-	@Override
-	public String toString() {
-		return "KvStateRequestFailure{" +
-				"requestId=" + requestId +
-				", cause=" + cause +
-				'}';
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestResult.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestResult.java
deleted file mode 100644
index 2bd8a36..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestResult.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.query.netty.message;
-
-import org.apache.flink.util.Preconditions;
-
-/**
- * A successful response to a {@link KvStateRequest} containing the serialized
- * result for the requested key and namespace.
- */
-public final class KvStateRequestResult {
-
-	/** ID of the request responding to. */
-	private final long requestId;
-
-	/**
-	 * Serialized result for the requested key and namespace. If no result was
-	 * available for the specified key and namespace, this is <code>null</code>.
-	 */
-	private final byte[] serializedResult;
-
-	/**
-	 * Creates a successful {@link KvStateRequestResult} response.
-	 *
-	 * @param requestId        ID of the request responding to
-	 * @param serializedResult Serialized result or <code>null</code> if none
-	 */
-	KvStateRequestResult(long requestId, byte[] serializedResult) {
-		this.requestId = requestId;
-		this.serializedResult = Preconditions.checkNotNull(serializedResult, "Serialization result");
-	}
-
-	/**
-	 * Returns the request ID responding to.
-	 *
-	 * @return Request ID responding to
-	 */
-	public long getRequestId() {
-		return requestId;
-	}
-
-	/**
-	 * Returns the serialized result or <code>null</code> if none available.
-	 *
-	 * @return Serialized result or <code>null</code> if none available.
-	 */
-	public byte[] getSerializedResult() {
-		return serializedResult;
-	}
-
-	@Override
-	public String toString() {
-		return "KvStateRequestResult{" +
-				"requestId=" + requestId +
-				", serializedResult.length=" + serializedResult.length +
-				'}';
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
deleted file mode 100644
index 68f06e3..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
+++ /dev/null
@@ -1,568 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.query.netty.message;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.query.KvStateID;
-import org.apache.flink.runtime.query.netty.KvStateClient;
-import org.apache.flink.runtime.query.netty.KvStateServer;
-import org.apache.flink.runtime.util.DataInputDeserializer;
-import org.apache.flink.runtime.util.DataOutputSerializer;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream;
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufOutputStream;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Serialization and deserialization of messages exchanged between
- * {@link KvStateClient} and {@link KvStateServer}.
- *
- * <p>The binary messages have the following format:
- *
- * <pre>
- *                     <------ Frame ------------------------->
- *                    +----------------------------------------+
- *                    |        HEADER (8)      | PAYLOAD (VAR) |
- * +------------------+----------------------------------------+
- * | FRAME LENGTH (4) | VERSION (4) | TYPE (4) | CONTENT (VAR) |
- * +------------------+----------------------------------------+
- * </pre>
- *
- * <p>The concrete content of a message depends on the {@link KvStateRequestType}.
- */
-public final class KvStateRequestSerializer {
-
-	/** The serialization version ID. */
-	private static final int VERSION = 0x79a1b710;
-
-	/** Byte length of the header. */
-	private static final int HEADER_LENGTH = 8;
-
-	// ------------------------------------------------------------------------
-	// Serialization
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Allocates a buffer and serializes the KvState request into it.
-	 *
-	 * @param alloc                     ByteBuf allocator for the buffer to
-	 *                                  serialize message into
-	 * @param requestId                 ID for this request
-	 * @param kvStateId                 ID of the requested KvState instance
-	 * @param serializedKeyAndNamespace Serialized key and namespace to request
-	 *                                  from the KvState instance.
-	 * @return Serialized KvState request message
-	 */
-	public static ByteBuf serializeKvStateRequest(
-			ByteBufAllocator alloc,
-			long requestId,
-			KvStateID kvStateId,
-			byte[] serializedKeyAndNamespace) {
-
-		// Header + request ID + KvState ID + Serialized namespace
-		int frameLength = HEADER_LENGTH + 8 + (8 + 8) + (4 + serializedKeyAndNamespace.length);
-		ByteBuf buf = alloc.ioBuffer(frameLength + 4); // +4 for frame length
-
-		buf.writeInt(frameLength);
-
-		writeHeader(buf, KvStateRequestType.REQUEST);
-
-		buf.writeLong(requestId);
-		buf.writeLong(kvStateId.getLowerPart());
-		buf.writeLong(kvStateId.getUpperPart());
-		buf.writeInt(serializedKeyAndNamespace.length);
-		buf.writeBytes(serializedKeyAndNamespace);
-
-		return buf;
-	}
-
-	/**
-	 * Allocates a buffer and serializes the KvState request result into it.
-	 *
-	 * @param alloc             ByteBuf allocator for the buffer to serialize message into
-	 * @param requestId         ID for this request
-	 * @param serializedResult  Serialized Result
-	 * @return Serialized KvState request result message
-	 */
-	public static ByteBuf serializeKvStateRequestResult(
-			ByteBufAllocator alloc,
-			long requestId,
-			byte[] serializedResult) {
-
-		Preconditions.checkNotNull(serializedResult, "Serialized result");
-
-		// Header + request ID + serialized result
-		int frameLength = HEADER_LENGTH + 8 + 4 + serializedResult.length;
-
-		ByteBuf buf = alloc.ioBuffer(frameLength);
-
-		buf.writeInt(frameLength);
-		writeHeader(buf, KvStateRequestType.REQUEST_RESULT);
-		buf.writeLong(requestId);
-
-		buf.writeInt(serializedResult.length);
-		buf.writeBytes(serializedResult);
-
-		return buf;
-	}
-
-	/**
-	 * Allocates a buffer and serializes the KvState request failure into it.
-	 *
-	 * @param alloc ByteBuf allocator for the buffer to serialize message into
-	 * @param requestId ID of the request responding to
-	 * @param cause Failure cause
-	 * @return Serialized KvState request failure message
-	 * @throws IOException Serialization failures are forwarded
-	 */
-	public static ByteBuf serializeKvStateRequestFailure(
-			ByteBufAllocator alloc,
-			long requestId,
-			Throwable cause) throws IOException {
-
-		ByteBuf buf = alloc.ioBuffer();
-
-		// Frame length is set at the end
-		buf.writeInt(0);
-
-		writeHeader(buf, KvStateRequestType.REQUEST_FAILURE);
-
-		// Message
-		buf.writeLong(requestId);
-
-		try (ByteBufOutputStream bbos = new ByteBufOutputStream(buf);
-				ObjectOutputStream out = new ObjectOutputStream(bbos)) {
-
-			out.writeObject(cause);
-		}
-
-		// Set frame length
-		int frameLength = buf.readableBytes() - 4;
-		buf.setInt(0, frameLength);
-
-		return buf;
-	}
-
-	/**
-	 * Allocates a buffer and serializes the server failure into it.
-	 *
-	 * <p>The cause must not be or contain any user types as causes.
-	 *
-	 * @param alloc ByteBuf allocator for the buffer to serialize message into
-	 * @param cause Failure cause
-	 * @return Serialized server failure message
-	 * @throws IOException Serialization failures are forwarded
-	 */
-	public static ByteBuf serializeServerFailure(ByteBufAllocator alloc, Throwable cause) throws IOException {
-		ByteBuf buf = alloc.ioBuffer();
-
-		// Frame length is set at end
-		buf.writeInt(0);
-
-		writeHeader(buf, KvStateRequestType.SERVER_FAILURE);
-
-		try (ByteBufOutputStream bbos = new ByteBufOutputStream(buf);
-				ObjectOutputStream out = new ObjectOutputStream(bbos)) {
-
-			out.writeObject(cause);
-		}
-
-		// Set frame length
-		int frameLength = buf.readableBytes() - 4;
-		buf.setInt(0, frameLength);
-
-		return buf;
-	}
-
-	// ------------------------------------------------------------------------
-	// Deserialization
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Deserializes the header and returns the request type.
-	 *
-	 * @param buf Buffer to deserialize (expected to be at header position)
-	 * @return Deserialzied request type
-	 * @throws IllegalArgumentException If unexpected message version or message type
-	 */
-	public static KvStateRequestType deserializeHeader(ByteBuf buf) {
-		// Check the version
-		int version = buf.readInt();
-		if (version != VERSION) {
-			throw new IllegalArgumentException("Illegal message version " + version +
-					". Expected: " + VERSION + ".");
-		}
-
-		// Get the message type
-		int msgType = buf.readInt();
-		KvStateRequestType[] values = KvStateRequestType.values();
-		if (msgType >= 0 && msgType < values.length) {
-			return values[msgType];
-		} else {
-			throw new IllegalArgumentException("Illegal message type with index " + msgType);
-		}
-	}
-
-	/**
-	 * Deserializes the KvState request message.
-	 *
-	 * <p><strong>Important</strong>: the returned buffer is sliced from the
-	 * incoming ByteBuf stream and retained. Therefore, it needs to be recycled
-	 * by the consumer.
-	 *
-	 * @param buf Buffer to deserialize (expected to be positioned after header)
-	 * @return Deserialized KvStateRequest
-	 */
-	public static KvStateRequest deserializeKvStateRequest(ByteBuf buf) {
-		long requestId = buf.readLong();
-		KvStateID kvStateId = new KvStateID(buf.readLong(), buf.readLong());
-
-		// Serialized key and namespace
-		int length = buf.readInt();
-
-		if (length < 0) {
-			throw new IllegalArgumentException("Negative length for serialized key and namespace. " +
-					"This indicates a serialization error.");
-		}
-
-		// Copy the buffer in order to be able to safely recycle the ByteBuf
-		byte[] serializedKeyAndNamespace = new byte[length];
-		if (length > 0) {
-			buf.readBytes(serializedKeyAndNamespace);
-		}
-
-		return new KvStateRequest(requestId, kvStateId, serializedKeyAndNamespace);
-	}
-
-	/**
-	 * Deserializes the KvState request result.
-	 *
-	 * @param buf Buffer to deserialize (expected to be positioned after header)
-	 * @return Deserialized KvStateRequestResult
-	 */
-	public static KvStateRequestResult deserializeKvStateRequestResult(ByteBuf buf) {
-		long requestId = buf.readLong();
-
-		// Serialized KvState
-		int length = buf.readInt();
-
-		if (length < 0) {
-			throw new IllegalArgumentException("Negative length for serialized result. " +
-					"This indicates a serialization error.");
-		}
-
-		byte[] serializedValue = new byte[length];
-
-		if (length > 0) {
-			buf.readBytes(serializedValue);
-		}
-
-		return new KvStateRequestResult(requestId, serializedValue);
-	}
-
-	/**
-	 * Deserializes the KvState request failure.
-	 *
-	 * @param buf Buffer to deserialize (expected to be positioned after header)
-	 * @return Deserialized KvStateRequestFailure
-	 */
-	public static KvStateRequestFailure deserializeKvStateRequestFailure(ByteBuf buf) throws IOException, ClassNotFoundException {
-		long requestId = buf.readLong();
-
-		Throwable cause;
-		try (ByteBufInputStream bbis = new ByteBufInputStream(buf);
-				ObjectInputStream in = new ObjectInputStream(bbis)) {
-
-			cause = (Throwable) in.readObject();
-		}
-
-		return new KvStateRequestFailure(requestId, cause);
-	}
-
-	/**
-	 * Deserializes the KvState request failure.
-	 *
-	 * @param buf Buffer to deserialize (expected to be positioned after header)
-	 * @return Deserialized KvStateRequestFailure
-	 * @throws IOException            Serialization failure are forwarded
-	 * @throws ClassNotFoundException If Exception type can not be loaded
-	 */
-	public static Throwable deserializeServerFailure(ByteBuf buf) throws IOException, ClassNotFoundException {
-		try (ByteBufInputStream bbis = new ByteBufInputStream(buf);
-				ObjectInputStream in = new ObjectInputStream(bbis)) {
-
-			return (Throwable) in.readObject();
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	// Generic serialization utils
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Serializes the key and namespace into a {@link ByteBuffer}.
-	 *
-	 * <p>The serialized format matches the RocksDB state backend key format, i.e.
-	 * the key and namespace don't have to be deserialized for RocksDB lookups.
-	 *
-	 * @param key                 Key to serialize
-	 * @param keySerializer       Serializer for the key
-	 * @param namespace           Namespace to serialize
-	 * @param namespaceSerializer Serializer for the namespace
-	 * @param <K>                 Key type
-	 * @param <N>                 Namespace type
-	 * @return Buffer holding the serialized key and namespace
-	 * @throws IOException Serialization errors are forwarded
-	 */
-	public static <K, N> byte[] serializeKeyAndNamespace(
-			K key,
-			TypeSerializer<K> keySerializer,
-			N namespace,
-			TypeSerializer<N> namespaceSerializer) throws IOException {
-
-		DataOutputSerializer dos = new DataOutputSerializer(32);
-
-		keySerializer.serialize(key, dos);
-		dos.writeByte(42);
-		namespaceSerializer.serialize(namespace, dos);
-
-		return dos.getCopyOfBuffer();
-	}
-
-	/**
-	 * Deserializes the key and namespace into a {@link Tuple2}.
-	 *
-	 * @param serializedKeyAndNamespace Serialized key and namespace
-	 * @param keySerializer             Serializer for the key
-	 * @param namespaceSerializer       Serializer for the namespace
-	 * @param <K>                       Key type
-	 * @param <N>                       Namespace
-	 * @return Tuple2 holding deserialized key and namespace
-	 * @throws IOException              if the deserialization fails for any reason
-	 */
-	public static <K, N> Tuple2<K, N> deserializeKeyAndNamespace(
-			byte[] serializedKeyAndNamespace,
-			TypeSerializer<K> keySerializer,
-			TypeSerializer<N> namespaceSerializer) throws IOException {
-
-		DataInputDeserializer dis = new DataInputDeserializer(
-				serializedKeyAndNamespace,
-				0,
-				serializedKeyAndNamespace.length);
-
-		try {
-			K key = keySerializer.deserialize(dis);
-			byte magicNumber = dis.readByte();
-			if (magicNumber != 42) {
-				throw new IOException("Unexpected magic number " + magicNumber + ".");
-			}
-			N namespace = namespaceSerializer.deserialize(dis);
-
-			if (dis.available() > 0) {
-				throw new IOException("Unconsumed bytes in the serialized key and namespace.");
-			}
-
-			return new Tuple2<>(key, namespace);
-		} catch (IOException e) {
-			throw new IOException("Unable to deserialize key " +
-				"and namespace. This indicates a mismatch in the key/namespace " +
-				"serializers used by the KvState instance and this access.", e);
-		}
-	}
-
-	/**
-	 * Serializes the value with the given serializer.
-	 *
-	 * @param value      Value of type T to serialize
-	 * @param serializer Serializer for T
-	 * @param <T>        Type of the value
-	 * @return Serialized value or <code>null</code> if value <code>null</code>
-	 * @throws IOException On failure during serialization
-	 */
-	public static <T> byte[] serializeValue(T value, TypeSerializer<T> serializer) throws IOException {
-		if (value != null) {
-			// Serialize
-			DataOutputSerializer dos = new DataOutputSerializer(32);
-			serializer.serialize(value, dos);
-			return dos.getCopyOfBuffer();
-		} else {
-			return null;
-		}
-	}
-
-	/**
-	 * Deserializes the value with the given serializer.
-	 *
-	 * @param serializedValue Serialized value of type T
-	 * @param serializer      Serializer for T
-	 * @param <T>             Type of the value
-	 * @return Deserialized value or <code>null</code> if the serialized value
-	 * is <code>null</code>
-	 * @throws IOException On failure during deserialization
-	 */
-	public static <T> T deserializeValue(byte[] serializedValue, TypeSerializer<T> serializer) throws IOException {
-		if (serializedValue == null) {
-			return null;
-		} else {
-			final DataInputDeserializer deser = new DataInputDeserializer(
-				serializedValue, 0, serializedValue.length);
-			final T value = serializer.deserialize(deser);
-			if (deser.available() > 0) {
-				throw new IOException(
-					"Unconsumed bytes in the deserialized value. " +
-						"This indicates a mismatch in the value serializers " +
-						"used by the KvState instance and this access.");
-			}
-			return value;
-		}
-	}
-
-	/**
-	 * Deserializes all values with the given serializer.
-	 *
-	 * @param serializedValue Serialized value of type List&lt;T&gt;
-	 * @param serializer      Serializer for T
-	 * @param <T>             Type of the value
-	 * @return Deserialized list or <code>null</code> if the serialized value
-	 * is <code>null</code>
-	 * @throws IOException On failure during deserialization
-	 */
-	public static <T> List<T> deserializeList(byte[] serializedValue, TypeSerializer<T> serializer) throws IOException {
-		if (serializedValue != null) {
-			final DataInputDeserializer in = new DataInputDeserializer(
-				serializedValue, 0, serializedValue.length);
-
-			try {
-				final List<T> result = new ArrayList<>();
-				while (in.available() > 0) {
-					result.add(serializer.deserialize(in));
-
-					// The expected binary format has a single byte separator. We
-					// want a consistent binary format in order to not need any
-					// special casing during deserialization. A "cleaner" format
-					// would skip this extra byte, but would require a memory copy
-					// for RocksDB, which stores the data serialized in this way
-					// for lists.
-					if (in.available() > 0) {
-						in.readByte();
-					}
-				}
-
-				return result;
-			} catch (IOException e) {
-				throw new IOException(
-						"Unable to deserialize value. " +
-							"This indicates a mismatch in the value serializers " +
-							"used by the KvState instance and this access.", e);
-			}
-		} else {
-			return null;
-		}
-	}
-
-	/**
-	 * Serializes all values of the Iterable with the given serializer.
-	 *
-	 * @param entries         Key-value pairs to serialize
-	 * @param keySerializer   Serializer for UK
-	 * @param valueSerializer Serializer for UV
-	 * @param <UK>            Type of the keys
-	 * @param <UV>            Type of the values
-	 * @return Serialized values or <code>null</code> if values <code>null</code> or empty
-	 * @throws IOException On failure during serialization
-	 */
-	public static <UK, UV> byte[] serializeMap(Iterable<Map.Entry<UK, UV>> entries, TypeSerializer<UK> keySerializer, TypeSerializer<UV> valueSerializer) throws IOException {
-		if (entries != null) {
-			// Serialize
-			DataOutputSerializer dos = new DataOutputSerializer(32);
-
-			for (Map.Entry<UK, UV> entry : entries) {
-				keySerializer.serialize(entry.getKey(), dos);
-
-				if (entry.getValue() == null) {
-					dos.writeBoolean(true);
-				} else {
-					dos.writeBoolean(false);
-					valueSerializer.serialize(entry.getValue(), dos);
-				}
-			}
-
-			return dos.getCopyOfBuffer();
-		} else {
-			return null;
-		}
-	}
-
-	/**
-	 * Deserializes all kv pairs with the given serializer.
-	 *
-	 * @param serializedValue Serialized value of type Map&lt;UK, UV&gt;
-	 * @param keySerializer   Serializer for UK
-	 * @param valueSerializer Serializer for UV
-	 * @param <UK>            Type of the key
-	 * @param <UV>            Type of the value.
-	 * @return Deserialized map or <code>null</code> if the serialized value
-	 * is <code>null</code>
-	 * @throws IOException On failure during deserialization
-	 */
-	public static <UK, UV> Map<UK, UV> deserializeMap(byte[] serializedValue, TypeSerializer<UK> keySerializer, TypeSerializer<UV> valueSerializer) throws IOException {
-		if (serializedValue != null) {
-			DataInputDeserializer in = new DataInputDeserializer(serializedValue, 0, serializedValue.length);
-
-			Map<UK, UV> result = new HashMap<>();
-			while (in.available() > 0) {
-				UK key = keySerializer.deserialize(in);
-
-				boolean isNull = in.readBoolean();
-				UV value = isNull ? null : valueSerializer.deserialize(in);
-
-				result.put(key, value);
-			}
-
-			return result;
-		} else {
-			return null;
-		}
-	}
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Helper for writing the header.
-	 *
-	 * @param buf         Buffer to serialize header into
-	 * @param requestType Result type to serialize
-	 */
-	private static void writeHeader(ByteBuf buf, KvStateRequestType requestType) {
-		buf.writeInt(VERSION);
-		buf.writeInt(requestType.ordinal());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestType.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestType.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestType.java
deleted file mode 100644
index de7270a..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestType.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.query.netty.message;
-
-import org.apache.flink.runtime.query.netty.KvStateServer;
-
-/**
- * Expected message types when communicating with the {@link KvStateServer}.
- */
-public enum KvStateRequestType {
-
-	/** Request a KvState instance. */
-	REQUEST,
-
-	/** Successful response to a KvStateRequest. */
-	REQUEST_RESULT,
-
-	/** Failure response to a KvStateRequest. */
-	REQUEST_FAILURE,
-
-	/** Generic server failure. */
-	SERVER_FAILURE
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateSerializer.java
new file mode 100644
index 0000000..44ee571
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateSerializer.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query.netty.message;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.util.DataInputDeserializer;
+import org.apache.flink.runtime.util.DataOutputSerializer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Serialization and deserialization the different state types and namespaces.
+ */
+public final class KvStateSerializer {
+
+	// ------------------------------------------------------------------------
+	// Generic serialization utils
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Serializes the key and namespace into a {@link ByteBuffer}.
+	 *
+	 * <p>The serialized format matches the RocksDB state backend key format, i.e.
+	 * the key and namespace don't have to be deserialized for RocksDB lookups.
+	 *
+	 * @param key                 Key to serialize
+	 * @param keySerializer       Serializer for the key
+	 * @param namespace           Namespace to serialize
+	 * @param namespaceSerializer Serializer for the namespace
+	 * @param <K>                 Key type
+	 * @param <N>                 Namespace type
+	 * @return Buffer holding the serialized key and namespace
+	 * @throws IOException Serialization errors are forwarded
+	 */
+	public static <K, N> byte[] serializeKeyAndNamespace(
+			K key,
+			TypeSerializer<K> keySerializer,
+			N namespace,
+			TypeSerializer<N> namespaceSerializer) throws IOException {
+
+		DataOutputSerializer dos = new DataOutputSerializer(32);
+
+		keySerializer.serialize(key, dos);
+		dos.writeByte(42);
+		namespaceSerializer.serialize(namespace, dos);
+
+		return dos.getCopyOfBuffer();
+	}
+
+	/**
+	 * Deserializes the key and namespace into a {@link Tuple2}.
+	 *
+	 * @param serializedKeyAndNamespace Serialized key and namespace
+	 * @param keySerializer             Serializer for the key
+	 * @param namespaceSerializer       Serializer for the namespace
+	 * @param <K>                       Key type
+	 * @param <N>                       Namespace
+	 * @return Tuple2 holding deserialized key and namespace
+	 * @throws IOException              if the deserialization fails for any reason
+	 */
+	public static <K, N> Tuple2<K, N> deserializeKeyAndNamespace(
+			byte[] serializedKeyAndNamespace,
+			TypeSerializer<K> keySerializer,
+			TypeSerializer<N> namespaceSerializer) throws IOException {
+
+		DataInputDeserializer dis = new DataInputDeserializer(
+				serializedKeyAndNamespace,
+				0,
+				serializedKeyAndNamespace.length);
+
+		try {
+			K key = keySerializer.deserialize(dis);
+			byte magicNumber = dis.readByte();
+			if (magicNumber != 42) {
+				throw new IOException("Unexpected magic number " + magicNumber + ".");
+			}
+			N namespace = namespaceSerializer.deserialize(dis);
+
+			if (dis.available() > 0) {
+				throw new IOException("Unconsumed bytes in the serialized key and namespace.");
+			}
+
+			return new Tuple2<>(key, namespace);
+		} catch (IOException e) {
+			throw new IOException("Unable to deserialize key " +
+				"and namespace. This indicates a mismatch in the key/namespace " +
+				"serializers used by the KvState instance and this access.", e);
+		}
+	}
+
+	/**
+	 * Serializes the value with the given serializer.
+	 *
+	 * @param value      Value of type T to serialize
+	 * @param serializer Serializer for T
+	 * @param <T>        Type of the value
+	 * @return Serialized value or <code>null</code> if value <code>null</code>
+	 * @throws IOException On failure during serialization
+	 */
+	public static <T> byte[] serializeValue(T value, TypeSerializer<T> serializer) throws IOException {
+		if (value != null) {
+			// Serialize
+			DataOutputSerializer dos = new DataOutputSerializer(32);
+			serializer.serialize(value, dos);
+			return dos.getCopyOfBuffer();
+		} else {
+			return null;
+		}
+	}
+
+	/**
+	 * Deserializes the value with the given serializer.
+	 *
+	 * @param serializedValue Serialized value of type T
+	 * @param serializer      Serializer for T
+	 * @param <T>             Type of the value
+	 * @return Deserialized value or <code>null</code> if the serialized value
+	 * is <code>null</code>
+	 * @throws IOException On failure during deserialization
+	 */
+	public static <T> T deserializeValue(byte[] serializedValue, TypeSerializer<T> serializer) throws IOException {
+		if (serializedValue == null) {
+			return null;
+		} else {
+			final DataInputDeserializer deser = new DataInputDeserializer(
+				serializedValue, 0, serializedValue.length);
+			final T value = serializer.deserialize(deser);
+			if (deser.available() > 0) {
+				throw new IOException(
+					"Unconsumed bytes in the deserialized value. " +
+						"This indicates a mismatch in the value serializers " +
+						"used by the KvState instance and this access.");
+			}
+			return value;
+		}
+	}
+
+	/**
+	 * Deserializes all values with the given serializer.
+	 *
+	 * @param serializedValue Serialized value of type List&lt;T&gt;
+	 * @param serializer      Serializer for T
+	 * @param <T>             Type of the value
+	 * @return Deserialized list or <code>null</code> if the serialized value
+	 * is <code>null</code>
+	 * @throws IOException On failure during deserialization
+	 */
+	public static <T> List<T> deserializeList(byte[] serializedValue, TypeSerializer<T> serializer) throws IOException {
+		if (serializedValue != null) {
+			final DataInputDeserializer in = new DataInputDeserializer(
+				serializedValue, 0, serializedValue.length);
+
+			try {
+				final List<T> result = new ArrayList<>();
+				while (in.available() > 0) {
+					result.add(serializer.deserialize(in));
+
+					// The expected binary format has a single byte separator. We
+					// want a consistent binary format in order to not need any
+					// special casing during deserialization. A "cleaner" format
+					// would skip this extra byte, but would require a memory copy
+					// for RocksDB, which stores the data serialized in this way
+					// for lists.
+					if (in.available() > 0) {
+						in.readByte();
+					}
+				}
+
+				return result;
+			} catch (IOException e) {
+				throw new IOException(
+						"Unable to deserialize value. " +
+							"This indicates a mismatch in the value serializers " +
+							"used by the KvState instance and this access.", e);
+			}
+		} else {
+			return null;
+		}
+	}
+
+	/**
+	 * Serializes all values of the Iterable with the given serializer.
+	 *
+	 * @param entries         Key-value pairs to serialize
+	 * @param keySerializer   Serializer for UK
+	 * @param valueSerializer Serializer for UV
+	 * @param <UK>            Type of the keys
+	 * @param <UV>            Type of the values
+	 * @return Serialized values or <code>null</code> if values <code>null</code> or empty
+	 * @throws IOException On failure during serialization
+	 */
+	public static <UK, UV> byte[] serializeMap(Iterable<Map.Entry<UK, UV>> entries, TypeSerializer<UK> keySerializer, TypeSerializer<UV> valueSerializer) throws IOException {
+		if (entries != null) {
+			// Serialize
+			DataOutputSerializer dos = new DataOutputSerializer(32);
+
+			for (Map.Entry<UK, UV> entry : entries) {
+				keySerializer.serialize(entry.getKey(), dos);
+
+				if (entry.getValue() == null) {
+					dos.writeBoolean(true);
+				} else {
+					dos.writeBoolean(false);
+					valueSerializer.serialize(entry.getValue(), dos);
+				}
+			}
+
+			return dos.getCopyOfBuffer();
+		} else {
+			return null;
+		}
+	}
+
+	/**
+	 * Deserializes all kv pairs with the given serializer.
+	 *
+	 * @param serializedValue Serialized value of type Map&lt;UK, UV&gt;
+	 * @param keySerializer   Serializer for UK
+	 * @param valueSerializer Serializer for UV
+	 * @param <UK>            Type of the key
+	 * @param <UV>            Type of the value.
+	 * @return Deserialized map or <code>null</code> if the serialized value
+	 * is <code>null</code>
+	 * @throws IOException On failure during deserialization
+	 */
+	public static <UK, UV> Map<UK, UV> deserializeMap(byte[] serializedValue, TypeSerializer<UK> keySerializer, TypeSerializer<UV> valueSerializer) throws IOException {
+		if (serializedValue != null) {
+			DataInputDeserializer in = new DataInputDeserializer(serializedValue, 0, serializedValue.length);
+
+			Map<UK, UV> result = new HashMap<>();
+			while (in.available() > 0) {
+				UK key = keySerializer.deserialize(in);
+
+				boolean isNull = in.readBoolean();
+				UV value = isNull ? null : valueSerializer.deserialize(in);
+
+				result.put(key, value);
+			}
+
+			return result;
+		} else {
+			return null;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/package-info.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/package-info.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/package-info.java
deleted file mode 100644
index 7e8de40..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/package-info.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * This package contains all Netty-based client/server classes used to query
- * KvState instances.
- *
- * <h2>Server and Client</h2>
- *
- * <p>Both server and client expect received binary messages to contain a frame
- * length field. Netty's {@link io.netty.handler.codec.LengthFieldBasedFrameDecoder}
- * is used to fully receive the frame before giving it to the respective client
- * or server handler.
- *
- * <p>Connection establishment and release happens by the client. The server
- * only closes a connection if a fatal failure happens that cannot be resolved
- * otherwise.
- *
- * <p>The is a single server per task manager and a single client can be shared
- * by multiple Threads.
- *
- * <p>See also:
- * <ul>
- * <li>{@link org.apache.flink.runtime.query.netty.KvStateServer}</li>
- * <li>{@link org.apache.flink.runtime.query.netty.KvStateServerHandler}</li>
- * <li>{@link org.apache.flink.runtime.query.netty.KvStateClient}</li>
- * <li>{@link org.apache.flink.runtime.query.netty.KvStateClientHandler}</li>
- * </ul>
- *
- * <h2>Serialization</h2>
- *
- * <p>The exchanged binary messages have the following format:
- *
- * <pre>
- *                     <------ Frame ------------------------->
- *                    +----------------------------------------+
- *                    |        HEADER (8)      | PAYLOAD (VAR) |
- * +------------------+----------------------------------------+
- * | FRAME LENGTH (4) | VERSION (4) | TYPE (4) | CONTENT (VAR) |
- * +------------------+----------------------------------------+
- * </pre>
- *
- * <p>For frame decoding, both server and client use Netty's {@link
- * io.netty.handler.codec.LengthFieldBasedFrameDecoder}. Message serialization
- * is done via static helpers in {@link org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer}.
- * The serialization helpers return {@link io.netty.buffer.ByteBuf} instances,
- * which are ready to be sent to the client or server respectively as they
- * contain the frame length.
- *
- * <p>See also:
- * <ul>
- * <li>{@link org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer}</li>
- * </ul>
- *
- * <h2>Statistics</h2>
- *
- * <p>Both server and client keep track of request statistics via {@link
- * org.apache.flink.runtime.query.netty.KvStateRequestStats}.
- *
- * <p>See also:
- * <ul>
- * <li>{@link org.apache.flink.runtime.query.netty.KvStateRequestStats}</li>
- * </ul>
- */
-package org.apache.flink.runtime.query.netty;

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/package-info.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/package-info.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/package-info.java
deleted file mode 100644
index 07a4396..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/package-info.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * This package contains all KvState query related classes.
- *
- * <h2>TaskManager and JobManager</h2>
- *
- * <p>State backends register queryable state instances at the {@link
- * org.apache.flink.runtime.query.KvStateRegistry}.
- * There is one registry per TaskManager. Registered KvState instances are
- * reported to the JobManager, where they are aggregated at the {@link
- * org.apache.flink.runtime.query.KvStateLocationRegistry}.
- *
- * <p>Instances of {@link org.apache.flink.runtime.query.KvStateLocation} contain
- * all information needed for a client to query a KvState instance.
- *
- * <p>See also:
- * <ul>
- * <li>{@link org.apache.flink.runtime.query.KvStateRegistry}</li>
- * <li>{@link org.apache.flink.runtime.query.TaskKvStateRegistry}</li>
- * <li>{@link org.apache.flink.runtime.query.KvStateLocation}</li>
- * <li>{@link org.apache.flink.runtime.query.KvStateLocationRegistry}</li>
- * </ul>
- *
- * <h2>Client</h2>
- *
- * The {@link org.apache.flink.runtime.query.QueryableStateClient} is used
- * to query KvState instances. The client takes care of {@link
- * org.apache.flink.runtime.query.KvStateLocation} lookup and caching. Queries
- * are then dispatched via the network client.
- *
- * <h3>JobManager Communication</h3>
- *
- * <p>The JobManager is queried for {@link org.apache.flink.runtime.query.KvStateLocation}
- * instances via the {@link org.apache.flink.runtime.query.KvStateLocationLookupService}.
- * The client caches resolved locations and dispatches queries directly to the
- * TaskManager.
- *
- * <h3>TaskManager Communication</h3>
- *
- * <p>After the location has been resolved, the TaskManager is queried via the
- * {@link org.apache.flink.runtime.query.netty.KvStateClient}.
- */
-package org.apache.flink.runtime.query;

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
index 7e1123d..97b6bcd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
+import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.util.Preconditions;
 
@@ -90,7 +90,7 @@ public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends St
 	public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception {
 		Preconditions.checkNotNull(serializedKeyAndNamespace, "Serialized key and namespace");
 
-		Tuple2<K, N> keyAndNamespace = KvStateRequestSerializer.deserializeKeyAndNamespace(
+		Tuple2<K, N> keyAndNamespace = KvStateSerializer.deserializeKeyAndNamespace(
 				serializedKeyAndNamespace, keySerializer, namespaceSerializer);
 
 		return getSerializedValue(keyAndNamespace.f0, keyAndNamespace.f1);
@@ -108,7 +108,7 @@ public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends St
 
 		@SuppressWarnings("unchecked,rawtypes")
 		TypeSerializer serializer = stateDesc.getSerializer();
-		return KvStateRequestSerializer.serializeValue(result, serializer);
+		return KvStateSerializer.serializeValue(result, serializer);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
index f393237..f981b9f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.state.heap;
 import org.apache.flink.api.common.state.MapState;
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
+import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
 import org.apache.flink.runtime.state.internal.InternalMapState;
 import org.apache.flink.util.Preconditions;
 
@@ -153,6 +153,6 @@ public class HeapMapState<K, N, UK, UV>
 		TypeSerializer<UK> userKeySerializer = stateDesc.getKeySerializer();
 		TypeSerializer<UV> userValueSerializer = stateDesc.getValueSerializer();
 
-		return KvStateRequestSerializer.serializeMap(result.entrySet(), userKeySerializer, userValueSerializer);
+		return KvStateSerializer.serializeMap(result.entrySet(), userKeySerializer, userValueSerializer);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java
index fa1ae54..37d28de 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java
@@ -47,7 +47,7 @@ public class QueryableStateConfiguration {
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Returns whether queryable state is enabled. 
+	 * Returns whether queryable state is enabled.
 	 */
 	public boolean enabled() {
 		return enabled;
@@ -70,7 +70,7 @@ public class QueryableStateConfiguration {
 
 	/**
 	 * Returns the number of threads for the thread pool that performs the actual state lookup.
-	 * These threads perform the actual state lookup. 
+	 * These threads perform the actual state lookup.
 	 */
 	public int numQueryThreads() {
 		return numQueryThreads;
@@ -90,7 +90,7 @@ public class QueryableStateConfiguration {
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Gets the configuration describing the queryable state as deactivated. 
+	 * Gets the configuration describing the queryable state as deactivated.
 	 */
 	public static QueryableStateConfiguration disabled() {
 		return new QueryableStateConfiguration(false, 0, 0, 0);

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index 1c30ff6..7c5c830 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -39,8 +39,9 @@ import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.query.QueryableStateUtils;
 import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
-import org.apache.flink.runtime.query.netty.KvStateServer;
+import org.apache.flink.runtime.query.KvStateServer;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
 import org.apache.flink.runtime.taskexecutor.slot.TimerService;
 import org.apache.flink.runtime.taskexecutor.utils.TaskExecutorMetricsInitializer;
@@ -48,6 +49,7 @@ import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -138,7 +140,7 @@ public class TaskManagerServices {
 	public FileCache getFileCache() {
 		return fileCache;
 	}
-	
+
 	public TaskSlotTable getTaskSlotTable() {
 		return taskSlotTable;
 	}
@@ -214,7 +216,7 @@ public class TaskManagerServices {
 		final JobManagerTable jobManagerTable = new JobManagerTable();
 
 		final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);
-		
+
 		return new TaskManagerServices(
 			taskManagerLocation,
 			memoryManager,
@@ -354,7 +356,7 @@ public class TaskManagerServices {
 		TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
 
 		KvStateRegistry kvStateRegistry = new KvStateRegistry();
-		KvStateServer kvStateServer;
+		KvStateServer kvStateServer = null;
 
 		if (taskManagerServicesConfiguration.getQueryableStateConfig().enabled()) {
 			QueryableStateConfiguration qsConfig = taskManagerServicesConfiguration.getQueryableStateConfig();
@@ -365,15 +367,13 @@ public class TaskManagerServices {
 			int numQueryThreads = qsConfig.numQueryThreads() == 0 ?
 					taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numQueryThreads();
 
-			kvStateServer = new KvStateServer(
-				taskManagerServicesConfiguration.getTaskManagerAddress(),
-				qsConfig.port(),
-				numNetworkThreads,
-				numQueryThreads,
-				kvStateRegistry,
-				new DisabledKvStateRequestStats());
-		} else {
-			kvStateServer = null;
+			kvStateServer = QueryableStateUtils.createKvStateServer(
+					taskManagerServicesConfiguration.getTaskManagerAddress(),
+					qsConfig.port(),
+					numNetworkThreads,
+					numQueryThreads,
+					kvStateRegistry,
+					new DisabledKvStateRequestStats());
 		}
 
 		// we start the network first, to make sure it can allocate its buffers first
@@ -395,7 +395,7 @@ public class TaskManagerServices {
 	 * Calculates the amount of memory used for network buffers based on the total memory to use and
 	 * the according configuration parameters.
 	 *
-	 * The following configuration parameters are involved:
+	 * <p>The following configuration parameters are involved:
 	 * <ul>
 	 *  <li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},</li>
 	 * 	<li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN},</li>
@@ -458,11 +458,11 @@ public class TaskManagerServices {
 	 * Calculates the amount of memory used for network buffers inside the current JVM instance
 	 * based on the available heap or the max heap size and the according configuration parameters.
 	 *
-	 * For containers or when started via scripts, if started with a memory limit and set to use
+	 * <p>For containers or when started via scripts, if started with a memory limit and set to use
 	 * off-heap memory, the maximum heap size for the JVM is adjusted accordingly and we are able
 	 * to extract the intended values from this.
 	 *
-	 * The following configuration parameters are involved:
+	 * <p>The following configuration parameters are involved:
 	 * <ul>
 	 *  <li>{@link TaskManagerOptions#MANAGED_MEMORY_FRACTION},</li>
 	 *  <li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},</li>
@@ -629,7 +629,7 @@ public class TaskManagerServices {
 				if (LOG.isInfoEnabled()) {
 					long totalSpaceGb = file.getTotalSpace() >> 30;
 					long usableSpaceGb = file.getUsableSpace() >> 30;
-					double usablePercentage = (double)usableSpaceGb / totalSpaceGb * 100;
+					double usablePercentage = (double) usableSpaceGb / totalSpaceGb * 100;
 					String path = file.getAbsolutePath();
 					LOG.info(String.format("Temporary file directory '%s': total %d GB, " + "usable %d GB (%.2f%% usable)",
 						path, totalSpaceGb, usableSpaceGb, usablePercentage));

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index 6cc7569..f1f7d39 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
 import org.apache.flink.util.MathUtils;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,7 +44,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Configuration for the task manager services such as the network environment, the memory manager,
- * the io manager and the metric registry
+ * the io manager and the metric registry.
  */
 public class TaskManagerServicesConfiguration {
 	private static final Logger LOG = LoggerFactory.getLogger(TaskManagerServicesConfiguration.class);
@@ -106,7 +107,6 @@ public class TaskManagerServicesConfiguration {
 	//  Getter/Setter
 	// --------------------------------------------------------------------------------------------
 
-
 	public InetAddress getTaskManagerAddress() {
 		return taskManagerAddress;
 	}
@@ -291,7 +291,7 @@ public class TaskManagerServicesConfiguration {
 
 		if (!hasNewNetworkBufConf(configuration)) {
 			// map old config to new one:
-			networkBufMin = networkBufMax = ((long)numNetworkBuffers) * pageSize;
+			networkBufMin = networkBufMax = ((long) numNetworkBuffers) * pageSize;
 		} else {
 			if (configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) {
 				LOG.info("Ignoring old (but still present) network buffer configuration via {}.",
@@ -439,9 +439,8 @@ public class TaskManagerServicesConfiguration {
 	static void checkConfigParameter(boolean condition, Object parameter, String name, String errorMessage)
 			throws IllegalConfigurationException {
 		if (!condition) {
-			throw new IllegalConfigurationException("Invalid configuration value for " + 
+			throw new IllegalConfigurationException("Invalid configuration value for " +
 					name + " : " + parameter + " - " + errorMessage);
 		}
 	}
 }
-

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/test/java/org/apache/flink/runtime/query/AkkaKvStateLocationLookupServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/AkkaKvStateLocationLookupServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/AkkaKvStateLocationLookupServiceTest.java
deleted file mode 100644
index edefcf8..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/AkkaKvStateLocationLookupServiceTest.java
+++ /dev/null
@@ -1,398 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.query;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.akka.FlinkUntypedActor;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
-import org.apache.flink.runtime.query.AkkaKvStateLocationLookupService.LookupRetryStrategy;
-import org.apache.flink.runtime.query.AkkaKvStateLocationLookupService.LookupRetryStrategyFactory;
-import org.apache.flink.runtime.query.KvStateMessage.LookupKvStateLocation;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.TestLogger;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.actor.Status;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.util.ArrayDeque;
-import java.util.Queue;
-import java.util.UUID;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * Tests for {@link AkkaKvStateLocationLookupService}.
- */
-public class AkkaKvStateLocationLookupServiceTest extends TestLogger {
-
-	/** The default timeout. */
-	private static final FiniteDuration TIMEOUT = new FiniteDuration(10, TimeUnit.SECONDS);
-
-	/** Test actor system shared between the tests. */
-	private static ActorSystem testActorSystem;
-
-	@BeforeClass
-	public static void setUp() throws Exception {
-		testActorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
-	}
-
-	@AfterClass
-	public static void tearDown() throws Exception {
-		if (testActorSystem != null) {
-			testActorSystem.shutdown();
-		}
-	}
-
-	/**
-	 * Tests responses if no leader notification has been reported or leadership
-	 * has been lost (leaderAddress = <code>null</code>).
-	 */
-	@Test
-	public void testNoJobManagerRegistered() throws Exception {
-		TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService(
-			null,
-			null);
-		Queue<LookupKvStateLocation> received = new LinkedBlockingQueue<>();
-
-		AkkaKvStateLocationLookupService lookupService = new AkkaKvStateLocationLookupService(
-				leaderRetrievalService,
-				testActorSystem,
-				TIMEOUT,
-				new AkkaKvStateLocationLookupService.DisabledLookupRetryStrategyFactory());
-
-		lookupService.start();
-
-		//
-		// No leader registered initially => fail with UnknownJobManager
-		//
-		try {
-			JobID jobId = new JobID();
-			String name = "coffee";
-
-			Future<KvStateLocation> locationFuture = lookupService.getKvStateLookupInfo(jobId, name);
-
-			Await.result(locationFuture, TIMEOUT);
-			fail("Did not throw expected Exception");
-		} catch (UnknownJobManager ignored) {
-			// Expected
-		}
-
-		assertEquals("Received unexpected lookup", 0, received.size());
-
-		//
-		// Leader registration => communicate with new leader
-		//
-		UUID leaderSessionId = HighAvailabilityServices.DEFAULT_LEADER_ID;
-		KvStateLocation expected = new KvStateLocation(new JobID(), new JobVertexID(), 8282, "tea");
-
-		ActorRef testActor = LookupResponseActor.create(received, leaderSessionId, expected);
-
-		String testActorAddress = AkkaUtils.getAkkaURL(testActorSystem, testActor);
-
-		// Notify the service about a leader
-		leaderRetrievalService.notifyListener(testActorAddress, leaderSessionId);
-
-		JobID jobId = new JobID();
-		String name = "tea";
-
-		// Verify that the leader response is handled
-		KvStateLocation location = Await.result(lookupService.getKvStateLookupInfo(jobId, name), TIMEOUT);
-		assertEquals(expected, location);
-
-		// Verify that the correct message was sent to the leader
-		assertEquals(1, received.size());
-
-		verifyLookupMsg(received.poll(), jobId, name);
-
-		//
-		// Leader loss => fail with UnknownJobManager
-		//
-		leaderRetrievalService.notifyListener(null, null);
-
-		try {
-			Future<KvStateLocation> locationFuture = lookupService
-					.getKvStateLookupInfo(new JobID(), "coffee");
-
-			Await.result(locationFuture, TIMEOUT);
-			fail("Did not throw expected Exception");
-		} catch (UnknownJobManager ignored) {
-			// Expected
-		}
-
-		// No new messages received
-		assertEquals(0, received.size());
-	}
-
-	/**
-	 * Tests that messages are properly decorated with the leader session ID.
-	 */
-	@Test
-	public void testLeaderSessionIdChange() throws Exception {
-		TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService(
-			"localhost",
-			HighAvailabilityServices.DEFAULT_LEADER_ID);
-		Queue<LookupKvStateLocation> received = new LinkedBlockingQueue<>();
-
-		AkkaKvStateLocationLookupService lookupService = new AkkaKvStateLocationLookupService(
-				leaderRetrievalService,
-				testActorSystem,
-				TIMEOUT,
-				new AkkaKvStateLocationLookupService.DisabledLookupRetryStrategyFactory());
-
-		lookupService.start();
-
-		// Create test actors with random leader session IDs
-		KvStateLocation expected1 = new KvStateLocation(new JobID(), new JobVertexID(), 8282, "salt");
-		UUID leaderSessionId1 = UUID.randomUUID();
-		ActorRef testActor1 = LookupResponseActor.create(received, leaderSessionId1, expected1);
-		String testActorAddress1 = AkkaUtils.getAkkaURL(testActorSystem, testActor1);
-
-		KvStateLocation expected2 = new KvStateLocation(new JobID(), new JobVertexID(), 22321, "pepper");
-		UUID leaderSessionId2 = UUID.randomUUID();
-		ActorRef testActor2 = LookupResponseActor.create(received, leaderSessionId1, expected2);
-		String testActorAddress2 = AkkaUtils.getAkkaURL(testActorSystem, testActor2);
-
-		JobID jobId = new JobID();
-
-		//
-		// Notify about first leader
-		//
-		leaderRetrievalService.notifyListener(testActorAddress1, leaderSessionId1);
-
-		KvStateLocation location = Await.result(lookupService.getKvStateLookupInfo(jobId, "rock"), TIMEOUT);
-		assertEquals(expected1, location);
-
-		assertEquals(1, received.size());
-		verifyLookupMsg(received.poll(), jobId, "rock");
-
-		//
-		// Notify about second leader
-		//
-		leaderRetrievalService.notifyListener(testActorAddress2, leaderSessionId2);
-
-		location = Await.result(lookupService.getKvStateLookupInfo(jobId, "roll"), TIMEOUT);
-		assertEquals(expected2, location);
-
-		assertEquals(1, received.size());
-		verifyLookupMsg(received.poll(), jobId, "roll");
-	}
-
-	/**
-	 * Tests that lookups are retried when no leader notification is available.
-	 */
-	@Test
-	public void testRetryOnUnknownJobManager() throws Exception {
-		final Queue<LookupRetryStrategy> retryStrategies = new LinkedBlockingQueue<>();
-
-		LookupRetryStrategyFactory retryStrategy =
-				new LookupRetryStrategyFactory() {
-					@Override
-					public LookupRetryStrategy createRetryStrategy() {
-						return retryStrategies.poll();
-					}
-				};
-
-		final TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService(
-			null,
-			null);
-
-		AkkaKvStateLocationLookupService lookupService = new AkkaKvStateLocationLookupService(
-				leaderRetrievalService,
-				testActorSystem,
-				TIMEOUT,
-				retryStrategy);
-
-		lookupService.start();
-
-		//
-		// Test call to retry
-		//
-		final AtomicBoolean hasRetried = new AtomicBoolean();
-		retryStrategies.add(
-				new LookupRetryStrategy() {
-					@Override
-					public FiniteDuration getRetryDelay() {
-						return FiniteDuration.Zero();
-					}
-
-					@Override
-					public boolean tryRetry() {
-						if (hasRetried.compareAndSet(false, true)) {
-							return true;
-						}
-						return false;
-					}
-				});
-
-		Future<KvStateLocation> locationFuture = lookupService.getKvStateLookupInfo(new JobID(), "yessir");
-
-		Await.ready(locationFuture, TIMEOUT);
-		assertTrue("Did not retry ", hasRetried.get());
-
-		//
-		// Test leader notification after retry
-		//
-		Queue<LookupKvStateLocation> received = new LinkedBlockingQueue<>();
-
-		KvStateLocation expected = new KvStateLocation(new JobID(), new JobVertexID(), 12122, "garlic");
-		ActorRef testActor = LookupResponseActor.create(received, null, expected);
-		final String testActorAddress = AkkaUtils.getAkkaURL(testActorSystem, testActor);
-
-		retryStrategies.add(new LookupRetryStrategy() {
-			@Override
-			public FiniteDuration getRetryDelay() {
-				return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
-			}
-
-			@Override
-			public boolean tryRetry() {
-				leaderRetrievalService.notifyListener(testActorAddress, HighAvailabilityServices.DEFAULT_LEADER_ID);
-				return true;
-			}
-		});
-
-		KvStateLocation location = Await.result(lookupService.getKvStateLookupInfo(new JobID(), "yessir"), TIMEOUT);
-		assertEquals(expected, location);
-	}
-
-	@Test
-	public void testUnexpectedResponseType() throws Exception {
-		TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService(
-			"localhost",
-			HighAvailabilityServices.DEFAULT_LEADER_ID);
-		Queue<LookupKvStateLocation> received = new LinkedBlockingQueue<>();
-
-		AkkaKvStateLocationLookupService lookupService = new AkkaKvStateLocationLookupService(
-				leaderRetrievalService,
-				testActorSystem,
-				TIMEOUT,
-				new AkkaKvStateLocationLookupService.DisabledLookupRetryStrategyFactory());
-
-		lookupService.start();
-
-		// Create test actors with random leader session IDs
-		String expected = "unexpected-response-type";
-		ActorRef testActor = LookupResponseActor.create(received, null, expected);
-		String testActorAddress = AkkaUtils.getAkkaURL(testActorSystem, testActor);
-
-		leaderRetrievalService.notifyListener(testActorAddress, null);
-
-		try {
-			Await.result(lookupService.getKvStateLookupInfo(new JobID(), "spicy"), TIMEOUT);
-			fail("Did not throw expected Exception");
-		} catch (Throwable ignored) {
-			// Expected
-		}
-	}
-
-	private static final class LookupResponseActor extends FlinkUntypedActor {
-
-		/** Received lookup messages. */
-		private final Queue<LookupKvStateLocation> receivedLookups;
-
-		/** Responses on KvStateMessage.LookupKvStateLocation messages. */
-		private final Queue<Object> lookupResponses;
-
-		/** The leader session ID. */
-		private UUID leaderSessionId;
-
-		public LookupResponseActor(
-				Queue<LookupKvStateLocation> receivedLookups,
-				UUID leaderSessionId, Object... lookupResponses) {
-
-			this.receivedLookups = Preconditions.checkNotNull(receivedLookups, "Received lookups");
-			this.leaderSessionId = leaderSessionId;
-			this.lookupResponses = new ArrayDeque<>();
-
-			if (lookupResponses != null) {
-				for (Object resp : lookupResponses) {
-					this.lookupResponses.add(resp);
-				}
-			}
-		}
-
-		@Override
-		public void handleMessage(Object message) throws Exception {
-			if (message instanceof LookupKvStateLocation) {
-				// Add to received lookups queue
-				receivedLookups.add((LookupKvStateLocation) message);
-
-				Object msg = lookupResponses.poll();
-				if (msg != null) {
-					if (msg instanceof Throwable) {
-						sender().tell(new Status.Failure((Throwable) msg), self());
-					} else {
-						sender().tell(new Status.Success(msg), self());
-					}
-				}
-			} else if (message instanceof UUID) {
-				this.leaderSessionId = (UUID) message;
-			} else {
-				LOG.debug("Received unhandled message: {}", message);
-			}
-		}
-
-		@Override
-		protected UUID getLeaderSessionID() {
-			return leaderSessionId;
-		}
-
-		private static ActorRef create(
-				Queue<LookupKvStateLocation> receivedLookups,
-				UUID leaderSessionId,
-				Object... lookupResponses) {
-
-			return testActorSystem.actorOf(Props.create(
-					LookupResponseActor.class,
-					receivedLookups,
-					leaderSessionId,
-					lookupResponses));
-		}
-	}
-
-	private static void verifyLookupMsg(
-			LookupKvStateLocation lookUpMsg,
-			JobID expectedJobId,
-			String expectedName) {
-
-		assertNotNull(lookUpMsg);
-		assertEquals(expectedJobId, lookUpMsg.getJobId());
-		assertEquals(expectedName, lookUpMsg.getRegistrationName());
-	}
-
-}


Mime
View raw message