flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kklou...@apache.org
Subject [10/14] flink git commit: [FLINK-7770][QS] Hide the queryable state behind a proxy.
Date Wed, 11 Oct 2017 15:46:10 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientTest.java
deleted file mode 100644
index a2850b3..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientTest.java
+++ /dev/null
@@ -1,752 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.network;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.queryablestate.client.KvStateClient;
-import org.apache.flink.queryablestate.messages.KvStateRequest;
-import org.apache.flink.queryablestate.network.messages.MessageSerializer;
-import org.apache.flink.queryablestate.network.messages.MessageType;
-import org.apache.flink.queryablestate.server.KvStateServerImpl;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
-import org.apache.flink.runtime.query.KvStateID;
-import org.apache.flink.runtime.query.KvStateRegistry;
-import org.apache.flink.runtime.query.KvStateServer;
-import org.apache.flink.runtime.query.KvStateServerAddress;
-import org.apache.flink.runtime.query.netty.AtomicKvStateRequestStats;
-import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
-import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.runtime.state.internal.InternalKvState;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.apache.flink.util.NetUtils;
-
-import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
-import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
-import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
-import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
-import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-
-import org.junit.AfterClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.ConnectException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
-import java.nio.channels.ClosedChannelException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * Tests for {@link KvStateClient}.
- */
-public class KvStateClientTest {
-
-	private static final Logger LOG = LoggerFactory.getLogger(KvStateClientTest.class);
-
-	// Thread pool for client bootstrap (shared between tests)
-	private static final NioEventLoopGroup NIO_GROUP = new NioEventLoopGroup();
-
-	private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(100, TimeUnit.SECONDS);
-
-	@AfterClass
-	public static void tearDown() throws Exception {
-		if (NIO_GROUP != null) {
-			NIO_GROUP.shutdownGracefully();
-		}
-	}
-
-	/**
-	 * Tests simple queries, of which half succeed and half fail.
-	 */
-	@Test
-	public void testSimpleRequests() throws Exception {
-		Deadline deadline = TEST_TIMEOUT.fromNow();
-		AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
-
-		KvStateClient client = null;
-		Channel serverChannel = null;
-
-		try {
-			client = new KvStateClient(1, stats);
-
-			// Random result
-			final byte[] expected = new byte[1024];
-			ThreadLocalRandom.current().nextBytes(expected);
-
-			final LinkedBlockingQueue<ByteBuf> received = new LinkedBlockingQueue<>();
-			final AtomicReference<Channel> channel = new AtomicReference<>();
-
-			serverChannel = createServerChannel(new ChannelInboundHandlerAdapter() {
-				@Override
-				public void channelActive(ChannelHandlerContext ctx) throws Exception {
-					channel.set(ctx.channel());
-				}
-
-				@Override
-				public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-					received.add((ByteBuf) msg);
-				}
-			});
-
-			KvStateServerAddress serverAddress = getKvStateServerAddress(serverChannel);
-
-			List<Future<byte[]>> futures = new ArrayList<>();
-
-			int numQueries = 1024;
-
-			for (int i = 0; i < numQueries; i++) {
-				futures.add(client.getKvState(serverAddress, new KvStateID(), new byte[0]));
-			}
-
-			// Respond to messages
-			Exception testException = new RuntimeException("Expected test Exception");
-
-			for (int i = 0; i < numQueries; i++) {
-				ByteBuf buf = received.poll(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-				assertNotNull("Receive timed out", buf);
-
-				Channel ch = channel.get();
-				assertNotNull("Channel not active", ch);
-
-				assertEquals(MessageType.REQUEST, MessageSerializer.deserializeHeader(buf));
-				KvStateRequest request = MessageSerializer.deserializeKvStateRequest(buf);
-
-				buf.release();
-
-				if (i % 2 == 0) {
-					ByteBuf response = MessageSerializer.serializeKvStateRequestResult(
-							serverChannel.alloc(),
-							request.getRequestId(),
-							expected);
-
-					ch.writeAndFlush(response);
-				} else {
-					ByteBuf response = MessageSerializer.serializeKvStateRequestFailure(
-							serverChannel.alloc(),
-							request.getRequestId(),
-							testException);
-
-					ch.writeAndFlush(response);
-				}
-			}
-
-			for (int i = 0; i < numQueries; i++) {
-				if (i % 2 == 0) {
-					byte[] serializedResult = Await.result(futures.get(i), deadline.timeLeft());
-					assertArrayEquals(expected, serializedResult);
-				} else {
-					try {
-						Await.result(futures.get(i), deadline.timeLeft());
-						fail("Did not throw expected Exception");
-					} catch (RuntimeException ignored) {
-						// Expected
-					}
-				}
-			}
-
-			assertEquals(numQueries, stats.getNumRequests());
-			int expectedRequests = numQueries / 2;
-
-			// Counts can take some time to propagate
-			while (deadline.hasTimeLeft() && (stats.getNumSuccessful() != expectedRequests ||
-					stats.getNumFailed() != expectedRequests)) {
-				Thread.sleep(100);
-			}
-
-			assertEquals(expectedRequests, stats.getNumSuccessful());
-			assertEquals(expectedRequests, stats.getNumFailed());
-		} finally {
-			if (client != null) {
-				client.shutDown();
-			}
-
-			if (serverChannel != null) {
-				serverChannel.close();
-			}
-
-			assertEquals("Channel leak", 0, stats.getNumConnections());
-		}
-	}
-
-	/**
-	 * Tests that a request to an unavailable host is failed with ConnectException.
-	 */
-	@Test
-	public void testRequestUnavailableHost() throws Exception {
-		Deadline deadline = TEST_TIMEOUT.fromNow();
-		AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
-		KvStateClient client = null;
-
-		try {
-			client = new KvStateClient(1, stats);
-
-			int availablePort = NetUtils.getAvailablePort();
-
-			KvStateServerAddress serverAddress = new KvStateServerAddress(
-					InetAddress.getLocalHost(),
-					availablePort);
-
-			Future<byte[]> future = client.getKvState(serverAddress, new KvStateID(), new byte[0]);
-
-			try {
-				Await.result(future, deadline.timeLeft());
-				fail("Did not throw expected ConnectException");
-			} catch (ConnectException ignored) {
-				// Expected
-			}
-		} finally {
-			if (client != null) {
-				client.shutDown();
-			}
-
-			assertEquals("Channel leak", 0, stats.getNumConnections());
-		}
-	}
-
-	/**
-	 * Multiple threads concurrently fire queries.
-	 */
-	@Test
-	public void testConcurrentQueries() throws Exception {
-		Deadline deadline = TEST_TIMEOUT.fromNow();
-		AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
-
-		ExecutorService executor = null;
-		KvStateClient client = null;
-		Channel serverChannel = null;
-
-		final byte[] serializedResult = new byte[1024];
-		ThreadLocalRandom.current().nextBytes(serializedResult);
-
-		try {
-			int numQueryTasks = 4;
-			final int numQueriesPerTask = 1024;
-
-			executor = Executors.newFixedThreadPool(numQueryTasks);
-
-			client = new KvStateClient(1, stats);
-
-			serverChannel = createServerChannel(new ChannelInboundHandlerAdapter() {
-				@Override
-				public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-					ByteBuf buf = (ByteBuf) msg;
-					assertEquals(MessageType.REQUEST, MessageSerializer.deserializeHeader(buf));
-					KvStateRequest request = MessageSerializer.deserializeKvStateRequest(buf);
-
-					buf.release();
-
-					ByteBuf response = MessageSerializer.serializeKvStateRequestResult(
-							ctx.alloc(),
-							request.getRequestId(),
-							serializedResult);
-
-					ctx.channel().writeAndFlush(response);
-				}
-			});
-
-			final KvStateServerAddress serverAddress = getKvStateServerAddress(serverChannel);
-
-			final KvStateClient finalClient = client;
-			Callable<List<Future<byte[]>>> queryTask = new Callable<List<Future<byte[]>>>() {
-				@Override
-				public List<Future<byte[]>> call() throws Exception {
-					List<Future<byte[]>> results = new ArrayList<>(numQueriesPerTask);
-
-					for (int i = 0; i < numQueriesPerTask; i++) {
-						results.add(finalClient.getKvState(
-								serverAddress,
-								new KvStateID(),
-								new byte[0]));
-					}
-
-					return results;
-				}
-			};
-
-			// Submit query tasks
-			List<java.util.concurrent.Future<List<Future<byte[]>>>> futures = new ArrayList<>();
-			for (int i = 0; i < numQueryTasks; i++) {
-				futures.add(executor.submit(queryTask));
-			}
-
-			// Verify results
-			for (java.util.concurrent.Future<List<Future<byte[]>>> future : futures) {
-				List<Future<byte[]>> results = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-				for (Future<byte[]> result : results) {
-					byte[] actual = Await.result(result, deadline.timeLeft());
-					assertArrayEquals(serializedResult, actual);
-				}
-			}
-
-			int totalQueries = numQueryTasks * numQueriesPerTask;
-
-			// Counts can take some time to propagate
-			while (deadline.hasTimeLeft() && stats.getNumSuccessful() != totalQueries) {
-				Thread.sleep(100);
-			}
-
-			assertEquals(totalQueries, stats.getNumRequests());
-			assertEquals(totalQueries, stats.getNumSuccessful());
-		} finally {
-			if (executor != null) {
-				executor.shutdown();
-			}
-
-			if (serverChannel != null) {
-				serverChannel.close();
-			}
-
-			if (client != null) {
-				client.shutDown();
-			}
-
-			assertEquals("Channel leak", 0, stats.getNumConnections());
-		}
-	}
-
-	/**
-	 * Tests that a server failure closes the connection and removes it from
-	 * the established connections.
-	 */
-	@Test
-	public void testFailureClosesChannel() throws Exception {
-		Deadline deadline = TEST_TIMEOUT.fromNow();
-		AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
-
-		KvStateClient client = null;
-		Channel serverChannel = null;
-
-		try {
-			client = new KvStateClient(1, stats);
-
-			final LinkedBlockingQueue<ByteBuf> received = new LinkedBlockingQueue<>();
-			final AtomicReference<Channel> channel = new AtomicReference<>();
-
-			serverChannel = createServerChannel(new ChannelInboundHandlerAdapter() {
-				@Override
-				public void channelActive(ChannelHandlerContext ctx) throws Exception {
-					channel.set(ctx.channel());
-				}
-
-				@Override
-				public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-					received.add((ByteBuf) msg);
-				}
-			});
-
-			KvStateServerAddress serverAddress = getKvStateServerAddress(serverChannel);
-
-			// Requests
-			List<Future<byte[]>> futures = new ArrayList<>();
-			futures.add(client.getKvState(serverAddress, new KvStateID(), new byte[0]));
-			futures.add(client.getKvState(serverAddress, new KvStateID(), new byte[0]));
-
-			ByteBuf buf = received.poll(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-			assertNotNull("Receive timed out", buf);
-			buf.release();
-
-			buf = received.poll(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-			assertNotNull("Receive timed out", buf);
-			buf.release();
-
-			assertEquals(1, stats.getNumConnections());
-
-			Channel ch = channel.get();
-			assertNotNull("Channel not active", ch);
-
-			// Respond with failure
-			ch.writeAndFlush(MessageSerializer.serializeServerFailure(
-					serverChannel.alloc(),
-					new RuntimeException("Expected test server failure")));
-
-			try {
-				Await.result(futures.remove(0), deadline.timeLeft());
-				fail("Did not throw expected server failure");
-			} catch (RuntimeException ignored) {
-				// Expected
-			}
-
-			try {
-				Await.result(futures.remove(0), deadline.timeLeft());
-				fail("Did not throw expected server failure");
-			} catch (RuntimeException ignored) {
-				// Expected
-			}
-
-			assertEquals(0, stats.getNumConnections());
-
-			// Counts can take some time to propagate
-			while (deadline.hasTimeLeft() && (stats.getNumSuccessful() != 0 ||
-					stats.getNumFailed() != 2)) {
-				Thread.sleep(100);
-			}
-
-			assertEquals(2, stats.getNumRequests());
-			assertEquals(0, stats.getNumSuccessful());
-			assertEquals(2, stats.getNumFailed());
-		} finally {
-			if (client != null) {
-				client.shutDown();
-			}
-
-			if (serverChannel != null) {
-				serverChannel.close();
-			}
-
-			assertEquals("Channel leak", 0, stats.getNumConnections());
-		}
-	}
-
-	/**
-	 * Tests that a server channel close, closes the connection and removes it
-	 * from the established connections.
-	 */
-	@Test
-	public void testServerClosesChannel() throws Exception {
-		Deadline deadline = TEST_TIMEOUT.fromNow();
-		AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
-
-		KvStateClient client = null;
-		Channel serverChannel = null;
-
-		try {
-			client = new KvStateClient(1, stats);
-
-			final AtomicBoolean received = new AtomicBoolean();
-			final AtomicReference<Channel> channel = new AtomicReference<>();
-
-			serverChannel = createServerChannel(new ChannelInboundHandlerAdapter() {
-				@Override
-				public void channelActive(ChannelHandlerContext ctx) throws Exception {
-					channel.set(ctx.channel());
-				}
-
-				@Override
-				public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-					received.set(true);
-				}
-			});
-
-			KvStateServerAddress serverAddress = getKvStateServerAddress(serverChannel);
-
-			// Requests
-			Future<byte[]> future = client.getKvState(serverAddress, new KvStateID(), new byte[0]);
-
-			while (!received.get() && deadline.hasTimeLeft()) {
-				Thread.sleep(50);
-			}
-			assertTrue("Receive timed out", received.get());
-
-			assertEquals(1, stats.getNumConnections());
-
-			channel.get().close().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-
-			try {
-				Await.result(future, deadline.timeLeft());
-				fail("Did not throw expected server failure");
-			} catch (ClosedChannelException ignored) {
-				// Expected
-			}
-
-			assertEquals(0, stats.getNumConnections());
-
-			// Counts can take some time to propagate
-			while (deadline.hasTimeLeft() && (stats.getNumSuccessful() != 0 ||
-					stats.getNumFailed() != 1)) {
-				Thread.sleep(100);
-			}
-
-			assertEquals(1, stats.getNumRequests());
-			assertEquals(0, stats.getNumSuccessful());
-			assertEquals(1, stats.getNumFailed());
-		} finally {
-			if (client != null) {
-				client.shutDown();
-			}
-
-			if (serverChannel != null) {
-				serverChannel.close();
-			}
-
-			assertEquals("Channel leak", 0, stats.getNumConnections());
-		}
-	}
-
-	/**
-	 * Tests multiple clients querying multiple servers until 100k queries have
-	 * been processed. At this point, the client is shut down and its verified
-	 * that all ongoing requests are failed.
-	 */
-	@Test
-	public void testClientServerIntegration() throws Exception {
-		// Config
-		final int numServers = 2;
-		final int numServerEventLoopThreads = 2;
-		final int numServerQueryThreads = 2;
-
-		final int numClientEventLoopThreads = 4;
-		final int numClientsTasks = 8;
-
-		final int batchSize = 16;
-
-		final int numKeyGroups = 1;
-
-		AbstractStateBackend abstractBackend = new MemoryStateBackend();
-		KvStateRegistry dummyRegistry = new KvStateRegistry();
-		DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
-		dummyEnv.setKvStateRegistry(dummyRegistry);
-
-		AbstractKeyedStateBackend<Integer> backend = abstractBackend.createKeyedStateBackend(
-				dummyEnv,
-				new JobID(),
-				"test_op",
-				IntSerializer.INSTANCE,
-				numKeyGroups,
-				new KeyGroupRange(0, 0),
-				dummyRegistry.createTaskRegistry(new JobID(), new JobVertexID()));
-
-		final FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);
-
-		AtomicKvStateRequestStats clientStats = new AtomicKvStateRequestStats();
-
-		KvStateClient client = null;
-		ExecutorService clientTaskExecutor = null;
-		final KvStateServer[] server = new KvStateServer[numServers];
-
-		try {
-			client = new KvStateClient(numClientEventLoopThreads, clientStats);
-			clientTaskExecutor = Executors.newFixedThreadPool(numClientsTasks);
-
-			// Create state
-			ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
-			desc.setQueryable("any");
-
-			// Create servers
-			KvStateRegistry[] registry = new KvStateRegistry[numServers];
-			AtomicKvStateRequestStats[] serverStats = new AtomicKvStateRequestStats[numServers];
-			final KvStateID[] ids = new KvStateID[numServers];
-
-			for (int i = 0; i < numServers; i++) {
-				registry[i] = new KvStateRegistry();
-				serverStats[i] = new AtomicKvStateRequestStats();
-				server[i] = new KvStateServerImpl(
-						InetAddress.getLocalHost(),
-						0,
-						numServerEventLoopThreads,
-						numServerQueryThreads,
-						registry[i],
-						serverStats[i]);
-
-				server[i].start();
-
-				backend.setCurrentKey(1010 + i);
-
-				// Value per server
-				ValueState<Integer> state = backend.getPartitionedState(VoidNamespace.INSTANCE,
-						VoidNamespaceSerializer.INSTANCE,
-						desc);
-
-				state.update(201 + i);
-
-				// we know it must be a KvStat but this is not exposed to the user via State
-				InternalKvState<?> kvState = (InternalKvState<?>) state;
-
-				// Register KvState (one state instance for all server)
-				ids[i] = registry[i].registerKvState(new JobID(), new JobVertexID(), new KeyGroupRange(0, 0), "any", kvState);
-			}
-
-			final KvStateClient finalClient = client;
-			Callable<Void> queryTask = new Callable<Void>() {
-				@Override
-				public Void call() throws Exception {
-					while (true) {
-						if (Thread.interrupted()) {
-							throw new InterruptedException();
-						}
-
-						// Random server permutation
-						List<Integer> random = new ArrayList<>();
-						for (int j = 0; j < batchSize; j++) {
-							random.add(j);
-						}
-						Collections.shuffle(random);
-
-						// Dispatch queries
-						List<Future<byte[]>> futures = new ArrayList<>(batchSize);
-
-						for (int j = 0; j < batchSize; j++) {
-							int targetServer = random.get(j) % numServers;
-
-							byte[] serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace(
-									1010 + targetServer,
-									IntSerializer.INSTANCE,
-									VoidNamespace.INSTANCE,
-									VoidNamespaceSerializer.INSTANCE);
-
-							futures.add(finalClient.getKvState(
-									server[targetServer].getAddress(),
-									ids[targetServer],
-									serializedKeyAndNamespace));
-						}
-
-						// Verify results
-						for (int j = 0; j < batchSize; j++) {
-							int targetServer = random.get(j) % numServers;
-
-							Future<byte[]> future = futures.get(j);
-							byte[] buf = Await.result(future, timeout);
-							int value = KvStateSerializer.deserializeValue(buf, IntSerializer.INSTANCE);
-							assertEquals(201 + targetServer, value);
-						}
-					}
-				}
-			};
-
-			// Submit tasks
-			List<java.util.concurrent.Future<Void>> taskFutures = new ArrayList<>();
-			for (int i = 0; i < numClientsTasks; i++) {
-				taskFutures.add(clientTaskExecutor.submit(queryTask));
-			}
-
-			long numRequests;
-			while ((numRequests = clientStats.getNumRequests()) < 100_000) {
-				Thread.sleep(100);
-				LOG.info("Number of requests {}/100_000", numRequests);
-			}
-
-			// Shut down
-			client.shutDown();
-
-			for (java.util.concurrent.Future<Void> future : taskFutures) {
-				try {
-					future.get();
-					fail("Did not throw expected Exception after shut down");
-				} catch (ExecutionException t) {
-					if (t.getCause() instanceof ClosedChannelException ||
-							t.getCause() instanceof IllegalStateException) {
-						// Expected
-					} else {
-						t.printStackTrace();
-						fail("Failed with unexpected Exception type: " + t.getClass().getName());
-					}
-				}
-			}
-
-			assertEquals("Connection leak (client)", 0, clientStats.getNumConnections());
-			for (int i = 0; i < numServers; i++) {
-				boolean success = false;
-				int numRetries = 0;
-				while (!success) {
-					try {
-						assertEquals("Connection leak (server)", 0, serverStats[i].getNumConnections());
-						success = true;
-					} catch (Throwable t) {
-						if (numRetries < 10) {
-							LOG.info("Retrying connection leak check (server)");
-							Thread.sleep((numRetries + 1) * 50);
-							numRetries++;
-						} else {
-							throw t;
-						}
-					}
-				}
-			}
-		} finally {
-			if (client != null) {
-				client.shutDown();
-			}
-
-			for (int i = 0; i < numServers; i++) {
-				if (server[i] != null) {
-					server[i].shutDown();
-				}
-			}
-
-			if (clientTaskExecutor != null) {
-				clientTaskExecutor.shutdown();
-			}
-		}
-	}
-
-	// ------------------------------------------------------------------------
-
-	private Channel createServerChannel(final ChannelHandler... handlers) throws UnknownHostException, InterruptedException {
-		ServerBootstrap bootstrap = new ServerBootstrap()
-				// Bind address and port
-				.localAddress(InetAddress.getLocalHost(), 0)
-				// NIO server channels
-				.group(NIO_GROUP)
-				.channel(NioServerSocketChannel.class)
-				// See initializer for pipeline details
-				.childHandler(new ChannelInitializer<SocketChannel>() {
-					@Override
-					protected void initChannel(SocketChannel ch) throws Exception {
-						ch.pipeline()
-								.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))
-								.addLast(handlers);
-					}
-				});
-
-		return bootstrap.bind().sync().channel();
-	}
-
-	private KvStateServerAddress getKvStateServerAddress(Channel serverChannel) {
-		InetSocketAddress localAddress = (InetSocketAddress) serverChannel.localAddress();
-
-		return new KvStateServerAddress(localAddress.getAddress(), localAddress.getPort());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
deleted file mode 100644
index f28ca68..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.network;
-
-import org.apache.flink.queryablestate.messages.KvStateRequest;
-import org.apache.flink.queryablestate.messages.KvStateRequestFailure;
-import org.apache.flink.queryablestate.messages.KvStateRequestResult;
-import org.apache.flink.queryablestate.network.messages.MessageSerializer;
-import org.apache.flink.queryablestate.network.messages.MessageType;
-import org.apache.flink.runtime.query.KvStateID;
-import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
-
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
-import org.apache.flink.shaded.netty4.io.netty.buffer.UnpooledByteBufAllocator;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.concurrent.ThreadLocalRandom;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-
-/**
- * Tests for {@link KvStateSerializer}.
- */
-@RunWith(Parameterized.class)
-public class KvStateRequestSerializerTest {
-
-	private final ByteBufAllocator alloc = UnpooledByteBufAllocator.DEFAULT;
-
-	@Parameterized.Parameters
-	public static Collection<Boolean> parameters() {
-		return Arrays.asList(false, true);
-	}
-
-	@Parameterized.Parameter
-	public boolean async;
-
-	/**
-	 * Tests KvState request serialization.
-	 */
-	@Test
-	public void testKvStateRequestSerialization() throws Exception {
-		long requestId = Integer.MAX_VALUE + 1337L;
-		KvStateID kvStateId = new KvStateID();
-		byte[] serializedKeyAndNamespace = randomByteArray(1024);
-
-		ByteBuf buf = MessageSerializer.serializeKvStateRequest(
-				alloc,
-				requestId,
-				kvStateId,
-				serializedKeyAndNamespace);
-
-		int frameLength = buf.readInt();
-		assertEquals(MessageType.REQUEST, MessageSerializer.deserializeHeader(buf));
-		KvStateRequest request = MessageSerializer.deserializeKvStateRequest(buf);
-		assertEquals(buf.readerIndex(), frameLength + 4);
-
-		assertEquals(requestId, request.getRequestId());
-		assertEquals(kvStateId, request.getKvStateId());
-		assertArrayEquals(serializedKeyAndNamespace, request.getSerializedKeyAndNamespace());
-	}
-
-	/**
-	 * Tests KvState request serialization with zero-length serialized key and namespace.
-	 */
-	@Test
-	public void testKvStateRequestSerializationWithZeroLengthKeyAndNamespace() throws Exception {
-		byte[] serializedKeyAndNamespace = new byte[0];
-
-		ByteBuf buf = MessageSerializer.serializeKvStateRequest(
-				alloc,
-				1823,
-				new KvStateID(),
-				serializedKeyAndNamespace);
-
-		int frameLength = buf.readInt();
-		assertEquals(MessageType.REQUEST, MessageSerializer.deserializeHeader(buf));
-		KvStateRequest request = MessageSerializer.deserializeKvStateRequest(buf);
-		assertEquals(buf.readerIndex(), frameLength + 4);
-
-		assertArrayEquals(serializedKeyAndNamespace, request.getSerializedKeyAndNamespace());
-	}
-
-	/**
-	 * Tests that we don't try to be smart about <code>null</code> key and namespace.
-	 * They should be treated explicitly.
-	 */
-	@Test(expected = NullPointerException.class)
-	public void testNullPointerExceptionOnNullSerializedKeyAndNamepsace() throws Exception {
-		new KvStateRequest(0, new KvStateID(), null);
-	}
-
-	/**
-	 * Tests KvState request result serialization.
-	 */
-	@Test
-	public void testKvStateRequestResultSerialization() throws Exception {
-		long requestId = Integer.MAX_VALUE + 72727278L;
-		byte[] serializedResult = randomByteArray(1024);
-
-		ByteBuf buf = MessageSerializer.serializeKvStateRequestResult(
-				alloc,
-				requestId,
-				serializedResult);
-
-		int frameLength = buf.readInt();
-		assertEquals(MessageType.REQUEST_RESULT, MessageSerializer.deserializeHeader(buf));
-		KvStateRequestResult request = MessageSerializer.deserializeKvStateRequestResult(buf);
-		assertEquals(buf.readerIndex(), frameLength + 4);
-
-		assertEquals(requestId, request.getRequestId());
-
-		assertArrayEquals(serializedResult, request.getSerializedResult());
-	}
-
-	/**
-	 * Tests KvState request result serialization with zero-length serialized result.
-	 */
-	@Test
-	public void testKvStateRequestResultSerializationWithZeroLengthSerializedResult() throws Exception {
-		byte[] serializedResult = new byte[0];
-
-		ByteBuf buf = MessageSerializer.serializeKvStateRequestResult(
-				alloc,
-				72727278,
-				serializedResult);
-
-		int frameLength = buf.readInt();
-
-		assertEquals(MessageType.REQUEST_RESULT, MessageSerializer.deserializeHeader(buf));
-		KvStateRequestResult request = MessageSerializer.deserializeKvStateRequestResult(buf);
-		assertEquals(buf.readerIndex(), frameLength + 4);
-
-		assertArrayEquals(serializedResult, request.getSerializedResult());
-	}
-
-	/**
-	 * Tests that we don't try to be smart about <code>null</code> results.
-	 * They should be treated explicitly.
-	 */
-	@Test(expected = NullPointerException.class)
-	public void testNullPointerExceptionOnNullSerializedResult() throws Exception {
-		new KvStateRequestResult(0, null);
-	}
-
-	/**
-	 * Tests KvState request failure serialization.
-	 */
-	@Test
-	public void testKvStateRequestFailureSerialization() throws Exception {
-		long requestId = Integer.MAX_VALUE + 1111222L;
-		IllegalStateException cause = new IllegalStateException("Expected test");
-
-		ByteBuf buf = MessageSerializer.serializeKvStateRequestFailure(
-				alloc,
-				requestId,
-				cause);
-
-		int frameLength = buf.readInt();
-		assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf));
-		KvStateRequestFailure request = MessageSerializer.deserializeKvStateRequestFailure(buf);
-		assertEquals(buf.readerIndex(), frameLength + 4);
-
-		assertEquals(requestId, request.getRequestId());
-		assertEquals(cause.getClass(), request.getCause().getClass());
-		assertEquals(cause.getMessage(), request.getCause().getMessage());
-	}
-
-	/**
-	 * Tests KvState server failure serialization.
-	 */
-	@Test
-	public void testServerFailureSerialization() throws Exception {
-		IllegalStateException cause = new IllegalStateException("Expected test");
-
-		ByteBuf buf = MessageSerializer.serializeServerFailure(alloc, cause);
-
-		int frameLength = buf.readInt();
-		assertEquals(MessageType.SERVER_FAILURE, MessageSerializer.deserializeHeader(buf));
-		Throwable request = MessageSerializer.deserializeServerFailure(buf);
-		assertEquals(buf.readerIndex(), frameLength + 4);
-
-		assertEquals(cause.getClass(), request.getClass());
-		assertEquals(cause.getMessage(), request.getMessage());
-	}
-
-	private byte[] randomByteArray(int capacity) {
-		byte[] bytes = new byte[capacity];
-		ThreadLocalRandom.current().nextBytes(bytes);
-		return bytes;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
index c37c822..944349ee 100644
--- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
@@ -24,20 +24,22 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
-import org.apache.flink.queryablestate.UnknownKeyOrNamespace;
-import org.apache.flink.queryablestate.UnknownKvStateID;
-import org.apache.flink.queryablestate.messages.KvStateRequestFailure;
-import org.apache.flink.queryablestate.messages.KvStateRequestResult;
+import org.apache.flink.queryablestate.UnknownKeyOrNamespaceException;
+import org.apache.flink.queryablestate.UnknownKvStateIdException;
+import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
+import org.apache.flink.queryablestate.messages.KvStateResponse;
 import org.apache.flink.queryablestate.network.messages.MessageSerializer;
 import org.apache.flink.queryablestate.network.messages.MessageType;
-import org.apache.flink.queryablestate.server.ChunkedByteBuf;
+import org.apache.flink.queryablestate.network.messages.RequestFailure;
 import org.apache.flink.queryablestate.server.KvStateServerHandler;
+import org.apache.flink.queryablestate.server.KvStateServerImpl;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.query.KvStateID;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.KvStateRegistryListener;
 import org.apache.flink.runtime.query.netty.AtomicKvStateRequestStats;
+import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
 import org.apache.flink.runtime.query.netty.KvStateRequestStats;
 import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
@@ -57,10 +59,11 @@ import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 
 import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -76,16 +79,28 @@ import static org.mockito.Mockito.when;
  */
 public class KvStateServerHandlerTest extends TestLogger {
 
-	/** Shared Thread pool for query execution. */
-	private static final ExecutorService TEST_THREAD_POOL = Executors.newSingleThreadExecutor();
-
-	private static final int READ_TIMEOUT_MILLIS = 10000;
+	private static KvStateServerImpl testServer;
+
+	private static final long READ_TIMEOUT_MILLIS = 10000L;
+
+	@BeforeClass
+	public static void setup() {
+		try {
+			testServer = new KvStateServerImpl(
+					InetAddress.getLocalHost(),
+					0,
+					1,
+					1,
+					new KvStateRegistry(),
+					new DisabledKvStateRequestStats());
+		} catch (UnknownHostException e) {
+			e.printStackTrace();
+		}
+	}
 
 	@AfterClass
 	public static void tearDown() throws Exception {
-		if (TEST_THREAD_POOL != null) {
-			TEST_THREAD_POOL.shutdown();
-		}
+		testServer.shutdown();
 	}
 
 	/**
@@ -96,7 +111,10 @@ public class KvStateServerHandlerTest extends TestLogger {
 		KvStateRegistry registry = new KvStateRegistry();
 		AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
 
-		KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
+		MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
+				new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer());
+
+		KvStateServerHandler handler = new KvStateServerHandler(testServer, registry, serializer, stats);
 		EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler);
 
 		// Register state
@@ -141,40 +159,40 @@ public class KvStateServerHandlerTest extends TestLogger {
 
 		assertTrue(registryListener.registrationName.equals("vanilla"));
 
-		ByteBuf request = MessageSerializer.serializeKvStateRequest(
-				channel.alloc(),
-				requestId,
-				registryListener.kvStateId,
-				serializedKeyAndNamespace);
+		KvStateInternalRequest request = new KvStateInternalRequest(
+				registryListener.kvStateId, serializedKeyAndNamespace);
+
+		ByteBuf serRequest = MessageSerializer.serializeRequest(channel.alloc(), requestId, request);
 
 		// Write the request and wait for the response
-		channel.writeInbound(request);
+		channel.writeInbound(serRequest);
 
 		ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
 		buf.skipBytes(4); // skip frame length
 
 		// Verify the response
 		assertEquals(MessageType.REQUEST_RESULT, MessageSerializer.deserializeHeader(buf));
-		KvStateRequestResult response = MessageSerializer.deserializeKvStateRequestResult(buf);
+		long deserRequestId = MessageSerializer.getRequestId(buf);
+		KvStateResponse response = serializer.deserializeResponse(buf);
 
-		assertEquals(requestId, response.getRequestId());
+		assertEquals(requestId, deserRequestId);
 
-		int actualValue = KvStateSerializer.deserializeValue(response.getSerializedResult(), IntSerializer.INSTANCE);
+		int actualValue = KvStateSerializer.deserializeValue(response.getContent(), IntSerializer.INSTANCE);
 		assertEquals(expectedValue, actualValue);
 
 		assertEquals(stats.toString(), 1, stats.getNumRequests());
 
 		// Wait for async successful request report
 		long deadline = System.nanoTime() + TimeUnit.NANOSECONDS.convert(30, TimeUnit.SECONDS);
-		while (stats.getNumSuccessful() != 1 && System.nanoTime() <= deadline) {
-			Thread.sleep(10);
+		while (stats.getNumSuccessful() != 1L && System.nanoTime() <= deadline) {
+			Thread.sleep(10L);
 		}
 
-		assertEquals(stats.toString(), 1, stats.getNumSuccessful());
+		assertEquals(stats.toString(), 1L, stats.getNumSuccessful());
 	}
 
 	/**
-	 * Tests the failure response with {@link UnknownKvStateID} as cause on
+	 * Tests the failure response with {@link UnknownKvStateIdException} as cause on
 	 * queries for unregistered KvStateIDs.
 	 */
 	@Test
@@ -182,36 +200,38 @@ public class KvStateServerHandlerTest extends TestLogger {
 		KvStateRegistry registry = new KvStateRegistry();
 		AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
 
-		KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
+		MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
+				new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer());
+
+		KvStateServerHandler handler = new KvStateServerHandler(testServer, registry, serializer, stats);
 		EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler);
 
 		long requestId = Integer.MAX_VALUE + 182828L;
-		ByteBuf request = MessageSerializer.serializeKvStateRequest(
-				channel.alloc(),
-				requestId,
-				new KvStateID(),
-				new byte[0]);
+
+		KvStateInternalRequest request = new KvStateInternalRequest(new KvStateID(), new byte[0]);
+
+		ByteBuf serRequest = MessageSerializer.serializeRequest(channel.alloc(), requestId, request);
 
 		// Write the request and wait for the response
-		channel.writeInbound(request);
+		channel.writeInbound(serRequest);
 
 		ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
 		buf.skipBytes(4); // skip frame length
 
 		// Verify the response
 		assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf));
