flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [06/10] flink git commit: [FLINK-3779] [runtime] Add KvStateRegistry for queryable KvState
Date Tue, 09 Aug 2016 14:47:40 GMT
[FLINK-3779] [runtime] Add KvStateRegistry for queryable KvState

[streaming-java]

- Adds a KvStateRegistry per TaskManager at which created KvState instances are
  registered/unregistered.

- Registered KvState instances are reported to the JobManager, whcih can be
  queried for KvStateLocation.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/63c9b8e7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/63c9b8e7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/63c9b8e7

Branch: refs/heads/master
Commit: 63c9b8e70877fb8940b44c7870f3e75101349b15
Parents: a909adb
Author: Ufuk Celebi <uce@apache.org>
Authored: Mon May 30 14:03:35 2016 +0200
Committer: Ufuk Celebi <uce@apache.org>
Committed: Tue Aug 9 16:42:05 2016 +0200

----------------------------------------------------------------------
 .../state/RocksDBStateBackendConfigTest.java    |   6 +
 .../flink/runtime/execution/Environment.java    |   9 +
 .../runtime/executiongraph/ExecutionGraph.java  |  10 +
 .../runtime/io/network/NetworkEnvironment.java  | 125 +++++++-
 .../flink/runtime/query/KvStateLocation.java    | 239 +++++++++++++++
 .../runtime/query/KvStateLocationRegistry.java  | 161 ++++++++++
 .../flink/runtime/query/KvStateMessage.java     | 290 +++++++++++++++++++
 .../flink/runtime/query/KvStateRegistry.java    | 157 ++++++++++
 .../runtime/query/KvStateRegistryListener.java  |  62 ++++
 .../runtime/query/TaskKvStateRegistry.java      |  93 ++++++
 .../runtime/taskmanager/RuntimeEnvironment.java |  10 +
 .../apache/flink/runtime/taskmanager/Task.java  |   5 +
 .../flink/runtime/jobmanager/JobManager.scala   |  76 ++++-
 .../flink/runtime/taskmanager/TaskManager.scala |   6 +-
 .../io/network/NetworkEnvironmentTest.java      |   7 +-
 .../runtime/jobmanager/JobManagerTest.java      | 258 ++++++++++++++++-
 .../operators/testutils/DummyEnvironment.java   |  15 +
 .../testutils/DummyEnvironment.java.orig        | 185 ++++++++++++
 .../operators/testutils/MockEnvironment.java    |  14 +-
 .../query/KvStateLocationRegistryTest.java      | 231 +++++++++++++++
 .../runtime/query/KvStateLocationTest.java      |  92 ++++++
 .../runtime/taskmanager/TaskAsyncCallTest.java  |   3 +
 ...askManagerComponentsStartupShutdownTest.java |   3 +-
 .../flink/runtime/taskmanager/TaskTest.java     |  10 +-
 .../tasks/InterruptSensitiveRestoreTest.java    |   7 +-
 .../runtime/tasks/StreamMockEnvironment.java    |  12 +
 .../streaming/runtime/tasks/StreamTaskTest.java |  15 +-
 27 files changed, 2062 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/63c9b8e7/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index 0878b8c..657c57e 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.contrib.streaming.state;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
