flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kklou...@apache.org
Subject [09/14] flink git commit: [FLINK-7770][QS] Hide the queryable state behind a proxy.
Date Wed, 11 Oct 2017 15:46:09 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/QueryableStateClientTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/QueryableStateClientTest.java
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/QueryableStateClientTest.java
deleted file mode 100644
index 5d4a861..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/QueryableStateClientTest.java
+++ /dev/null
@@ -1,458 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.network;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.queryablestate.UnknownKvStateID;
-import org.apache.flink.queryablestate.UnknownKvStateKeyGroupLocation;
-import org.apache.flink.queryablestate.client.KvStateClient;
-import org.apache.flink.queryablestate.client.KvStateLocationLookupService;
-import org.apache.flink.queryablestate.client.QueryableStateClient;
-import org.apache.flink.queryablestate.server.KvStateServerImpl;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
-import org.apache.flink.runtime.query.KvStateID;
-import org.apache.flink.runtime.query.KvStateLocation;
-import org.apache.flink.runtime.query.KvStateRegistry;
-import org.apache.flink.runtime.query.KvStateServer;
-import org.apache.flink.runtime.query.KvStateServerAddress;
-import org.apache.flink.runtime.query.UnknownKvStateLocation;
-import org.apache.flink.runtime.query.netty.AtomicKvStateRequestStats;
-import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.runtime.state.heap.HeapValueState;
-import org.apache.flink.runtime.state.heap.NestedMapsStateTable;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.apache.flink.util.MathUtils;
-
-import akka.actor.ActorSystem;
-import akka.dispatch.Futures;
-import org.junit.AfterClass;
-import org.junit.Test;
-
-import java.net.ConnectException;
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-/**
- * Tests for {@link QueryableStateClient}.
- */
-public class QueryableStateClientTest {
-
-	private static final ActorSystem testActorSystem = AkkaUtils.createLocalActorSystem(new
Configuration());
-
-	private static final FiniteDuration timeout = new FiniteDuration(100, TimeUnit.SECONDS);
-
-	@AfterClass
-	public static void tearDown() throws Exception {
-		if (testActorSystem != null) {
-			testActorSystem.shutdown();
-		}
-	}
-
-	/**
-	 * All failures should lead to a retry with a forced location lookup.
-	 *
-	 * <p>UnknownKvStateID, UnknownKvStateKeyGroupLocation, UnknownKvStateLocation,
-	 * ConnectException are checked explicitly as these indicate out-of-sync
-	 * KvStateLocation.
-	 */
-	@Test
-	public void testForceLookupOnOutdatedLocation() throws Exception {
-		KvStateLocationLookupService lookupService = mock(KvStateLocationLookupService.class);
-		KvStateClient networkClient = mock(KvStateClient.class);
-
-		QueryableStateClient client = new QueryableStateClient(
-				lookupService,
-				networkClient,
-				testActorSystem.dispatcher());
-
-		try {
-			JobID jobId = new JobID();
-			int numKeyGroups = 4;
-
-			//
-			// UnknownKvStateLocation
-			//
-			String query1 = "lucky";
-
-			Future<KvStateLocation> unknownKvStateLocation = Futures.failed(
-					new UnknownKvStateLocation(query1));
-
-			when(lookupService.getKvStateLookupInfo(eq(jobId), eq(query1)))
-					.thenReturn(unknownKvStateLocation);
-
-			Future<Integer> result = client.getKvState(
-					jobId,
-					query1,
-					0,
-					BasicTypeInfo.INT_TYPE_INFO,
-					new ValueStateDescriptor<>("test", IntSerializer.INSTANCE));
-
-			try {
-				Await.result(result, timeout);
-				fail("Did not throw expected UnknownKvStateLocation exception");
-			} catch (UnknownKvStateLocation ignored) {
-				// Expected
-			}
-
-			verify(lookupService, times(2)).getKvStateLookupInfo(eq(jobId), eq(query1));
-
-			//
-			// UnknownKvStateKeyGroupLocation
-			//
-			String query2 = "unlucky";
-
-			Future<KvStateLocation> unknownKeyGroupLocation = Futures.successful(
-					new KvStateLocation(jobId, new JobVertexID(), numKeyGroups, query2));
-
-			when(lookupService.getKvStateLookupInfo(eq(jobId), eq(query2)))
-					.thenReturn(unknownKeyGroupLocation);
-
-			result = client.getKvState(
-					jobId,
-					query2,
-					0,
-					BasicTypeInfo.INT_TYPE_INFO,
-					new ValueStateDescriptor<>("test", IntSerializer.INSTANCE));
-
-			try {
-				Await.result(result, timeout);
-				fail("Did not throw expected UnknownKvStateKeyGroupLocation exception");
-			} catch (UnknownKvStateKeyGroupLocation ignored) {
-				// Expected
-			}
-
-			verify(lookupService, times(2)).getKvStateLookupInfo(eq(jobId), eq(query2));
-
-			//
-			// UnknownKvStateID
-			//
-			String query3 = "water";
-			KvStateID kvStateId = new KvStateID();
-			Future<byte[]> unknownKvStateId = Futures.failed(new UnknownKvStateID(kvStateId));
-
-			KvStateServerAddress serverAddress = new KvStateServerAddress(InetAddress.getLocalHost(),
12323);
-			KvStateLocation location = new KvStateLocation(jobId, new JobVertexID(), numKeyGroups,
query3);
-			for (int i = 0; i < numKeyGroups; i++) {
-				location.registerKvState(new KeyGroupRange(i, i), kvStateId, serverAddress);
-			}
-
-			when(lookupService.getKvStateLookupInfo(eq(jobId), eq(query3)))
-					.thenReturn(Futures.successful(location));
-
-			when(networkClient.getKvState(eq(serverAddress), eq(kvStateId), any(byte[].class)))
-					.thenReturn(unknownKvStateId);
-
-			result = client.getKvState(
-					jobId,
-					query3,
-					0,
-					BasicTypeInfo.INT_TYPE_INFO,
-					new ValueStateDescriptor<>("test", IntSerializer.INSTANCE));
-
-			try {
-				Await.result(result, timeout);
-				fail("Did not throw expected UnknownKvStateID exception");
-			} catch (UnknownKvStateID ignored) {
-				// Expected
-			}
-
-			verify(lookupService, times(2)).getKvStateLookupInfo(eq(jobId), eq(query3));
-
-			//
-			// ConnectException
-			//
-			String query4 = "space";
-			Future<byte[]> connectException = Futures.failed(new ConnectException());
-			kvStateId = new KvStateID();
-
-			serverAddress = new KvStateServerAddress(InetAddress.getLocalHost(), 11123);
-			location = new KvStateLocation(jobId, new JobVertexID(), numKeyGroups, query4);
-			for (int i = 0; i < numKeyGroups; i++) {
-				location.registerKvState(new KeyGroupRange(i, i), kvStateId, serverAddress);
-			}
-
-			when(lookupService.getKvStateLookupInfo(eq(jobId), eq(query4)))
-					.thenReturn(Futures.successful(location));
-
-			when(networkClient.getKvState(eq(serverAddress), eq(kvStateId), any(byte[].class)))
-					.thenReturn(connectException);
-
-			result = client.getKvState(
-					jobId,
-					query4,
-					0,
-					BasicTypeInfo.INT_TYPE_INFO,
-					new ValueStateDescriptor<>("test", IntSerializer.INSTANCE));
-
-			try {
-				Await.result(result, timeout);
-				fail("Did not throw expected ConnectException exception");
-			} catch (ConnectException ignored) {
-				// Expected
-			}
-
-			verify(lookupService, times(2)).getKvStateLookupInfo(eq(jobId), eq(query4));
-
-			//
-			// Other Exceptions don't lead to a retry no retry
-			//
-			String query5 = "universe";
-			Future<KvStateLocation> exception = Futures.failed(new RuntimeException("Test exception"));
-			when(lookupService.getKvStateLookupInfo(eq(jobId), eq(query5)))
-					.thenReturn(exception);
-
-			client.getKvState(
-					jobId,
-					query5,
-					0,
-					BasicTypeInfo.INT_TYPE_INFO,
-					new ValueStateDescriptor<>("test", IntSerializer.INSTANCE));
-
-			verify(lookupService, times(1)).getKvStateLookupInfo(eq(jobId), eq(query5));
-		} finally {
-			client.shutDown();
-		}
-	}
-
-	/**
-	 * Tests queries against multiple servers.
-	 *
-	 * <p>The servers are populated with different keys and the client queries
-	 * all available keys from all servers.
-	 */
-	@Test
-	public void testIntegrationWithKvStateServer() throws Exception {
-		// Config
-		int numServers = 2;
-		int numKeys = 1024;
-		int numKeyGroups = 1;
-
-		JobID jobId = new JobID();
-		JobVertexID jobVertexId = new JobVertexID();
-
-		KvStateServer[] servers = new KvStateServer[numServers];
-		AtomicKvStateRequestStats[] serverStats = new AtomicKvStateRequestStats[numServers];
-
-		QueryableStateClient client = null;
-		KvStateClient networkClient = null;
-		AtomicKvStateRequestStats networkClientStats = new AtomicKvStateRequestStats();
-
-		MemoryStateBackend backend = new MemoryStateBackend();
-		DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
-
-		AbstractKeyedStateBackend<Integer> keyedStateBackend = backend.createKeyedStateBackend(dummyEnv,
-				new JobID(),
-				"test_op",
-				IntSerializer.INSTANCE,
-				numKeyGroups,
-				new KeyGroupRange(0, 0),
-				new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID()));
-
-		try {
-			KvStateRegistry[] registries = new KvStateRegistry[numServers];
-			KvStateID[] kvStateIds = new KvStateID[numServers];
-			List<HeapValueState<Integer, VoidNamespace, Integer>> kvStates = new ArrayList<>();
-
-			// Start the servers
-			for (int i = 0; i < numServers; i++) {
-				registries[i] = new KvStateRegistry();
-				serverStats[i] = new AtomicKvStateRequestStats();
-				servers[i] = new KvStateServerImpl(InetAddress.getLocalHost(), 0, 1, 1, registries[i],
serverStats[i]);
-				servers[i].start();
-				ValueStateDescriptor<Integer> descriptor =
-						new ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
-
-				RegisteredKeyedBackendStateMetaInfo<VoidNamespace, Integer> registeredKeyedBackendStateMetaInfo
= new RegisteredKeyedBackendStateMetaInfo<>(
-						descriptor.getType(),
-						descriptor.getName(),
-						VoidNamespaceSerializer.INSTANCE,
-						IntSerializer.INSTANCE);
-
-				// Register state
-				HeapValueState<Integer, VoidNamespace, Integer> kvState = new HeapValueState<>(
-						descriptor,
-						new NestedMapsStateTable<>(keyedStateBackend, registeredKeyedBackendStateMetaInfo),
-						IntSerializer.INSTANCE,
-						VoidNamespaceSerializer.INSTANCE);
-
-				kvStates.add(kvState);
-
-				kvStateIds[i] = registries[i].registerKvState(
-						jobId,
-						new JobVertexID(),
-						new KeyGroupRange(i, i),
-						"choco",
-						kvState);
-			}
-
-			int[] expectedRequests = new int[numServers];
-
-			for (int key = 0; key < numKeys; key++) {
-				int targetKeyGroupIndex = MathUtils.murmurHash(key) % numServers;
-				expectedRequests[targetKeyGroupIndex]++;
-
-				HeapValueState<Integer, VoidNamespace, Integer> kvState = kvStates.get(targetKeyGroupIndex);
-
-				keyedStateBackend.setCurrentKey(key);
-				kvState.setCurrentNamespace(VoidNamespace.INSTANCE);
-				kvState.update(1337 + key);
-			}
-
-			// Location lookup service
-			KvStateLocation location = new KvStateLocation(jobId, jobVertexId, numServers, "choco");
-			for (int keyGroupIndex = 0; keyGroupIndex < numServers; keyGroupIndex++) {
-				location.registerKvState(new KeyGroupRange(keyGroupIndex, keyGroupIndex), kvStateIds[keyGroupIndex],
servers[keyGroupIndex].getAddress());
-			}
-
-			KvStateLocationLookupService lookupService = mock(KvStateLocationLookupService.class);
-			when(lookupService.getKvStateLookupInfo(eq(jobId), eq("choco")))
-					.thenReturn(Futures.successful(location));
-
-			// The client
-			networkClient = new KvStateClient(1, networkClientStats);
-
-			client = new QueryableStateClient(lookupService, networkClient, testActorSystem.dispatcher());
-
-			// Send all queries
-			List<Future<Integer>> futures = new ArrayList<>(numKeys);
-			for (int key = 0; key < numKeys; key++) {
-				ValueStateDescriptor<Integer> descriptor =
-						new ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
-				futures.add(client.getKvState(
-						jobId,
-						"choco",
-						key,
-						BasicTypeInfo.INT_TYPE_INFO,
-						descriptor));
-			}
-
-			// Verify results
-			Future<Iterable<Integer>> future = Futures.sequence(futures, testActorSystem.dispatcher());
-			Iterable<Integer> results = Await.result(future, timeout);
-
-			int index = 0;
-			for (int buffer : results) {
-				assertEquals(1337 + index, buffer);
-				index++;
-			}
-
-			// Verify requests
-			for (int i = 0; i < numServers; i++) {
-				int numRetries = 10;
-				for (int retry = 0; retry < numRetries; retry++) {
-					try {
-						assertEquals("Unexpected number of requests", expectedRequests[i], serverStats[i].getNumRequests());
-						assertEquals("Unexpected success requests", expectedRequests[i], serverStats[i].getNumSuccessful());
-						assertEquals("Unexpected failed requests", 0, serverStats[i].getNumFailed());
-						break;
-					} catch (Throwable t) {
-						// Retry
-						if (retry == numRetries - 1) {
-							throw t;
-						} else {
-							Thread.sleep(100);
-						}
-					}
-				}
-			}
-		} finally {
-			if (client != null) {
-				client.shutDown();
-			}
-
-			if (networkClient != null) {
-				networkClient.shutDown();
-			}
-
-			for (KvStateServer server : servers) {
-				if (server != null) {
-					server.shutDown();
-				}
-			}
-		}
-	}
-
-	/**
-	 * Tests that the QueryableState client correctly caches location lookups
-	 * keyed by both job and name. This test is mainly due to a previous bug due
-	 * to which cache entries were by name only. This is a problem, because the
-	 * same client can be used to query multiple jobs.
-	 */
-	@Test
-	public void testLookupMultipleJobIds() throws Exception {
-		String name = "unique-per-job";
-
-		// Exact contents don't matter here
-		KvStateLocation location = new KvStateLocation(new JobID(), new JobVertexID(), 1, name);
-		location.registerKvState(new KeyGroupRange(0, 0), new KvStateID(), new KvStateServerAddress(InetAddress.getLocalHost(),
892));
-
-		JobID jobId1 = new JobID();
-		JobID jobId2 = new JobID();
-
-		KvStateLocationLookupService lookupService = mock(KvStateLocationLookupService.class);
-
-		when(lookupService.getKvStateLookupInfo(any(JobID.class), anyString()))
-				.thenReturn(Futures.successful(location));
-
-		KvStateClient networkClient = mock(KvStateClient.class);
-		when(networkClient.getKvState(any(KvStateServerAddress.class), any(KvStateID.class), any(byte[].class)))
-				.thenReturn(Futures.successful(new byte[0]));
-
-		QueryableStateClient client = new QueryableStateClient(
-				lookupService,
-				networkClient,
-				testActorSystem.dispatcher());
-
-		ValueStateDescriptor<Integer> stateDesc = new ValueStateDescriptor<>("test",
IntSerializer.INSTANCE);
-
-		// Query ies with same name, but different job IDs should lead to a
-		// single lookup per query and job ID.
-		client.getKvState(jobId1, name, 0, BasicTypeInfo.INT_TYPE_INFO, stateDesc);
-		client.getKvState(jobId2, name, 0, BasicTypeInfo.INT_TYPE_INFO, stateDesc);
-
-		verify(lookupService, times(1)).getKvStateLookupInfo(eq(jobId1), eq(name));
-		verify(lookupService, times(1)).getKvStateLookupInfo(eq(jobId2), eq(name));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index b982c8e..50ef543 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -445,4 +445,20 @@ public class FutureUtils {
 
 		return result;
 	}
+
+	// ------------------------------------------------------------------------
+	//  Future Completed with an exception.
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Returns a {@link CompletableFuture} that has failed with the exception
+	 * provided as argument.
+	 * @param throwable the exception to fail the future with.
+	 * @return The failed future.
+	 */
+	public static <T> CompletableFuture<T> getFailedFuture(Throwable throwable)
{
+		CompletableFuture<T> failedAttempt = new CompletableFuture<>();
+		failedAttempt.completeExceptionally(throwable);
+		return failedAttempt;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 53503ce..d6c5d75 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -29,14 +29,16 @@ import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.query.KvStateClientProxy;
 import org.apache.flink.runtime.query.KvStateRegistry;
-import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.query.KvStateServer;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -65,6 +67,9 @@ public class NetworkEnvironment {
 	/** Server for {@link InternalKvState} requests. */
 	private final KvStateServer kvStateServer;
 
+	/** Proxy for the queryable state client. */
+	private final KvStateClientProxy kvStateProxy;
+
 	/** Registry for {@link InternalKvState} instances. */
 	private final KvStateRegistry kvStateRegistry;
 
@@ -76,6 +81,7 @@ public class NetworkEnvironment {
 
 	/** Number of network buffers to use for each outgoing/incoming channel (subpartition/input
channel). */
 	private final int networkBuffersPerChannel;
+
 	/** Number of extra network buffers to use for each outgoing/incoming gate (result partition/input
gate). */
 	private final int extraNetworkBuffersPerGate;
 
@@ -88,6 +94,7 @@ public class NetworkEnvironment {
 			TaskEventDispatcher taskEventDispatcher,
 			KvStateRegistry kvStateRegistry,
 			KvStateServer kvStateServer,
+			KvStateClientProxy kvStateClientProxy,
 			IOMode defaultIOMode,
 			int partitionRequestInitialBackoff,
 			int partitionRequestMaxBackoff,
@@ -101,6 +108,7 @@ public class NetworkEnvironment {
 		this.kvStateRegistry = checkNotNull(kvStateRegistry);
 
 		this.kvStateServer = kvStateServer;
+		this.kvStateProxy = kvStateClientProxy;
 
 		this.defaultIOMode = defaultIOMode;
 
@@ -152,6 +160,10 @@ public class NetworkEnvironment {
 		return kvStateServer;
 	}
 
+	public KvStateClientProxy getKvStateProxy() {
+		return kvStateProxy;
+	}
+
 	public TaskKvStateRegistry createKvStateTaskRegistry(JobID jobId, JobVertexID jobVertexId)
{
 		return kvStateRegistry.createTaskRegistry(jobId, jobVertexId);
 	}
@@ -291,17 +303,25 @@ public class NetworkEnvironment {
 			try {
 				LOG.debug("Starting network connection manager");
 				connectionManager.start(resultPartitionManager, taskEventDispatcher);
-			}
-			catch (IOException t) {
+			} catch (IOException t) {
 				throw new IOException("Failed to instantiate network connection manager.", t);
 			}
 
 			if (kvStateServer != null) {
 				try {
-					LOG.debug("Starting the KvState server.");
 					kvStateServer.start();
+					LOG.info("Started Queryable State Data Server @ {}", kvStateServer.getServerAddress());
+				} catch (InterruptedException ie) {
+					throw new IOException("Failed to start the Queryable State Data Server.", ie);
+				}
+			}
+
+			if (kvStateProxy != null) {
+				try {
+					kvStateProxy.start();
+					LOG.info("Started the Queryable State Client Proxy @ {}", kvStateProxy.getServerAddress());
 				} catch (InterruptedException ie) {
-					throw new IOException("Failed to start the KvState server.", ie);
+					throw new IOException("Failed to start the Queryable State Client Proxy.", ie);
 				}
 			}
 		}
@@ -318,11 +338,21 @@ public class NetworkEnvironment {
 
 			LOG.info("Shutting down the network environment and its components.");
 
+			if (kvStateProxy != null) {
+				try {
+					LOG.debug("Shutting down Queryable State Client Proxy.");
+					kvStateProxy.shutdown();
+				} catch (Throwable t) {
+					LOG.warn("Cannot shut down Queryable State Client Proxy.", t);
+				}
+			}
+
 			if (kvStateServer != null) {
 				try {
-					kvStateServer.shutDown();
+					LOG.debug("Shutting down Queryable State Data Server.");
+					kvStateServer.shutdown();
 				} catch (Throwable t) {
-					LOG.warn("Cannot shut down KvState server.", t);
+					LOG.warn("Cannot shut down Queryable State Data Server.", t);
 				}
 			}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateClientProxy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateClientProxy.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateClientProxy.java
new file mode 100644
index 0000000..d605952
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateClientProxy.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query;
+
+import org.apache.flink.runtime.instance.ActorGateway;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * An interface for the Queryable State Client Proxy running on each Task Manager in the
cluster.
+ *
+ * <p>This proxy is where the Queryable State Client (potentially running outside your
Flink
+ * cluster) connects to, and his responsibility is to forward the client's requests to the
rest
+ * of the entities participating in fetching the requested state, and running within the
cluster.
+ *
+ * <p>These are:
+ * <ol>
+ *     <li> the {@link org.apache.flink.runtime.jobmanager.JobManager Job Manager},
+ *     which is responsible for sending the
+ *     {@link org.apache.flink.runtime.taskmanager.TaskManager Task Manager} storing
+ *     the requested state, and </li>
+ *     <li> the Task Manager having the state itself.</li>
+ * </ol>
+ */
+public interface KvStateClientProxy extends KvStateServer {
+
+	/**
+	 * Updates the active {@link org.apache.flink.runtime.jobmanager.JobManager Job Manager}
+	 * in case of change.
+	 *
+	 * <p>This is useful in settings where high-availability is enabled and
+	 * a failed Job Manager is replaced by a new one.
+	 *
+	 * <p><b>IMPORTANT: </b> this method may be called by a different thread
than the {@link #getJobManagerFuture()}.
+	 *
+	 * @param leadingJobManager the currently leading job manager.
+	 * */
+	void updateJobManager(CompletableFuture<ActorGateway> leadingJobManager) throws Exception;
+
+	/**
+	 * Retrieves a future containing the currently leading Job Manager.
+	 *
+	 * <p><b>IMPORTANT: </b> this method may be called by a different thread
than the
+	 * {@link #updateJobManager(CompletableFuture)}.
+	 *
+	 * @return A {@link CompletableFuture} containing the currently active Job Manager.
+	 */
+	CompletableFuture<ActorGateway> getJobManagerFuture();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java
index 8a213bb..03e8238 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java
@@ -31,7 +31,7 @@ import java.util.Arrays;
  * Location information for all key groups of a {@link InternalKvState} instance.
  *
  * <p>This is populated by the {@link KvStateLocationRegistry} and used by the
- * Queryable State Client to target queries.
+ * queryable state to target queries.
  */
 public class KvStateLocation implements Serializable {
 
@@ -183,10 +183,6 @@ public class KvStateLocation implements Serializable {
 		}
 	}
 
-	public static long getSerialVersionUID() {
-		return serialVersionUID;
-	}
-
 	/**
 	 * Registers a KvState instance for the given key group index.
 	 *
@@ -194,7 +190,7 @@ public class KvStateLocation implements Serializable {
 	 * @throws IndexOutOfBoundsException If key group range start < 0 or key group range
end >= Number of key groups
 	 * @throws IllegalArgumentException  If no location information registered for a key group
index in the range.
 	 */
-	public void unregisterKvState(KeyGroupRange keyGroupRange) {
+	void unregisterKvState(KeyGroupRange keyGroupRange) {
 		if (keyGroupRange.getStartKeyGroup() < 0 || keyGroupRange.getEndKeyGroup() >= numKeyGroups)
{
 			throw new IndexOutOfBoundsException("Key group index");
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServer.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServer.java
index 9b14c49..81727fc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServer.java
@@ -20,24 +20,21 @@ package org.apache.flink.runtime.query;
 
 /**
  * An interface for the Queryable State Server running on each Task Manager in the cluster.
- * This server is responsible for serving requests coming from the Queryable State Client
and
- * requesting <b>locally</b> stored state.
+ * This server is responsible for serving requests coming from the {@link KvStateClientProxy
+ * Queryable State Proxy} and requesting <b>locally</b> stored state.
  */
 public interface KvStateServer {
 
 	/**
-	 * Returns the address of this server.
-	 *
-	 * @return Server address
+	 * Returns the {@link KvStateServerAddress address} the server is listening to.
+	 * @return Server address.
 	 */
-	KvStateServerAddress getAddress();
+	KvStateServerAddress getServerAddress();
 
 
-	/** Starts the proxy. */
+	/** Starts the server. */
 	void start() throws InterruptedException;
 
-	/**
-	 * Shuts down the server and all related thread pools.
-	 */
-	void shutDown();
+	/** Shuts down the server and all related thread pools. */
+	void shutdown();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java
index 852d394..8f66734 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java
@@ -36,8 +36,56 @@ public final class QueryableStateUtils {
 	private static final Logger LOG = LoggerFactory.getLogger(QueryableStateUtils.class);
 
 	/**
+	 * Initializes the {@link KvStateClientProxy client proxy} responsible for
+	 * receiving requests from the external (to the cluster) client and forwarding them internally.
+	 *
+	 * @param address the address to bind to.
+	 * @param port the port to listen to.
+	 * @param eventLoopThreads the number of threads to be used to process incoming requests.
+	 * @param queryThreads the number of threads to be used to send the actual state.
+	 * @param stats statistics to be gathered about the incoming requests.
+	 * @return the {@link KvStateClientProxy client proxy}.
+	 */
+	public static KvStateClientProxy createKvStateClientProxy(
+			final InetAddress address,
+			final int port,
+			final int eventLoopThreads,
+			final int queryThreads,
+			final KvStateRequestStats stats) {
+
+		Preconditions.checkNotNull(address, "address");
+		Preconditions.checkNotNull(stats, "stats");
+
+		Preconditions.checkArgument(eventLoopThreads >= 1);
+		Preconditions.checkArgument(queryThreads >= 1);
+
+		try {
+			String classname = "org.apache.flink.queryablestate.client.proxy.KvStateClientProxyImpl";
+			Class<? extends KvStateClientProxy> clazz = Class.forName(classname).asSubclass(KvStateClientProxy.class);
+			Constructor<? extends KvStateClientProxy> constructor = clazz.getConstructor(
+					InetAddress.class,
+					Integer.class,
+					Integer.class,
+					Integer.class,
+					KvStateRequestStats.class);
+			return constructor.newInstance(address, port, eventLoopThreads, queryThreads, stats);
+		} catch (ClassNotFoundException e) {
+			LOG.warn("Could not load Queryable State Client Proxy. " +
+					"Probable reason: flink-queryable-state is not in the classpath");
+			LOG.debug("Caught exception", e);
+			return null;
+		} catch (InvocationTargetException e) {
+			LOG.error("Queryable State Client Proxy could not be created: ", e.getTargetException());
+			return null;
+		} catch (Throwable t) {
+			LOG.error("Failed to instantiate the Queryable State Client Proxy.", t);
+			return null;
+		}
+	}
+
+	/**
 	 * Initializes the {@link KvStateServer server} responsible for sending the
-	 * requested internal state to the Queryable State Client.
+	 * requested internal state to the {@link KvStateClientProxy client proxy}.
 	 *
 	 * @param address the address to bind to.
 	 * @param port the port to listen to.
@@ -74,12 +122,12 @@ public final class QueryableStateUtils {
 					KvStateRequestStats.class);
 			return constructor.newInstance(address, port, eventLoopThreads, queryThreads, kvStateRegistry,
stats);
 		} catch (ClassNotFoundException e) {
-			LOG.info("Could not load Queryable State Server. " +
+			LOG.warn("Could not load Queryable State Server. " +
 					"Probable reason: flink-queryable-state is not in the classpath");
 			LOG.debug("Caught exception", e);
 			return null;
 		} catch (InvocationTargetException e) {
-			LOG.error("Queryable State Server could not be created", e.getTargetException());
+			LOG.error("Queryable State Server could not be created: ", e.getTargetException());
 			return null;
 		} catch (Throwable t) {
 			LOG.error("Failed to instantiate the Queryable State Server.", t);

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateRequestStats.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateRequestStats.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateRequestStats.java
index 9781e23..19caf92 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateRequestStats.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateRequestStats.java
@@ -18,10 +18,10 @@
 
 package org.apache.flink.runtime.query.netty;
 
-import org.apache.flink.runtime.query.KvStateServer;
-
 /**
- * Simple statistics for {@link KvStateServer} monitoring.
+ * Simple statistics for
+ * {@link org.apache.flink.runtime.query.KvStateServer} and
+ * {@link org.apache.flink.runtime.query.KvStateClientProxy} monitoring.
  */
 public interface KvStateRequestStats {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java
index 37d28de..fed5fc0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java
@@ -82,6 +82,7 @@ public class QueryableStateConfiguration {
 	public String toString() {
 		return "QueryableStateConfiguration {" +
 				"enabled=" + enabled +
+				", port=" + port +
 				", numServerThreads=" + numServerThreads +
 				", numQueryThreads=" + numQueryThreads +
 				'}';

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index 7c5c830..cbf0d95 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -38,10 +38,11 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
+import org.apache.flink.runtime.query.KvStateClientProxy;
 import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.query.KvStateServer;
 import org.apache.flink.runtime.query.QueryableStateUtils;
 import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
-import org.apache.flink.runtime.query.KvStateServer;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
 import org.apache.flink.runtime.taskexecutor.slot.TimerService;
 import org.apache.flink.runtime.taskexecutor.utils.TaskExecutorMetricsInitializer;
@@ -66,7 +67,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 public class TaskManagerServices {
 	private static final Logger LOG = LoggerFactory.getLogger(TaskManagerServices.class);
 
-	/** TaskManager services */
+	/** TaskManager services. */
 	private final TaskManagerLocation taskManagerLocation;
 	private final MemoryManager memoryManager;
 	private final IOManager ioManager;
@@ -356,6 +357,7 @@ public class TaskManagerServices {
 		TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
 
 		KvStateRegistry kvStateRegistry = new KvStateRegistry();
+		KvStateClientProxy kvClientProxy = null;
 		KvStateServer kvStateServer = null;
 
 		if (taskManagerServicesConfiguration.getQueryableStateConfig().enabled()) {
@@ -367,11 +369,18 @@ public class TaskManagerServices {
 			int numQueryThreads = qsConfig.numQueryThreads() == 0 ?
 					taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numQueryThreads();
 
-			kvStateServer = QueryableStateUtils.createKvStateServer(
+			kvClientProxy = QueryableStateUtils.createKvStateClientProxy(
 					taskManagerServicesConfiguration.getTaskManagerAddress(),
 					qsConfig.port(),
 					numNetworkThreads,
 					numQueryThreads,
+					new DisabledKvStateRequestStats());
+
+			kvStateServer = QueryableStateUtils.createKvStateServer(
+					taskManagerServicesConfiguration.getTaskManagerAddress(),
+					0,
+					numNetworkThreads,
+					numQueryThreads,
 					kvStateRegistry,
 					new DisabledKvStateRequestStats());
 		}
@@ -384,6 +393,7 @@ public class TaskManagerServices {
 			taskEventDispatcher,
 			kvStateRegistry,
 			kvStateServer,
+			kvClientProxy,
 			networkEnvironmentConfiguration.ioMode(),
 			networkEnvironmentConfiguration.partitionRequestInitialBackoff(),
 			networkEnvironmentConfiguration.partitionRequestMaxBackoff(),

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index f1f7d39..e6643b7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -189,8 +189,8 @@ public class TaskManagerServicesConfiguration {
 			remoteAddress,
 			slots);
 
-		final QueryableStateConfiguration queryableStateConfig = localCommunication ?
-				QueryableStateConfiguration.disabled() :
+		// @Ufuk todo why was it like this before ???
+		final QueryableStateConfiguration queryableStateConfig =
 				parseQueryableStateConfiguration(configuration);
 
 		// extract memory settings

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 558388c..c370725 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -39,7 +39,7 @@ import org.apache.flink.runtime.blob.{BlobClient, BlobService, BlobCacheService}
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager
 import org.apache.flink.runtime.clusterframework.messages.StopCluster
 import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.concurrent.Executors
+import org.apache.flink.runtime.concurrent.{Executors, FutureUtils}
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor
 import org.apache.flink.runtime.execution.ExecutionState
 import org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, FallbackLibraryCacheManager,
LibraryCacheManager}
@@ -47,7 +47,7 @@ import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, PartitionInf
 import org.apache.flink.runtime.filecache.FileCache
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution
 import org.apache.flink.runtime.highavailability.{HighAvailabilityServices, HighAvailabilityServicesUtils}
-import org.apache.flink.runtime.instance.{AkkaActorGateway, HardwareDescription, InstanceID}
+import org.apache.flink.runtime.instance.{ActorGateway, AkkaActorGateway, HardwareDescription,
InstanceID}
 import org.apache.flink.runtime.io.disk.iomanager.IOManager
 import org.apache.flink.runtime.io.network.NetworkEnvironment
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker
@@ -951,7 +951,7 @@ class TaskManager(
       kvStateRegistry.registerListener(
         new ActorGatewayKvStateRegistryListener(
           jobManagerGateway,
-          kvStateServer.getAddress))
+          kvStateServer.getServerAddress))
     }
 
     // start a blob service, if a blob server is specified
@@ -1423,6 +1423,28 @@ class TaskManager(
   }
 
   override def notifyLeaderAddress(leaderAddress: String, leaderSessionID: UUID): Unit =
{
+    val proxy = network.getKvStateProxy
+    if (proxy != null) {
+
+      val askTimeoutString = config.getConfiguration.getString(AkkaOptions.ASK_TIMEOUT)
+
+      val timeout = Duration(askTimeoutString)
+
+      if (!timeout.isFinite) {
+        throw new IllegalConfigurationException(AkkaOptions.ASK_TIMEOUT.key +
+          " is not a finite timeout ('" + askTimeoutString + "')")
+      }
+
+      if (leaderAddress != null) {
+        val actorGwFuture: Future[ActorGateway] =
+          AkkaUtils.getActorRefFuture(
+            leaderAddress, context.system, timeout.asInstanceOf[FiniteDuration]
+          ).map(actor => new AkkaActorGateway(actor, leaderSessionID))(context.system.dispatcher)
+
+        proxy.updateJobManager(FutureUtils.toJava(actorGwFuture))
+      }
+    }
+
     self ! JobManagerLeaderAddress(leaderAddress, leaderSessionID)
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
index 826ae3f..ef2d5c2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -69,6 +69,7 @@ public class NetworkEnvironmentTest {
 			new TaskEventDispatcher(),
 			new KvStateRegistry(),
 			null,
+			null,
 			IOManager.IOMode.SYNC,
 			0,
 			0,

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 052699a..6dabcd3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -145,6 +145,7 @@ public class TaskManagerComponentsStartupShutdownTest extends TestLogger
{
 				new TaskEventDispatcher(),
 				new KvStateRegistry(),
 				null,
+				null,
 				netConf.ioMode(),
 				netConf.partitionRequestInitialBackoff(),
 				netConf.partitionRequestMaxBackoff(),


Mime
View raw message