-		KvStateRequestFailure response = MessageSerializer.deserializeKvStateRequestFailure(buf);
+		RequestFailure response = MessageSerializer.deserializeRequestFailure(buf);
 
 		assertEquals(requestId, response.getRequestId());
 
-		assertTrue("Did not respond with expected failure cause", response.getCause() instanceof UnknownKvStateID);
+		assertTrue("Did not respond with expected failure cause", response.getCause() instanceof UnknownKvStateIdException);
 
-		assertEquals(1, stats.getNumRequests());
-		assertEquals(1, stats.getNumFailed());
+		assertEquals(1L, stats.getNumRequests());
+		assertEquals(1L, stats.getNumFailed());
 	}
 
 	/**
-	 * Tests the failure response with {@link UnknownKeyOrNamespace} as cause
+	 * Tests the failure response with {@link UnknownKeyOrNamespaceException} as cause
 	 * on queries for non-existing keys.
 	 */
 	@Test
@@ -219,7 +239,10 @@ public class KvStateServerHandlerTest extends TestLogger {
 		KvStateRegistry registry = new KvStateRegistry();
 		AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
 
-		KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
+		MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
+				new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer());
+
+		KvStateServerHandler handler = new KvStateServerHandler(testServer, registry, serializer, stats);
 		EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler);
 
 		int numKeyGroups = 1;