@@ -311,6 +312,11 @@ public class RocksDBStateBackendConfigTest {
 		when(env.getJobID()).thenReturn(new JobID());
 		when(env.getUserClassLoader()).thenReturn(RocksDBStateBackendConfigTest.class.getClassLoader());
 		when(env.getIOManager()).thenReturn(ioMan);
+
+		TaskInfo taskInfo = mock(TaskInfo.class);
+		when(env.getTaskInfo()).thenReturn(taskInfo);
+
+		when(taskInfo.getIndexOfThisSubtask()).thenReturn(0);
 		return env;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/63c9b8e7/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index 5ad5fe2..2f158fd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -33,6 +33,8 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.KvState;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 
@@ -148,6 +150,13 @@ public interface Environment {
 	AccumulatorRegistry getAccumulatorRegistry();
 
 	/**
+	 * Returns the registry for {@link KvState} instances.
+	 *
+	 * @return KvState registry
+	 */
+	TaskKvStateRegistry getTaskKvStateRegistry();
+
+	/**
 	 * Confirms that the invokable has successfully completed all steps it needed to
 	 * to for the checkpoint with the give checkpoint-ID. This method does not include
 	 * any state in the checkpoint.

http://git-wip-us.apache.org/repos/asf/flink/blob/63c9b8e7/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 1a0301d..e6ae6ce 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -54,6 +54,7 @@ import org.apache.flink.runtime.jobmanager.RecoveryMode;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.messages.ExecutionGraphMessages;
+import org.apache.flink.runtime.query.KvStateLocationRegistry;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.runtime.util.SerializedThrowable;
@@ -224,6 +225,9 @@ public class ExecutionGraph {
 	/** The execution context which is used to execute futures. */
 	private ExecutionContext executionContext;
 
+	/** Registered KvState instances reported by the TaskManagers. */
+	private transient KvStateLocationRegistry kvStateLocationRegistry;
+
 	// ------ Fields that are only relevant for archived execution graphs ------------
 	private String jsonPlan;
 
@@ -304,6 +308,8 @@ public class ExecutionGraph {
 		this.restartStrategy = restartStrategy;
 
 		metricGroup.gauge(RESTARTING_TIME_METRIC_NAME, new RestartTimeGauge());
+
+		this.kvStateLocationRegistry = new KvStateLocationRegistry(jobId, getAllVertices());
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -445,6 +451,10 @@ public class ExecutionGraph {
 		return savepointCoordinator;
 	}
 
+	public KvStateLocationRegistry getKvStateLocationRegistry() {
+		return kvStateLocationRegistry;
+	}
+
 	public RestartStrategy getRestartStrategy() {
 		return restartStrategy;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/63c9b8e7/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 30d2e38..283d804 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -22,6 +22,7 @@ import akka.dispatch.OnFailure;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
@@ -35,11 +36,21 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState;
 import org.apache.flink.runtime.messages.TaskMessages.FailTask;
+import org.apache.flink.runtime.query.KvStateID;
+import org.apache.flink.runtime.query.KvStateMessage;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.query.KvStateRegistryListener;
+import org.apache.flink.runtime.query.KvStateServerAddress;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.query.netty.AtomicKvStateRequestStats;
+import org.apache.flink.runtime.query.netty.KvStateServer;
+import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Option;
@@ -84,6 +95,12 @@ public class NetworkEnvironment {
 
 	private PartitionStateChecker partitionStateChecker;
 
+	/** Server for {@link org.apache.flink.runtime.state.KvState} requests. */
+	private KvStateServer kvStateServer;
+
+	/** Registry for {@link org.apache.flink.runtime.state.KvState} instances. */
+	private KvStateRegistry kvStateRegistry;
+
 	private boolean isShutdown;
 
 	/**
@@ -92,17 +109,21 @@ public class NetworkEnvironment {
 	 */
 	private final ExecutionContext executionContext;
 
+	private final InstanceConnectionInfo connectionInfo;
+
 	/**
 	 * Initializes all network I/O components.
 	 */
 	public NetworkEnvironment(
-		ExecutionContext executionContext,
-		FiniteDuration jobManagerTimeout,
-		NetworkEnvironmentConfiguration config) throws IOException {
+			ExecutionContext executionContext,
+			FiniteDuration jobManagerTimeout,
+			NetworkEnvironmentConfiguration config,
+			InstanceConnectionInfo connectionInfo) throws IOException {
 
 		this.executionContext = executionContext;
 		this.configuration = checkNotNull(config);
 		this.jobManagerTimeout = checkNotNull(jobManagerTimeout);
+		this.connectionInfo = checkNotNull(connectionInfo);
 
 		// create the network buffers - this is the operation most likely to fail upon
 		// mis-configuration, so we do this first
@@ -151,6 +172,10 @@ public class NetworkEnvironment {
 		return configuration.partitionRequestInitialAndMaxBackoff();
 	}
 
+	public TaskKvStateRegistry createKvStateTaskRegistry(JobID jobId, JobVertexID jobVertexId) {
+		return kvStateRegistry.createTaskRegistry(jobId, jobVertexId);
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Association / Disassociation with JobManager / TaskManager
 	// --------------------------------------------------------------------------------------------
@@ -183,7 +208,9 @@ public class NetworkEnvironment {
 			if (this.partitionConsumableNotifier == null &&
 				this.partitionManager == null &&
 				this.taskEventDispatcher == null &&
-				this.connectionManager == null)
+				this.connectionManager == null &&
+				this.kvStateRegistry == null &&
+				this.kvStateServer == null)
 			{
 				// good, not currently associated. start the individual components
 
@@ -211,6 +238,29 @@ public class NetworkEnvironment {
 				catch (Throwable t) {
 					throw new IOException("Failed to instantiate network connection manager: " + t.getMessage(), t);
 				}
+
+				try {
+					kvStateRegistry = new KvStateRegistry();
+
+					kvStateServer = new KvStateServer(
+							connectionInfo.address(),
+							0,
+							1,
+							10,
+							kvStateRegistry,
+							new AtomicKvStateRequestStats());
+
+					kvStateServer.start();
+
+					KvStateRegistryListener listener = new JobManagerKvStateRegistryListener(
+							jobManagerGateway,
+							kvStateServer.getAddress());
+
+					kvStateRegistry.registerListener(listener);
+				} catch (Throwable t) {
+					throw new IOException("Failed to instantiate KvState management components: "
+							+ t.getMessage(), t);
+				}
 			}
 			else {
 				throw new IllegalStateException(
@@ -227,6 +277,19 @@ public class NetworkEnvironment {
 
 			LOG.debug("Disassociating NetworkEnvironment from TaskManager. Cleaning all intermediate results.");
 
+			// Shut down KvStateRegistry
+			kvStateRegistry = null;
+
+			// Shut down KvStateServer
+			if (kvStateServer != null) {
+				try {
+					kvStateServer.shutDown();
+				} catch (Throwable t) {
+					throw new IOException("Cannot shutdown KvStateNettyServer", t);
+				}
+				kvStateServer = null;
+			}
+
 			// terminate all network connections
 			if (connectionManager != null) {
 				try {
@@ -511,4 +574,58 @@ public class NetworkEnvironment {
 			jobManager.tell(msg, taskManager);
 		}
 	}
+
+	/**
+	 * Simple {@link KvStateRegistry} listener, which forwards registrations to
+	 * the JobManager.
+	 */
+	private static class JobManagerKvStateRegistryListener implements KvStateRegistryListener {
+
+		private ActorGateway jobManager;
+
+		private KvStateServerAddress kvStateServerAddress;
+
+		public JobManagerKvStateRegistryListener(
+				ActorGateway jobManager,
+				KvStateServerAddress kvStateServerAddress) {
+
+			this.jobManager = Preconditions.checkNotNull(jobManager, "JobManager");
+			this.kvStateServerAddress = Preconditions.checkNotNull(kvStateServerAddress, "KvStateServerAddress");
+		}
+
+		@Override
+		public void notifyKvStateRegistered(
+				JobID jobId,
+				JobVertexID jobVertexId,
+				int keyGroupIndex,
+				String registrationName,
+				KvStateID kvStateId) {
+
+			Object msg = new KvStateMessage.NotifyKvStateRegistered(
+					jobId,
+					jobVertexId,
+					keyGroupIndex,
+					registrationName,
+					kvStateId,
+					kvStateServerAddress);
+
+			jobManager.tell(msg);
+		}
+
+		@Override
+		public void notifyKvStateUnregistered(
+				JobID jobId,
+				JobVertexID jobVertexId,
+				int keyGroupIndex,
+				String registrationName) {
+
+			Object msg = new KvStateMessage.NotifyKvStateUnregistered(
+					jobId,
+					jobVertexId,
+					keyGroupIndex,
+					registrationName);
+
+			jobManager.tell(msg);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/63c9b8e7/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java
new file mode 100644
index 0000000..9be22c2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+/**
+ * Location information for all key groups of a {@link KvState} instance.
+ *
+ * <p>This is populated by the {@link KvStateLocationRegistry} and used by the
+ * {@link QueryableStateClient} to target queries.
+ */
+public class KvStateLocation implements Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	/** JobID the KvState instances belong to. */
+	private final JobID jobId;
+
+	/** JobVertexID the KvState instances belong to. */
+	private final JobVertexID jobVertexId;
+
+	/** Number of key groups of the operator the KvState instances belong to. */
+	private final int numKeyGroups;
+
+	/** Name under which the KvState instances have been registered. */
+	private final String registrationName;
+
+	/** IDs for each KvState instance where array index corresponds to key group index. */
+	private final KvStateID[] kvStateIds;
+
+	/**
+	 * Server address for each KvState instance where array index corresponds to
+	 * key group index.
+	 */
+	private final KvStateServerAddress[] kvStateAddresses;
+
+	/** Current number of registered key groups. */
+	private int numRegisteredKeyGroups;
+
+	/**
+	 * Creates the location information
+	 *
+	 * @param jobId            JobID the KvState instances belong to
+	 * @param jobVertexId      JobVertexID the KvState instances belong to
+	 * @param numKeyGroups     Number of key groups of the operator
+	 * @param registrationName Name under which the KvState instances have been registered
+	 */
+	public KvStateLocation(JobID jobId, JobVertexID jobVertexId, int numKeyGroups, String registrationName) {
+		this.jobId = Preconditions.checkNotNull(jobId, "JobID");
+		this.jobVertexId = Preconditions.checkNotNull(jobVertexId, "JobVertexID");
+		Preconditions.checkArgument(numKeyGroups >= 0, "Negative number of key groups");
+		this.numKeyGroups = numKeyGroups;
+		this.registrationName = Preconditions.checkNotNull(registrationName, "Registration name");
+		this.kvStateIds = new KvStateID[numKeyGroups];
+		this.kvStateAddresses = new KvStateServerAddress[numKeyGroups];
+	}
+
+	/**
+	 * Returns the JobID the KvState instances belong to.
+	 *
+	 * @return JobID the KvState instances belong to
+	 */
+	public JobID getJobId() {
+		return jobId;
+	}
+
+	/**
+	 * Returns the JobVertexID the KvState instances belong to.
+	 *
+	 * @return JobVertexID the KvState instances belong to
+	 */
+	public JobVertexID getJobVertexId() {
+		return jobVertexId;
+	}
+
+	/**
+	 * Returns the number of key groups of the operator the KvState instances belong to.
+	 *
+	 * @return Number of key groups of the operator the KvState instances belong to
+	 */
+	public int getNumKeyGroups() {
+		return numKeyGroups;
+	}
+
+	/**
+	 * Returns the name under which the KvState instances have been registered.
+	 *
+	 * @return Name under which the KvState instances have been registered.
+	 */
+	public String getRegistrationName() {
+		return registrationName;
+	}
+
+	/**
+	 * Returns the current number of registered key groups.
+	 *
+	 * @return Number of registered key groups.
+	 */
+	public int getNumRegisteredKeyGroups() {
+		return numRegisteredKeyGroups;
+	}
+
+	/**
+	 * Returns the registered KvStateID for the key group index or
+	 * <code>null</code> if none is registered yet.
+	 *
+	 * @param keyGroupIndex Key group index to get ID for.
+	 * @return KvStateID for the key group index or <code>null</code> if none
+	 * is registered yet
+	 * @throws IndexOutOfBoundsException If key group index < 0 or >= Number of key groups
+	 */
+	public KvStateID getKvStateID(int keyGroupIndex) {
+		if (keyGroupIndex < 0 || keyGroupIndex >= numKeyGroups) {
+			throw new IndexOutOfBoundsException("Key group index");
+		}
+
+		return kvStateIds[keyGroupIndex];
+	}
+
+	/**
+	 * Returns the registered KvStateServerAddress for the key group index or
+	 * <code>null</code> if none is registered yet.
+	 *
+	 * @param keyGroupIndex Key group index to get server address for.
+	 * @return KvStateServerAddress for the key group index or <code>null</code>
+	 * if none is registered yet
+	 * @throws IndexOutOfBoundsException If key group index < 0 or >= Number of key groups
+	 */
+	public KvStateServerAddress getKvStateServerAddress(int keyGroupIndex) {
+		if (keyGroupIndex < 0 || keyGroupIndex >= numKeyGroups) {
+			throw new IndexOutOfBoundsException("Key group index");
+		}
+
+		return kvStateAddresses[keyGroupIndex];
+	}
+
+	/**
+	 * Registers a KvState instance for the given key group index.
+	 *
+	 * @param keyGroupIndex  Key group index to register
+	 * @param kvStateId      ID of the KvState instance at the key group index.
+	 * @param kvStateAddress Server address of the KvState instance at the key group index.
+	 * @throws IndexOutOfBoundsException If key group index < 0 or >= Number of key groups
+	 */
+	void registerKvState(int keyGroupIndex, KvStateID kvStateId, KvStateServerAddress kvStateAddress) {
+		if (keyGroupIndex < 0 || keyGroupIndex >= numKeyGroups) {
+			throw new IndexOutOfBoundsException("Key group index");
+		}
+
+		if (kvStateIds[keyGroupIndex] == null && kvStateAddresses[keyGroupIndex] == null) {
+			numRegisteredKeyGroups++;
+		}
+
+		kvStateIds[keyGroupIndex] = kvStateId;
+		kvStateAddresses[keyGroupIndex] = kvStateAddress;
+	}
+
+	/**
+	 * Registers a KvState instance for the given key group index.
+	 *
+	 * @param keyGroupIndex Key group index to unregister.
+	 * @throws IndexOutOfBoundsException If key group index < 0 or >= Number of key groups
+	 * @throws IllegalArgumentException If no location information registered for key group index.
+	 */
+	void unregisterKvState(int keyGroupIndex) {
+		if (keyGroupIndex < 0 || keyGroupIndex >= numKeyGroups) {
+			throw new IndexOutOfBoundsException("Key group index");
+		}
+
+		if (kvStateIds[keyGroupIndex] == null || kvStateAddresses[keyGroupIndex] == null) {
+			throw new IllegalArgumentException("Not registered. Probably registration/unregistration race.");
+		}
+
+		numRegisteredKeyGroups--;
+
+		kvStateIds[keyGroupIndex] = null;
+		kvStateAddresses[keyGroupIndex] = null;
+	}
+
+	@Override
+	public String toString() {
+		return "KvStateLocation{" +
+				"jobId=" + jobId +
+				", jobVertexId=" + jobVertexId +
+				", parallelism=" + numKeyGroups +
+				", kvStateIds=" + Arrays.toString(kvStateIds) +
+				", kvStateAddresses=" + Arrays.toString(kvStateAddresses) +
+				'}';
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) { return true; }
+		if (o == null || getClass() != o.getClass()) { return false; }
+
+		KvStateLocation that = (KvStateLocation) o;
+
+		if (numKeyGroups != that.numKeyGroups) { return false; }
+		if (!jobId.equals(that.jobId)) { return false; }
+		if (!jobVertexId.equals(that.jobVertexId)) { return false; }
+		if (!registrationName.equals(that.registrationName)) { return false; }
+		if (!Arrays.equals(kvStateIds, that.kvStateIds)) { return false; }
+		return Arrays.equals(kvStateAddresses, that.kvStateAddresses);
+	}
+
+	@Override
+	public int hashCode() {
+		int result = jobId.hashCode();
+		result = 31 * result + jobVertexId.hashCode();
+		result = 31 * result + numKeyGroups;
+		result = 31 * result + registrationName.hashCode();
+		result = 31 * result + Arrays.hashCode(kvStateIds);
+		result = 31 * result + Arrays.hashCode(kvStateAddresses);
+		return result;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/63c9b8e7/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationRegistry.java
new file mode 100644
index 0000000..5b76598
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationRegistry.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.execution.SuppressRestartsException;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Simple registry, which maps {@link KvState} registration notifications to
+ * {@link KvStateLocation} instances.
+ */
+public class KvStateLocationRegistry {
+
+	/** JobID this coordinator belongs to. */
+	private final JobID jobId;
+
+	/** Job vertices for determining parallelism per key. */
+	private final Map<JobVertexID, ExecutionJobVertex> jobVertices;
+
+	/**
+	 * Location info keyed by registration name. The name needs to be unique
+	 * per JobID, i.e. two operators cannot register KvState with the same
+	 * name.
+	 */
+	private final Map<String, KvStateLocation> lookupTable = new HashMap<>();
+
+	/**
+	 * Creates the registry for the job.
+	 *
+	 * @param jobId       JobID this coordinator belongs to.
+	 * @param jobVertices Job vertices map of all vertices of this job.
+	 */
+	public KvStateLocationRegistry(JobID jobId, Map<JobVertexID, ExecutionJobVertex> jobVertices) {
+		this.jobId = Preconditions.checkNotNull(jobId, "JobID");
+		this.jobVertices = Preconditions.checkNotNull(jobVertices, "Job vertices");
+	}
+
+	/**
+	 * Returns the {@link KvStateLocation} for the registered KvState instance
+	 * or <code>null</code> if no location information is available.
+	 *
+	 * @param registrationName Name under which the KvState instance is registered.
+	 * @return Location information or <code>null</code>.
+	 */
+	public KvStateLocation getKvStateLocation(String registrationName) {
+		return lookupTable.get(registrationName);
+	}
+
+	/**
+	 * Notifies the registry about a registered KvState instance.
+	 *
+	 * @param jobVertexId JobVertexID the KvState instance belongs to
+	 * @param keyGroupIndex Key group index the KvState instance belongs to
+	 * @param registrationName Name under which the KvState has been registered
+	 * @param kvStateId ID of the registered KvState instance
+	 * @param kvStateServerAddress Server address where to find the KvState instance
+	 *
+	 * @throws IllegalArgumentException If JobVertexID does not belong to job
+	 * @throws IllegalArgumentException If state has been registered with same
+	 * name by another operator.
+	 * @throws IndexOutOfBoundsException If key group index is out of bounds.
+	 */
+	public void notifyKvStateRegistered(
+			JobVertexID jobVertexId,
+			int keyGroupIndex,
+			String registrationName,
+			KvStateID kvStateId,
+			KvStateServerAddress kvStateServerAddress) {
+
+		KvStateLocation location = lookupTable.get(registrationName);
+
+		if (location == null) {
+			// First registration for this operator, create the location info
+			ExecutionJobVertex vertex = jobVertices.get(jobVertexId);
+
+			if (vertex != null) {
+				int parallelism = vertex.getParallelism();
+				location = new KvStateLocation(jobId, jobVertexId, parallelism, registrationName);
+				lookupTable.put(registrationName, location);
+			} else {
+				throw new IllegalArgumentException("Unknown JobVertexID " + jobVertexId);
+			}
+		}
+
+		// Duplicated name if vertex IDs don't match
+		if (!location.getJobVertexId().equals(jobVertexId)) {
+			IllegalStateException duplicate = new IllegalStateException(
+					"Registration name clash. KvState with name '" + registrationName +
+							"' has already been registered by another operator (" +
+							location.getJobVertexId() + ").");
+
+			ExecutionJobVertex vertex = jobVertices.get(jobVertexId);
+			if (vertex != null) {
+				vertex.fail(new SuppressRestartsException(duplicate));
+			}
+
+			throw duplicate;
+		}
+
+		location.registerKvState(keyGroupIndex, kvStateId, kvStateServerAddress);
+	}
+
+	/**
+	 * Notifies the registry about an unregistered KvState instance.
+	 *
+	 * @param jobVertexId JobVertexID the KvState instance belongs to
+	 * @param keyGroupIndex Key group index the KvState instance belongs to
+	 * @param registrationName Name under which the KvState has been registered
+	 * @throws IllegalArgumentException If another operator registered the state instance
+	 * @throws IllegalArgumentException If the registration name is not known
+	 */
+	public void notifyKvStateUnregistered(
+			JobVertexID jobVertexId,
+			int keyGroupIndex,
+			String registrationName) {
+
+		KvStateLocation location = lookupTable.get(registrationName);
+
+		if (location != null) {
+			// Duplicate name if vertex IDs don't match
+			if (!location.getJobVertexId().equals(jobVertexId)) {
+				throw new IllegalArgumentException("Another operator (" +
+						location.getJobVertexId() + ") registered the KvState " +
+						"under '" + registrationName + "'.");
+			}
+
+			location.unregisterKvState(keyGroupIndex);
+
+			if (location.getNumRegisteredKeyGroups() == 0) {
+				lookupTable.remove(registrationName);
+			}
+		} else {
+			throw new IllegalArgumentException("Unknown registration name '" +
+					registrationName + "'. " + "Probably registration/unregistration race.");
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/63c9b8e7/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateMessage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateMessage.java
new file mode 100644
index 0000000..5e3c38e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateMessage.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * Actor messages for {@link KvState} lookup and registration.
+ */
+public interface KvStateMessage extends Serializable {
+
+	// ------------------------------------------------------------------------
+	// Lookup
+	// ------------------------------------------------------------------------
+
+	class LookupKvStateLocation implements KvStateMessage {
+
+		private static final long serialVersionUID = 1L;
+
+		/** JobID the KvState instance belongs to. */
+		private final JobID jobId;
+
+		/** Name under which the KvState has been registered. */
+		private final String registrationName;
+
+		/**
+		 * Requests a {@link KvStateLocation} for the specified JobID and
+		 * {@link KvState} registration name.
+		 *
+		 * @param jobId            JobID the KvState instance belongs to
+		 * @param registrationName Name under which the KvState has been registered
+		 */
+		public LookupKvStateLocation(JobID jobId, String registrationName) {
+			this.jobId = Preconditions.checkNotNull(jobId, "JobID");
+			this.registrationName = Preconditions.checkNotNull(registrationName, "Name");
+		}
+
+		/**
+		 * Returns the JobID the KvState instance belongs to.
+		 *
+		 * @return JobID the KvState instance belongs to
+		 */
+		public JobID getJobId() {
+			return jobId;
+		}
+
+		/**
+		 * Returns the name under which the KvState has been registered.
+		 *
+		 * @return Name under which the KvState has been registered
+		 */
+		public String getRegistrationName() {
+			return registrationName;
+		}
+
+		@Override
+		public String toString() {
+			return "LookupKvStateLocation{" +
+					"jobId=" + jobId +
+					", registrationName='" + registrationName + '\'' +
+					'}';
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	// Registration
+	// ------------------------------------------------------------------------
+
+	class NotifyKvStateRegistered implements KvStateMessage {
+
+		private static final long serialVersionUID = 1L;
+
+		/** JobID the KvState instance belongs to. */
+		private final JobID jobId;
+
+		/** JobVertexID the KvState instance belongs to. */
+		private final JobVertexID jobVertexId;
+
+		/** Key group index the KvState instance belongs to. */
+		private final int keyGroupIndex;
+
+		/** Name under which the KvState has been registered. */
+		private final String registrationName;
+
+		/** ID of the registered KvState instance. */
+		private final KvStateID kvStateId;
+
+		/** Server address where to find the KvState instance. */
+		private final KvStateServerAddress kvStateServerAddress;
+
+		/**
+		 * Notifies the JobManager about a registered {@link KvState} instance.
+		 *
+		 * @param jobId                JobID the KvState instance belongs to
+		 * @param jobVertexId          JobVertexID the KvState instance belongs to
+		 * @param keyGroupIndex        Key group index the KvState instance belongs to
+		 * @param registrationName     Name under which the KvState has been registered
+		 * @param kvStateId            ID of the registered KvState instance
+		 * @param kvStateServerAddress Server address where to find the KvState instance
+		 */
+		public NotifyKvStateRegistered(
+				JobID jobId,
+				JobVertexID jobVertexId,
+				int keyGroupIndex,
+				String registrationName,
+				KvStateID kvStateId,
+				KvStateServerAddress kvStateServerAddress) {
+
+			this.jobId = Preconditions.checkNotNull(jobId, "JobID");
+			this.jobVertexId = Preconditions.checkNotNull(jobVertexId, "JobVertexID");
+			Preconditions.checkArgument(keyGroupIndex >= 0, "Negative key group index");
+			this.keyGroupIndex = keyGroupIndex;
+			this.registrationName = Preconditions.checkNotNull(registrationName, "Registration name");
+			this.kvStateId = Preconditions.checkNotNull(kvStateId, "KvStateID");
+			this.kvStateServerAddress = Preconditions.checkNotNull(kvStateServerAddress, "KvStateServerAddress");
+		}
+
+		/**
+		 * Returns the JobID the KvState instance belongs to.
+		 *
+		 * @return JobID the KvState instance belongs to
+		 */
+		public JobID getJobId() {
+			return jobId;
+		}
+
+		/**
+		 * Returns the JobVertexID the KvState instance belongs to
+		 *
+		 * @return JobVertexID the KvState instance belongs to
+		 */
+		public JobVertexID getJobVertexId() {
+			return jobVertexId;
+		}
+
+		/**
+		 * Returns the key group index the KvState instance belongs to.
+		 *
+		 * @return Key group index the KvState instance belongs to
+		 */
+		public int getKeyGroupIndex() {
+			return keyGroupIndex;
+		}
+
+		/**
+		 * Returns the name under which the KvState has been registered.
+		 *
+		 * @return Name under which the KvState has been registered
+		 */
+		public String getRegistrationName() {
+			return registrationName;
+		}
+
+		/**
+		 * Returns the ID of the registered KvState instance.
+		 *
+		 * @return ID of the registered KvState instance
+		 */
+		public KvStateID getKvStateId() {
+			return kvStateId;
+		}
+
+		/**
+		 * Returns the server address where to find the KvState instance.
+		 *
+		 * @return Server address where to find the KvState instance
+		 */
+		public KvStateServerAddress getKvStateServerAddress() {
+			return kvStateServerAddress;
+		}
+
+		@Override
+		public String toString() {
+			return "NotifyKvStateRegistered{" +
+					"jobId=" + jobId +
+					", jobVertexId=" + jobVertexId +
+					", keyGroupIndex=" + keyGroupIndex +
+					", registrationName='" + registrationName + '\'' +
+					", kvStateId=" + kvStateId +
+					", kvStateServerAddress=" + kvStateServerAddress +
+					'}';
+		}
+	}
+
+	class NotifyKvStateUnregistered implements KvStateMessage {
+
+		private static final long serialVersionUID = 1L;
+
+		/** JobID the KvState instance belongs to. */
+		private final JobID jobId;
+
+		/** JobVertexID the KvState instance belongs to. */
+		private final JobVertexID jobVertexId;
+
+		/** Key group index the KvState instance belongs to. */
+		private final int keyGroupIndex;
+
+		/** Name under which the KvState has been registered. */
+		private final String registrationName;
+
+		/**
+		 * Notifies the JobManager about an unregistered {@link KvState} instance.
+		 *
+		 * @param jobId                JobID the KvState instance belongs to
+		 * @param jobVertexId          JobVertexID the KvState instance belongs to
+		 * @param keyGroupIndex        Key group index the KvState instance belongs to
+		 * @param registrationName     Name under which the KvState has been registered
+		 */
+		public NotifyKvStateUnregistered(
+				JobID jobId,
+				JobVertexID jobVertexId,
+				int keyGroupIndex,
+				String registrationName) {
+
+			this.jobId = Preconditions.checkNotNull(jobId, "JobID");
+			this.jobVertexId = Preconditions.checkNotNull(jobVertexId, "JobVertexID");
+			Preconditions.checkArgument(keyGroupIndex >= 0, "Negative key group index");
+			this.keyGroupIndex = keyGroupIndex;
+			this.registrationName = Preconditions.checkNotNull(registrationName, "Registration name");
+		}
+
+		/**
+		 * Returns the JobID the KvState instance belongs to.
+		 *
+		 * @return JobID the KvState instance belongs to
+		 */
+		public JobID getJobId() {
+			return jobId;
+		}
+
+		/**
+		 * Returns the JobVertexID the KvState instance belongs to
+		 *
+		 * @return JobVertexID the KvState instance belongs to
+		 */
+		public JobVertexID getJobVertexId() {
+			return jobVertexId;
+		}
+
+		/**
+		 * Returns the key group index the KvState instance belongs to.
+		 *
+		 * @return Key group index the KvState instance belongs to
+		 */
+		public int getKeyGroupIndex() {
+			return keyGroupIndex;
+		}
+
+		/**
+		 * Returns the name under which the KvState has been registered.
+		 *
+		 * @return Name under which the KvState has been registered
+		 */
+		public String getRegistrationName() {
+			return registrationName;
+		}
+
+		@Override
+		public String toString() {
+			return "NotifyKvStateUnregistered{" +
+					"jobId=" + jobId +
+					", jobVertexId=" + jobVertexId +
+					", keyGroupIndex=" + keyGroupIndex +
+					", registrationName='" + registrationName + '\'' +
+					'}';
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/63c9b8e7/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
new file mode 100644
index 0000000..e09b868
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.query.netty.KvStateServer;
+import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.runtime.taskmanager.Task;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * A registry for {@link KvState} instances per task manager.
+ *
+ * <p>This is currently only used for KvState queries: KvState instances, which
+ * are marked as queryable in their state descriptor are registered here and
+ * can be queried by the {@link KvStateServer}.
+ *
+ * <p>KvState is registered when it is created/restored and unregistered when
+ * the owning operator stops running.
+ */
+public class KvStateRegistry {
+
+	/** All registered KvState instances. */
+	private final ConcurrentHashMap<KvStateID, KvState<?, ?, ?, ?, ?>> registeredKvStates =
+			new ConcurrentHashMap<>();
+
+	/** Registry listener to be notified on registration/unregistration. */
+	private final AtomicReference<KvStateRegistryListener> listener = new AtomicReference<>();
+
+	/**
+	 * Registers a listener with the registry.
+	 *
+	 * @param listener The registry listener.
+	 * @throws IllegalStateException If there is a registered listener
+	 */
+	public void registerListener(KvStateRegistryListener listener) {
+		if (!this.listener.compareAndSet(null, listener)) {
+			throw new IllegalStateException("Listener already registered.");
+		}
+	}
+
+	/**
+	 * Registers the KvState instance identified by the given 4-tuple of JobID,
+	 * JobVertexID, key group index, and registration name.
+	 *
+	 * @param kvStateId KvStateID to identify the KvState instance
+	 * @param kvState   KvState instance to register
+	 * @throws IllegalStateException If there is a KvState instance registered
+	 *                               with the same ID.
+	 */
+
+	/**
+	 * Registers the KvState instance and returns the assigned ID.
+	 *
+	 * @param jobId            JobId the KvState instance belongs to
+	 * @param jobVertexId      JobVertexID the KvState instance belongs to
+	 * @param keyGroupIndex    Key group index the KvState instance belongs to
+	 * @param registrationName Name under which the KvState is registered
+	 * @param kvState          KvState instance to be registered
+	 * @return Assigned KvStateID
+	 */
+	public KvStateID registerKvState(
+			JobID jobId,
+			JobVertexID jobVertexId,
+			int keyGroupIndex,
+			String registrationName,
+			KvState<?, ?, ?, ?, ?> kvState) {
+
+		KvStateID kvStateId = new KvStateID();
+
+		if (registeredKvStates.putIfAbsent(kvStateId, kvState) == null) {
+			KvStateRegistryListener listener = this.listener.get();
+			if (listener != null) {
+				listener.notifyKvStateRegistered(
+						jobId,
+						jobVertexId,
+						keyGroupIndex,
+						registrationName,
+						kvStateId);
+			}
+
+			return kvStateId;
+		} else {
+			throw new IllegalStateException(kvStateId + " is already registered.");
+		}
+	}
+
+	/**
+	 * Unregisters the KvState instance identified by the given KvStateID.
+	 *
+	 * @param jobId     JobId the KvState instance belongs to
+	 * @param kvStateId KvStateID to identify the KvState instance
+	 */
+	public void unregisterKvState(
+			JobID jobId,
+			JobVertexID jobVertexId,
+			int keyGroupIndex,
+			String registrationName,
+			KvStateID kvStateId) {
+
+		if (registeredKvStates.remove(kvStateId) != null) {
+			KvStateRegistryListener listener = this.listener.get();
+			if (listener != null) {
+				listener.notifyKvStateUnregistered(
+						jobId,
+						jobVertexId,
+						keyGroupIndex,
+						registrationName);
+			}
+		}
+	}
+
+	/**
+	 * Returns the KvState instance identified by the given KvStateID or
+	 * <code>null</code> if none is registered.
+	 *
+	 * @param kvStateId KvStateID to identify the KvState instance
+	 * @return KvState instance identified by the KvStateID or <code>null</code>
+	 */
+	public KvState<?, ?, ?, ?, ?> getKvState(KvStateID kvStateId) {
+		return registeredKvStates.get(kvStateId);
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a {@link TaskKvStateRegistry} facade for the {@link Task}
+	 * identified by the given JobID and JobVertexID instance.
+	 *
+	 * @param jobId JobID of the task
+	 * @param jobVertexId JobVertexID of the task
+	 * @return A {@link TaskKvStateRegistry} facade for the task
+	 */
+	public TaskKvStateRegistry createTaskRegistry(JobID jobId, JobVertexID jobVertexId) {
+		return new TaskKvStateRegistry(this, jobId, jobVertexId);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/63c9b8e7/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistryListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistryListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistryListener.java
new file mode 100644
index 0000000..760adf1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistryListener.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+/**
+ * A listener for a {@link KvStateRegistry}.
+ *
+ * <p>The registry calls these methods when KvState instances are registered
+ * and unregistered.
+ */
+public interface KvStateRegistryListener {
+
+	/**
+	 * Notifies the listener about a registered KvState instance.
+	 *
+	 * @param jobId            Job ID the KvState instance belongs to
+	 * @param jobVertexId      JobVertexID the KvState instance belongs to
+	 * @param keyGroupIndex    Key group index the KvState instance belongs to
+	 * @param registrationName Name under which the KvState is registered
+	 * @param kvStateId        ID of the KvState instance
+	 */
+	void notifyKvStateRegistered(
+			JobID jobId,
+			JobVertexID jobVertexId,
+			int keyGroupIndex,
+			String registrationName,
+			KvStateID kvStateId);
+
+	/**
+	 * Notifies the listener about an unregistered KvState instance.
+	 *
+	 * @param jobId            Job ID the KvState instance belongs to
+	 * @param jobVertexId      JobVertexID the KvState instance belongs to
+	 * @param keyGroupIndex    Key group index the KvState instance belongs to
+	 * @param registrationName Name under which the KvState is registered
+	 */
+	void notifyKvStateUnregistered(
+			JobID jobId,
+			JobVertexID jobVertexId,
+			int keyGroupIndex,
+			String registrationName);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/63c9b8e7/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java
new file mode 100644
index 0000000..15f0160
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A helper for KvState registrations of a single task.
+ */
+public class TaskKvStateRegistry {
+
+	/** KvStateRegistry for KvState instance registrations. */
+	private final KvStateRegistry registry;
+
+	/** JobID of the task. */
+	private final JobID jobId;
+
+	/** JobVertexID of the task. */
+	private final JobVertexID jobVertexId;
+
+	/** List of all registered KvState instances of this task. */
+	private final List<KvStateInfo> registeredKvStates = new ArrayList<>();
+
+	TaskKvStateRegistry(KvStateRegistry registry, JobID jobId, JobVertexID jobVertexId) {
+		this.registry = Preconditions.checkNotNull(registry, "KvStateRegistry");
+		this.jobId = Preconditions.checkNotNull(jobId, "JobID");
+		this.jobVertexId = Preconditions.checkNotNull(jobVertexId, "JobVertexID");
+	}
+
+	/**
+	 * Registers the KvState instance at the KvStateRegistry.
+	 *
+	 * @param keyGroupIndex    KeyGroupIndex the KvState instance belongs to
+	 * @param registrationName The registration name (not necessarily the same
+	 *                         as the KvState name defined in the state
+	 *                         descriptor used to create the KvState instance)
+	 * @param kvState          The
+	 */
+	public void registerKvState(int keyGroupIndex, String registrationName, KvState<?, ?, ?, ?, ?> kvState) {
+		KvStateID kvStateId = registry.registerKvState(jobId, jobVertexId, keyGroupIndex, registrationName, kvState);
+		registeredKvStates.add(new KvStateInfo(keyGroupIndex, registrationName, kvStateId));
+	}
+
+	/**
+	 * Unregisters all registered KvState instances from the KvStateRegistry.
+	 */
+	public void unregisterAll() {
+		for (KvStateInfo kvState : registeredKvStates) {
+			registry.unregisterKvState(jobId, jobVertexId, kvState.keyGroupIndex, kvState.registrationName, kvState.kvStateId);
+		}
+	}
+
+	/**
+	 * 3-tuple holding registered KvState meta data.
+	 */
+	private static class KvStateInfo {
+
+		private final int keyGroupIndex;
+
+		private final String registrationName;
+
+		private final KvStateID kvStateId;
+
+		public KvStateInfo(int keyGroupIndex, String registrationName, KvStateID kvStateId) {
+			this.keyGroupIndex = keyGroupIndex;
+			this.registrationName = registrationName;
+			this.kvStateId = kvStateId;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/63c9b8e7/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
index 6fdf6f9..6958784 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.util.SerializedValue;
 
@@ -75,6 +76,8 @@ public class RuntimeEnvironment implements Environment {
 
 	private final AccumulatorRegistry accumulatorRegistry;
 
+	private final TaskKvStateRegistry kvStateRegistry;
+
 	private final TaskManagerRuntimeInfo taskManagerInfo;
 	private final TaskMetricGroup metrics;
 
@@ -95,6 +98,7 @@ public class RuntimeEnvironment implements Environment {
 			IOManager ioManager,
 			BroadcastVariableManager bcVarManager,
 			AccumulatorRegistry accumulatorRegistry,
+			TaskKvStateRegistry kvStateRegistry,
 			InputSplitProvider splitProvider,
 			Map<String, Future<Path>> distCacheEntries,
 			ResultPartitionWriter[] writers,
@@ -116,6 +120,7 @@ public class RuntimeEnvironment implements Environment {
 		this.ioManager = checkNotNull(ioManager);
 		this.bcVarManager = checkNotNull(bcVarManager);
 		this.accumulatorRegistry = checkNotNull(accumulatorRegistry);
+		this.kvStateRegistry = checkNotNull(kvStateRegistry);
 		this.splitProvider = checkNotNull(splitProvider);
 		this.distCacheEntries = checkNotNull(distCacheEntries);
 		this.writers = checkNotNull(writers);
@@ -199,6 +204,11 @@ public class RuntimeEnvironment implements Environment {
 	}
 
 	@Override
+	public TaskKvStateRegistry getTaskKvStateRegistry() {
+		return kvStateRegistry;
+	}
+
+	@Override
 	public InputSplitProvider getInputSplitProvider() {
 		return splitProvider;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/63c9b8e7/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index dbc0b62..c98d512 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -57,6 +57,7 @@ import org.apache.flink.runtime.messages.TaskMessages.FailTask;
 import org.apache.flink.runtime.messages.TaskMessages.TaskInFinalState;
 import org.apache.flink.runtime.messages.TaskMessages.UpdateTaskExecutionState;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.state.StateUtils;
 import org.apache.flink.util.SerializedValue;
@@ -519,10 +520,14 @@ public class Task implements Runnable {
 			TaskInputSplitProvider splitProvider = new TaskInputSplitProvider(jobManager,
 					jobId, vertexId, executionId, userCodeClassLoader, actorAskTimeout);
 
+			TaskKvStateRegistry kvStateRegistry = network
+					.createKvStateTaskRegistry(jobId, getJobVertexId());
+
 			Environment env = new RuntimeEnvironment(jobId, vertexId, executionId,
 					executionConfig, taskInfo, jobConfiguration, taskConfiguration,
 					userCodeClassLoader, memoryManager, ioManager,
 					broadcastVariableManager, accumulatorRegistry,
+					kvStateRegistry,
 					splitProvider, distributedCacheEntries,
 					writers, inputGates, jobManager, taskManagerConfig, metrics, this);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/63c9b8e7/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 84d38c1..01af3c1 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -25,7 +25,7 @@ import java.util.UUID
 import java.util.concurrent.{ExecutorService, TimeUnit, TimeoutException}
 import javax.management.ObjectName
 
-import akka.actor.Status.Failure
+import akka.actor.Status.{Success, Failure}
 import akka.actor._
 import akka.pattern.ask
 
@@ -74,6 +74,8 @@ import org.apache.flink.runtime.messages.webmonitor._
 import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry}
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup
 import org.apache.flink.runtime.process.ProcessReaper
+import org.apache.flink.runtime.query.{UnknownKvStateLocation, KvStateMessage}
+import org.apache.flink.runtime.query.KvStateMessage.{NotifyKvStateUnregistered, LookupKvStateLocation, NotifyKvStateRegistered}
 import org.apache.flink.runtime.security.SecurityUtils
 import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
 import org.apache.flink.runtime.taskmanager.TaskManager
@@ -678,6 +680,9 @@ class JobManager(
     case checkpointMessage : AbstractCheckpointMessage =>
       handleCheckpointMessage(checkpointMessage)
 
+    case kvStateMsg : KvStateMessage =>
+      handleKvStateMessage(kvStateMsg)
+
     case TriggerSavepoint(jobId) =>
       currentJobs.get(jobId) match {
         case Some((graph, _)) =>
@@ -1435,6 +1440,75 @@ class JobManager(
       case _ => unhandled(actorMessage)
     }
   }
+
+  /**
+    * Handle all [KvStateMessage] instances for KvState location lookups and
+    * registration.
+    *
+    * @param actorMsg The KvState actor message.
+    */
+  private def handleKvStateMessage(actorMsg: KvStateMessage): Unit = {
+    actorMsg match {
+      // Client KvStateLocation lookup
+      case msg: LookupKvStateLocation =>
+        currentJobs.get(msg.getJobId) match {
+          case Some((graph, _)) =>
+            try {
+              val registry = graph.getKvStateLocationRegistry
+              val location = registry.getKvStateLocation(msg.getRegistrationName)
+              if (location == null) {
+                sender() ! Failure(new UnknownKvStateLocation(msg.getRegistrationName))
+              } else {
+                sender() ! Success(location)
+              }
+            } catch {
+              case t: Throwable =>
+                sender() ! Failure(t)
+            }
+
+          case None =>
+            sender() ! Status.Failure(new IllegalStateException(s"Job ${msg.getJobId} not found"))
+        }
+
+      // TaskManager KvState registration
+      case msg: NotifyKvStateRegistered =>
+        currentJobs.get(msg.getJobId) match {
+          case Some((graph, _)) =>
+            try {
+              graph.getKvStateLocationRegistry.notifyKvStateRegistered(
+                msg.getJobVertexId,
+                msg.getKeyGroupIndex,
+                msg.getRegistrationName,
+                msg.getKvStateId,
+                msg.getKvStateServerAddress)
+            } catch {
+              case t: Throwable =>
+                log.error(s"Failed to notify KvStateRegistry about registration $msg.")
+            }
+
+          case None => log.error(s"Received $msg for unavailable job.")
+        }
+
+      // TaskManager KvState unregistration
+      case msg: NotifyKvStateUnregistered =>
+        currentJobs.get(msg.getJobId) match {
+          case Some((graph, _)) =>
+            try {
+              graph.getKvStateLocationRegistry.notifyKvStateUnregistered(
+                msg.getJobVertexId,
+                msg.getKeyGroupIndex,
+                msg.getRegistrationName)
+            } catch {
+              case t: Throwable =>
+                log.error(s"Failed to notify KvStateRegistry about registration $msg.")
+            }
+
+          case None => log.error(s"Received $msg for unavailable job.")
+        }
+
+      case _ => unhandled(actorMsg)
+    }
+  }
   
   /**
    * Handle unmatched messages with an exception.

http://git-wip-us.apache.org/repos/asf/flink/blob/63c9b8e7/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 226fa75..7c4b867 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1809,7 +1809,11 @@ object TaskManager {
     val executionContext = ExecutionContext.fromExecutor(new ForkJoinPool())
 
     // we start the network first, to make sure it can allocate its buffers first
-    val network = new NetworkEnvironment(executionContext, taskManagerConfig.timeout, netConfig)
+    val network = new NetworkEnvironment(
+      executionContext,
+      taskManagerConfig.timeout,
+      netConfig,
+      connectionInfo)
 
     // computing the amount of memory to use depends on how much memory is available
     // it strictly needs to happen AFTER the network stack has been initialized

http://git-wip-us.apache.org/repos/asf/flink/blob/63c9b8e7/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
index fca3ceb..938e661 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.DummyActorGateway;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
@@ -85,7 +86,8 @@ public class NetworkEnvironmentTest {
 			NetworkEnvironment env = new NetworkEnvironment(
 				TestingUtils.defaultExecutionContext(),
 				new FiniteDuration(30, TimeUnit.SECONDS),
-				config);
+				config,
+				new InstanceConnectionInfo(InetAddress.getLocalHost(), port));
 
 			assertFalse(env.isShutdown());
 			assertFalse(env.isAssociated());
@@ -178,7 +180,8 @@ public class NetworkEnvironmentTest {
 		NetworkEnvironment env = new NetworkEnvironment(
 				TestingUtils.defaultExecutionContext(),
 				new FiniteDuration(30, TimeUnit.SECONDS),
-				config);
+				config,
+				new InstanceConnectionInfo(InetAddress.getLocalHost(), 12232));
 
 		// Associate the environment with the mock actors
 		env.associateWithTaskManagerAndJobManager(

http://git-wip-us.apache.org/repos/asf/flink/blob/63c9b8e7/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 5c25003..f925d62 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -18,57 +18,83 @@
 
 package org.apache.flink.runtime.jobmanager;
 
+import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.testkit.JavaTestKit;
-
 import com.typesafe.config.Config;
-
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage;
+import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState;
 import org.apache.flink.runtime.messages.JobManagerMessages.StopJob;
 import org.apache.flink.runtime.messages.JobManagerMessages.StoppingFailure;
 import org.apache.flink.runtime.messages.JobManagerMessages.StoppingSuccess;
 import org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob;
-import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState;
 import org.apache.flink.runtime.messages.TaskMessages.PartitionState;
+import org.apache.flink.runtime.query.KvStateID;
+import org.apache.flink.runtime.query.KvStateLocation;
+import org.apache.flink.runtime.query.KvStateMessage.LookupKvStateLocation;
+import org.apache.flink.runtime.query.KvStateMessage.NotifyKvStateRegistered;
+import org.apache.flink.runtime.query.KvStateMessage.NotifyKvStateUnregistered;
+import org.apache.flink.runtime.query.KvStateServerAddress;
+import org.apache.flink.runtime.query.UnknownKvStateLocation;
+import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingJobManager;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.ExecutionGraphFound;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobStatus;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.RequestExecutionGraph;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunningOrFinished;
+import org.apache.flink.runtime.testingUtils.TestingTaskManager;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.StoppableInvokable;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
-
 import scala.Some;
 import scala.Tuple2;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+import scala.reflect.ClassTag$;
 
 import java.net.InetAddress;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.PIPELINED;
+import static org.apache.flink.runtime.messages.JobManagerMessages.JobResultSuccess;
+import static org.apache.flink.runtime.messages.JobManagerMessages.JobSubmitSuccess;
+import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.AllVerticesRunning;
+import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.JobStatusIs;
+import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenAtLeastNumTaskManagerAreRegistered;
 import static org.apache.flink.runtime.testingUtils.TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT;
 import static org.apache.flink.runtime.testingUtils.TestingUtils.startTestingCluster;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -141,13 +167,13 @@ public class JobManagerTest {
 								jobGraph,
 								ListeningBehaviour.EXECUTION_RESULT),
 							testActorGateway);
-						expectMsgClass(JobManagerMessages.JobSubmitSuccess.class);
+						expectMsgClass(JobSubmitSuccess.class);
 
 						jobManagerGateway.tell(
 							new WaitForAllVerticesToBeRunningOrFinished(jid),
 							testActorGateway);
 
-						expectMsgClass(TestingJobManagerMessages.AllVerticesRunning.class);
+						expectMsgClass(AllVerticesRunning.class);
 
 						// This is the mock execution ID of the task requesting the state of the partition
 						final ExecutionAttemptID receiver = new ExecutionAttemptID();
@@ -267,17 +293,17 @@ public class JobManagerTest {
 								jobGraph,
 								ListeningBehaviour.EXECUTION_RESULT),
 							testActorGateway);
-						expectMsgClass(JobManagerMessages.JobSubmitSuccess.class);
+						expectMsgClass(JobSubmitSuccess.class);
 
 						jobManagerGateway.tell(new WaitForAllVerticesToBeRunning(jid), testActorGateway);
-						expectMsgClass(TestingJobManagerMessages.AllVerticesRunning.class);
+						expectMsgClass(AllVerticesRunning.class);
 
 						jobManagerGateway.tell(new StopJob(jid), testActorGateway);
 
 						// - The test ----------------------------------------------------------------------
 						expectMsgClass(StoppingSuccess.class);
 
-						expectMsgClass(JobManagerMessages.JobResultSuccess.class);
+						expectMsgClass(JobResultSuccess.class);
 					} finally {
 						if (cluster != null) {
 							cluster.shutdown();
@@ -319,10 +345,10 @@ public class JobManagerTest {
 								jobGraph,
 								ListeningBehaviour.EXECUTION_RESULT),
 							testActorGateway);
-						expectMsgClass(JobManagerMessages.JobSubmitSuccess.class);
+						expectMsgClass(JobSubmitSuccess.class);
 
 						jobManagerGateway.tell(new WaitForAllVerticesToBeRunning(jid), testActorGateway);
-						expectMsgClass(TestingJobManagerMessages.AllVerticesRunning.class);
+						expectMsgClass(AllVerticesRunning.class);
 
 						jobManagerGateway.tell(new StopJob(jid), testActorGateway);
 
@@ -342,4 +368,206 @@ public class JobManagerTest {
 		}};
 	}
 
+	/**
+	 * Tests that the JobManager handles {@link org.apache.flink.runtime.query.KvStateMessage}
+	 * instances as expected.
+	 */
+	@Test
+	public void testKvStateMessages() throws Exception {
+		Deadline deadline = new FiniteDuration(100, TimeUnit.SECONDS).fromNow();
+
+		Configuration config = new Configuration();
+		config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "100ms");
+
+		UUID leaderSessionId = null;
+		ActorGateway jobManager = new AkkaActorGateway(
+				JobManager.startJobManagerActors(
+						config,
+						system,
+						TestingJobManager.class,
+						MemoryArchivist.class)._1(),
+				leaderSessionId);
+
+		LeaderRetrievalService leaderRetrievalService = new StandaloneLeaderRetrievalService(
+				AkkaUtils.getAkkaURL(system, jobManager.actor()));
+
+		Configuration tmConfig = new Configuration();
+		tmConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
+		tmConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
+
+		ActorRef taskManager = TaskManager.startTaskManagerComponentsAndActor(
+				tmConfig,
+				ResourceID.generate(),
+				system,
+				"localhost",
+				scala.Option.<String>empty(),
+				scala.Option.apply(leaderRetrievalService),
+				true,
+				TestingTaskManager.class);
+
+		Future<Object> registrationFuture = jobManager
+				.ask(new NotifyWhenAtLeastNumTaskManagerAreRegistered(1), deadline.timeLeft());
+
+		Await.ready(registrationFuture, deadline.timeLeft());
+
+		//
+		// Location lookup
+		//
+		LookupKvStateLocation lookupNonExistingJob = new LookupKvStateLocation(
+				new JobID(),
+				"any-name");
+
+		Future<KvStateLocation> lookupFuture = jobManager
+				.ask(lookupNonExistingJob, deadline.timeLeft())
+				.mapTo(ClassTag$.MODULE$.<KvStateLocation>apply(KvStateLocation.class));
+
+		try {
+			Await.result(lookupFuture, deadline.timeLeft());
+			fail("Did not throw expected Exception");
+		} catch (IllegalStateException ignored) {
+			// Expected
+		}
+
+		JobGraph jobGraph = new JobGraph("croissant");
+		JobVertex jobVertex1 = new JobVertex("cappuccino");
+		jobVertex1.setParallelism(4);
+		jobVertex1.setInvokableClass(Tasks.BlockingNoOpInvokable.class);
+
+		JobVertex jobVertex2 = new JobVertex("americano");
+		jobVertex2.setParallelism(4);
+		jobVertex2.setInvokableClass(Tasks.BlockingNoOpInvokable.class);
+
+		jobGraph.addVertex(jobVertex1);
+		jobGraph.addVertex(jobVertex2);
+
+		Future<JobSubmitSuccess> submitFuture = jobManager
+				.ask(new SubmitJob(jobGraph, ListeningBehaviour.DETACHED), deadline.timeLeft())
+				.mapTo(ClassTag$.MODULE$.<JobSubmitSuccess>apply(JobSubmitSuccess.class));
+
+		Await.result(submitFuture, deadline.timeLeft());
+
+		Object lookupUnknownRegistrationName = new LookupKvStateLocation(
+				jobGraph.getJobID(),
+				"unknown");
+
+		lookupFuture = jobManager
+				.ask(lookupUnknownRegistrationName, deadline.timeLeft())
+				.mapTo(ClassTag$.MODULE$.<KvStateLocation>apply(KvStateLocation.class));
+
+		try {
+			Await.result(lookupFuture, deadline.timeLeft());
+			fail("Did not throw expected Exception");
+		} catch (UnknownKvStateLocation ignored) {
+			// Expected
+		}
+
+		//
+		// Registration
+		//
+		NotifyKvStateRegistered registerNonExistingJob = new NotifyKvStateRegistered(
+				new JobID(),
+				new JobVertexID(),
+				0,
+				"any-name",
+				new KvStateID(),
+				new KvStateServerAddress(InetAddress.getLocalHost(), 1233));
+
+		jobManager.tell(registerNonExistingJob);
+
+		LookupKvStateLocation lookupAfterRegistration = new LookupKvStateLocation(
+				registerNonExistingJob.getJobId(),
+				registerNonExistingJob.getRegistrationName());
+
+		lookupFuture = jobManager
+				.ask(lookupAfterRegistration, deadline.timeLeft())
+				.mapTo(ClassTag$.MODULE$.<KvStateLocation>apply(KvStateLocation.class));
+
+		try {
+			Await.result(lookupFuture, deadline.timeLeft());
+			fail("Did not throw expected Exception");
+		} catch (IllegalStateException ignored) {
+			// Expected
+		}
+
+		NotifyKvStateRegistered registerForExistingJob = new NotifyKvStateRegistered(
+				jobGraph.getJobID(),
+				jobVertex1.getID(),
+				0,
+				"register-me",
+				new KvStateID(),
+				new KvStateServerAddress(InetAddress.getLocalHost(), 1293));
+
+		jobManager.tell(registerForExistingJob);
+
+		lookupAfterRegistration = new LookupKvStateLocation(
+				registerForExistingJob.getJobId(),
+				registerForExistingJob.getRegistrationName());
+
+		lookupFuture = jobManager
+				.ask(lookupAfterRegistration, deadline.timeLeft())
+				.mapTo(ClassTag$.MODULE$.<KvStateLocation>apply(KvStateLocation.class));
+
+		KvStateLocation location = Await.result(lookupFuture, deadline.timeLeft());
+		assertNotNull(location);
+
+		assertEquals(jobGraph.getJobID(), location.getJobId());
+		assertEquals(jobVertex1.getID(), location.getJobVertexId());
+		assertEquals(jobVertex1.getParallelism(), location.getNumKeyGroups());
+		assertEquals(1, location.getNumRegisteredKeyGroups());
+		int keyGroupIndex = registerForExistingJob.getKeyGroupIndex();
+		assertEquals(registerForExistingJob.getKvStateId(), location.getKvStateID(keyGroupIndex));
+		assertEquals(registerForExistingJob.getKvStateServerAddress(), location.getKvStateServerAddress(keyGroupIndex));
+
+		//
+		// Unregistration
+		//
+		NotifyKvStateUnregistered unregister = new NotifyKvStateUnregistered(
+				registerForExistingJob.getJobId(),
+				registerForExistingJob.getJobVertexId(),
+				registerForExistingJob.getKeyGroupIndex(),
+				registerForExistingJob.getRegistrationName());
+
+		jobManager.tell(unregister);
+
+		lookupFuture = jobManager
+				.ask(lookupAfterRegistration, deadline.timeLeft())
+				.mapTo(ClassTag$.MODULE$.<KvStateLocation>apply(KvStateLocation.class));
+
+		try {
+			Await.result(lookupFuture, deadline.timeLeft());
+			fail("Did not throw expected Exception");
+		} catch (UnknownKvStateLocation ignored) {
+			// Expected
+		}
+
+		//
+		// Duplicate registration fails task
+		//
+		NotifyKvStateRegistered register = new NotifyKvStateRegistered(
+				jobGraph.getJobID(),
+				jobVertex1.getID(),
+				0,
+				"duplicate-me",
+				new KvStateID(),
+				new KvStateServerAddress(InetAddress.getLocalHost(), 1293));
+
+		NotifyKvStateRegistered duplicate = new NotifyKvStateRegistered(
+				jobGraph.getJobID(),
+				jobVertex2.getID(), // <--- different operator, but...
+				0,
+				"duplicate-me", // ...same name
+				new KvStateID(),
+				new KvStateServerAddress(InetAddress.getLocalHost(), 1293));
+
+		Future<TestingJobManagerMessages.JobStatusIs> failedFuture = jobManager
+				.ask(new NotifyWhenJobStatus(jobGraph.getJobID(), JobStatus.FAILED), deadline.timeLeft())
+				.mapTo(ClassTag$.MODULE$.<JobStatusIs>apply(JobStatusIs.class));
+
+		jobManager.tell(register);
+		jobManager.tell(duplicate);
+
+		// Wait for failure
+		JobStatusIs jobStatus = Await.result(failedFuture, deadline.timeLeft());
+		assertEquals(JobStatus.FAILED, jobStatus.state());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/63c9b8e7/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
index 5af34fb..87540bc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
@@ -34,6 +34,8 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 
@@ -48,9 +50,17 @@ public class DummyEnvironment implements Environment {
 	private final ExecutionAttemptID executionId = new ExecutionAttemptID();
 	private final ExecutionConfig executionConfig = new ExecutionConfig();
 	private final TaskInfo taskInfo;
+	private final KvStateRegistry kvStateRegistry = new KvStateRegistry();
+	private final TaskKvStateRegistry taskKvStateRegistry;
 
 	public DummyEnvironment(String taskName, int numSubTasks, int subTaskIndex) {
 		this.taskInfo = new TaskInfo(taskName, subTaskIndex, numSubTasks, 0);
+
+		this.taskKvStateRegistry = kvStateRegistry.createTaskRegistry(jobId, jobVertexId);
+	}
+
+	public KvStateRegistry getKvStateRegistry() {
+		return kvStateRegistry;
 	}
 
 	@Override
@@ -134,6 +144,11 @@ public class DummyEnvironment implements Environment {
 	}
 
 	@Override
+	public TaskKvStateRegistry getTaskKvStateRegistry() {
+		return taskKvStateRegistry;
+	}
+
+	@Override
 	public void acknowledgeCheckpoint(long checkpointId) {}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/63c9b8e7/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java.orig
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java.orig b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java.orig
new file mode 100644
index 0000000..393ee4c
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java.orig
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.testutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.TaskInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.Future;
+
+public class DummyEnvironment implements Environment {
+
+	private final JobID jobId = new JobID();
+	private final JobVertexID jobVertexId = new JobVertexID();
+	private final ExecutionAttemptID executionId = new ExecutionAttemptID();
+	private final ExecutionConfig executionConfig = new ExecutionConfig();
+<<<<<<< 9a73dbc71b83080b7deccc62b8b6ffa9f102e847
+	private final TaskInfo taskInfo;
+=======
+	private final KvStateRegistry kvStateRegistry = new KvStateRegistry();
+	private final TaskKvStateRegistry taskKvStateRegistry;
+>>>>>>> [FLINK-3779] [runtime] Add KvStateRegistry for queryable KvState
+
+	public DummyEnvironment(String taskName, int numSubTasks, int subTaskIndex) {
+		this.taskInfo = new TaskInfo(taskName, subTaskIndex, numSubTasks, 0);
+
+		this.taskKvStateRegistry = kvStateRegistry.createTaskRegistry(jobId, jobVertexId);
+	}
+
+	public KvStateRegistry getKvStateRegistry() {
+		return kvStateRegistry;
+	}
+
+	@Override
+	public ExecutionConfig getExecutionConfig() {
+		return executionConfig;
+	}
+
+	@Override
+	public JobID getJobID() {
+		return jobId;
+	}
+
+	@Override
+	public JobVertexID getJobVertexId() {
+		return jobVertexId;
+	}
+
+	@Override
+	public ExecutionAttemptID getExecutionId() {
+		return executionId;
+	}
+
+	@Override
+	public Configuration getTaskConfiguration() {
+		return new Configuration();
+	}
+
+	@Override
+	public TaskManagerRuntimeInfo getTaskManagerInfo() {
+		return null;
+	}
+
+	@Override
+	public TaskMetricGroup getMetricGroup() {
+		return new UnregisteredTaskMetricsGroup();
+	}
+
+	@Override
+	public Configuration getJobConfiguration() {
+		return new Configuration();
+	}
+
+	@Override
+	public TaskInfo getTaskInfo() {
+		return taskInfo;
+	}
+
+	@Override
+	public InputSplitProvider getInputSplitProvider() {
+		return null;
+	}
+
+	@Override
+	public IOManager getIOManager() {
+		return null;
+	}
+
+	@Override
+	public MemoryManager getMemoryManager() {
+		return null;
+	}
+
+	@Override
+	public ClassLoader getUserClassLoader() {
+		return getClass().getClassLoader();
+	}
+
+	@Override
+	public Map<String, Future<Path>> getDistributedCacheEntries() {
+		return Collections.emptyMap();
+	}
+
+	@Override
+	public BroadcastVariableManager getBroadcastVariableManager() {
+		return null;
+	}
+
+	@Override
+	public AccumulatorRegistry getAccumulatorRegistry() {
+		return null;
+	}
+
+	@Override
+<<<<<<< 9a73dbc71b83080b7deccc62b8b6ffa9f102e847
+	public void acknowledgeCheckpoint(long checkpointId) {}
+=======
+	public TaskKvStateRegistry getTaskKvStateRegistry() {
+		return taskKvStateRegistry;
+	}
+
+	@Override
+	public void acknowledgeCheckpoint(long checkpointId) {
+	}
+>>>>>>> [FLINK-3779] [runtime] Add KvStateRegistry for queryable KvState
+
+	@Override
+	public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state) {}
+
+	@Override
+	public ResultPartitionWriter getWriter(int index) {
+		return null;
+	}
+
+	@Override
+	public ResultPartitionWriter[] getAllWriters() {
+		return null;
+	}
+
+	@Override
+	public InputGate getInputGate(int index) {
+		return null;
+	}
+
+	@Override
+	public InputGate[] getAllInputGates() {
+		return null;
+	}
+
+}


Mime
View raw message