flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [3/4] flink git commit: [FLINK-8493] [flip6] Integrate queryable state with Flip-6
Date Tue, 06 Feb 2018 11:02:05 GMT
[FLINK-8493] [flip6] Integrate queryable state with Flip-6

Adapt KvStateRegistry to accept multiple KvStateRegistryListeners. Introduce
the KvStateLocationOracle to retrieve the KvStateLocation. Adapt the KvStateClientProxy
to accept multiple KvStateLocationOracles to retrieve the KvStateLocations for
different jobs. Registered the KvStateRegistryListener and KvStateLocationOracle
in TaskExecutor upon establishing a connection to the JobMaster.

This closes #5339.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cef6741a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cef6741a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cef6741a

Branch: refs/heads/master
Commit: cef6741a91fcb83757a82a39b04d074b1a1311cf
Parents: 6316087
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Mon Jan 22 18:11:04 2018 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Tue Feb 6 11:49:44 2018 +0100

----------------------------------------------------------------------
 .../apache/flink/configuration/RestOptions.java |   2 +-
 .../apache/flink/configuration/WebOptions.java  |   4 +-
 .../exceptions/UnknownJobManagerException.java  |  36 ----
 .../exceptions/UnknownLocationException.java    |  37 ++++
 .../client/proxy/KvStateClientProxyHandler.java |  62 +++---
 .../client/proxy/KvStateClientProxyImpl.java    |  44 ++--
 .../proxy/KvStateClientProxyImplTest.java       | 102 +++++++++
 .../network/KvStateServerHandlerTest.java       |  66 ++----
 .../network/KvStateServerTest.java              |  17 +-
 .../flink/runtime/jobmaster/JobMaster.java      |  80 ++++---
 .../runtime/jobmaster/JobMasterGateway.java     |  48 +----
 .../jobmaster/KvStateLocationOracle.java        |  45 ++++
 .../jobmaster/KvStateRegistryGateway.java       |  68 ++++++
 .../flink/runtime/query/KvStateClientProxy.java |  25 ++-
 .../flink/runtime/query/KvStateRegistry.java    |  51 +++--
 .../runtime/query/KvStateRegistryGateway.java   |  63 ------
 .../runtime/taskexecutor/TaskExecutor.java      |  38 ++++
 .../TaskManagerServicesConfiguration.java       |   6 +-
 .../rpc/RpcKvStateRegistryListener.java         |   7 +-
 .../ActorGatewayKvStateLocationOracle.java      |  63 ++++++
 .../flink/runtime/taskmanager/TaskManager.scala |  42 ++--
 .../runtime/query/KvStateRegistryTest.java      | 215 +++++++++++++++++++
 .../runtime/state/StateBackendTestBase.java     |   3 +-
 23 files changed, 794 insertions(+), 330 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cef6741a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