@@ -254,40 +277,39 @@ public class KvStateServerHandlerTest extends TestLogger {
 
 		assertTrue(registryListener.registrationName.equals("vanilla"));
 
-		ByteBuf request = MessageSerializer.serializeKvStateRequest(
-				channel.alloc(),
-				requestId,
-				registryListener.kvStateId,
-				serializedKeyAndNamespace);
+		KvStateInternalRequest request = new KvStateInternalRequest(registryListener.kvStateId, serializedKeyAndNamespace);
+		ByteBuf serRequest = MessageSerializer.serializeRequest(channel.alloc(), requestId, request);
 
 		// Write the request and wait for the response
-		channel.writeInbound(request);
+		channel.writeInbound(serRequest);
 
 		ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
 		buf.skipBytes(4); // skip frame length
 
 		// Verify the response
 		assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf));
-		KvStateRequestFailure response = MessageSerializer.deserializeKvStateRequestFailure(buf);
+		RequestFailure response = MessageSerializer.deserializeRequestFailure(buf);
 
 		assertEquals(requestId, response.getRequestId());
 
-		assertTrue("Did not respond with expected failure cause", response.getCause() instanceof UnknownKeyOrNamespace);
+		assertTrue("Did not respond with expected failure cause", response.getCause() instanceof UnknownKeyOrNamespaceException);
 
