flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [03/16] flink git commit: [FLINK-7531] Move Flink legacy rest handler to flink-runtime
Date Tue, 19 Sep 2017 22:44:13 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksTimesHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksTimesHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksTimesHandlerTest.java
new file mode 100644
index 0000000..3783b84
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksTimesHandlerTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.TestLogger;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the SubtasksTimesHandler.
+ */
+public class SubtasksTimesHandlerTest extends TestLogger {
+
+	@Test
+	public void testArchiver() throws Exception {
+		JsonArchivist archivist = new SubtasksTimesHandler.SubtasksTimesJsonArchivist();
+		AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+		AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask();
+		AccessExecution originalAttempt = ArchivedJobGenerationUtils.getTestAttempt();
+
+		Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
+		Assert.assertEquals(1, archives.size());
+
+		ArchivedJson archive = archives.iterator().next();
+		Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/vertices/" + originalTask.getJobVertexId() + "/subtasktimes", archive.getPath());
+		compareSubtaskTimes(originalTask, originalAttempt, archive.getJson());
+	}
+
+	@Test
+	public void testGetPaths() {
+		SubtasksTimesHandler handler = new SubtasksTimesHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
+		String[] paths = handler.getPaths();
+		Assert.assertEquals(1, paths.length);
+		Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasktimes", paths[0]);
+	}
+
+	@Test
+	public void testJsonGeneration() throws Exception {
+		AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask();
+		AccessExecution originalAttempt = ArchivedJobGenerationUtils.getTestAttempt();
+		String json = SubtasksTimesHandler.createSubtaskTimesJson(originalTask);
+
+		compareSubtaskTimes(originalTask, originalAttempt, json);
+	}
+
+	private static void compareSubtaskTimes(AccessExecutionJobVertex originalTask, AccessExecution originalAttempt, String json) throws IOException {
+		JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(json);
+
+		Assert.assertEquals(originalTask.getJobVertexId().toString(), result.get("id").asText());
+		Assert.assertEquals(originalTask.getName(), result.get("name").asText());
+		Assert.assertTrue(result.get("now").asLong() > 0L);
+
+		ArrayNode subtasks = (ArrayNode) result.get("subtasks");
+
+		JsonNode subtask = subtasks.get(0);
+		Assert.assertEquals(0, subtask.get("subtask").asInt());
+		Assert.assertEquals(originalAttempt.getAssignedResourceLocation().getHostname(), subtask.get("host").asText());
+		Assert.assertEquals(originalAttempt.getStateTimestamp(originalAttempt.getState()) - originalAttempt.getStateTimestamp(ExecutionState.SCHEDULED), subtask.get("duration").asLong());
+
+		JsonNode timestamps = subtask.get("timestamps");
+
+		Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.CREATED), timestamps.get(ExecutionState.CREATED.name()).asLong());
+		Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.SCHEDULED), timestamps.get(ExecutionState.SCHEDULED.name()).asLong());
+		Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.DEPLOYING), timestamps.get(ExecutionState.DEPLOYING.name()).asLong());
+		Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.RUNNING), timestamps.get(ExecutionState.RUNNING.name()).asLong());
+		Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.FINISHED), timestamps.get(ExecutionState.FINISHED.name()).asLong());
+		Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.CANCELING), timestamps.get(ExecutionState.CANCELING.name()).asLong());
+		Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.CANCELED), timestamps.get(ExecutionState.CANCELED.name()).asLong());
+		Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.FAILED), timestamps.get(ExecutionState.FAILED.name()).asLong());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandlerTest.java
new file mode 100644
index 0000000..b65dcb6
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandlerTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.isA;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Tests for the TaskManagersLogHandler.
+ */
+public class TaskManagerLogHandlerTest {
+	@Test
+	public void testGetPaths() {
+		TaskManagerLogHandler handlerLog = new TaskManagerLogHandler(
+			mock(GatewayRetriever.class),
+			Executors.directExecutor(),
+			CompletableFuture.completedFuture("/jm/address"),
+			TestingUtils.TIMEOUT(),
+			TaskManagerLogHandler.FileMode.LOG,
+			new Configuration(),
+			new VoidBlobStore());
+		String[] pathsLog = handlerLog.getPaths();
+		Assert.assertEquals(1, pathsLog.length);
+		Assert.assertEquals("/taskmanagers/:taskmanagerid/log", pathsLog[0]);
+
+		TaskManagerLogHandler handlerOut = new TaskManagerLogHandler(
+			mock(GatewayRetriever.class),
+			Executors.directExecutor(),
+			CompletableFuture.completedFuture("/jm/address"),
+			TestingUtils.TIMEOUT(),
+			TaskManagerLogHandler.FileMode.STDOUT,
+			new Configuration(),
+			new VoidBlobStore());
+		String[] pathsOut = handlerOut.getPaths();
+		Assert.assertEquals(1, pathsOut.length);
+		Assert.assertEquals("/taskmanagers/:taskmanagerid/stdout", pathsOut[0]);
+	}
+
+	@Test
+	public void testLogFetchingFailure() throws Exception {
+		// ========= setup TaskManager =================================================================================
+		InstanceID tmID = new InstanceID();
+		ResourceID tmRID = new ResourceID(tmID.toString());
+		TaskManagerGateway taskManagerGateway = mock(TaskManagerGateway.class);
+		when(taskManagerGateway.getAddress()).thenReturn("/tm/address");
+
+		Instance taskManager = mock(Instance.class);
+		when(taskManager.getId()).thenReturn(tmID);
+		when(taskManager.getTaskManagerID()).thenReturn(tmRID);
+		when(taskManager.getTaskManagerGateway()).thenReturn(taskManagerGateway);
+		CompletableFuture<BlobKey> future = new CompletableFuture<>();
+		future.completeExceptionally(new IOException("failure"));
+		when(taskManagerGateway.requestTaskManagerLog(any(Time.class))).thenReturn(future);
+
+		// ========= setup JobManager ==================================================================================
+
+		JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class);
+		when(jobManagerGateway.requestBlobServerPort(any(Time.class))).thenReturn(CompletableFuture.completedFuture(1337));
+		when(jobManagerGateway.getHostname()).thenReturn("localhost");
+		when(jobManagerGateway.requestTaskManagerInstance(any(InstanceID.class), any(Time.class))).thenReturn(
+			CompletableFuture.completedFuture(Optional.of(taskManager)));
+
+		GatewayRetriever<JobManagerGateway> retriever = mock(GatewayRetriever.class);
+		when(retriever.getNow())
+			.thenReturn(Optional.of(jobManagerGateway));
+
+		TaskManagerLogHandler handler = new TaskManagerLogHandler(
+			retriever,
+			Executors.directExecutor(),
+			CompletableFuture.completedFuture("/jm/address"),
+			TestingUtils.TIMEOUT(),
+			TaskManagerLogHandler.FileMode.LOG,
+			new Configuration(),
+			new VoidBlobStore());
+
+		final AtomicReference<String> exception = new AtomicReference<>();
+
+		ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+		when(ctx.write(isA(ByteBuf.class))).thenAnswer(new Answer<Object>() {
+			@Override
+			public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+				ByteBuf data = invocationOnMock.getArgumentAt(0, ByteBuf.class);
+				exception.set(new String(data.array(), ConfigConstants.DEFAULT_CHARSET));
+				return null;
+			}
+		});
+
+		Map<String, String> pathParams = new HashMap<>();
+		pathParams.put(TaskManagersHandler.TASK_MANAGER_ID_KEY, tmID.toString());
+		Routed routed = mock(Routed.class);
+		when(routed.pathParams()).thenReturn(pathParams);
+		when(routed.request()).thenReturn(new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/taskmanagers/" + tmID + "/log"));
+
+		handler.respondAsLeader(ctx, routed, jobManagerGateway);
+
+		Assert.assertEquals("Fetching TaskManager log failed.", exception.get());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandlerTest.java
new file mode 100644
index 0000000..2f8afd1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandlerTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.Executors;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+
+/**
+ * Tests for the TaskManagersHandler.
+ */
+public class TaskManagersHandlerTest {
+	@Test
+	public void testGetPaths() {
+		TaskManagersHandler handler = new TaskManagersHandler(Executors.directExecutor(), Time.seconds(0L), null);
+		String[] paths = handler.getPaths();
+		Assert.assertEquals(2, paths.length);
+		List<String> pathsList = Lists.newArrayList(paths);
+		Assert.assertTrue(pathsList.contains("/taskmanagers"));
+		Assert.assertTrue(pathsList.contains("/taskmanagers/:taskmanagerid"));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerITCase.java
new file mode 100644
index 0000000..e2289f0
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerITCase.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.backpressure;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobClient;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.AllVerticesRunning;
+import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.ExecutionGraphFound;
+import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.RequestExecutionGraph;
+import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning;
+
+/**
+ * Simple back pressured task test.
+ */
+public class BackPressureStatsTrackerITCase extends TestLogger {
+
+	private static NetworkBufferPool networkBufferPool;
+	private static ActorSystem testActorSystem;
+
+	/** Shared as static variable with the test task. */
+	private static BufferPool testBufferPool;
+
+	@BeforeClass
+	public static void setup() {
+		testActorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
+		networkBufferPool = new NetworkBufferPool(100, 8192, MemoryType.HEAP);
+	}
+
+	@AfterClass
+	public static void teardown() {
+		JavaTestKit.shutdownActorSystem(testActorSystem);
+		networkBufferPool.destroy();
+	}
+
+	/**
+	 * Tests a simple fake-back pressured task. Back pressure is assumed when
+	 * sampled stack traces are in blocking buffer requests.
+	 */
+	@Test
+	public void testBackPressuredProducer() throws Exception {
+		new JavaTestKit(testActorSystem) {{
+			final FiniteDuration deadline = new FiniteDuration(60, TimeUnit.SECONDS);
+
+			// The JobGraph
+			final JobGraph jobGraph = new JobGraph();
+			final int parallelism = 4;
+
+			final JobVertex task = new JobVertex("Task");
+			task.setInvokableClass(BackPressuredTask.class);
+			task.setParallelism(parallelism);
+
+			jobGraph.addVertex(task);
+
+			final Configuration config = new Configuration();
+
+			final HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
+				config,
+				TestingUtils.defaultExecutor());
+
+			ActorGateway jobManger = null;
+			ActorGateway taskManager = null;
+
+			//
+			// 1) Consume all buffers at first (no buffers for the test task)
+			//
+			testBufferPool = networkBufferPool.createBufferPool(1, Integer.MAX_VALUE);
+			final List<Buffer> buffers = new ArrayList<>();
+			while (true) {
+				Buffer buffer = testBufferPool.requestBuffer();
+				if (buffer != null) {
+					buffers.add(buffer);
+				} else {
+					break;
+				}
+			}
+
+			try {
+				jobManger = TestingUtils.createJobManager(
+					testActorSystem,
+					TestingUtils.defaultExecutor(),
+					TestingUtils.defaultExecutor(),
+					config,
+					highAvailabilityServices);
+
+				config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);
+
+				taskManager = TestingUtils.createTaskManager(
+					testActorSystem,
+					highAvailabilityServices,
+					config,
+					true,
+					true);
+
+				final ActorGateway jm = jobManger;
+
+				new Within(deadline) {
+					@Override
+					protected void run() {
+						try {
+							ActorGateway testActor = new AkkaActorGateway(getTestActor(), HighAvailabilityServices.DEFAULT_LEADER_ID);
+
+							// Submit the job and wait until it is running
+							JobClient.submitJobDetached(
+									new AkkaJobManagerGateway(jm),
+									config,
+									jobGraph,
+									Time.milliseconds(deadline.toMillis()),
+									ClassLoader.getSystemClassLoader());
+
+							jm.tell(new WaitForAllVerticesToBeRunning(jobGraph.getJobID()), testActor);
+
+							expectMsgEquals(new AllVerticesRunning(jobGraph.getJobID()));
+
+							// Get the ExecutionGraph
+							jm.tell(new RequestExecutionGraph(jobGraph.getJobID()), testActor);
+
+							ExecutionGraphFound executionGraphResponse =
+									expectMsgClass(ExecutionGraphFound.class);
+
+							ExecutionGraph executionGraph = (ExecutionGraph) executionGraphResponse.executionGraph();
+							ExecutionJobVertex vertex = executionGraph.getJobVertex(task.getID());
+
+							StackTraceSampleCoordinator coordinator = new StackTraceSampleCoordinator(
+									testActorSystem.dispatcher(), 60000);
+
+							// Verify back pressure (clean up interval can be ignored)
+							BackPressureStatsTracker statsTracker = new BackPressureStatsTracker(
+								coordinator,
+								100 * 1000,
+								20,
+								Time.milliseconds(10L));
+
+							int numAttempts = 10;
+
+							int nextSampleId = 0;
+
+							// Verify that all tasks are back pressured. This
+							// can fail if the task takes longer to request
+							// the buffer.
+							for (int attempt = 0; attempt < numAttempts; attempt++) {
+								try {
+									OperatorBackPressureStats stats = triggerStatsSample(statsTracker, vertex);
+
+									Assert.assertEquals(nextSampleId + attempt, stats.getSampleId());
+									Assert.assertEquals(parallelism, stats.getNumberOfSubTasks());
+									Assert.assertEquals(1.0, stats.getMaxBackPressureRatio(), 0.0);
+
+									for (int i = 0; i < parallelism; i++) {
+										Assert.assertEquals(1.0, stats.getBackPressureRatio(i), 0.0);
+									}
+
+									nextSampleId = stats.getSampleId() + 1;
+
+									break;
+								} catch (Throwable t) {
+									if (attempt == numAttempts - 1) {
+										throw t;
+									} else {
+										Thread.sleep(500);
+									}
+								}
+							}
+
+							//
+							// 2) Release all buffers and let the tasks grab one
+							//
+							for (Buffer buf : buffers) {
+								buf.recycle();
+							}
+
+							// Wait for all buffers to be available. The tasks
+							// grab them and then immediately release them.
+							while (testBufferPool.getNumberOfAvailableMemorySegments() < 100) {
+								Thread.sleep(100);
+							}
+
+							// Verify that no task is back pressured any more.
+							for (int attempt = 0; attempt < numAttempts; attempt++) {
+								try {
+									OperatorBackPressureStats stats = triggerStatsSample(statsTracker, vertex);
+
+									Assert.assertEquals(nextSampleId + attempt, stats.getSampleId());
+									Assert.assertEquals(parallelism, stats.getNumberOfSubTasks());
+
+									// Verify that no task is back pressured
+									for (int i = 0; i < parallelism; i++) {
+										Assert.assertEquals(0.0, stats.getBackPressureRatio(i), 0.0);
+									}
+
+									break;
+								} catch (Throwable t) {
+									if (attempt == numAttempts - 1) {
+										throw t;
+									} else {
+										Thread.sleep(500);
+									}
+								}
+							}
+
+							// Shut down
+							jm.tell(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()), testActor);
+
+							// Cancel job
+							jm.tell(new JobManagerMessages.CancelJob(jobGraph.getJobID()));
+
+							// Response to removal notification
+							expectMsgEquals(true);
+
+							//
+							// 3) Trigger stats for archived job
+							//
+							statsTracker.invalidateOperatorStatsCache();
+							Assert.assertFalse("Unexpected trigger", statsTracker.triggerStackTraceSample(vertex));
+
+						} catch (Exception e) {
+							e.printStackTrace();
+							Assert.fail(e.getMessage());
+						}
+					}
+				};
+			} finally {
+				TestingUtils.stopActor(jobManger);
+				TestingUtils.stopActor(taskManager);
+
+				highAvailabilityServices.closeAndCleanupAllData();
+
+				for (Buffer buf : buffers) {
+					buf.recycle();
+				}
+
+				testBufferPool.lazyDestroy();
+			}
+		}};
+	}
+
+	/**
+	 * Triggers a new stats sample.
+	 */
+	private OperatorBackPressureStats triggerStatsSample(
+			BackPressureStatsTracker statsTracker,
+			ExecutionJobVertex vertex) throws InterruptedException {
+
+		statsTracker.invalidateOperatorStatsCache();
+		Assert.assertTrue("Failed to trigger", statsTracker.triggerStackTraceSample(vertex));
+
+		// Sleep minimum duration
+		Thread.sleep(20 * 10);
+
+		Optional<OperatorBackPressureStats> stats;
+
+		// Get the stats
+		while (!(stats = statsTracker.getOperatorBackPressureStats(vertex)).isPresent()) {
+			Thread.sleep(10);
+		}
+
+		return stats.get();
+	}
+
+	/**
+	 * A back pressured producer sharing a {@link BufferPool} with the
+	 * test driver.
+	 */
+	public static class BackPressuredTask extends AbstractInvokable {
+
+		@Override
+		public void invoke() throws Exception {
+			while (true) {
+				Buffer buffer = testBufferPool.requestBufferBlocking();
+				// Got a buffer, yay!
+				buffer.recycle();
+
+				new CountDownLatch(1).await();
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerTest.java
new file mode 100644
index 0000000..02f954a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerTest.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.backpressure;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Tests for the BackPressureStatsTracker.
+ */
+public class BackPressureStatsTrackerTest extends TestLogger {
+
+	/** Tests simple statistics with fake stack traces. */
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testTriggerStackTraceSample() throws Exception {
+		CompletableFuture<StackTraceSample> sampleFuture = new CompletableFuture<>();
+
+		StackTraceSampleCoordinator sampleCoordinator = Mockito.mock(StackTraceSampleCoordinator.class);
+		Mockito.when(sampleCoordinator.triggerStackTraceSample(
+				Matchers.any(ExecutionVertex[].class),
+				Matchers.anyInt(),
+				Matchers.any(Time.class),
+				Matchers.anyInt())).thenReturn(sampleFuture);
+
+		ExecutionGraph graph = Mockito.mock(ExecutionGraph.class);
+		Mockito.when(graph.getState()).thenReturn(JobStatus.RUNNING);
+
+		// Same Thread execution context
+		Mockito.when(graph.getFutureExecutor()).thenReturn(new Executor() {
+
+			@Override
+			public void execute(Runnable runnable) {
+				runnable.run();
+			}
+		});
+
+		ExecutionVertex[] taskVertices = new ExecutionVertex[4];
+
+		ExecutionJobVertex jobVertex = Mockito.mock(ExecutionJobVertex.class);
+		Mockito.when(jobVertex.getJobId()).thenReturn(new JobID());
+		Mockito.when(jobVertex.getJobVertexId()).thenReturn(new JobVertexID());
+		Mockito.when(jobVertex.getGraph()).thenReturn(graph);
+		Mockito.when(jobVertex.getTaskVertices()).thenReturn(taskVertices);
+
+		taskVertices[0] = mockExecutionVertex(jobVertex, 0);
+		taskVertices[1] = mockExecutionVertex(jobVertex, 1);
+		taskVertices[2] = mockExecutionVertex(jobVertex, 2);
+		taskVertices[3] = mockExecutionVertex(jobVertex, 3);
+
+		int numSamples = 100;
+		Time delayBetweenSamples = Time.milliseconds(100L);
+
+		BackPressureStatsTracker tracker = new BackPressureStatsTracker(
+				sampleCoordinator, 9999, numSamples, delayBetweenSamples);
+
+		// Trigger
+		Assert.assertTrue("Failed to trigger", tracker.triggerStackTraceSample(jobVertex));
+
+		Mockito.verify(sampleCoordinator).triggerStackTraceSample(
+				Matchers.eq(taskVertices),
+				Matchers.eq(numSamples),
+				Matchers.eq(delayBetweenSamples),
+				Matchers.eq(BackPressureStatsTracker.MAX_STACK_TRACE_DEPTH));
+
+		// Trigger again for pending request, should not fire
+		Assert.assertFalse("Unexpected trigger", tracker.triggerStackTraceSample(jobVertex));
+
+		Assert.assertTrue(!tracker.getOperatorBackPressureStats(jobVertex).isPresent());
+
+		Mockito.verify(sampleCoordinator).triggerStackTraceSample(
+				Matchers.eq(taskVertices),
+				Matchers.eq(numSamples),
+				Matchers.eq(delayBetweenSamples),
+				Matchers.eq(BackPressureStatsTracker.MAX_STACK_TRACE_DEPTH));
+
+		Assert.assertTrue(!tracker.getOperatorBackPressureStats(jobVertex).isPresent());
+
+		// Complete the future
+		Map<ExecutionAttemptID, List<StackTraceElement[]>> traces = new HashMap<>();
+		for (ExecutionVertex vertex : taskVertices) {
+			List<StackTraceElement[]> taskTraces = new ArrayList<>();
+
+			for (int i = 0; i < taskVertices.length; i++) {
+				// Traces until sub task index are back pressured
+				taskTraces.add(createStackTrace(i <= vertex.getParallelSubtaskIndex()));
+			}
+
+			traces.put(vertex.getCurrentExecutionAttempt().getAttemptId(), taskTraces);
+		}
+
+		int sampleId = 1231;
+		int endTime = 841;
+
+		StackTraceSample sample = new StackTraceSample(
+				sampleId,
+				0,
+				endTime,
+				traces);
+
+		// Succeed the promise
+		sampleFuture.complete(sample);
+
+		Assert.assertTrue(tracker.getOperatorBackPressureStats(jobVertex).isPresent());
+
+		OperatorBackPressureStats stats = tracker.getOperatorBackPressureStats(jobVertex).get();
+
+		// Verify the stats
+		Assert.assertEquals(sampleId, stats.getSampleId());
+		Assert.assertEquals(endTime, stats.getEndTimestamp());
+		Assert.assertEquals(taskVertices.length, stats.getNumberOfSubTasks());
+
+		for (int i = 0; i < taskVertices.length; i++) {
+			double ratio = stats.getBackPressureRatio(i);
+			// Traces until sub task index are back pressured
+			Assert.assertEquals((i + 1) / ((double) 4), ratio, 0.0);
+		}
+	}
+
+	private StackTraceElement[] createStackTrace(boolean isBackPressure) {
+		if (isBackPressure) {
+			return new StackTraceElement[] { new StackTraceElement(
+					BackPressureStatsTracker.EXPECTED_CLASS_NAME,
+					BackPressureStatsTracker.EXPECTED_METHOD_NAME,
+					"LocalBufferPool.java",
+					133) };
+		} else {
+			return Thread.currentThread().getStackTrace();
+		}
+	}
+
+	private ExecutionVertex mockExecutionVertex(
+			ExecutionJobVertex jobVertex,
+			int subTaskIndex) {
+
+		Execution exec = Mockito.mock(Execution.class);
+		Mockito.when(exec.getAttemptId()).thenReturn(new ExecutionAttemptID());
+
+		JobVertexID id = jobVertex.getJobVertexId();
+
+		ExecutionVertex vertex = Mockito.mock(ExecutionVertex.class);
+		Mockito.when(vertex.getJobvertexId()).thenReturn(id);
+		Mockito.when(vertex.getCurrentExecutionAttempt()).thenReturn(exec);
+		Mockito.when(vertex.getParallelSubtaskIndex()).thenReturn(subTaskIndex);
+
+		return vertex;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorITCase.java
new file mode 100644
index 0000000..8fa302a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorITCase.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.backpressure;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobClient;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.AllVerticesRunning;
+import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.ExecutionGraphFound;
+import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.RequestExecutionGraph;
+import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning;
+
+/**
+ * Simple stack trace sampling test.
+ */
+public class StackTraceSampleCoordinatorITCase extends TestLogger {
+
+	private static ActorSystem testActorSystem;
+
+	@BeforeClass
+	public static void setup() {
+		testActorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
+	}
+
+	@AfterClass
+	public static void teardown() {
+		JavaTestKit.shutdownActorSystem(testActorSystem);
+	}
+
+	/**
+	 * Tests that a cleared task is answered with a partial success response.
+	 */
+	@Test
+	public void testTaskClearedWhileSampling() throws Exception {
+		new JavaTestKit(testActorSystem) {{
+			final FiniteDuration deadline = new FiniteDuration(60, TimeUnit.SECONDS);
+
+			// The JobGraph
+			final JobGraph jobGraph = new JobGraph();
+			final int parallelism = 1;
+
+			final JobVertex task = new JobVertex("Task");
+			task.setInvokableClass(BlockingNoOpInvokable.class);
+			task.setParallelism(parallelism);
+
+			jobGraph.addVertex(task);
+
+			final Configuration config = new Configuration();
+
+			final HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
+				config,
+				TestingUtils.defaultExecutor());
+
+			ActorGateway jobManger = null;
+			ActorGateway taskManager = null;
+
+			try {
+				jobManger = TestingUtils.createJobManager(
+					testActorSystem,
+					TestingUtils.defaultExecutor(),
+					TestingUtils.defaultExecutor(),
+					config,
+					highAvailabilityServices);
+
+				config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);
+
+				taskManager = TestingUtils.createTaskManager(
+					testActorSystem,
+					highAvailabilityServices,
+					config,
+					true,
+					true);
+
+				final ActorGateway jm = jobManger;
+
+				new Within(deadline) {
+					@Override
+					protected void run() {
+						try {
+							ActorGateway testActor = new AkkaActorGateway(getTestActor(), HighAvailabilityServices.DEFAULT_LEADER_ID);
+
+							int maxAttempts = 10;
+							int sleepTime = 100;
+
+							for (int i = 0; i < maxAttempts; i++, sleepTime *= 2) {
+								// Submit the job and wait until it is running
+								JobClient.submitJobDetached(
+										new AkkaJobManagerGateway(jm),
+										config,
+										jobGraph,
+										Time.milliseconds(deadline.toMillis()),
+										ClassLoader.getSystemClassLoader());
+
+								jm.tell(new WaitForAllVerticesToBeRunning(jobGraph.getJobID()), testActor);
+
+								expectMsgEquals(new AllVerticesRunning(jobGraph.getJobID()));
+
+								// Get the ExecutionGraph
+								jm.tell(new RequestExecutionGraph(jobGraph.getJobID()), testActor);
+								ExecutionGraphFound executionGraphResponse =
+										expectMsgClass(ExecutionGraphFound.class);
+								ExecutionGraph executionGraph = (ExecutionGraph) executionGraphResponse.executionGraph();
+								ExecutionJobVertex vertex = executionGraph.getJobVertex(task.getID());
+
+								StackTraceSampleCoordinator coordinator = new StackTraceSampleCoordinator(
+										testActorSystem.dispatcher(), 60000);
+
+								CompletableFuture<StackTraceSample> sampleFuture = coordinator.triggerStackTraceSample(
+									vertex.getTaskVertices(),
+									// Do this often so we have a good
+									// chance of removing the job during
+									// sampling.
+									21474700 * 100,
+									Time.milliseconds(10L),
+									0);
+
+								// Wait before cancelling so that some samples
+								// are actually taken.
+								Thread.sleep(sleepTime);
+
+								// Cancel job
+								Future<?> removeFuture = jm.ask(
+										new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()),
+										remaining());
+
+								jm.tell(new JobManagerMessages.CancelJob(jobGraph.getJobID()));
+
+								try {
+									// Throws Exception on failure
+									sampleFuture.get(remaining().toMillis(), TimeUnit.MILLISECONDS);
+
+									// OK, we are done. Got the expected
+									// partial result.
+									break;
+								} catch (Throwable t) {
+									// We were too fast in cancelling the job.
+									// Fall through and retry.
+								} finally {
+									Await.ready(removeFuture, remaining());
+								}
+							}
+						} catch (Exception e) {
+							e.printStackTrace();
+							Assert.fail(e.getMessage());
+						}
+					}
+				};
+			} finally {
+				TestingUtils.stopActor(jobManger);
+				TestingUtils.stopActor(taskManager);
+
+				highAvailabilityServices.closeAndCleanupAllData();
+			}
+		}};
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorTest.java
new file mode 100644
index 0000000..786b0ae
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorTest.java
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.backpressure;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.messages.StackTraceSampleMessages.TriggerStackTraceSample;
+import org.apache.flink.runtime.messages.StackTraceSampleResponse;
+import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorSystem;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Test for the {@link StackTraceSampleCoordinator}.
+ */
+public class StackTraceSampleCoordinatorTest extends TestLogger {
+
+	private static ActorSystem system;
+
+	private StackTraceSampleCoordinator coord;
+
+	@BeforeClass
+	public static void setUp() throws Exception {
+		system = AkkaUtils.createLocalActorSystem(new Configuration());
+	}
+
+	@AfterClass
+	public static void tearDown() throws Exception {
+		if (system != null) {
+			system.shutdown();
+		}
+	}
+
+	@Before
+	public void init() throws Exception {
+		this.coord = new StackTraceSampleCoordinator(system.dispatcher(), 60000);
+	}
+
+	/** Tests simple trigger and collect of stack trace samples. */
+	@Test
+	public void testTriggerStackTraceSample() throws Exception {
+		ExecutionVertex[] vertices = new ExecutionVertex[] {
+				mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true),
+				mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true),
+				mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true),
+				mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true)
+		};
+
+		int numSamples = 1;
+		Time delayBetweenSamples = Time.milliseconds(100L);
+		int maxStackTraceDepth = 0;
+
+		CompletableFuture<StackTraceSample> sampleFuture = coord.triggerStackTraceSample(
+				vertices, numSamples, delayBetweenSamples, maxStackTraceDepth);
+
+		// Verify messages have been sent
+		for (ExecutionVertex vertex : vertices) {
+			ExecutionAttemptID expectedExecutionId = vertex
+					.getCurrentExecutionAttempt().getAttemptId();
+
+			TriggerStackTraceSample expectedMsg = new TriggerStackTraceSample(
+					0,
+					expectedExecutionId,
+					numSamples,
+					delayBetweenSamples,
+					maxStackTraceDepth);
+
+			Mockito.verify(vertex.getCurrentExecutionAttempt())
+				.requestStackTraceSample(Matchers.eq(0), Matchers.eq(numSamples), Matchers.eq(delayBetweenSamples), Matchers.eq(maxStackTraceDepth), Matchers.any(Time.class));
+		}
+
+		Assert.assertFalse(sampleFuture.isDone());
+
+		StackTraceElement[] stackTraceSample = Thread.currentThread().getStackTrace();
+		List<StackTraceElement[]> traces = new ArrayList<>();
+		traces.add(stackTraceSample);
+		traces.add(stackTraceSample);
+		traces.add(stackTraceSample);
+
+		// Collect stack traces
+		for (int i = 0; i < vertices.length; i++) {
+			ExecutionAttemptID executionId = vertices[i].getCurrentExecutionAttempt().getAttemptId();
+			coord.collectStackTraces(0, executionId, traces);
+
+			if (i == vertices.length - 1) {
+				Assert.assertTrue(sampleFuture.isDone());
+			} else {
+				Assert.assertFalse(sampleFuture.isDone());
+			}
+		}
+
+		// Verify completed stack trace sample
+		StackTraceSample sample = sampleFuture.get();
+
+		Assert.assertEquals(0, sample.getSampleId());
+		Assert.assertTrue(sample.getEndTime() >= sample.getStartTime());
+
+		Map<ExecutionAttemptID, List<StackTraceElement[]>> tracesByTask = sample.getStackTraces();
+
+		for (ExecutionVertex vertex : vertices) {
+			ExecutionAttemptID executionId = vertex.getCurrentExecutionAttempt().getAttemptId();
+			List<StackTraceElement[]> sampleTraces = tracesByTask.get(executionId);
+
+			Assert.assertNotNull("Task not found", sampleTraces);
+			Assert.assertTrue(traces.equals(sampleTraces));
+		}
+
+		// Verify no more pending sample
+		Assert.assertEquals(0, coord.getNumberOfPendingSamples());
+
+		// Verify no error on late collect
+		coord.collectStackTraces(0, vertices[0].getCurrentExecutionAttempt().getAttemptId(), traces);
+	}
+
+	/** Tests triggering for non-running tasks fails the future. */
+	@Test
+	public void testTriggerStackTraceSampleNotRunningTasks() throws Exception {
+		ExecutionVertex[] vertices = new ExecutionVertex[] {
+				mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true),
+				mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.DEPLOYING, true)
+		};
+
+		CompletableFuture<StackTraceSample> sampleFuture = coord.triggerStackTraceSample(
+			vertices,
+			1,
+			Time.milliseconds(100L),
+			0);
+
+		Assert.assertTrue(sampleFuture.isDone());
+
+		try {
+			sampleFuture.get();
+			Assert.fail("Expected exception.");
+		} catch (ExecutionException e) {
+			Assert.assertTrue(e.getCause() instanceof IllegalStateException);
+		}
+	}
+
+	/** Tests triggering for reset tasks fails the future. */
+	@Test(timeout = 1000L)
+	public void testTriggerStackTraceSampleResetRunningTasks() throws Exception {
+		ExecutionVertex[] vertices = new ExecutionVertex[] {
+				mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true),
+				// Fails to send the message to the execution (happens when execution is reset)
+				mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, false)
+		};
+
+		CompletableFuture<StackTraceSample> sampleFuture = coord.triggerStackTraceSample(
+			vertices,
+			1,
+			Time.milliseconds(100L),
+			0);
+
+		try {
+			sampleFuture.get();
+			Assert.fail("Expected exception.");
+		} catch (ExecutionException e) {
+			Assert.assertTrue(e.getCause() instanceof RuntimeException);
+		}
+	}
+
+	/** Tests that samples time out if they don't finish in time. */
+	@Test(timeout = 1000L)
+	public void testTriggerStackTraceSampleTimeout() throws Exception {
+		int timeout = 100;
+
+		coord = new StackTraceSampleCoordinator(system.dispatcher(), timeout);
+
+		final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
+
+		try {
+
+			ExecutionVertex[] vertices = new ExecutionVertex[]{
+				mockExecutionVertexWithTimeout(
+					new ExecutionAttemptID(),
+					ExecutionState.RUNNING,
+					scheduledExecutorService,
+					timeout)
+			};
+
+			CompletableFuture<StackTraceSample> sampleFuture = coord.triggerStackTraceSample(
+				vertices, 1, Time.milliseconds(100L), 0);
+
+			// Wait for the timeout
+			Thread.sleep(timeout * 2);
+
+			boolean success = false;
+			for (int i = 0; i < 10; i++) {
+				if (sampleFuture.isDone()) {
+					success = true;
+					break;
+				}
+
+				Thread.sleep(timeout);
+			}
+
+			Assert.assertTrue("Sample did not time out", success);
+
+			try {
+				sampleFuture.get();
+				Assert.fail("Expected exception.");
+			} catch (ExecutionException e) {
+				Assert.assertTrue(e.getCause().getCause().getMessage().contains("Timeout"));
+			}
+
+			// Collect after the timeout (should be ignored)
+			ExecutionAttemptID executionId = vertices[0].getCurrentExecutionAttempt().getAttemptId();
+			coord.collectStackTraces(0, executionId, new ArrayList<StackTraceElement[]>());
+		} finally {
+			scheduledExecutorService.shutdownNow();
+		}
+	}
+
+	/** Tests that collecting an unknown sample is ignored. */
+	@Test
+	public void testCollectStackTraceForUnknownSample() throws Exception {
+		coord.collectStackTraces(0, new ExecutionAttemptID(), new ArrayList<StackTraceElement[]>());
+	}
+
+	/** Tests cancelling of a pending sample. */
+	@Test
+	public void testCancelStackTraceSample() throws Exception {
+		ExecutionVertex[] vertices = new ExecutionVertex[] {
+				mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true),
+		};
+
+		CompletableFuture<StackTraceSample> sampleFuture = coord.triggerStackTraceSample(
+				vertices, 1, Time.milliseconds(100L), 0);
+
+		Assert.assertFalse(sampleFuture.isDone());
+
+		// Cancel
+		coord.cancelStackTraceSample(0, null);
+
+		// Verify completed
+		Assert.assertTrue(sampleFuture.isDone());
+
+		// Verify no more pending samples
+		Assert.assertEquals(0, coord.getNumberOfPendingSamples());
+	}
+
+	/** Tests that collecting for a cancelled sample throws no Exception. */
+	@Test
+	public void testCollectStackTraceForCanceledSample() throws Exception {
+		ExecutionVertex[] vertices = new ExecutionVertex[] {
+				mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true),
+		};
+
+		CompletableFuture<StackTraceSample> sampleFuture = coord.triggerStackTraceSample(
+				vertices, 1, Time.milliseconds(100L), 0);
+
+		Assert.assertFalse(sampleFuture.isDone());
+
+		coord.cancelStackTraceSample(0, null);
+
+		Assert.assertTrue(sampleFuture.isDone());
+
+		// Verify no error on late collect
+		ExecutionAttemptID executionId = vertices[0].getCurrentExecutionAttempt().getAttemptId();
+		coord.collectStackTraces(0, executionId, new ArrayList<StackTraceElement[]>());
+	}
+
+	/** Tests that collecting for a cancelled sample throws no Exception. */
+	@Test
+	public void testCollectForDiscardedPendingSample() throws Exception {
+		ExecutionVertex[] vertices = new ExecutionVertex[] {
+				mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true),
+		};
+
+		CompletableFuture<StackTraceSample> sampleFuture = coord.triggerStackTraceSample(
+				vertices, 1, Time.milliseconds(100L), 0);
+
+		Assert.assertFalse(sampleFuture.isDone());
+
+		coord.cancelStackTraceSample(0, null);
+
+		Assert.assertTrue(sampleFuture.isDone());
+
+		// Verify no error on late collect
+		ExecutionAttemptID executionId = vertices[0].getCurrentExecutionAttempt().getAttemptId();
+		coord.collectStackTraces(0, executionId, new ArrayList<StackTraceElement[]>());
+	}
+
+
+	/** Tests that collecting for a unknown task fails. */
+	@Test(expected = IllegalArgumentException.class)
+	public void testCollectStackTraceForUnknownTask() throws Exception {
+		ExecutionVertex[] vertices = new ExecutionVertex[] {
+				mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true),
+		};
+
+		coord.triggerStackTraceSample(vertices, 1, Time.milliseconds(100L), 0);
+
+		coord.collectStackTraces(0, new ExecutionAttemptID(), new ArrayList<StackTraceElement[]>());
+	}
+
+	/** Tests that shut down fails all pending samples and future sample triggers. */
+	@Test
+	public void testShutDown() throws Exception {
+		ExecutionVertex[] vertices = new ExecutionVertex[] {
+				mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true),
+		};
+
+		List<CompletableFuture<StackTraceSample>> sampleFutures = new ArrayList<>();
+
+		// Trigger
+		sampleFutures.add(coord.triggerStackTraceSample(
+				vertices, 1, Time.milliseconds(100L), 0));
+
+		sampleFutures.add(coord.triggerStackTraceSample(
+				vertices, 1, Time.milliseconds(100L), 0));
+
+		for (CompletableFuture<StackTraceSample> future : sampleFutures) {
+			Assert.assertFalse(future.isDone());
+		}
+
+		// Shut down
+		coord.shutDown();
+
+		// Verify all completed
+		for (CompletableFuture<StackTraceSample> future : sampleFutures) {
+			Assert.assertTrue(future.isDone());
+		}
+
+		// Verify new trigger returns failed future
+		CompletableFuture<StackTraceSample> future = coord.triggerStackTraceSample(
+				vertices, 1, Time.milliseconds(100L), 0);
+
+		Assert.assertTrue(future.isDone());
+
+		try {
+			future.get();
+			Assert.fail("Expected exception.");
+		} catch (ExecutionException e) {
+			// we expected an exception here :-)
+		}
+
+	}
+
+	// ------------------------------------------------------------------------
+
+	private ExecutionVertex mockExecutionVertex(
+			ExecutionAttemptID executionId,
+			ExecutionState state,
+			boolean sendSuccess) {
+
+		Execution exec = Mockito.mock(Execution.class);
+		CompletableFuture<StackTraceSampleResponse> failedFuture = new CompletableFuture<>();
+		failedFuture.completeExceptionally(new Exception("Send failed."));
+
+		Mockito.when(exec.getAttemptId()).thenReturn(executionId);
+		Mockito.when(exec.getState()).thenReturn(state);
+		Mockito.when(exec.requestStackTraceSample(Matchers.anyInt(), Matchers.anyInt(), Matchers.any(Time.class), Matchers.anyInt(), Matchers.any(Time.class)))
+			.thenReturn(
+				sendSuccess ?
+					CompletableFuture.completedFuture(Mockito.mock(StackTraceSampleResponse.class)) :
+					failedFuture);
+
+		ExecutionVertex vertex = Mockito.mock(ExecutionVertex.class);
+		Mockito.when(vertex.getJobvertexId()).thenReturn(new JobVertexID());
+		Mockito.when(vertex.getCurrentExecutionAttempt()).thenReturn(exec);
+
+		return vertex;
+	}
+
+	private ExecutionVertex mockExecutionVertexWithTimeout(
+		ExecutionAttemptID executionId,
+		ExecutionState state,
+		ScheduledExecutorService scheduledExecutorService,
+		int timeout) {
+
+		final CompletableFuture<StackTraceSampleResponse> future = new CompletableFuture<>();
+
+		Execution exec = Mockito.mock(Execution.class);
+		Mockito.when(exec.getAttemptId()).thenReturn(executionId);
+		Mockito.when(exec.getState()).thenReturn(state);
+		Mockito.when(exec.requestStackTraceSample(Matchers.anyInt(), Matchers.anyInt(), Matchers.any(Time.class), Matchers.anyInt(), Matchers.any(Time.class)))
+			.thenReturn(future);
+
+		scheduledExecutorService.schedule(new Runnable() {
+			@Override
+			public void run() {
+				future.completeExceptionally(new TimeoutException("Timeout"));
+			}
+		}, timeout, TimeUnit.MILLISECONDS);
+
+		ExecutionVertex vertex = Mockito.mock(ExecutionVertex.class);
+		Mockito.when(vertex.getJobvertexId()).thenReturn(new JobVertexID());
+		Mockito.when(vertex.getCurrentExecutionAttempt()).thenReturn(exec);
+
+		return vertex;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandlerTest.java
new file mode 100644
index 0000000..db91f58
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandlerTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.checkpoints;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the CheckpointConfigHandler.
+ */
+public class CheckpointConfigHandlerTest {
+
+	@Test
+	public void testArchiver() throws IOException {
+		JsonArchivist archivist = new CheckpointConfigHandler.CheckpointConfigJsonArchivist();
+		GraphAndSettings graphAndSettings = createGraphAndSettings(true, true);
+
+		AccessExecutionGraph graph = graphAndSettings.graph;
+		when(graph.getJobID()).thenReturn(new JobID());
+		JobCheckpointingSettings settings = graphAndSettings.snapshottingSettings;
+		ExternalizedCheckpointSettings externalizedSettings = graphAndSettings.externalizedSettings;
+
+		Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(graph);
+		Assert.assertEquals(1, archives.size());
+		ArchivedJson archive = archives.iterator().next();
+		Assert.assertEquals("/jobs/" + graph.getJobID() + "/checkpoints/config", archive.getPath());
+
+		ObjectMapper mapper = new ObjectMapper();
+		JsonNode rootNode = mapper.readTree(archive.getJson());
+
+		Assert.assertEquals("exactly_once", rootNode.get("mode").asText());
+		Assert.assertEquals(settings.getCheckpointInterval(), rootNode.get("interval").asLong());
+		Assert.assertEquals(settings.getCheckpointTimeout(), rootNode.get("timeout").asLong());
+		Assert.assertEquals(settings.getMinPauseBetweenCheckpoints(), rootNode.get("min_pause").asLong());
+		Assert.assertEquals(settings.getMaxConcurrentCheckpoints(), rootNode.get("max_concurrent").asInt());
+
+		JsonNode externalizedNode = rootNode.get("externalization");
+		Assert.assertNotNull(externalizedNode);
+		Assert.assertEquals(externalizedSettings.externalizeCheckpoints(), externalizedNode.get("enabled").asBoolean());
+		Assert.assertEquals(externalizedSettings.deleteOnCancellation(), externalizedNode.get("delete_on_cancellation").asBoolean());
+
+	}
+
+	@Test
+	public void testGetPaths() {
+		CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
+		String[] paths = handler.getPaths();
+		Assert.assertEquals(1, paths.length);
+		Assert.assertEquals("/jobs/:jobid/checkpoints/config", paths[0]);
+	}
+
+	/**
+	 * Tests a simple config.
+	 */
+	@Test
+	public void testSimpleConfig() throws Exception {
+		GraphAndSettings graphAndSettings = createGraphAndSettings(false, true);
+
+		AccessExecutionGraph graph = graphAndSettings.graph;
+		JobCheckpointingSettings settings = graphAndSettings.snapshottingSettings;
+
+		CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
+		String json = handler.handleRequest(graph, Collections.<String, String>emptyMap()).get();
+
+		ObjectMapper mapper = new ObjectMapper();
+		JsonNode rootNode = mapper.readTree(json);
+
+		assertEquals("exactly_once", rootNode.get("mode").asText());
+		assertEquals(settings.getCheckpointInterval(), rootNode.get("interval").asLong());
+		assertEquals(settings.getCheckpointTimeout(), rootNode.get("timeout").asLong());
+		assertEquals(settings.getMinPauseBetweenCheckpoints(), rootNode.get("min_pause").asLong());
+		assertEquals(settings.getMaxConcurrentCheckpoints(), rootNode.get("max_concurrent").asInt());
+
+		JsonNode externalizedNode = rootNode.get("externalization");
+		assertNotNull(externalizedNode);
+		assertEquals(false, externalizedNode.get("enabled").asBoolean());
+	}
+
+	/**
+	 * Tests the that the isExactlyOnce flag is respected.
+	 */
+	@Test
+	public void testAtLeastOnce() throws Exception {
+		GraphAndSettings graphAndSettings = createGraphAndSettings(false, false);
+
+		AccessExecutionGraph graph = graphAndSettings.graph;
+
+		CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
+		String json = handler.handleRequest(graph, Collections.<String, String>emptyMap()).get();
+
+		ObjectMapper mapper = new ObjectMapper();
+		JsonNode rootNode = mapper.readTree(json);
+
+		assertEquals("at_least_once", rootNode.get("mode").asText());
+	}
+
+	/**
+	 * Tests that the externalized checkpoint settings are forwarded.
+	 */
+	@Test
+	public void testEnabledExternalizedCheckpointSettings() throws Exception {
+		GraphAndSettings graphAndSettings = createGraphAndSettings(true, false);
+
+		AccessExecutionGraph graph = graphAndSettings.graph;
+		ExternalizedCheckpointSettings externalizedSettings = graphAndSettings.externalizedSettings;
+
+		CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
+		String json = handler.handleRequest(graph, Collections.<String, String>emptyMap()).get();
+
+		ObjectMapper mapper = new ObjectMapper();
+		JsonNode externalizedNode = mapper.readTree(json).get("externalization");
+		assertNotNull(externalizedNode);
+		assertEquals(externalizedSettings.externalizeCheckpoints(), externalizedNode.get("enabled").asBoolean());
+		assertEquals(externalizedSettings.deleteOnCancellation(), externalizedNode.get("delete_on_cancellation").asBoolean());
+	}
+
+	private static GraphAndSettings createGraphAndSettings(boolean externalized, boolean exactlyOnce) {
+		long interval = 18231823L;
+		long timeout = 996979L;
+		long minPause = 119191919L;
+		int maxConcurrent = 12929329;
+		ExternalizedCheckpointSettings externalizedSetting = externalized
+			? ExternalizedCheckpointSettings.externalizeCheckpoints(true)
+			: ExternalizedCheckpointSettings.none();
+
+		JobCheckpointingSettings settings = new JobCheckpointingSettings(
+			Collections.<JobVertexID>emptyList(),
+			Collections.<JobVertexID>emptyList(),
+			Collections.<JobVertexID>emptyList(),
+			interval,
+			timeout,
+			minPause,
+			maxConcurrent,
+			externalizedSetting,
+			null,
+			exactlyOnce);
+
+		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
+		when(graph.getJobCheckpointingSettings()).thenReturn(settings);
+
+		return new GraphAndSettings(graph, settings, externalizedSetting);
+	}
+
+	private static class GraphAndSettings {
+		public final AccessExecutionGraph graph;
+		public final JobCheckpointingSettings snapshottingSettings;
+		public final ExternalizedCheckpointSettings externalizedSettings;
+
+		public GraphAndSettings(
+				AccessExecutionGraph graph,
+				JobCheckpointingSettings snapshottingSettings,
+				ExternalizedCheckpointSettings externalizedSettings) {
+			this.graph = graph;
+			this.snapshottingSettings = snapshottingSettings;
+			this.externalizedSettings = externalizedSettings;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCacheTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCacheTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCacheTest.java
new file mode 100644
index 0000000..04b1c55
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCacheTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.checkpoints;
+
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the CheckpoitnStatsCache.
+ */
+public class CheckpointStatsCacheTest {
+
+	@Test
+	public void testZeroSizeCache() throws Exception {
+		AbstractCheckpointStats checkpoint = createCheckpoint(0, CheckpointStatsStatus.COMPLETED);
+
+		CheckpointStatsCache cache = new CheckpointStatsCache(0);
+		cache.tryAdd(checkpoint);
+		assertNull(cache.tryGet(0L));
+	}
+
+	@Test
+	public void testCacheAddAndGet() throws Exception {
+		AbstractCheckpointStats chk0 = createCheckpoint(0, CheckpointStatsStatus.COMPLETED);
+		AbstractCheckpointStats chk1 = createCheckpoint(1, CheckpointStatsStatus.COMPLETED);
+		AbstractCheckpointStats chk2 = createCheckpoint(2, CheckpointStatsStatus.IN_PROGRESS);
+
+		CheckpointStatsCache cache = new CheckpointStatsCache(1);
+		cache.tryAdd(chk0);
+		assertEquals(chk0, cache.tryGet(0));
+
+		cache.tryAdd(chk1);
+		assertNull(cache.tryGet(0));
+		assertEquals(chk1, cache.tryGet(1));
+
+		cache.tryAdd(chk2);
+		assertNull(cache.tryGet(2));
+		assertNull(cache.tryGet(0));
+		assertEquals(chk1, cache.tryGet(1));
+	}
+
+	private AbstractCheckpointStats createCheckpoint(long id, CheckpointStatsStatus status) {
+		AbstractCheckpointStats checkpoint = mock(AbstractCheckpointStats.class);
+		when(checkpoint.getCheckpointId()).thenReturn(id);
+		when(checkpoint.getStatus()).thenReturn(status);
+		return checkpoint;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandlerTest.java
new file mode 100644
index 0000000..e614608
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandlerTest.java
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.checkpoints;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointProperties;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.PendingCheckpointStats;
+import org.apache.flink.runtime.checkpoint.TaskStateStats;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the CheckpointStatsDetailsHandler.
+ */
+public class CheckpointStatsDetailsHandlerTest {
+
+	@Test
+	public void testArchiver() throws IOException {
+		JsonArchivist archivist = new CheckpointStatsDetailsHandler.CheckpointStatsDetailsJsonArchivist();
+
+		CompletedCheckpointStats completedCheckpoint = createCompletedCheckpoint();
+		FailedCheckpointStats failedCheckpoint = createFailedCheckpoint();
+		List<AbstractCheckpointStats> checkpoints = new ArrayList<>();
+		checkpoints.add(failedCheckpoint);
+		checkpoints.add(completedCheckpoint);
+
+		CheckpointStatsHistory history = mock(CheckpointStatsHistory.class);
+		when(history.getCheckpoints()).thenReturn(checkpoints);
+		CheckpointStatsSnapshot snapshot = mock(CheckpointStatsSnapshot.class);
+		when(snapshot.getHistory()).thenReturn(history);
+
+		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
+		when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
+		when(graph.getJobID()).thenReturn(new JobID());
+
+		ObjectMapper mapper = new ObjectMapper();
+
+		Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(graph);
+		Assert.assertEquals(2, archives.size());
+
+		Iterator<ArchivedJson> iterator = archives.iterator();
+		ArchivedJson archive1 = iterator.next();
+		Assert.assertEquals(
+			"/jobs/" + graph.getJobID() + "/checkpoints/details/" + failedCheckpoint.getCheckpointId(),
+			archive1.getPath());
+		compareFailedCheckpoint(failedCheckpoint, mapper.readTree(archive1.getJson()));
+
+		ArchivedJson archive2 = iterator.next();
+		Assert.assertEquals(
+			"/jobs/" + graph.getJobID() + "/checkpoints/details/" + completedCheckpoint.getCheckpointId(),
+			archive2.getPath());
+		compareCompletedCheckpoint(completedCheckpoint, mapper.readTree(archive2.getJson()));
+	}
+
+	@Test
+	public void testGetPaths() {
+		CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
+		String[] paths = handler.getPaths();
+		Assert.assertEquals(1, paths.length);
+		Assert.assertEquals("/jobs/:jobid/checkpoints/details/:checkpointid", paths[0]);
+	}
+
+	/**
+	 * Tests request with illegal checkpoint ID param.
+	 */
+	@Test
+	public void testIllegalCheckpointId() throws Exception {
+		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
+		CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
+		Map<String, String> params = new HashMap<>();
+		params.put("checkpointid", "illegal checkpoint");
+		String json = handler.handleRequest(graph, params).get();
+
+		assertEquals("{}", json);
+	}
+
+	/**
+	 * Tests request with missing checkpoint ID param.
+	 */
+	@Test
+	public void testNoCheckpointIdParam() throws Exception {
+		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
+		CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
+		String json = handler.handleRequest(graph, Collections.<String, String>emptyMap()).get();
+
+		assertEquals("{}", json);
+	}
+
+	/**
+	 * Test lookup of not existing checkpoint in history.
+	 */
+	@Test
+	public void testCheckpointNotFound() throws Exception {
+		CheckpointStatsHistory history = mock(CheckpointStatsHistory.class);
+		when(history.getCheckpointById(anyLong())).thenReturn(null); // not found
+
+		CheckpointStatsSnapshot snapshot = mock(CheckpointStatsSnapshot.class);
+		when(snapshot.getHistory()).thenReturn(history);
+
+		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
+		when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
+
+		CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
+		Map<String, String> params = new HashMap<>();
+		params.put("checkpointid", "123");
+		String json = handler.handleRequest(graph, params).get();
+
+		assertEquals("{}", json);
+		verify(history, times(1)).getCheckpointById(anyLong());
+	}
+
+	/**
+	 * Tests a checkpoint details request for an in progress checkpoint.
+	 */
+	@Test
+	public void testCheckpointDetailsRequestInProgressCheckpoint() throws Exception {
+		PendingCheckpointStats checkpoint = mock(PendingCheckpointStats.class);
+		when(checkpoint.getCheckpointId()).thenReturn(1992139L);
+		when(checkpoint.getStatus()).thenReturn(CheckpointStatsStatus.IN_PROGRESS);
+		when(checkpoint.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint());
+		when(checkpoint.getTriggerTimestamp()).thenReturn(1919191900L);
+		when(checkpoint.getLatestAckTimestamp()).thenReturn(1977791901L);
+		when(checkpoint.getStateSize()).thenReturn(111939272822L);
+		when(checkpoint.getEndToEndDuration()).thenReturn(121191L);
+		when(checkpoint.getAlignmentBuffered()).thenReturn(1L);
+		when(checkpoint.getNumberOfSubtasks()).thenReturn(501);
+		when(checkpoint.getNumberOfAcknowledgedSubtasks()).thenReturn(101);
+
+		List<TaskStateStats> taskStats = new ArrayList<>();
+		TaskStateStats task1 = createTaskStateStats();
+		TaskStateStats task2 = createTaskStateStats();
+		taskStats.add(task1);
+		taskStats.add(task2);
+
+		when(checkpoint.getAllTaskStateStats()).thenReturn(taskStats);
+
+		JsonNode rootNode = triggerRequest(checkpoint);
+
+		assertEquals(checkpoint.getCheckpointId(), rootNode.get("id").asLong());
+		assertEquals(checkpoint.getStatus().toString(), rootNode.get("status").asText());
+		assertEquals(checkpoint.getProperties().isSavepoint(), rootNode.get("is_savepoint").asBoolean());
+		assertEquals(checkpoint.getTriggerTimestamp(), rootNode.get("trigger_timestamp").asLong());
+		assertEquals(checkpoint.getLatestAckTimestamp(), rootNode.get("latest_ack_timestamp").asLong());
+		assertEquals(checkpoint.getStateSize(), rootNode.get("state_size").asLong());
+		assertEquals(checkpoint.getEndToEndDuration(), rootNode.get("end_to_end_duration").asLong());
+		assertEquals(checkpoint.getAlignmentBuffered(), rootNode.get("alignment_buffered").asLong());
+		assertEquals(checkpoint.getNumberOfSubtasks(), rootNode.get("num_subtasks").asInt());
+		assertEquals(checkpoint.getNumberOfAcknowledgedSubtasks(), rootNode.get("num_acknowledged_subtasks").asInt());
+
+		verifyTaskNodes(taskStats, rootNode);
+	}
+
+	/**
+	 * Tests a checkpoint details request for a completed checkpoint.
+	 */
+	@Test
+	public void testCheckpointDetailsRequestCompletedCheckpoint() throws Exception {
+		CompletedCheckpointStats checkpoint = createCompletedCheckpoint();
+
+		JsonNode rootNode = triggerRequest(checkpoint);
+
+		compareCompletedCheckpoint(checkpoint, rootNode);
+
+		verifyTaskNodes(checkpoint.getAllTaskStateStats(), rootNode);
+	}
+
+	/**
+	 * Tests a checkpoint details request for a failed checkpoint.
+	 */
+	@Test
+	public void testCheckpointDetailsRequestFailedCheckpoint() throws Exception {
+		FailedCheckpointStats checkpoint = createFailedCheckpoint();
+
+		JsonNode rootNode = triggerRequest(checkpoint);
+
+		compareFailedCheckpoint(checkpoint, rootNode);
+
+		verifyTaskNodes(checkpoint.getAllTaskStateStats(), rootNode);
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static CompletedCheckpointStats createCompletedCheckpoint() {
+		CompletedCheckpointStats checkpoint = mock(CompletedCheckpointStats.class);
+		when(checkpoint.getCheckpointId()).thenReturn(1818213L);
+		when(checkpoint.getStatus()).thenReturn(CheckpointStatsStatus.COMPLETED);
+		when(checkpoint.getProperties()).thenReturn(CheckpointProperties.forStandardSavepoint());
+		when(checkpoint.getTriggerTimestamp()).thenReturn(1818L);
+		when(checkpoint.getLatestAckTimestamp()).thenReturn(11029222L);
+		when(checkpoint.getStateSize()).thenReturn(925281L);
+		when(checkpoint.getEndToEndDuration()).thenReturn(181819L);
+		when(checkpoint.getAlignmentBuffered()).thenReturn(1010198L);
+		when(checkpoint.getNumberOfSubtasks()).thenReturn(181271);
+		when(checkpoint.getNumberOfAcknowledgedSubtasks()).thenReturn(29821);
+		when(checkpoint.isDiscarded()).thenReturn(true);
+		when(checkpoint.getExternalPath()).thenReturn("checkpoint-external-path");
+
+		List<TaskStateStats> taskStats = new ArrayList<>();
+		TaskStateStats task1 = createTaskStateStats();
+		TaskStateStats task2 = createTaskStateStats();
+		taskStats.add(task1);
+		taskStats.add(task2);
+
+		when(checkpoint.getAllTaskStateStats()).thenReturn(taskStats);
+
+		return checkpoint;
+	}
+
+	private static void compareCompletedCheckpoint(CompletedCheckpointStats checkpoint, JsonNode rootNode) {
+		assertEquals(checkpoint.getCheckpointId(), rootNode.get("id").asLong());
+		assertEquals(checkpoint.getStatus().toString(), rootNode.get("status").asText());
+		assertEquals(checkpoint.getProperties().isSavepoint(), rootNode.get("is_savepoint").asBoolean());
+		assertEquals(checkpoint.getTriggerTimestamp(), rootNode.get("trigger_timestamp").asLong());
+		assertEquals(checkpoint.getLatestAckTimestamp(), rootNode.get("latest_ack_timestamp").asLong());
+		assertEquals(checkpoint.getStateSize(), rootNode.get("state_size").asLong());
+		assertEquals(checkpoint.getEndToEndDuration(), rootNode.get("end_to_end_duration").asLong());
+		assertEquals(checkpoint.getAlignmentBuffered(), rootNode.get("alignment_buffered").asLong());
+		assertEquals(checkpoint.isDiscarded(), rootNode.get("discarded").asBoolean());
+		assertEquals(checkpoint.getExternalPath(), rootNode.get("external_path").asText());
+		assertEquals(checkpoint.getNumberOfSubtasks(), rootNode.get("num_subtasks").asInt());
+		assertEquals(checkpoint.getNumberOfAcknowledgedSubtasks(), rootNode.get("num_acknowledged_subtasks").asInt());
+	}
+
+	private static FailedCheckpointStats createFailedCheckpoint() {
+		FailedCheckpointStats checkpoint = mock(FailedCheckpointStats.class);
+		when(checkpoint.getCheckpointId()).thenReturn(1818214L);
+		when(checkpoint.getStatus()).thenReturn(CheckpointStatsStatus.FAILED);
+		when(checkpoint.getProperties()).thenReturn(CheckpointProperties.forStandardSavepoint());
+		when(checkpoint.getTriggerTimestamp()).thenReturn(1818L);
+		when(checkpoint.getLatestAckTimestamp()).thenReturn(11029222L);
+		when(checkpoint.getStateSize()).thenReturn(925281L);
+		when(checkpoint.getEndToEndDuration()).thenReturn(181819L);
+		when(checkpoint.getAlignmentBuffered()).thenReturn(1010198L);
+		when(checkpoint.getNumberOfSubtasks()).thenReturn(181271);
+		when(checkpoint.getNumberOfAcknowledgedSubtasks()).thenReturn(29821);
+		when(checkpoint.getFailureTimestamp()).thenReturn(123012890312093L);
+		when(checkpoint.getFailureMessage()).thenReturn("failure-message");
+
+		List<TaskStateStats> taskStats = new ArrayList<>();
+		TaskStateStats task1 = createTaskStateStats();
+		TaskStateStats task2 = createTaskStateStats();
+		taskStats.add(task1);
+		taskStats.add(task2);
+
+		when(checkpoint.getAllTaskStateStats()).thenReturn(taskStats);
+
+		return checkpoint;
+	}
+
+	private static void compareFailedCheckpoint(FailedCheckpointStats checkpoint, JsonNode rootNode) {
+		assertEquals(checkpoint.getCheckpointId(), rootNode.get("id").asLong());
+		assertEquals(checkpoint.getStatus().toString(), rootNode.get("status").asText());
+		assertEquals(checkpoint.getProperties().isSavepoint(), rootNode.get("is_savepoint").asBoolean());
+		assertEquals(checkpoint.getTriggerTimestamp(), rootNode.get("trigger_timestamp").asLong());
+		assertEquals(checkpoint.getLatestAckTimestamp(), rootNode.get("latest_ack_timestamp").asLong());
+		assertEquals(checkpoint.getStateSize(), rootNode.get("state_size").asLong());
+		assertEquals(checkpoint.getEndToEndDuration(), rootNode.get("end_to_end_duration").asLong());
+		assertEquals(checkpoint.getAlignmentBuffered(), rootNode.get("alignment_buffered").asLong());
+		assertEquals(checkpoint.getFailureTimestamp(), rootNode.get("failure_timestamp").asLong());
+		assertEquals(checkpoint.getFailureMessage(), rootNode.get("failure_message").asText());
+		assertEquals(checkpoint.getNumberOfSubtasks(), rootNode.get("num_subtasks").asInt());
+		assertEquals(checkpoint.getNumberOfAcknowledgedSubtasks(), rootNode.get("num_acknowledged_subtasks").asInt());
+	}
+
+	private static JsonNode triggerRequest(AbstractCheckpointStats checkpoint) throws Exception {
+		CheckpointStatsHistory history = mock(CheckpointStatsHistory.class);
+		when(history.getCheckpointById(anyLong())).thenReturn(checkpoint);
+		CheckpointStatsSnapshot snapshot = mock(CheckpointStatsSnapshot.class);
+		when(snapshot.getHistory()).thenReturn(history);
+
+		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
+		when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
+
+		CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
+		Map<String, String> params = new HashMap<>();
+		params.put("checkpointid", "123");
+		String json = handler.handleRequest(graph, params).get();
+
+		ObjectMapper mapper = new ObjectMapper();
+		return mapper.readTree(json);
+	}
+
+	private static void verifyTaskNodes(Collection<TaskStateStats> tasks, JsonNode parentNode) {
+		for (TaskStateStats task : tasks) {
+			long duration = ThreadLocalRandom.current().nextInt(128);
+
+			JsonNode taskNode = parentNode.get("tasks").get(task.getJobVertexId().toString());
+			assertEquals(task.getLatestAckTimestamp(), taskNode.get("latest_ack_timestamp").asLong());
+			assertEquals(task.getStateSize(), taskNode.get("state_size").asLong());
+			assertEquals(task.getEndToEndDuration(task.getLatestAckTimestamp() - duration), taskNode.get("end_to_end_duration").asLong());
+			assertEquals(task.getAlignmentBuffered(), taskNode.get("alignment_buffered").asLong());
+			assertEquals(task.getNumberOfSubtasks(), taskNode.get("num_subtasks").asInt());
+			assertEquals(task.getNumberOfAcknowledgedSubtasks(), taskNode.get("num_acknowledged_subtasks").asInt());
+		}
+	}
+
+	private static TaskStateStats createTaskStateStats() {
+		ThreadLocalRandom rand = ThreadLocalRandom.current();
+
+		TaskStateStats task = mock(TaskStateStats.class);
+		when(task.getJobVertexId()).thenReturn(new JobVertexID());
+		when(task.getLatestAckTimestamp()).thenReturn(rand.nextLong(1024) + 1);
+		when(task.getStateSize()).thenReturn(rand.nextLong(1024) + 1);
+		when(task.getEndToEndDuration(anyLong())).thenReturn(rand.nextLong(1024) + 1);
+		when(task.getAlignmentBuffered()).thenReturn(rand.nextLong(1024) + 1);
+		when(task.getNumberOfSubtasks()).thenReturn(rand.nextInt(1024) + 1);
+		when(task.getNumberOfAcknowledgedSubtasks()).thenReturn(rand.nextInt(1024) + 1);
+		return task;
+	}
+}


Mime
View raw message