index 16fd40d..a66b27c 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
@@ -41,7 +41,7 @@ public class RestOptions {
 	 */
 	public static final ConfigOption<Integer> REST_PORT =
 		key("rest.port")
-			.defaultValue(9067)
+			.defaultValue(9065)
 			.withDescription("The port that the server listens on / the client connects to.");
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/cef6741a/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java
index b74f23f..7eea17d 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java
@@ -153,8 +153,8 @@ public class WebOptions {
 	/**
 	 * Timeout for asynchronous operations by the WebRuntimeMonitor in milliseconds.
 	 */
-	public static final ConfigOption<Long> TIMEOUT = ConfigOptions
-		.key("web.timeout")
+	public static final ConfigOption<Long> TIMEOUT =
+		key("web.timeout")
 		.defaultValue(10L * 1000L);
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/cef6741a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/exceptions/UnknownJobManagerException.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/exceptions/UnknownJobManagerException.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/exceptions/UnknownJobManagerException.java
deleted file mode 100644
index 19063c2..0000000
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/exceptions/UnknownJobManagerException.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.exceptions;
-
-import org.apache.flink.annotation.Internal;
-
-/**
- * Exception to fail Future if the Task Manager on which the
- * {@code Client Proxy} is running on, does not know the active
- * Job Manager.
- */
-@Internal
-public class UnknownJobManagerException extends Exception {
-
-	private static final long serialVersionUID = 9092442511708951209L;
-
-	public UnknownJobManagerException() {
-		super("Unknown JobManager. Either the JobManager has not registered yet or has lost leadership.");
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/cef6741a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/exceptions/UnknownLocationException.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/exceptions/UnknownLocationException.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/exceptions/UnknownLocationException.java
new file mode 100644
index 0000000..e898e92
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/exceptions/UnknownLocationException.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.exceptions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.FlinkException;
+
+/**
+ * Exception to fail Future if the Task Manager on which the
+ * {@code Client Proxy} is running on, does not know the location
+ * of a requested state.
+ */
+@Internal
+public class UnknownLocationException extends FlinkException {
+
+	private static final long serialVersionUID = 9092442511708951209L;
+
+	public UnknownLocationException(String msg) {
+		super(msg);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cef6741a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
index 29ee0d7..2e24431 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.queryablestate.exceptions.UnknownKvStateIdException;
 import org.apache.flink.queryablestate.exceptions.UnknownKvStateKeyGroupLocationException;
+import org.apache.flink.queryablestate.exceptions.UnknownLocationException;
 import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
 import org.apache.flink.queryablestate.messages.KvStateRequest;
 import org.apache.flink.queryablestate.messages.KvStateResponse;
@@ -33,18 +34,17 @@ import org.apache.flink.queryablestate.network.messages.MessageSerializer;
 import org.apache.flink.queryablestate.network.stats.DisabledKvStateRequestStats;
 import org.apache.flink.queryablestate.network.stats.KvStateRequestStats;
 import org.apache.flink.queryablestate.server.KvStateServerImpl;
-import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobmaster.KvStateLocationOracle;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.query.KvStateClientProxy;
 import org.apache.flink.runtime.query.KvStateLocation;
-import org.apache.flink.runtime.query.KvStateMessage;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
 
-import akka.dispatch.OnComplete;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,12 +53,8 @@ import java.net.InetSocketAddress;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 
-import scala.concurrent.duration.FiniteDuration;
-import scala.reflect.ClassTag$;
-
 /**
  * This handler acts as an internal (to the Flink cluster) client that receives
  * the requests from external clients, executes them by contacting the Job Manager (if necessary) and
@@ -205,32 +201,32 @@ public class KvStateClientProxyHandler extends AbstractServerHandler<KvStateRequ
 			return cachedFuture;
 		}
 
-		LOG.debug("Retrieving location for state={} of job={} from the job manager.", jobId, queryableStateName);
-
-		final CompletableFuture<KvStateLocation> location = new CompletableFuture<>();
-		lookupCache.put(cacheKey, location);
-		return proxy.getJobManagerFuture().thenComposeAsync(
-				jobManagerGateway -> {
-					final Object msg = new KvStateMessage.LookupKvStateLocation(jobId, queryableStateName);
-					jobManagerGateway.ask(msg, FiniteDuration.apply(1000L, TimeUnit.MILLISECONDS))
-							.mapTo(ClassTag$.MODULE$.<KvStateLocation>apply(KvStateLocation.class))
-							.onComplete(new OnComplete<KvStateLocation>() {
-
-								@Override
-								public void onComplete(Throwable failure, KvStateLocation loc) throws Throwable {
-									if (failure != null) {
-										if (failure instanceof FlinkJobNotFoundException) {
-											// if the jobId was wrong, remove the entry from the cache.
-											lookupCache.remove(cacheKey);
-										}
-										location.completeExceptionally(failure);
-									} else {
-										location.complete(loc);
-									}
-								}
-							}, Executors.directExecutionContext());
-					return location;
-				}, queryExecutor);
+		final KvStateLocationOracle kvStateLocationOracle = proxy.getKvStateLocationOracle(jobId);
+
+		if (kvStateLocationOracle != null) {
+			LOG.debug("Retrieving location for state={} of job={} from the key-value state location oracle.", jobId, queryableStateName);
+			final CompletableFuture<KvStateLocation> location = new CompletableFuture<>();
+			lookupCache.put(cacheKey, location);
+
+			kvStateLocationOracle
+				.requestKvStateLocation(jobId, queryableStateName)
+				.whenComplete(
+					(KvStateLocation kvStateLocation, Throwable throwable) -> {
+						if (throwable != null) {
+							if (ExceptionUtils.stripCompletionException(throwable) instanceof FlinkJobNotFoundException) {
+								// if the jobId was wrong, remove the entry from the cache.
+								lookupCache.remove(cacheKey);
+							}
+							location.completeExceptionally(throwable);
+						} else {
+							location.complete(kvStateLocation);
+						}
+					});
+
+			return location;
+		} else {
+			return FutureUtils.completedExceptionally(new UnknownLocationException("Could not contact the state location oracle to retrieve the state location."));
+		}
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/cef6741a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java
index aa5e7b6..6902cb5 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java
@@ -19,22 +19,24 @@
 package org.apache.flink.queryablestate.client.proxy;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.queryablestate.exceptions.UnknownJobManagerException;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.queryablestate.messages.KvStateRequest;
 import org.apache.flink.queryablestate.messages.KvStateResponse;
 import org.apache.flink.queryablestate.network.AbstractServerBase;
 import org.apache.flink.queryablestate.network.AbstractServerHandler;
 import org.apache.flink.queryablestate.network.messages.MessageSerializer;
 import org.apache.flink.queryablestate.network.stats.KvStateRequestStats;
-import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobmaster.KvStateLocationOracle;
 import org.apache.flink.runtime.query.KvStateClientProxy;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nullable;
+
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.Iterator;
-import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -43,18 +45,13 @@ import java.util.concurrent.TimeUnit;
 @Internal
 public class KvStateClientProxyImpl extends AbstractServerBase<KvStateRequest, KvStateResponse> implements KvStateClientProxy {
 
-	private static final CompletableFuture<ActorGateway> UNKNOWN_JOB_MANAGER =
-			FutureUtils.completedExceptionally(new UnknownJobManagerException());
-
 	/** Number of threads used to process incoming requests. */
 	private final int queryExecutorThreads;
 
 	/** Statistics collector. */
 	private final KvStateRequestStats stats;
 
-	private final Object leaderLock = new Object();
-
-	private CompletableFuture<ActorGateway> jobManagerFuture = UNKNOWN_JOB_MANAGER;
+	private final ConcurrentHashMap<JobID, KvStateLocationOracle> kvStateLocationOracles;
 
 	/**
 	 * Creates the Queryable State Client Proxy.
@@ -83,6 +80,8 @@ public class KvStateClientProxyImpl extends AbstractServerBase<KvStateRequest, K
 		Preconditions.checkArgument(numQueryThreads >= 1, "Non-positive number of query threads.");
 		this.queryExecutorThreads = numQueryThreads;
 		this.stats = Preconditions.checkNotNull(stats);
+
+		this.kvStateLocationOracles = new ConcurrentHashMap<>(4);
 	}
 
 	@Override
@@ -106,20 +105,25 @@ public class KvStateClientProxyImpl extends AbstractServerBase<KvStateRequest, K
 	}
 
 	@Override
-	public void updateJobManager(CompletableFuture<ActorGateway> leadingJobManager) throws Exception {
-		synchronized (leaderLock) {
-			if (leadingJobManager == null) {
-				jobManagerFuture = UNKNOWN_JOB_MANAGER;
-			} else {
-				jobManagerFuture = leadingJobManager;
-			}
+	public void updateKvStateLocationOracle(JobID jobId, @Nullable KvStateLocationOracle kvStateLocationOracle) {
+		if (kvStateLocationOracle == null) {
+			kvStateLocationOracles.remove(jobId);
+		} else {
+			kvStateLocationOracles.put(jobId, kvStateLocationOracle);
 		}
 	}
 
+	@Nullable
 	@Override
-	public CompletableFuture<ActorGateway> getJobManagerFuture() {
-		synchronized (leaderLock) {
-			return jobManagerFuture;
+	public KvStateLocationOracle getKvStateLocationOracle(JobID jobId) {
+		final KvStateLocationOracle legacyKvStateLocationOracle = kvStateLocationOracles.get(HighAvailabilityServices.DEFAULT_JOB_ID);
+
+		// we give preference to the oracle registered under the default job id
+		// to make it work with the pre Flip-6 code paths
+		if (legacyKvStateLocationOracle != null) {
+			return legacyKvStateLocationOracle;
+		} else {
+			return kvStateLocationOracles.get(jobId);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cef6741a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImplTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImplTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImplTest.java
new file mode 100644
index 0000000..fc8b8da
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImplTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client.proxy;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.queryablestate.network.stats.DisabledKvStateRequestStats;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobmaster.KvStateLocationOracle;
+import org.apache.flink.runtime.query.KvStateLocation;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link KvStateClientProxyImpl}.
+ */
+public class KvStateClientProxyImplTest extends TestLogger {
+
+	private KvStateClientProxyImpl kvStateClientProxy;
+
+	@Before
+	public void setup() {
+		kvStateClientProxy = new KvStateClientProxyImpl(
+			InetAddress.getLoopbackAddress(),
+			Collections.singleton(0).iterator(),
+			1,
+			1,
+			new DisabledKvStateRequestStats());
+	}
+
+	/**
+	 * Tests that we can set and retrieve the {@link KvStateLocationOracle}.
+	 */
+	@Test
+	public void testKvStateLocationOracle() {
+		final JobID jobId1 = new JobID();
+		final TestingKvStateLocationOracle kvStateLocationOracle1 = new TestingKvStateLocationOracle();
+		kvStateClientProxy.updateKvStateLocationOracle(jobId1, kvStateLocationOracle1);
+		final JobID jobId2 = new JobID();
+		final TestingKvStateLocationOracle kvStateLocationOracle2 = new TestingKvStateLocationOracle();
+		kvStateClientProxy.updateKvStateLocationOracle(jobId2, kvStateLocationOracle2);
+
+		assertThat(kvStateClientProxy.getKvStateLocationOracle(new JobID()), nullValue());
+
+		assertThat(kvStateClientProxy.getKvStateLocationOracle(jobId1), equalTo(kvStateLocationOracle1));
+		assertThat(kvStateClientProxy.getKvStateLocationOracle(jobId2), equalTo(kvStateLocationOracle2));
+
+		kvStateClientProxy.updateKvStateLocationOracle(jobId1, null);
+		assertThat(kvStateClientProxy.getKvStateLocationOracle(jobId1), nullValue());
+	}
+
+	/**
+	 * Tests that {@link KvStateLocationOracle} registered under {@link HighAvailabilityServices#DEFAULT_JOB_ID}
+	 * will be used for all requests.
+	 */
+	@Test
+	public void testPreFlip6CodePathPreference() {
+		final TestingKvStateLocationOracle kvStateLocationOracle = new TestingKvStateLocationOracle();
+		kvStateClientProxy.updateKvStateLocationOracle(HighAvailabilityServices.DEFAULT_JOB_ID, kvStateLocationOracle);
+		final JobID jobId = new JobID();
+		kvStateClientProxy.updateKvStateLocationOracle(jobId, new TestingKvStateLocationOracle());
+
+		assertThat(kvStateClientProxy.getKvStateLocationOracle(jobId), equalTo(kvStateLocationOracle));
+	}
+
+	/**
+	 * Testing implementation of {@link KvStateLocationOracle}.
+	 */
+	private static final class TestingKvStateLocationOracle implements KvStateLocationOracle {
+
+		@Override
+		public CompletableFuture<KvStateLocation> requestKvStateLocation(JobID jobId, String registrationName) {
+			return null;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cef6741a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
index 7b301ed..8b1517c 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
@@ -126,17 +126,10 @@ public class KvStateServerHandlerTest extends TestLogger {
 		AbstractStateBackend abstractBackend = new MemoryStateBackend();
 		DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
 		dummyEnv.setKvStateRegistry(registry);
-		AbstractKeyedStateBackend<Integer> backend = abstractBackend.createKeyedStateBackend(
-				dummyEnv,
-				new JobID(),
-				"test_op",
-				IntSerializer.INSTANCE,
-				numKeyGroups,
-				new KeyGroupRange(0, 0),
-				registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()));
+		AbstractKeyedStateBackend<Integer> backend = createKeyedStateBackend(registry, numKeyGroups, abstractBackend, dummyEnv);
 
 		final TestRegistryListener registryListener = new TestRegistryListener();
-		registry.registerListener(registryListener);
+		registry.registerListener(dummyEnv.getJobID(), registryListener);
 
 		// Update the KvState and request it
 		int expectedValue = 712828289;
@@ -250,17 +243,10 @@ public class KvStateServerHandlerTest extends TestLogger {
 		AbstractStateBackend abstractBackend = new MemoryStateBackend();
 		DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
 		dummyEnv.setKvStateRegistry(registry);
-		KeyedStateBackend<Integer> backend = abstractBackend.createKeyedStateBackend(
-				dummyEnv,
-				new JobID(),
-				"test_op",
-				IntSerializer.INSTANCE,
-				numKeyGroups,
-				new KeyGroupRange(0, 0),
-				registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()));
+		KeyedStateBackend<Integer> backend = createKeyedStateBackend(registry, numKeyGroups, abstractBackend, dummyEnv);
 
 		final TestRegistryListener registryListener = new TestRegistryListener();
-		registry.registerListener(registryListener);
+		registry.registerListener(dummyEnv.getJobID(), registryListener);
 
 		// Register state
 		ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
@@ -403,17 +389,10 @@ public class KvStateServerHandlerTest extends TestLogger {
 		AbstractStateBackend abstractBackend = new MemoryStateBackend();
 		DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
 		dummyEnv.setKvStateRegistry(registry);
-		KeyedStateBackend<Integer> backend = abstractBackend.createKeyedStateBackend(
-				dummyEnv,
-				new JobID(),
-				"test_op",
-				IntSerializer.INSTANCE,
-				numKeyGroups,
-				new KeyGroupRange(0, 0),
-				registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()));
+		KeyedStateBackend<Integer> backend = createKeyedStateBackend(registry, numKeyGroups, abstractBackend, dummyEnv);
 
 		final TestRegistryListener registryListener = new TestRegistryListener();
-		registry.registerListener(registryListener);
+		registry.registerListener(dummyEnv.getJobID(), registryListener);
 
 		// Register state
 		ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
@@ -545,17 +524,10 @@ public class KvStateServerHandlerTest extends TestLogger {
 		AbstractStateBackend abstractBackend = new MemoryStateBackend();
 		DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
 		dummyEnv.setKvStateRegistry(registry);
-		AbstractKeyedStateBackend<Integer> backend = abstractBackend.createKeyedStateBackend(
-				dummyEnv,
-				new JobID(),
-				"test_op",
-				IntSerializer.INSTANCE,
-				numKeyGroups,
-				new KeyGroupRange(0, 0),
-				registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()));
+		AbstractKeyedStateBackend<Integer> backend = createKeyedStateBackend(registry, numKeyGroups, abstractBackend, dummyEnv);
 
 		final TestRegistryListener registryListener = new TestRegistryListener();
-		registry.registerListener(registryListener);
+		registry.registerListener(dummyEnv.getJobID(), registryListener);
 
 		// Register state
 		ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
@@ -639,17 +611,10 @@ public class KvStateServerHandlerTest extends TestLogger {
 		AbstractStateBackend abstractBackend = new MemoryStateBackend();
 		DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
 		dummyEnv.setKvStateRegistry(registry);
-		AbstractKeyedStateBackend<Integer> backend = abstractBackend.createKeyedStateBackend(
-				dummyEnv,
-				new JobID(),
-				"test_op",
-				IntSerializer.INSTANCE,
-				numKeyGroups,
-				new KeyGroupRange(0, 0),
-				registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()));
+		AbstractKeyedStateBackend<Integer> backend = createKeyedStateBackend(registry, numKeyGroups, abstractBackend, dummyEnv);
 
 		final TestRegistryListener registryListener = new TestRegistryListener();
-		registry.registerListener(registryListener);
+		registry.registerListener(dummyEnv.getJobID(), registryListener);
 
 		// Register state
 		ValueStateDescriptor<byte[]> desc = new ValueStateDescriptor<>("any", BytePrimitiveArraySerializer.INSTANCE);
@@ -755,4 +720,15 @@ public class KvStateServerHandlerTest extends TestLogger {
 
 		}
 	}
+
+	private AbstractKeyedStateBackend<Integer> createKeyedStateBackend(KvStateRegistry registry, int numKeyGroups, AbstractStateBackend abstractBackend, DummyEnvironment dummyEnv) throws java.io.IOException {
+		return abstractBackend.createKeyedStateBackend(
+			dummyEnv,
+			dummyEnv.getJobID(),
+			"test_op",
+			IntSerializer.INSTANCE,
+			numKeyGroups,
+			new KeyGroupRange(0, 0),
+			registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()));
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cef6741a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
index debd190..8af9cf5 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
@@ -108,19 +108,20 @@ public class KvStateServerTest {
 			AbstractStateBackend abstractBackend = new MemoryStateBackend();
 			DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
 			dummyEnv.setKvStateRegistry(registry);
+			final JobID jobId = new JobID();
 			AbstractKeyedStateBackend<Integer> backend = abstractBackend.createKeyedStateBackend(
-					dummyEnv,
-					new JobID(),
-					"test_op",
-					IntSerializer.INSTANCE,
-					numKeyGroups,
-					new KeyGroupRange(0, 0),
-					registry.createTaskRegistry(new JobID(), new JobVertexID()));
+				dummyEnv,
+				jobId,
+				"test_op",
+				IntSerializer.INSTANCE,
+				numKeyGroups,
+				new KeyGroupRange(0, 0),
+				registry.createTaskRegistry(jobId, new JobVertexID()));
 
 			final KvStateServerHandlerTest.TestRegistryListener registryListener =
 					new KvStateServerHandlerTest.TestRegistryListener();
 
-			registry.registerListener(registryListener);
+			registry.registerListener(jobId, registryListener);
 
 			ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
 			desc.setQueryable("vanilla");

http://git-wip-us.apache.org/repos/asf/flink/blob/cef6741a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 6d22e75..8d47655 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -616,56 +616,86 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 	}
 
 	@Override
-	public CompletableFuture<KvStateLocation> lookupKvStateLocation(final String registrationName) {
-		if (log.isDebugEnabled()) {
-			log.debug("Lookup key-value state for job {} with registration " +
+	public CompletableFuture<KvStateLocation> requestKvStateLocation(final JobID jobId, final String registrationName) {
+		// sanity check for the correct JobID
+		if (jobGraph.getJobID().equals(jobId)) {
+			if (log.isDebugEnabled()) {
+				log.debug("Lookup key-value state for job {} with registration " +
 					"name {}.", jobGraph.getJobID(), registrationName);
-		}
+			}
 
-		final KvStateLocationRegistry registry = executionGraph.getKvStateLocationRegistry();
-		final KvStateLocation location = registry.getKvStateLocation(registrationName);
-		if (location != null) {
-			return CompletableFuture.completedFuture(location);
+			final KvStateLocationRegistry registry = executionGraph.getKvStateLocationRegistry();
+			final KvStateLocation location = registry.getKvStateLocation(registrationName);
+			if (location != null) {
+				return CompletableFuture.completedFuture(location);
+			} else {
+				return FutureUtils.completedExceptionally(new UnknownKvStateLocation(registrationName));
+			}
 		} else {
-			return FutureUtils.completedExceptionally(new UnknownKvStateLocation(registrationName));
+			if (log.isDebugEnabled()) {
+				log.debug("Request of key-value state location for unknown job {} received.", jobId);
+			}
+			return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId));
 		}
 	}
 
 	@Override
-	public void notifyKvStateRegistered(
+	public CompletableFuture<Acknowledge> notifyKvStateRegistered(
+			final JobID jobId,
 			final JobVertexID jobVertexId,
 			final KeyGroupRange keyGroupRange,
 			final String registrationName,
 			final KvStateID kvStateId,
 			final InetSocketAddress kvStateServerAddress) {
-		if (log.isDebugEnabled()) {
-			log.debug("Key value state registered for job {} under name {}.",
+		if (jobGraph.getJobID().equals(jobId)) {
+			if (log.isDebugEnabled()) {
+				log.debug("Key value state registered for job {} under name {}.",
 					jobGraph.getJobID(), registrationName);
-		}
+			}
 
-		try {
-			executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered(
+			try {
+				executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered(
 					jobVertexId, keyGroupRange, registrationName, kvStateId, kvStateServerAddress);
-		} catch (Exception e) {
-			log.error("Failed to notify KvStateRegistry about registration {}.", registrationName);
+
+				return CompletableFuture.completedFuture(Acknowledge.get());
+			} catch (Exception e) {
+				log.error("Failed to notify KvStateRegistry about registration {}.", registrationName);
+				return FutureUtils.completedExceptionally(e);
+			}
+		} else {
+			if (log.isDebugEnabled()) {
+				log.debug("Notification about key-value state registration for unknown job {} received.", jobId);
+			}
+			return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId));
 		}
 	}
 
 	@Override
-	public void notifyKvStateUnregistered(
+	public CompletableFuture<Acknowledge> notifyKvStateUnregistered(
+			JobID jobId,
 			JobVertexID jobVertexId,
 			KeyGroupRange keyGroupRange,
 			String registrationName) {
-		if (log.isDebugEnabled()) {
-			log.debug("Key value state unregistered for job {} under name {}.",
+		if (jobGraph.getJobID().equals(jobId)) {
+			if (log.isDebugEnabled()) {
+				log.debug("Key value state unregistered for job {} under name {}.",
 					jobGraph.getJobID(), registrationName);
-		}
+			}
 
-		try {
-			executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered(
+			try {
+				executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered(
 					jobVertexId, keyGroupRange, registrationName);
-		} catch (Exception e) {
-			log.error("Failed to notify KvStateRegistry about registration {}.", registrationName);
+
+				return CompletableFuture.completedFuture(Acknowledge.get());
+			} catch (Exception e) {
+				log.error("Failed to notify KvStateRegistry about registration {}.", registrationName);
+				return FutureUtils.completedExceptionally(e);
+			}
+		} else {
+			if (log.isDebugEnabled()) {
+				log.debug("Notification about key-value state deregistration for unknown job {} received.", jobId);
+			}
+			return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId));
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cef6741a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index b811531..9b9f3e3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorGateway;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -38,26 +37,27 @@ import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
-import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.rpc.FencedRpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 
-import java.net.InetSocketAddress;
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 
 /**
  * {@link JobMaster} rpc gateway interface.
  */
-public interface JobMasterGateway extends CheckpointCoordinatorGateway, FencedRpcGateway<JobMasterId>, RestfulGateway {
+public interface JobMasterGateway extends
+	CheckpointCoordinatorGateway,
+	FencedRpcGateway<JobMasterId>,
+	RestfulGateway,
+	KvStateLocationOracle,
+	KvStateRegistryGateway {
 
 	/**
 	 * Cancels the currently executed job.
@@ -147,42 +147,6 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway, FencedRp
 		final Exception cause);
 
 	/**
-	 * Requests a {@link KvStateLocation} for the specified {@link InternalKvState} registration name.
-	 *
-	 * @param registrationName Name under which the KvState has been registered.
-	 * @return Future of the requested {@link InternalKvState} location
-	 */
-	CompletableFuture<KvStateLocation> lookupKvStateLocation(final String registrationName);
-
-	/**
-	 * Notifies that queryable state has been registered.
-	 *
-	 * @param jobVertexId          JobVertexID the KvState instance belongs to.
-	 * @param keyGroupRange        Key group range the KvState instance belongs to.
-	 * @param registrationName     Name under which the KvState has been registered.
-	 * @param kvStateId            ID of the registered KvState instance.
-	 * @param kvStateServerAddress Server address where to find the KvState instance.
-	 */
-	void notifyKvStateRegistered(
-			final JobVertexID jobVertexId,
-			final KeyGroupRange keyGroupRange,
-			final String registrationName,
-			final KvStateID kvStateId,
-			final InetSocketAddress kvStateServerAddress);
-
-	/**
-	 * Notifies that queryable state has been unregistered.
-	 *
-	 * @param jobVertexId      JobVertexID the KvState instance belongs to.
-	 * @param keyGroupRange    Key group index the KvState instance belongs to.
-	 * @param registrationName Name under which the KvState has been registered.
-	 */
-	void notifyKvStateUnregistered(
-			JobVertexID jobVertexId,
-			KeyGroupRange keyGroupRange,
-			String registrationName);
-
-	/**
 	 * Request the classloading props of this job.
 	 */
 	CompletableFuture<ClassloadingProps> requestClassloadingProps();

http://git-wip-us.apache.org/repos/asf/flink/blob/cef6741a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/KvStateLocationOracle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/KvStateLocationOracle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/KvStateLocationOracle.java
new file mode 100644
index 0000000..31e7a91
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/KvStateLocationOracle.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.query.KvStateLocation;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Oracle for {@link KvStateLocation} in the cluster. In order to answer {@link InternalKvState}
+ * location requests, the {@link TaskExecutor} have to register and unregister their respective
+ * key-value states at the oracle.
+ */
+public interface KvStateLocationOracle {
+
+	/**
+	 * Requests a {@link KvStateLocation} for the specified {@link InternalKvState} registration name.
+	 *
+	 * @param jobId identifying the job for which to request the {@link KvStateLocation}
+	 * @param registrationName Name under which the KvState has been registered.
+	 * @return Future of the requested {@link InternalKvState} location
+	 */
+	CompletableFuture<KvStateLocation> requestKvStateLocation(
+		final JobID jobId,
+		final String registrationName);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cef6741a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/KvStateRegistryGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/KvStateRegistryGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/KvStateRegistryGateway.java
new file mode 100644
index 0000000..4e70df5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/KvStateRegistryGateway.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.queryablestate.KvStateID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.state.KeyGroupRange;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Gateway to report key-value state registration and deregistrations.
+ */
+public interface KvStateRegistryGateway {
+
+	/**
+	 * Notifies that queryable state has been registered.
+	 *
+	 * @param jobId	identifying the job for which to register a key value state
+	 * @param jobVertexId JobVertexID the KvState instance belongs to.
+	 * @param keyGroupRange Key group range the KvState instance belongs to.
+	 * @param registrationName Name under which the KvState has been registered.
+	 * @param kvStateId ID of the registered KvState instance.
+	 * @param kvStateServerAddress Server address where to find the KvState instance.
+	 * @return Future acknowledge if the key-value state has been registered
+	 */
+	CompletableFuture<Acknowledge> notifyKvStateRegistered(
+		final JobID jobId,
+		final JobVertexID jobVertexId,
+		final KeyGroupRange keyGroupRange,
+		final String registrationName,
+		final KvStateID kvStateId,
+		final InetSocketAddress kvStateServerAddress);
+
+	/**
+	 * Notifies that queryable state has been unregistered.
+	 *
+	 * @param jobId	identifying the job for which to unregister a key value state
+	 * @param jobVertexId JobVertexID the KvState instance belongs to.
+	 * @param keyGroupRange Key group index the KvState instance belongs to.
+	 * @param registrationName Name under which the KvState has been registered.
+	 * @return Future acknowledge if the key-value state has been unregistered
+	 */
+	CompletableFuture<Acknowledge> notifyKvStateUnregistered(
+		final JobID jobId,
+		final JobVertexID jobVertexId,
+		final KeyGroupRange keyGroupRange,
+		final String registrationName);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cef6741a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateClientProxy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateClientProxy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateClientProxy.java
index d605952..da3d5d0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateClientProxy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateClientProxy.java
@@ -18,9 +18,10 @@
 
 package org.apache.flink.runtime.query;
 
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobmaster.KvStateLocationOracle;
 
-import java.util.concurrent.CompletableFuture;
+import javax.annotation.Nullable;
 
 /**
  * An interface for the Queryable State Client Proxy running on each Task Manager in the cluster.
@@ -47,19 +48,25 @@ public interface KvStateClientProxy extends KvStateServer {
 	 * <p>This is useful in settings where high-availability is enabled and
 	 * a failed Job Manager is replaced by a new one.
 	 *
-	 * <p><b>IMPORTANT: </b> this method may be called by a different thread than the {@link #getJobManagerFuture()}.
+	 * <p><b>IMPORTANT: </b> this method may be called by a different thread than
+	 * the {@link #getKvStateLocationOracle(JobID)}.
 	 *
-	 * @param leadingJobManager the currently leading job manager.
+	 * @param jobId identifying the job for which to update the key-value state location oracle
+	 * @param kvStateLocationOracle the key-value state location oracle for the given {@link JobID},
+	 *                                 or null if there is no oracle anymore
 	 * */
-	void updateJobManager(CompletableFuture<ActorGateway> leadingJobManager) throws Exception;
+	void updateKvStateLocationOracle(
+		JobID jobId,
+		@Nullable KvStateLocationOracle kvStateLocationOracle);
 
 	/**
-	 * Retrieves a future containing the currently leading Job Manager.
+	 * Retrieves a future containing the currently leading key-value state location oracle.
 	 *
 	 * <p><b>IMPORTANT: </b> this method may be called by a different thread than the
-	 * {@link #updateJobManager(CompletableFuture)}.
+	 * {@link #updateKvStateLocationOracle(JobID, KvStateLocationOracle)}.
 	 *
-	 * @return A {@link CompletableFuture} containing the currently active Job Manager.
+	 * @param jobId identifying the job for which to request the key-value state location oracle
+	 * @return The key-value state location oracle for the given {@link JobID} or null if none.
 	 */
-	CompletableFuture<ActorGateway> getJobManagerFuture();
+	@Nullable KvStateLocationOracle getKvStateLocationOracle(JobID jobId);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cef6741a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
index ed1f92e..2c55463 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
@@ -20,13 +20,13 @@ package org.apache.flink.runtime.query;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.queryablestate.KvStateID;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.taskmanager.Task;
 
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * A registry for {@link InternalKvState} instances per task manager.
@@ -44,26 +44,31 @@ public class KvStateRegistry {
 	private final ConcurrentHashMap<KvStateID, InternalKvState<?>> registeredKvStates =
 			new ConcurrentHashMap<>();
 
-	/** Registry listener to be notified on registration/unregistration. */
-	private final AtomicReference<KvStateRegistryListener> listenerRef = new AtomicReference<>();
+	/** Registry listeners to be notified on registration/unregistration. */
+	private final ConcurrentHashMap<JobID, KvStateRegistryListener> listeners = new ConcurrentHashMap<>(4);
 
 	/**
 	 * Registers a listener with the registry.
 	 *
+	 * @param jobId identifying the job for which to register a {@link KvStateRegistryListener}
 	 * @param listener The registry listener.
 	 * @throws IllegalStateException If there is a registered listener
 	 */
-	public void registerListener(KvStateRegistryListener listener) {
-		if (!listenerRef.compareAndSet(null, listener)) {
-			throw new IllegalStateException("Listener already registered.");
+	public void registerListener(JobID jobId, KvStateRegistryListener listener) {
+		final KvStateRegistryListener previousValue = listeners.putIfAbsent(jobId, listener);
+
+		if (previousValue != null) {
+			throw new IllegalStateException("Listener already registered under " + jobId + '.');
 		}
 	}
 
 	/**
 	 * Unregisters the listener with the registry.
+	 *
+	 * @param jobId for which to unregister the {@link KvStateRegistryListener}
 	 */
-	public void unregisterListener() {
-		listenerRef.set(null);
+	public void unregisterListener(JobID jobId) {
+		listeners.remove(jobId);
 	}
 
 	/**
@@ -86,14 +91,15 @@ public class KvStateRegistry {
 		KvStateID kvStateId = new KvStateID();
 
 		if (registeredKvStates.putIfAbsent(kvStateId, kvState) == null) {
-			final KvStateRegistryListener listener = listenerRef.get();
+			final KvStateRegistryListener listener = getKvStateRegistryListener(jobId);
+
 			if (listener != null) {
 				listener.notifyKvStateRegistered(
-						jobId,
-						jobVertexId,
-						keyGroupRange,
-						registrationName,
-						kvStateId);
+					jobId,
+					jobVertexId,
+					keyGroupRange,
+					registrationName,
+					kvStateId);
 			}
 
 			return kvStateId;
@@ -118,7 +124,7 @@ public class KvStateRegistry {
 			KvStateID kvStateId) {
 
 		if (registeredKvStates.remove(kvStateId) != null) {
-			final KvStateRegistryListener listener = listenerRef.get();
+			final KvStateRegistryListener listener = getKvStateRegistryListener(jobId);
 			if (listener != null) {
 				listener.notifyKvStateUnregistered(
 						jobId,
@@ -154,4 +160,19 @@ public class KvStateRegistry {
 		return new TaskKvStateRegistry(this, jobId, jobVertexId);
 	}
 
+	// ------------------------------------------------------------------------
+	// Internal methods
+	// ------------------------------------------------------------------------
+
+	private KvStateRegistryListener getKvStateRegistryListener(JobID jobId) {
+		// first check whether we are running the pre-Flip-6 code which registers
+		// a single listener under HighAvailabilityServices.DEFAULT_JOB_ID
+		KvStateRegistryListener listener = listeners.get(HighAvailabilityServices.DEFAULT_JOB_ID);
+
+		if (listener == null) {
+			listener = listeners.get(jobId);
+		}
+		return listener;
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cef6741a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistryGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistryGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistryGateway.java
deleted file mode 100644
index 4b9834a..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistryGateway.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.query;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.queryablestate.KvStateID;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.rpc.RpcGateway;
-import org.apache.flink.runtime.state.KeyGroupRange;
-
-import java.net.InetSocketAddress;
-
-/**
- * A gateway to listen for {@code KvState} registrations.
- */
-public interface KvStateRegistryGateway extends RpcGateway {
-	/**
-	 * Notifies the listener about a registered KvState instance.
-	 *
-	 * @param jobId            Job ID the KvState instance belongs to
-	 * @param jobVertexId      JobVertexID the KvState instance belongs to
-	 * @param keyGroupRange    Key group range the KvState instance belongs to
-	 * @param registrationName Name under which the KvState is registered
-	 * @param kvStateId        ID of the KvState instance
-	 */
-	void notifyKvStateRegistered(
-		JobID jobId,
-		JobVertexID jobVertexId,
-		KeyGroupRange keyGroupRange,
-		String registrationName,
-		KvStateID kvStateId,
-		InetSocketAddress kvStateServerAddress);
-
-	/**
-	 * Notifies the listener about an unregistered KvState instance.
-	 *
-	 * @param jobId            Job ID the KvState instance belongs to
-	 * @param jobVertexId      JobVertexID the KvState instance belongs to
-	 * @param keyGroupRange    Key group range the KvState instance belongs to
-	 * @param registrationName Name under which the KvState is registered
-	 */
-	void notifyKvStateUnregistered(
-		JobID jobId,
-		JobVertexID jobVertexId,
-		KeyGroupRange keyGroupRange,
-		String registrationName);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/cef6741a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 9df2e88..6534c11 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -59,6 +59,9 @@ import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.query.KvStateClientProxy;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.query.KvStateServer;
 import org.apache.flink.runtime.registration.RegistrationConnectionListener;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
@@ -78,6 +81,7 @@ import org.apache.flink.runtime.taskexecutor.exceptions.TaskException;
 import org.apache.flink.runtime.taskexecutor.exceptions.TaskSubmissionException;
 import org.apache.flink.runtime.taskexecutor.rpc.RpcCheckpointResponder;
 import org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider;
+import org.apache.flink.runtime.taskexecutor.rpc.RpcKvStateRegistryListener;
 import org.apache.flink.runtime.taskexecutor.rpc.RpcPartitionStateChecker;
 import org.apache.flink.runtime.taskexecutor.rpc.RpcResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.taskexecutor.slot.SlotActions;
@@ -1015,6 +1019,8 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 
 		PartitionProducerStateChecker partitionStateChecker = new RpcPartitionStateChecker(jobMasterGateway);
 
+		registerQueryableState(jobID, jobMasterGateway);
+
 		return new JobManagerConnection(
 			jobID,
 			resourceID,
@@ -1029,12 +1035,44 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 
 	private void disassociateFromJobManager(JobManagerConnection jobManagerConnection, Exception cause) throws IOException {
 		checkNotNull(jobManagerConnection);
+
+		final KvStateRegistry kvStateRegistry = networkEnvironment.getKvStateRegistry();
+
+		if (kvStateRegistry != null) {
+			kvStateRegistry.unregisterListener(jobManagerConnection.getJobID());
+		}
+
+		final KvStateClientProxy kvStateClientProxy = networkEnvironment.getKvStateProxy();
+
+		if (kvStateClientProxy != null) {
+			kvStateClientProxy.updateKvStateLocationOracle(jobManagerConnection.getJobID(), null);
+		}
+
 		JobMasterGateway jobManagerGateway = jobManagerConnection.getJobManagerGateway();
 		jobManagerGateway.disconnectTaskManager(getResourceID(), cause);
 		jobManagerConnection.getLibraryCacheManager().shutdown();
 		jobManagerConnection.getBlobService().close();
 	}
 
+	private void registerQueryableState(JobID jobId, JobMasterGateway jobMasterGateway) {
+		final KvStateServer kvStateServer = networkEnvironment.getKvStateServer();
+		final KvStateRegistry kvStateRegistry = networkEnvironment.getKvStateRegistry();
+
+		if (kvStateServer != null && kvStateRegistry != null) {
+			kvStateRegistry.registerListener(
+				jobId,
+				new RpcKvStateRegistryListener(
+					jobMasterGateway,
+					kvStateServer.getServerAddress()));
+		}
+
+		final KvStateClientProxy kvStateProxy = networkEnvironment.getKvStateProxy();
+
+		if (kvStateProxy != null) {
+			kvStateProxy.updateKvStateLocationOracle(jobId, jobMasterGateway);
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	//  Internal task methods
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/cef6741a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index 61e83d7..c86d7c4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -413,11 +413,9 @@ public class TaskManagerServicesConfiguration {
 	private static QueryableStateConfiguration parseQueryableStateConfiguration(Configuration config) {
 
 		final Iterator<Integer> proxyPorts = NetUtils.getPortRangeFromString(
-				config.getString(QueryableStateOptions.PROXY_PORT_RANGE,
-						QueryableStateOptions.PROXY_PORT_RANGE.defaultValue()));
+				config.getString(QueryableStateOptions.PROXY_PORT_RANGE));
 		final Iterator<Integer> serverPorts = NetUtils.getPortRangeFromString(
-				config.getString(QueryableStateOptions.SERVER_PORT_RANGE,
-						QueryableStateOptions.SERVER_PORT_RANGE.defaultValue()));
+				config.getString(QueryableStateOptions.SERVER_PORT_RANGE));
 
 		final int numProxyServerNetworkThreads = config.getInteger(QueryableStateOptions.PROXY_NETWORK_THREADS);
 		final int numProxyServerQueryThreads = config.getInteger(QueryableStateOptions.PROXY_ASYNC_QUERY_THREADS);

http://git-wip-us.apache.org/repos/asf/flink/blob/cef6741a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcKvStateRegistryListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcKvStateRegistryListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcKvStateRegistryListener.java
index 6312d08..95eb2de 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcKvStateRegistryListener.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcKvStateRegistryListener.java
@@ -21,13 +21,16 @@ package org.apache.flink.runtime.taskexecutor.rpc;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.query.KvStateRegistryGateway;
+import org.apache.flink.runtime.jobmaster.KvStateRegistryGateway;
 import org.apache.flink.runtime.query.KvStateRegistryListener;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.util.Preconditions;
 
 import java.net.InetSocketAddress;
 
+/**
+ * {@link KvStateRegistryListener} implementation for the new RPC service.
+ */
 public class RpcKvStateRegistryListener implements KvStateRegistryListener {
 
 	private final KvStateRegistryGateway kvStateRegistryGateway;
@@ -54,7 +57,6 @@ public class RpcKvStateRegistryListener implements KvStateRegistryListener {
 			registrationName,
 			kvStateId,
 			kvStateServerAddress);
-
 	}
 
 	@Override
@@ -69,6 +71,5 @@ public class RpcKvStateRegistryListener implements KvStateRegistryListener {
 			jobVertexId,
 			keyGroupRange,
 			registrationName);
-
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cef6741a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayKvStateLocationOracle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayKvStateLocationOracle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayKvStateLocationOracle.java
new file mode 100644
index 0000000..d874438
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayKvStateLocationOracle.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.KvStateLocationOracle;
+import org.apache.flink.runtime.query.KvStateLocation;
+import org.apache.flink.runtime.query.KvStateMessage;
+import org.apache.flink.util.Preconditions;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import scala.concurrent.duration.FiniteDuration;
+import scala.reflect.ClassTag$;
+
+/**
+ * {@link KvStateLocationOracle} implementation for {@link ActorGateway}.
+ */
+public class ActorGatewayKvStateLocationOracle implements KvStateLocationOracle {
+
+	private final ActorGateway jobManagerActorGateway;
+
+	private final FiniteDuration timeout;
+
+	public ActorGatewayKvStateLocationOracle(
+			ActorGateway jobManagerActorGateway,
+			Time timeout) {
+		this.jobManagerActorGateway = Preconditions.checkNotNull(jobManagerActorGateway);
+
+		Preconditions.checkNotNull(timeout);
+		this.timeout = FiniteDuration.apply(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+	}
+
+	@Override
+	public CompletableFuture<KvStateLocation> requestKvStateLocation(JobID jobId, String registrationName) {
+		final KvStateMessage.LookupKvStateLocation lookupKvStateLocation = new KvStateMessage.LookupKvStateLocation(jobId, registrationName);
+
+		return FutureUtils.toJava(
+			jobManagerActorGateway
+				.ask(lookupKvStateLocation, timeout)
+				.mapTo(ClassTag$.MODULE$.<KvStateLocation>apply(KvStateLocation.class)));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cef6741a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index e6e584e..93d432d 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -944,11 +944,22 @@ class TaskManager(
       val kvStateRegistry = network.getKvStateRegistry()
 
       kvStateRegistry.registerListener(
+        HighAvailabilityServices.DEFAULT_JOB_ID,
         new ActorGatewayKvStateRegistryListener(
           jobManagerGateway,
           kvStateServer.getServerAddress))
     }
 
+    val proxy = network.getKvStateProxy
+
+    if (proxy != null) {
+      proxy.updateKvStateLocationOracle(
+        HighAvailabilityServices.DEFAULT_JOB_ID,
+        new ActorGatewayKvStateLocationOracle(
+          jobManagerGateway,
+          config.getTimeout()))
+    }
+
     // start a blob service, if a blob server is specified
     val jmHost = jobManager.path.address.host.getOrElse("localhost")
     val address = new InetSocketAddress(jmHost, blobPort)
@@ -1050,7 +1061,14 @@ class TaskManager(
     connectionUtils = None
 
     if (network.getKvStateRegistry != null) {
-      network.getKvStateRegistry.unregisterListener()
+      network.getKvStateRegistry.unregisterListener(HighAvailabilityServices.DEFAULT_JOB_ID)
+    }
+
+    val proxy = network.getKvStateProxy
+
+    if (proxy != null) {
+      // clear the key-value location oracle
+      proxy.updateKvStateLocationOracle(HighAvailabilityServices.DEFAULT_JOB_ID, null)
     }
     
     // failsafe shutdown of the metrics registry
@@ -1437,28 +1455,6 @@ class TaskManager(
   }
 
   override def notifyLeaderAddress(leaderAddress: String, leaderSessionID: UUID): Unit = {
-    val proxy = network.getKvStateProxy
-    if (proxy != null) {
-
-      val askTimeoutString = config.getConfiguration.getString(AkkaOptions.ASK_TIMEOUT)
-
-      val timeout = Duration(askTimeoutString)
-
-      if (!timeout.isFinite) {
-        throw new IllegalConfigurationException(AkkaOptions.ASK_TIMEOUT.key +
-          " is not a finite timeout ('" + askTimeoutString + "')")
-      }
-
-      if (leaderAddress != null) {
-        val actorGwFuture: Future[ActorGateway] =
-          AkkaUtils.getActorRefFuture(
-            leaderAddress, context.system, timeout.asInstanceOf[FiniteDuration]
-          ).map(actor => new AkkaActorGateway(actor, leaderSessionID))(context.system.dispatcher)
-
-        proxy.updateJobManager(FutureUtils.toJava(actorGwFuture))
-      }
-    }
-
     self ! JobManagerLeaderAddress(leaderAddress, leaderSessionID)
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cef6741a/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java
new file mode 100644
index 0000000..43aa1d1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.queryablestate.KvStateID;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.ArrayDeque;
+import java.util.Queue;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link KvStateRegistry}.
+ */
+public class KvStateRegistryTest extends TestLogger {
+
+	/**
+	 * Tests that {@link KvStateRegistryListener} only receive the notifications which
+	 * are destined for them.
+	 */
+	@Test
+	public void testKvStateRegistryListenerNotification() {
+		final JobID jobId1 = new JobID();
+		final JobID jobId2 = new JobID();
+
+		final KvStateRegistry kvStateRegistry = new KvStateRegistry();
+
+		final ArrayDeque<JobID> registeredNotifications1 = new ArrayDeque<>(2);
+		final ArrayDeque<JobID> deregisteredNotifications1 = new ArrayDeque<>(2);
+		final TestingKvStateRegistryListener listener1 = new TestingKvStateRegistryListener(
+			registeredNotifications1,
+			deregisteredNotifications1);
+
+		final ArrayDeque<JobID> registeredNotifications2 = new ArrayDeque<>(2);
+		final ArrayDeque<JobID> deregisteredNotifications2 = new ArrayDeque<>(2);
+		final TestingKvStateRegistryListener listener2 = new TestingKvStateRegistryListener(
+			registeredNotifications2,
+			deregisteredNotifications2);
+
+		kvStateRegistry.registerListener(jobId1, listener1);
+		kvStateRegistry.registerListener(jobId2, listener2);
+
+		final JobVertexID jobVertexId = new JobVertexID();
+		final KeyGroupRange keyGroupRange = new KeyGroupRange(0, 1);
+		final String registrationName = "foobar";
+		final KvStateID kvStateID = kvStateRegistry.registerKvState(
+			jobId1,
+			jobVertexId,
+			keyGroupRange,
+			registrationName,
+			new DummyKvState<>());
+
+		assertThat(registeredNotifications1.poll(), equalTo(jobId1));
+		assertThat(registeredNotifications2.isEmpty(), is(true));
+
+		final JobVertexID jobVertexId2 = new JobVertexID();
+		final KeyGroupRange keyGroupRange2 = new KeyGroupRange(0, 1);
+		final String registrationName2 = "barfoo";
+		final KvStateID kvStateID2 = kvStateRegistry.registerKvState(
+			jobId2,
+			jobVertexId2,
+			keyGroupRange2,
+			registrationName2,
+			new DummyKvState<>());
+
+		assertThat(registeredNotifications2.poll(), equalTo(jobId2));
+		assertThat(registeredNotifications1.isEmpty(), is(true));
+
+		kvStateRegistry.unregisterKvState(
+			jobId1,
+			jobVertexId,
+			keyGroupRange,
+			registrationName,
+			kvStateID);
+
+		assertThat(deregisteredNotifications1.poll(), equalTo(jobId1));
+		assertThat(deregisteredNotifications2.isEmpty(), is(true));
+
+		kvStateRegistry.unregisterKvState(
+			jobId2,
+			jobVertexId2,
+			keyGroupRange2,
+			registrationName2,
+			kvStateID2);
+
+		assertThat(deregisteredNotifications2.poll(), equalTo(jobId2));
+		assertThat(deregisteredNotifications1.isEmpty(), is(true));
+	}
+
+	/**
+	 * Tests that {@link KvStateRegistryListener} registered under {@link HighAvailabilityServices#DEFAULT_JOB_ID}
+	 * will be used for all notifications.
+	 */
+	@Test
+	public void testPreFlip6CodePathPreference() {
+		final KvStateRegistry kvStateRegistry = new KvStateRegistry();
+		final ArrayDeque<JobID> stateRegistrationNotifications = new ArrayDeque<>(2);
+		final ArrayDeque<JobID> stateDeregistrationNotifications = new ArrayDeque<>(2);
+		final TestingKvStateRegistryListener testingListener = new TestingKvStateRegistryListener(
+			stateRegistrationNotifications,
+			stateDeregistrationNotifications);
+
+		final ArrayDeque<JobID> anotherQueue = new ArrayDeque<>(2);
+		final TestingKvStateRegistryListener anotherListener = new TestingKvStateRegistryListener(
+			anotherQueue,
+			anotherQueue);
+
+		final JobID jobId = new JobID();
+
+		kvStateRegistry.registerListener(HighAvailabilityServices.DEFAULT_JOB_ID, testingListener);
+		kvStateRegistry.registerListener(jobId, anotherListener);
+
+		final JobVertexID jobVertexId = new JobVertexID();
+
+		final KeyGroupRange keyGroupRange = new KeyGroupRange(0, 1);
+		final String registrationName = "registrationName";
+		final KvStateID kvStateID = kvStateRegistry.registerKvState(
+			jobId,
+			jobVertexId,
+			keyGroupRange,
+			registrationName,
+			new DummyKvState());
+
+		assertThat(stateRegistrationNotifications.poll(), equalTo(jobId));
+		// another listener should not have received any notifications
+		assertThat(anotherQueue.isEmpty(), is(true));
+
+		kvStateRegistry.unregisterKvState(
+			jobId,
+			jobVertexId,
+			keyGroupRange,
+			registrationName,
+			kvStateID);
+
+		assertThat(stateDeregistrationNotifications.poll(), equalTo(jobId));
+		// another listener should not have received any notifications
+		assertThat(anotherQueue.isEmpty(), is(true));
+	}
+
+	/**
+	 * Testing implementation of {@link KvStateRegistryListener}.
+	 */
+	private static final class TestingKvStateRegistryListener implements KvStateRegistryListener {
+
+		private final Queue<JobID> stateRegisteredNotifications;
+		private final Queue<JobID> stateDeregisteredNotifications;
+
+		private TestingKvStateRegistryListener(
+				Queue<JobID> stateRegisteredNotifications,
+				Queue<JobID> stateDeregisteredNotifications) {
+			this.stateRegisteredNotifications = stateRegisteredNotifications;
+			this.stateDeregisteredNotifications = stateDeregisteredNotifications;
+		}
+
+		@Override
+		public void notifyKvStateRegistered(JobID jobId, JobVertexID jobVertexId, KeyGroupRange keyGroupRange, String registrationName, KvStateID kvStateId) {
+			stateRegisteredNotifications.offer(jobId);
+		}
+
+		@Override
+		public void notifyKvStateUnregistered(JobID jobId, JobVertexID jobVertexId, KeyGroupRange keyGroupRange, String registrationName) {
+			stateDeregisteredNotifications.offer(jobId);
+		}
+	}
+
+	/**
+	 * Testing implementation of {@link InternalKvState}.
+	 *
+	 * @param <T> type of the state
+	 */
+	private static final class DummyKvState<T> implements InternalKvState<T> {
+
+		@Override
+		public void setCurrentNamespace(Object namespace) {
+			// noop
+		}
+
+		@Override
+		public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception {
+			return serializedKeyAndNamespace;
+		}
+
+		@Override
+		public void clear() {
+			// noop
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cef6741a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 7475f91..ad69ae8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -54,6 +54,7 @@ import org.apache.flink.queryablestate.client.state.serialization.KvStateSeriali
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
 import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.KvStateRegistryListener;
@@ -3032,7 +3033,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		KeyGroupRange expectedKeyGroupRange = backend.getKeyGroupRange();
 
 		KvStateRegistryListener listener = mock(KvStateRegistryListener.class);
-		registry.registerListener(listener);
+		registry.registerListener(HighAvailabilityServices.DEFAULT_JOB_ID, listener);
 
 		ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>(
 				"test",


Mime
View raw message