-		assertEquals(1, stats.getNumRequests());
-		assertEquals(1, stats.getNumFailed());
+		assertEquals(1L, stats.getNumRequests());
+		assertEquals(1L, stats.getNumFailed());
 	}
 
 	/**
-	 * Tests the failure response on a failure on the {@link InternalKvState#getSerializedValue(byte[])}
-	 * call.
+	 * Tests the failure response on a failure on the {@link InternalKvState#getSerializedValue(byte[])} call.
 	 */
 	@Test
 	public void testFailureOnGetSerializedValue() throws Exception {
 		KvStateRegistry registry = new KvStateRegistry();
 		AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
 
-		KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
+		MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
+				new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer());
+
+		KvStateServerHandler handler = new KvStateServerHandler(testServer, registry, serializer, stats);
 		EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler);
 
 		// Failing KvState
@@ -302,38 +324,37 @@ public class KvStateServerHandlerTest extends TestLogger {
 				"vanilla",
 				kvState);
 
-		ByteBuf request = MessageSerializer.serializeKvStateRequest(
-				channel.alloc(),
-				282872,
-				kvStateId,
-				new byte[0]);
+		KvStateInternalRequest request = new KvStateInternalRequest(kvStateId, new byte[0]);
+		ByteBuf serRequest = MessageSerializer.serializeRequest(channel.alloc(), 282872L, request);
 
 		// Write the request and wait for the response
