flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [05/10] flink git commit: [FLINK-3779] [runtime] Add KvStateRegistry for queryable KvState
Date Tue, 09 Aug 2016 14:47:39 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/63c9b8e7/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 9dea324..7b966c3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -31,7 +31,6 @@ import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.io.network.partition.consumer.IteratorWrappingTestSingleInputGate;
 import org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer;
 import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
@@ -39,10 +38,13 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.io.network.partition.consumer.IteratorWrappingTestSingleInputGate;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.types.Record;
@@ -89,6 +91,8 @@ public class MockEnvironment implements Environment {
 
 	private final AccumulatorRegistry accumulatorRegistry;
 
+	private final TaskKvStateRegistry kvStateRegistry;
+
 	private final int bufferSize;
 
 	public MockEnvironment(String taskName, long memorySize, MockInputSplitProvider inputSplitProvider,
int bufferSize) {
@@ -105,6 +109,9 @@ public class MockEnvironment implements Environment {
 		this.bufferSize = bufferSize;
 
 		this.accumulatorRegistry = new AccumulatorRegistry(jobID, getExecutionId());
+
+		KvStateRegistry registry = new KvStateRegistry();
+		this.kvStateRegistry = registry.createTaskRegistry(jobID, getJobVertexId());
 	}
 
 	public IteratorWrappingTestSingleInputGate<Record> addInput(MutableObjectIterator<Record>
inputIterator) {
@@ -281,6 +288,11 @@ public class MockEnvironment implements Environment {
 	}
 
 	@Override
+	public TaskKvStateRegistry getTaskKvStateRegistry() {
+		return kvStateRegistry;
+	}
+
+	@Override
 	public void acknowledgeCheckpoint(long checkpointId) {
 		throw new UnsupportedOperationException();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/63c9b8e7/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationRegistryTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationRegistryTest.java
new file mode 100644
index 0000000..70f0ba2
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationRegistryTest.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class KvStateLocationRegistryTest {
+
+	/**
+	 * Simple test registering/unregistereing state and looking it up again.
+	 */
+	@Test
+	public void testRegisterAndLookup() throws Exception {
+		String[] registrationNames = new String[] {
+				"TAsIrGnc7MULwVupNKZ0",
+				"086133IrGn0Ii2853237" };
+
+		ExecutionJobVertex[] vertices = new ExecutionJobVertex[] {
+				createJobVertex(32),
+				createJobVertex(13) };
+
+		// IDs for each key group of each vertex
+		KvStateID[][] ids = new KvStateID[vertices.length][];
+		for (int i = 0; i < ids.length; i++) {
+			ids[i] = new KvStateID[vertices[i].getParallelism()];
+			for (int j = 0; j < vertices[i].getParallelism(); j++) {
+				ids[i][j] = new KvStateID();
+			}
+		}
+
+		KvStateServerAddress server = new KvStateServerAddress(InetAddress.getLocalHost(), 12032);
+
+		// Create registry
+		Map<JobVertexID, ExecutionJobVertex> vertexMap = createVertexMap(vertices);
+		KvStateLocationRegistry registry = new KvStateLocationRegistry(new JobID(), vertexMap);
+
+		// Register
+		for (int i = 0; i < vertices.length; i++) {
+			int numKeyGroups = vertices[i].getParallelism();
+			for (int keyGroupIndex = 0; keyGroupIndex < numKeyGroups; keyGroupIndex++) {
+				// Register
+				registry.notifyKvStateRegistered(
+						vertices[i].getJobVertexId(),
+						keyGroupIndex,
+						registrationNames[i],
+						ids[i][keyGroupIndex],
+						server);
+			}
+		}
+
+		// Verify all registrations
+		for (int i = 0; i < vertices.length; i++) {
+			KvStateLocation location = registry.getKvStateLocation(registrationNames[i]);
+			assertNotNull(location);
+
+			int parallelism = vertices[i].getParallelism();
+			for (int keyGroupIndex = 0; keyGroupIndex < parallelism; keyGroupIndex++) {
+				assertEquals(ids[i][keyGroupIndex], location.getKvStateID(keyGroupIndex));
+				assertEquals(server, location.getKvStateServerAddress(keyGroupIndex));
+			}
+		}
+
+		// Unregister
+		for (int i = 0; i < vertices.length; i++) {
+			int numKeyGroups = vertices[i].getParallelism();
+			JobVertexID jobVertexId = vertices[i].getJobVertexId();
+			for (int keyGroupIndex = 0; keyGroupIndex < numKeyGroups; keyGroupIndex++) {
+				registry.notifyKvStateUnregistered(jobVertexId, keyGroupIndex, registrationNames[i]);
+			}
+		}
+
+		for (int i = 0; i < registrationNames.length; i++) {
+			assertNull(registry.getKvStateLocation(registrationNames[i]));
+		}
+	}
+
+	/**
+	 * Tests that registrations with duplicate names throw an Exception.
+	 */
+	@Test
+	public void testRegisterDuplicateName() throws Exception {
+		ExecutionJobVertex[] vertices = new ExecutionJobVertex[] {
+				createJobVertex(32),
+				createJobVertex(13) };
+
+		Map<JobVertexID, ExecutionJobVertex> vertexMap = createVertexMap(vertices);
+
+		String registrationName = "duplicated-name";
+		KvStateLocationRegistry registry = new KvStateLocationRegistry(new JobID(), vertexMap);
+
+		// First operator registers
+		registry.notifyKvStateRegistered(
+				vertices[0].getJobVertexId(),
+				0,
+				registrationName,
+				new KvStateID(),
+				new KvStateServerAddress(InetAddress.getLocalHost(), 12328));
+
+		try {
+			// Second operator registers same name
+			registry.notifyKvStateRegistered(
+					vertices[1].getJobVertexId(),
+					0,
+					registrationName,
+					new KvStateID(),
+					new KvStateServerAddress(InetAddress.getLocalHost(), 12032));
+
+			fail("Did not throw expected Exception after duplicated name");
+		} catch (IllegalStateException ignored) {
+			// Expected
+		}
+	}
+
+	/**
+	 * Tests exception on unregistration before registration.
+	 */
+	@Test
+	public void testUnregisterBeforeRegister() throws Exception {
+		ExecutionJobVertex vertex = createJobVertex(4);
+		Map<JobVertexID, ExecutionJobVertex> vertexMap = createVertexMap(vertex);
+
+		KvStateLocationRegistry registry = new KvStateLocationRegistry(new JobID(), vertexMap);
+		try {
+			registry.notifyKvStateUnregistered(vertex.getJobVertexId(), 0, "any-name");
+			fail("Did not throw expected Exception, because of missing registration");
+		} catch (IllegalArgumentException ignored) {
+			// Expected
+		}
+	}
+
+	/**
+	 * Tests failures during unregistration.
+	 */
+	@Test
+	public void testUnregisterFailures() throws Exception {
+		String name = "IrGnc73237TAs";
+
+		ExecutionJobVertex[] vertices = new ExecutionJobVertex[] {
+				createJobVertex(32),
+				createJobVertex(13) };
+
+		Map<JobVertexID, ExecutionJobVertex> vertexMap = new HashMap<>();
+		for (ExecutionJobVertex vertex : vertices) {
+			vertexMap.put(vertex.getJobVertexId(), vertex);
+		}
+
+		KvStateLocationRegistry registry = new KvStateLocationRegistry(new JobID(), vertexMap);
+
+		// First operator registers name
+		registry.notifyKvStateRegistered(
+				vertices[0].getJobVertexId(),
+				0,
+				name,
+				new KvStateID(),
+				mock(KvStateServerAddress.class));
+
+		try {
+			// Unregister not registered keyGroupIndex
+			int notRegisteredKeyGroupIndex = 2;
+
+			registry.notifyKvStateUnregistered(
+					vertices[0].getJobVertexId(),
+					notRegisteredKeyGroupIndex,
+					name);
+
+			fail("Did not throw expected Exception");
+		} catch (IllegalArgumentException expected) {
+		}
+
+		try {
+			// Wrong operator tries to unregister
+			registry.notifyKvStateUnregistered(
+					vertices[1].getJobVertexId(),
+					0,
+					name);
+
+			fail("Did not throw expected Exception");
+		} catch (IllegalArgumentException expected) {
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	
+	private ExecutionJobVertex createJobVertex(int parallelism) {
+		JobVertexID id = new JobVertexID();
+		ExecutionJobVertex vertex = mock(ExecutionJobVertex.class);
+
+		when(vertex.getJobVertexId()).thenReturn(id);
+		when(vertex.getParallelism()).thenReturn(parallelism);
+
+		return vertex;
+	}
+
+	private Map<JobVertexID, ExecutionJobVertex> createVertexMap(ExecutionJobVertex...
vertices) {
+		Map<JobVertexID, ExecutionJobVertex> vertexMap = new HashMap<>();
+		for (ExecutionJobVertex vertex : vertices) {
+			vertexMap.put(vertex.getJobVertexId(), vertex);
+		}
+		return vertexMap;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/63c9b8e7/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationTest.java
new file mode 100644
index 0000000..59ac575
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.junit.Test;
+
+import java.net.InetAddress;
+
+import static org.junit.Assert.assertEquals;
+
+public class KvStateLocationTest {
+
+	/**
+	 * Simple test registering/unregistereing state and looking it up again.
+	 */
+	@Test
+	public void testRegisterAndLookup() throws Exception {
+		JobID jobId = new JobID();
+		JobVertexID jobVertexId = new JobVertexID();
+		int numKeyGroups = 123;
+		String registrationName = "asdasdasdasd";
+
+		KvStateLocation location = new KvStateLocation(jobId, jobVertexId, numKeyGroups, registrationName);
+
+		KvStateID[] kvStateIds = new KvStateID[numKeyGroups];
+		KvStateServerAddress[] serverAddresses = new KvStateServerAddress[numKeyGroups];
+
+		InetAddress host = InetAddress.getLocalHost();
+
+		// Register
+		for (int keyGroupIndex = 0; keyGroupIndex < numKeyGroups; keyGroupIndex++) {
+			kvStateIds[keyGroupIndex] = new KvStateID();
+			serverAddresses[keyGroupIndex] = new KvStateServerAddress(host, 1024 + keyGroupIndex);
+
+			location.registerKvState(keyGroupIndex, kvStateIds[keyGroupIndex], serverAddresses[keyGroupIndex]);
+			assertEquals(keyGroupIndex + 1, location.getNumRegisteredKeyGroups());
+		}
+
+		// Lookup
+		for (int keyGroupIndex = 0; keyGroupIndex < numKeyGroups; keyGroupIndex++) {
+			assertEquals(kvStateIds[keyGroupIndex], location.getKvStateID(keyGroupIndex));
+			assertEquals(serverAddresses[keyGroupIndex], location.getKvStateServerAddress(keyGroupIndex));
+		}
+
+		// Overwrite
+		for (int keyGroupIndex = 0; keyGroupIndex < numKeyGroups; keyGroupIndex++) {
+			kvStateIds[keyGroupIndex] = new KvStateID();
+			serverAddresses[keyGroupIndex] = new KvStateServerAddress(host, 1024 + keyGroupIndex);
+
+			location.registerKvState(keyGroupIndex, kvStateIds[keyGroupIndex], serverAddresses[keyGroupIndex]);
+			assertEquals(numKeyGroups, location.getNumRegisteredKeyGroups());
+		}
+
+		// Lookup
+		for (int keyGroupIndex = 0; keyGroupIndex < numKeyGroups; keyGroupIndex++) {
+			assertEquals(kvStateIds[keyGroupIndex], location.getKvStateID(keyGroupIndex));
+			assertEquals(serverAddresses[keyGroupIndex], location.getKvStateServerAddress(keyGroupIndex));
+		}
+
+		// Unregister
+		for (int keyGroupIndex = 0; keyGroupIndex < numKeyGroups; keyGroupIndex++) {
+			location.unregisterKvState(keyGroupIndex);
+			assertEquals(numKeyGroups - keyGroupIndex - 1, location.getNumRegisteredKeyGroups());
+		}
+
+		// Lookup
+		for (int keyGroupIndex = 0; keyGroupIndex < numKeyGroups; keyGroupIndex++) {
+			assertEquals(null, location.getKvStateID(keyGroupIndex));
+			assertEquals(null, location.getKvStateServerAddress(keyGroupIndex));
+		}
+
+		assertEquals(0, location.getNumRegisteredKeyGroups());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/63c9b8e7/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 0c0d064..fc1a5df 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -43,6 +43,7 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.memory.MemoryManager;
 
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.util.SerializedValue;
 import org.junit.Before;
@@ -149,6 +150,8 @@ public class TaskAsyncCallTest {
 		when(networkEnvironment.getPartitionManager()).thenReturn(partitionManager);
 		when(networkEnvironment.getPartitionConsumableNotifier()).thenReturn(consumableNotifier);
 		when(networkEnvironment.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
+		when(networkEnvironment.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class)))
+				.thenReturn(mock(TaskKvStateRegistry.class));
 
 		TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
 				new JobID(), "Job Name", new JobVertexID(), new ExecutionAttemptID(),

http://git-wip-us.apache.org/repos/asf/flink/blob/63c9b8e7/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 60bf8e7..ca7157a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -108,7 +108,8 @@ public class TaskManagerComponentsStartupShutdownTest {
 			final NetworkEnvironment network = new NetworkEnvironment(
 				TestingUtils.defaultExecutionContext(),
 				timeout,
-				netConf);
+				netConf,
+				connectionInfo);
 			final int numberOfSlots = 1;
 
 			LeaderRetrievalService leaderRetrievalService = new StandaloneLeaderRetrievalService(jobManager.path().toString());

http://git-wip-us.apache.org/repos/asf/flink/blob/63c9b8e7/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index fec9ef3..2f8e3db 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -19,9 +19,9 @@
 package org.apache.flink.runtime.taskmanager;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -34,7 +34,6 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -45,12 +44,12 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.messages.TaskMessages;
-
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.util.SerializedValue;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.IOException;
@@ -69,7 +68,6 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doThrow;
@@ -603,6 +601,8 @@ public class TaskTest {
 		when(network.getPartitionManager()).thenReturn(partitionManager);
 		when(network.getPartitionConsumableNotifier()).thenReturn(consumableNotifier);
 		when(network.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
+		when(network.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class)))
+				.thenReturn(mock(TaskKvStateRegistry.class));
 		
 		return createTask(invokable, libCache, network);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/63c9b8e7/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
index 5237c62..f9698a8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
@@ -134,11 +135,15 @@ public class InterruptSensitiveRestoreTest {
 	}
 	
 	private static Task createTask(TaskDeploymentDescriptor tdd) throws IOException {
+		NetworkEnvironment networkEnvironment = mock(NetworkEnvironment.class);
+		when(networkEnvironment.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class)))
+				.thenReturn(mock(TaskKvStateRegistry.class));
+
 		return new Task(
 				tdd,
 				mock(MemoryManager.class),
 				mock(IOManager.class),
-				mock(NetworkEnvironment.class),
+				networkEnvironment,
 				mock(BroadcastVariableManager.class),
 				mock(ActorGateway.class),
 				mock(ActorGateway.class),

http://git-wip-us.apache.org/repos/asf/flink/blob/63c9b8e7/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 7084208..05b8e8c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -47,6 +47,8 @@ import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 
@@ -91,6 +93,8 @@ public class StreamMockEnvironment implements Environment {
 
 	private final AccumulatorRegistry accumulatorRegistry;
 
+	private final TaskKvStateRegistry kvStateRegistry;
+
 	private final int bufferSize;
 
 	private final ExecutionConfig executionConfig;
@@ -110,6 +114,9 @@ public class StreamMockEnvironment implements Environment {
 
 		this.executionConfig = executionConfig;
 		this.accumulatorRegistry = new AccumulatorRegistry(jobID, getExecutionId());
+
+		KvStateRegistry registry = new KvStateRegistry();
+		this.kvStateRegistry = registry.createTaskRegistry(jobID, getJobVertexId());
 	}
 
 	public StreamMockEnvironment(Configuration jobConfig, Configuration taskConfig, long memorySize,
@@ -294,6 +301,11 @@ public class StreamMockEnvironment implements Environment {
 	}
 
 	@Override
+	public TaskKvStateRegistry getTaskKvStateRegistry() {
+		return kvStateRegistry;
+	}
+
+	@Override
 	public void acknowledgeCheckpoint(long checkpointId) {
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/63c9b8e7/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index e9d583c..bcd8a5f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -19,11 +19,9 @@
 package org.apache.flink.streaming.runtime.tasks;
 
 import akka.actor.ActorRef;
-
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -41,6 +39,8 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -49,10 +49,8 @@ import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.ExceptionUtils;
-
 import org.apache.flink.util.SerializedValue;
 import org.junit.Test;
-
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
@@ -64,14 +62,13 @@ import java.util.Collections;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class StreamTaskTest {
 
@@ -134,6 +131,8 @@ public class StreamTaskTest {
 		when(network.getPartitionManager()).thenReturn(partitionManager);
 		when(network.getPartitionConsumableNotifier()).thenReturn(consumableNotifier);
 		when(network.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
+		when(network.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class)))
+				.thenReturn(mock(TaskKvStateRegistry.class));
 
 		TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
 				new JobID(), "Job Name", new JobVertexID(), new ExecutionAttemptID(),


Mime
View raw message