-		channel.writeInbound(request);
+		channel.writeInbound(serRequest);
 
 		ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
 		buf.skipBytes(4); // skip frame length
 
 		// Verify the response
 		assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf));
-		KvStateRequestFailure response = MessageSerializer.deserializeKvStateRequestFailure(buf);
+		RequestFailure response = MessageSerializer.deserializeRequestFailure(buf);
 
 		assertTrue(response.getCause().getMessage().contains("Expected test Exception"));
 
-		assertEquals(1, stats.getNumRequests());
-		assertEquals(1, stats.getNumFailed());
+		assertEquals(1L, stats.getNumRequests());
+		assertEquals(1L, stats.getNumFailed());
 	}
 
 	/**
-	 * Tests that the channel is closed if an Exception reaches the channel
-	 * handler.
+	 * Tests that the channel is closed if an Exception reaches the channel handler.
 	 */
 	@Test
 	public void testCloseChannelOnExceptionCaught() throws Exception {
 		KvStateRegistry registry = new KvStateRegistry();
 		AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
 
-		KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
+		MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
+				new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer());
+
+		KvStateServerHandler handler = new KvStateServerHandler(testServer, registry, serializer, stats);
 		EmbeddedChannel channel = new EmbeddedChannel(handler);
 
 		channel.pipeline().fireExceptionCaught(new RuntimeException("Expected test Exception"));
@@ -352,19 +373,28 @@ public class KvStateServerHandlerTest extends TestLogger {
 	}
 
 	/**
-	 * Tests the failure response on a rejected execution, because the query
-	 * executor has been closed.
+	 * Tests the failure response on a rejected execution, because the query executor has been closed.
 	 */
 	@Test
 	public void testQueryExecutorShutDown() throws Exception {
 		KvStateRegistry registry = new KvStateRegistry();
 		AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
 
-		ExecutorService closedExecutor = Executors.newSingleThreadExecutor();
-		closedExecutor.shutdown();
-		assertTrue(closedExecutor.isShutdown());
+		KvStateServerImpl localTestServer = new KvStateServerImpl(
+				InetAddress.getLocalHost(),
+				0,
+				1,
+				1,
+				new KvStateRegistry(),
+				new DisabledKvStateRequestStats());
+
+		localTestServer.shutdown();
+		assertTrue(localTestServer.isExecutorShutdown());
 
-		KvStateServerHandler handler = new KvStateServerHandler(registry, closedExecutor, stats);
+		MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
+				new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer());
+
+		KvStateServerHandler handler = new KvStateServerHandler(localTestServer, registry, serializer, stats);
 		EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler);
 
 		int numKeyGroups = 1;
@@ -391,26 +421,25 @@ public class KvStateServerHandlerTest extends TestLogger {
 
 		assertTrue(registryListener.registrationName.equals("vanilla"));
 
-		ByteBuf request = MessageSerializer.serializeKvStateRequest(
-				channel.alloc(),
-				282872,
-				registryListener.kvStateId,
-				new byte[0]);
+		KvStateInternalRequest request = new KvStateInternalRequest(registryListener.kvStateId, new byte[0]);
+		ByteBuf serRequest = MessageSerializer.serializeRequest(channel.alloc(), 282872L, request);
 
 		// Write the request and wait for the response
-		channel.writeInbound(request);
+		channel.writeInbound(serRequest);
 
 		ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
 		buf.skipBytes(4); // skip frame length
 
 		// Verify the response
 		assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf));
-		KvStateRequestFailure response = MessageSerializer.deserializeKvStateRequestFailure(buf);
+		RequestFailure response = MessageSerializer.deserializeRequestFailure(buf);
 
 		assertTrue(response.getCause().getMessage().contains("RejectedExecutionException"));
 
-		assertEquals(1, stats.getNumRequests());
-		assertEquals(1, stats.getNumFailed());
+		assertEquals(1L, stats.getNumRequests());
+		assertEquals(1L, stats.getNumFailed());
+
+		localTestServer.shutdown();
 	}
 
 	/**
@@ -421,7 +450,10 @@ public class KvStateServerHandlerTest extends TestLogger {
 		KvStateRegistry registry = new KvStateRegistry();
 		AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
 
-		KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
+		MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
+				new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer());
+
+		KvStateServerHandler handler = new KvStateServerHandler(testServer, registry, serializer, stats);
 		EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler);
 
 		// Write the request and wait for the response
@@ -438,13 +470,11 @@ public class KvStateServerHandlerTest extends TestLogger {
 		assertEquals(MessageType.SERVER_FAILURE, MessageSerializer.deserializeHeader(buf));
 		Throwable response = MessageSerializer.deserializeServerFailure(buf);
 
-		assertEquals(0, stats.getNumRequests());
-		assertEquals(0, stats.getNumFailed());
+		assertEquals(0L, stats.getNumRequests());
+		assertEquals(0L, stats.getNumFailed());
 
-		unexpectedMessage = MessageSerializer.serializeKvStateRequestResult(
-				channel.alloc(),
-				192,
-				new byte[0]);
+		KvStateResponse stateResponse = new KvStateResponse(new byte[0]);
+		unexpectedMessage = MessageSerializer.serializeResponse(channel.alloc(), 192L, stateResponse);
 
 		channel.writeInbound(unexpectedMessage);
 
@@ -457,8 +487,8 @@ public class KvStateServerHandlerTest extends TestLogger {
 
 		assertTrue("Unexpected failure cause " + response.getClass().getName(), response instanceof IllegalArgumentException);
 
-		assertEquals(0, stats.getNumRequests());
-		assertEquals(0, stats.getNumFailed());
+		assertEquals(0L, stats.getNumRequests());
+		assertEquals(0L, stats.getNumFailed());
 	}
 
 	/**
@@ -469,30 +499,30 @@ public class KvStateServerHandlerTest extends TestLogger {
 		KvStateRegistry registry = new KvStateRegistry();
 		AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
 
-		KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
+		MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
+				new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer());
+
+		KvStateServerHandler handler = new KvStateServerHandler(testServer, registry, serializer, stats);
 		EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler);
 
-		ByteBuf request = MessageSerializer.serializeKvStateRequest(
-				channel.alloc(),
-				282872,
-				new KvStateID(),
-				new byte[0]);
+		KvStateInternalRequest request = new KvStateInternalRequest(new KvStateID(), new byte[0]);
+		ByteBuf serRequest = MessageSerializer.serializeRequest(channel.alloc(), 282872L, request);
 
-		assertEquals(1, request.refCnt());
+		assertEquals(1L, serRequest.refCnt());
 
 		// Write regular request
-		channel.writeInbound(request);
-		assertEquals("Buffer not recycled", 0, request.refCnt());
+		channel.writeInbound(serRequest);
+		assertEquals("Buffer not recycled", 0L, serRequest.refCnt());
 
 		// Write unexpected msg
 		ByteBuf unexpected = channel.alloc().buffer(8);
 		unexpected.writeInt(4);
 		unexpected.writeInt(4);
 
-		assertEquals(1, unexpected.refCnt());
+		assertEquals(1L, unexpected.refCnt());
 
 		channel.writeInbound(unexpected);
-		assertEquals("Buffer not recycled", 0, unexpected.refCnt());
+		assertEquals("Buffer not recycled", 0L, unexpected.refCnt());
 	}
 
 	/**
@@ -503,7 +533,10 @@ public class KvStateServerHandlerTest extends TestLogger {
 		KvStateRegistry registry = new KvStateRegistry();
 		AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
 
-		KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
+		MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
+				new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer());
+
+		KvStateServerHandler handler = new KvStateServerHandler(testServer, registry, serializer, stats);
 		EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler);
 
 		int numKeyGroups = 1;
@@ -550,45 +583,40 @@ public class KvStateServerHandlerTest extends TestLogger {
 				StringSerializer.INSTANCE);
 
 		assertTrue(registryListener.registrationName.equals("vanilla"));
-		ByteBuf request = MessageSerializer.serializeKvStateRequest(
-				channel.alloc(),
-				182828,
-				registryListener.kvStateId,
-				wrongKeyAndNamespace);
+
+		KvStateInternalRequest request = new KvStateInternalRequest(registryListener.kvStateId, wrongKeyAndNamespace);
+		ByteBuf serRequest = MessageSerializer.serializeRequest(channel.alloc(), 182828L, request);
 
 		// Write the request and wait for the response
-		channel.writeInbound(request);
+		channel.writeInbound(serRequest);
 
 		ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
 		buf.skipBytes(4); // skip frame length
 
 		// Verify the response
 		assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf));
-		KvStateRequestFailure response = MessageSerializer.deserializeKvStateRequestFailure(buf);
-		assertEquals(182828, response.getRequestId());
+		RequestFailure response = MessageSerializer.deserializeRequestFailure(buf);
+		assertEquals(182828L, response.getRequestId());
 		assertTrue(response.getCause().getMessage().contains("IOException"));
 
 		// Repeat with wrong namespace only
-		request = MessageSerializer.serializeKvStateRequest(
-				channel.alloc(),
-				182829,
-				registryListener.kvStateId,
-				wrongNamespace);
+		request = new KvStateInternalRequest(registryListener.kvStateId, wrongNamespace);
+		serRequest = MessageSerializer.serializeRequest(channel.alloc(), 182829L, request);
 
 		// Write the request and wait for the response
-		channel.writeInbound(request);
+		channel.writeInbound(serRequest);
 
 		buf = (ByteBuf) readInboundBlocking(channel);
 		buf.skipBytes(4); // skip frame length
 
 		// Verify the response
 		assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf));
-		response = MessageSerializer.deserializeKvStateRequestFailure(buf);
-		assertEquals(182829, response.getRequestId());
+		response = MessageSerializer.deserializeRequestFailure(buf);
+		assertEquals(182829L, response.getRequestId());
 		assertTrue(response.getCause().getMessage().contains("IOException"));
 
-		assertEquals(2, stats.getNumRequests());
-		assertEquals(2, stats.getNumFailed());
+		assertEquals(2L, stats.getNumRequests());
+		assertEquals(2L, stats.getNumFailed());
 	}
 
 	/**
@@ -599,7 +627,10 @@ public class KvStateServerHandlerTest extends TestLogger {
 		KvStateRegistry registry = new KvStateRegistry();
 		KvStateRequestStats stats = new AtomicKvStateRequestStats();
 
-		KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
+		MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
+				new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer());
+
+		KvStateServerHandler handler = new KvStateServerHandler(testServer, registry, serializer, stats);
 		EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler);
 
 		int numKeyGroups = 1;
@@ -650,14 +681,11 @@ public class KvStateServerHandlerTest extends TestLogger {
 
 		assertTrue(registryListener.registrationName.equals("vanilla"));
 
-		ByteBuf request = MessageSerializer.serializeKvStateRequest(
-				channel.alloc(),
-				requestId,
-				registryListener.kvStateId,
-				serializedKeyAndNamespace);
+		KvStateInternalRequest request = new KvStateInternalRequest(registryListener.kvStateId, serializedKeyAndNamespace);
+		ByteBuf serRequest = MessageSerializer.serializeRequest(channel.alloc(), requestId, request);
 
 		// Write the request and wait for the response
-		channel.writeInbound(request);
+		channel.writeInbound(serRequest);
 
 		Object msg = readInboundBlocking(channel);
 		assertTrue("Not ChunkedByteBuf", msg instanceof ChunkedByteBuf);
@@ -669,9 +697,9 @@ public class KvStateServerHandlerTest extends TestLogger {
 	 * Queries the embedded channel for data.
 	 */
 	private Object readInboundBlocking(EmbeddedChannel channel) throws InterruptedException, TimeoutException {
-		final int sleepMillis = 50;
+		final long sleepMillis = 50L;
 
-		int sleptMillis = 0;
+		long sleptMillis = 0L;
 
 		Object msg = null;
 		while (sleptMillis < READ_TIMEOUT_MILLIS &&

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
index 9332e68..b7f489a 100644
--- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
@@ -22,14 +22,14 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.queryablestate.messages.KvStateRequestResult;
+import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
+import org.apache.flink.queryablestate.messages.KvStateResponse;
 import org.apache.flink.queryablestate.network.messages.MessageSerializer;
 import org.apache.flink.queryablestate.network.messages.MessageType;
 import org.apache.flink.queryablestate.server.KvStateServerImpl;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.query.KvStateRegistry;
-import org.apache.flink.runtime.query.KvStateServer;
 import org.apache.flink.runtime.query.KvStateServerAddress;
 import org.apache.flink.runtime.query.netty.AtomicKvStateRequestStats;
 import org.apache.flink.runtime.query.netty.KvStateRequestStats;
@@ -66,7 +66,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 /**
- * Tests for {@link KvStateServer}.
+ * Tests for {@link KvStateServerImpl}.
  */
 public class KvStateServerTest {
 
@@ -87,7 +87,7 @@ public class KvStateServerTest {
 	 */
 	@Test
 	public void testSimpleRequest() throws Exception {
-		KvStateServer server = null;
+		KvStateServerImpl server = null;
 		Bootstrap bootstrap = null;
 		try {
 			KvStateRegistry registry = new KvStateRegistry();
@@ -96,7 +96,7 @@ public class KvStateServerTest {
 			server = new KvStateServerImpl(InetAddress.getLocalHost(), 0, 1, 1, registry, stats);
 			server.start();
 
-			KvStateServerAddress serverAddress = server.getAddress();
+			KvStateServerAddress serverAddress = server.getServerAddress();
 			int numKeyGroups = 1;
 			AbstractStateBackend abstractBackend = new MemoryStateBackend();
 			DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
@@ -155,25 +155,29 @@ public class KvStateServerTest {
 			long requestId = Integer.MAX_VALUE + 182828L;
 
 			assertTrue(registryListener.registrationName.equals("vanilla"));
-			ByteBuf request = MessageSerializer.serializeKvStateRequest(
-					channel.alloc(),
-					requestId,
+
+			final KvStateInternalRequest request = new KvStateInternalRequest(
 					registryListener.kvStateId,
 					serializedKeyAndNamespace);
 
-			channel.writeAndFlush(request);
+			ByteBuf serializeRequest = MessageSerializer.serializeRequest(
+					channel.alloc(),
+					requestId,
+					request);
+
+			channel.writeAndFlush(serializeRequest);
 
 			ByteBuf buf = responses.poll(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
 
 			assertEquals(MessageType.REQUEST_RESULT, MessageSerializer.deserializeHeader(buf));
-			KvStateRequestResult response = MessageSerializer.deserializeKvStateRequestResult(buf);
+			assertEquals(requestId, MessageSerializer.getRequestId(buf));
+			KvStateResponse response = server.getSerializer().deserializeResponse(buf);
 
-			assertEquals(requestId, response.getRequestId());
-			int actualValue = KvStateSerializer.deserializeValue(response.getSerializedResult(), IntSerializer.INSTANCE);
+			int actualValue = KvStateSerializer.deserializeValue(response.getContent(), IntSerializer.INSTANCE);
 			assertEquals(expectedValue, actualValue);
 		} finally {
 			if (server != null) {
-				server.shutDown();
+				server.shutdown();
 			}
 
 			if (bootstrap != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/MessageSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/MessageSerializerTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/MessageSerializerTest.java
new file mode 100644
index 0000000..32a0c9b
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/MessageSerializerTest.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network;
+
+import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
+import org.apache.flink.queryablestate.messages.KvStateResponse;
+import org.apache.flink.queryablestate.network.messages.MessageSerializer;
+import org.apache.flink.queryablestate.network.messages.MessageType;
+import org.apache.flink.queryablestate.network.messages.RequestFailure;
+import org.apache.flink.runtime.query.KvStateID;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
+import org.apache.flink.shaded.netty4.io.netty.buffer.UnpooledByteBufAllocator;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link MessageSerializer}.
+ */
+@RunWith(Parameterized.class)
+public class MessageSerializerTest {
+
+	private final ByteBufAllocator alloc = UnpooledByteBufAllocator.DEFAULT;
+
+	@Parameterized.Parameters
+	public static Collection<Boolean> parameters() {
+		return Arrays.asList(false, true);
+	}
+
+	@Parameterized.Parameter
+	public boolean async;
+
+	/**
+	 * Tests request serialization.
+	 */
+	@Test
+	public void testRequestSerialization() throws Exception {
+		long requestId = Integer.MAX_VALUE + 1337L;
+		KvStateID kvStateId = new KvStateID();
+		byte[] serializedKeyAndNamespace = randomByteArray(1024);
+
+		final KvStateInternalRequest request = new KvStateInternalRequest(kvStateId, serializedKeyAndNamespace);
+		final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
+				new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer());
+
+		ByteBuf buf = MessageSerializer.serializeRequest(alloc, requestId, request);
+
+		int frameLength = buf.readInt();
+		assertEquals(MessageType.REQUEST, MessageSerializer.deserializeHeader(buf));
+		assertEquals(requestId, MessageSerializer.getRequestId(buf));
+		KvStateInternalRequest requestDeser = serializer.deserializeRequest(buf);
+
+		assertEquals(buf.readerIndex(), frameLength + 4);
+
+		assertEquals(kvStateId, requestDeser.getKvStateId());
+		assertArrayEquals(serializedKeyAndNamespace, requestDeser.getSerializedKeyAndNamespace());
+	}
+
+	/**
+	 * Tests request serialization with zero-length serialized key and namespace.
+	 */
+	@Test
+	public void testRequestSerializationWithZeroLengthKeyAndNamespace() throws Exception {
+
+		long requestId = Integer.MAX_VALUE + 1337L;
+		KvStateID kvStateId = new KvStateID();
+		byte[] serializedKeyAndNamespace = new byte[0];
+
+		final KvStateInternalRequest request = new KvStateInternalRequest(kvStateId, serializedKeyAndNamespace);
+		final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
+				new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer());
+
+		ByteBuf buf = MessageSerializer.serializeRequest(alloc, requestId, request);
+
+		int frameLength = buf.readInt();
+		assertEquals(MessageType.REQUEST, MessageSerializer.deserializeHeader(buf));
+		assertEquals(requestId, MessageSerializer.getRequestId(buf));
+		KvStateInternalRequest requestDeser = serializer.deserializeRequest(buf);
+
+		assertEquals(buf.readerIndex(), frameLength + 4);
+
+		assertEquals(kvStateId, requestDeser.getKvStateId());
+		assertArrayEquals(serializedKeyAndNamespace, requestDeser.getSerializedKeyAndNamespace());
+	}
+
+	/**
+	 * Tests that we don't try to be smart about <code>null</code> key and namespace.
+	 * They should be treated explicitly.
+	 */
+	@Test(expected = NullPointerException.class)
+	public void testNullPointerExceptionOnNullSerializedKeyAndNamepsace() throws Exception {
+		new KvStateInternalRequest(new KvStateID(), null);
+	}
+
+	/**
+	 * Tests response serialization.
+	 */
+	@Test
+	public void testResponseSerialization() throws Exception {
+		long requestId = Integer.MAX_VALUE + 72727278L;
+		byte[] serializedResult = randomByteArray(1024);
+
+		final KvStateResponse response = new KvStateResponse(serializedResult);
+		final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
+				new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer());
+
+		ByteBuf buf = MessageSerializer.serializeResponse(alloc, requestId, response);
+
+		int frameLength = buf.readInt();
+		assertEquals(MessageType.REQUEST_RESULT, MessageSerializer.deserializeHeader(buf));
+		assertEquals(requestId, MessageSerializer.getRequestId(buf));
+		KvStateResponse responseDeser = serializer.deserializeResponse(buf);
+
+		assertEquals(buf.readerIndex(), frameLength + 4);
+
+		assertArrayEquals(serializedResult, responseDeser.getContent());
+	}
+
+	/**
+	 * Tests response serialization with zero-length serialized result.
+	 */
+	@Test
+	public void testResponseSerializationWithZeroLengthSerializedResult() throws Exception {
+		byte[] serializedResult = new byte[0];
+
+		final KvStateResponse response = new KvStateResponse(serializedResult);
+		final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
+				new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer());
+
+		ByteBuf buf = MessageSerializer.serializeResponse(alloc, 72727278L, response);
+
+		int frameLength = buf.readInt();
+
+		assertEquals(MessageType.REQUEST_RESULT, MessageSerializer.deserializeHeader(buf));
+		assertEquals(72727278L, MessageSerializer.getRequestId(buf));
+		KvStateResponse responseDeser = serializer.deserializeResponse(buf);
+		assertEquals(buf.readerIndex(), frameLength + 4);
+
+		assertArrayEquals(serializedResult, responseDeser.getContent());
+	}
+
+	/**
+	 * Tests that we don't try to be smart about <code>null</code> results.
+	 * They should be treated explicitly.
+	 */
+	@Test(expected = NullPointerException.class)
+	public void testNullPointerExceptionOnNullSerializedResult() throws Exception {
+		new KvStateResponse((byte[]) null);
+	}
+
+	/**
+	 * Tests request failure serialization.
+	 */
+	@Test
+	public void testKvStateRequestFailureSerialization() throws Exception {
+		long requestId = Integer.MAX_VALUE + 1111222L;
+		IllegalStateException cause = new IllegalStateException("Expected test");
+
+		ByteBuf buf = MessageSerializer.serializeRequestFailure(alloc, requestId, cause);
+
+		int frameLength = buf.readInt();
+		assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf));
+		RequestFailure requestFailure = MessageSerializer.deserializeRequestFailure(buf);
+		assertEquals(buf.readerIndex(), frameLength + 4);
+
+		assertEquals(requestId, requestFailure.getRequestId());
+		assertEquals(cause.getClass(), requestFailure.getCause().getClass());
+		assertEquals(cause.getMessage(), requestFailure.getCause().getMessage());
+	}
+
+	/**
+	 * Tests server failure serialization.
+	 */
+	@Test
+	public void testServerFailureSerialization() throws Exception {
+		IllegalStateException cause = new IllegalStateException("Expected test");
+
+		ByteBuf buf = MessageSerializer.serializeServerFailure(alloc, cause);
+
+		int frameLength = buf.readInt();
+		assertEquals(MessageType.SERVER_FAILURE, MessageSerializer.deserializeHeader(buf));
+		Throwable request = MessageSerializer.deserializeServerFailure(buf);
+		assertEquals(buf.readerIndex(), frameLength + 4);
+
+		assertEquals(cause.getClass(), request.getClass());
+		assertEquals(cause.getMessage(), request.getMessage());
+	}
+
+	private byte[] randomByteArray(int capacity) {
+		byte[] bytes = new byte[capacity];
+		ThreadLocalRandom.current().nextBytes(bytes);
+		return bytes;
+	}
+}


Mime
View raw message