flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject [1/2] flink git commit: [FLINK-5941] Integrate Archiver pattern into handlers
Date Thu, 02 Mar 2017 17:27:39 GMT
Repository: flink
Updated Branches:
  refs/heads/master 243ef69bf -> 7fe0eb477


http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java
index 939f439..0076d42 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java
@@ -21,12 +21,34 @@ import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.util.Collection;
+
 public class SubtasksTimesHandlerTest {
+
+	@Test
+	public void testArchiver() throws Exception {
+		JsonArchivist archivist = new SubtasksTimesHandler.SubtasksTimesJsonArchivist();
+		AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+		AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask();
+		AccessExecution originalAttempt = ArchivedJobGenerationUtils.getTestAttempt();
+
+		Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
+		Assert.assertEquals(1, archives.size());
+
+		ArchivedJson archive = archives.iterator().next();
+		Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/vertices/" + originalTask.getJobVertexId()
+ "/subtasktimes", archive.getPath());
+		compareSubtaskTimes(originalTask, originalAttempt, archive.getJson());
+	}
+
 	@Test
 	public void testGetPaths() {
 		SubtasksTimesHandler handler = new SubtasksTimesHandler(null);
@@ -41,6 +63,10 @@ public class SubtasksTimesHandlerTest {
 		AccessExecution originalAttempt = ArchivedJobGenerationUtils.getTestAttempt();
 		String json = SubtasksTimesHandler.createSubtaskTimesJson(originalTask);
 
+		compareSubtaskTimes(originalTask, originalAttempt, json);
+	}
+	
+	private static void compareSubtaskTimes(AccessExecutionJobVertex originalTask, AccessExecution
originalAttempt, String json) throws IOException {
 		JsonNode result = ArchivedJobGenerationUtils.mapper.readTree(json);
 
 		Assert.assertEquals(originalTask.getJobVertexId().toString(), result.get("id").asText());

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
index e570e18..9d339f5 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
@@ -20,14 +20,20 @@ package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.util.Collection;
 import java.util.Collections;
 
 import static org.junit.Assert.assertEquals;
@@ -38,6 +44,37 @@ import static org.mockito.Mockito.when;
 public class CheckpointConfigHandlerTest {
 
 	@Test
+	public void testArchiver() throws IOException {
+		JsonArchivist archivist = new CheckpointConfigHandler.CheckpointConfigJsonArchivist();
+		GraphAndSettings graphAndSettings = createGraphAndSettings(true, true);
+
+		AccessExecutionGraph graph = graphAndSettings.graph;
+		when(graph.getJobID()).thenReturn(new JobID());
+		JobSnapshottingSettings settings = graphAndSettings.snapshottingSettings;
+		ExternalizedCheckpointSettings externalizedSettings = graphAndSettings.externalizedSettings;
+		
+		Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(graph);
+		Assert.assertEquals(1, archives.size());
+		ArchivedJson archive = archives.iterator().next();
+		Assert.assertEquals("/jobs/" + graph.getJobID() + "/checkpoints/config", archive.getPath());
+
+		ObjectMapper mapper = new ObjectMapper();
+		JsonNode rootNode = mapper.readTree(archive.getJson());
+
+		Assert.assertEquals("exactly_once", rootNode.get("mode").asText());
+		Assert.assertEquals(settings.getCheckpointInterval(), rootNode.get("interval").asLong());
+		Assert.assertEquals(settings.getCheckpointTimeout(), rootNode.get("timeout").asLong());
+		Assert.assertEquals(settings.getMinPauseBetweenCheckpoints(), rootNode.get("min_pause").asLong());
+		Assert.assertEquals(settings.getMaxConcurrentCheckpoints(), rootNode.get("max_concurrent").asInt());
+
+		JsonNode externalizedNode = rootNode.get("externalization");
+		Assert.assertNotNull(externalizedNode);
+		Assert.assertEquals(externalizedSettings.externalizeCheckpoints(), externalizedNode.get("enabled").asBoolean());
+		Assert.assertEquals(externalizedSettings.deleteOnCancellation(), externalizedNode.get("delete_on_cancellation").asBoolean());
+
+	}
+
+	@Test
 	public void testGetPaths() {
 		CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class));
 		String[] paths = handler.getPaths();
@@ -50,26 +87,10 @@ public class CheckpointConfigHandlerTest {
 	 */
 	@Test
 	public void testSimpleConfig() throws Exception {
-		long interval = 18231823L;
-		long timeout = 996979L;
-		long minPause = 119191919L;
-		int maxConcurrent = 12929329;
-		ExternalizedCheckpointSettings externalized = ExternalizedCheckpointSettings.none();
+		GraphAndSettings graphAndSettings = createGraphAndSettings(false, true);
 
-		JobSnapshottingSettings settings = new JobSnapshottingSettings(
-			Collections.<JobVertexID>emptyList(),
-			Collections.<JobVertexID>emptyList(),
-			Collections.<JobVertexID>emptyList(),
-			interval,
-			timeout,
-			minPause,
-			maxConcurrent,
-			externalized,
-			null,
-			true);
-
-		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
-		when(graph.getJobSnapshottingSettings()).thenReturn(settings);
+		AccessExecutionGraph graph = graphAndSettings.graph;
+		JobSnapshottingSettings settings = graphAndSettings.snapshottingSettings;
 
 		CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class));
 		String json = handler.handleRequest(graph, Collections.<String, String>emptyMap());
@@ -78,10 +99,10 @@ public class CheckpointConfigHandlerTest {
 		JsonNode rootNode = mapper.readTree(json);
 
 		assertEquals("exactly_once", rootNode.get("mode").asText());
-		assertEquals(interval, rootNode.get("interval").asLong());
-		assertEquals(timeout, rootNode.get("timeout").asLong());
-		assertEquals(minPause, rootNode.get("min_pause").asLong());
-		assertEquals(maxConcurrent, rootNode.get("max_concurrent").asInt());
+		assertEquals(settings.getCheckpointInterval(), rootNode.get("interval").asLong());
+		assertEquals(settings.getCheckpointTimeout(), rootNode.get("timeout").asLong());
+		assertEquals(settings.getMinPauseBetweenCheckpoints(), rootNode.get("min_pause").asLong());
+		assertEquals(settings.getMaxConcurrentCheckpoints(), rootNode.get("max_concurrent").asInt());
 
 		JsonNode externalizedNode = rootNode.get("externalization");
 		assertNotNull(externalizedNode);
@@ -93,20 +114,9 @@ public class CheckpointConfigHandlerTest {
 	 */
 	@Test
 	public void testAtLeastOnce() throws Exception {
-		JobSnapshottingSettings settings = new JobSnapshottingSettings(
-			Collections.<JobVertexID>emptyList(),
-			Collections.<JobVertexID>emptyList(),
-			Collections.<JobVertexID>emptyList(),
-			996979L,
-			1818L,
-			1212L,
-			12,
-			ExternalizedCheckpointSettings.none(),
-			null,
-			false); // at least once
+		GraphAndSettings graphAndSettings = createGraphAndSettings(false, false);
 
-		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
-		when(graph.getJobSnapshottingSettings()).thenReturn(settings);
+		AccessExecutionGraph graph = graphAndSettings.graph;
 
 		CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class));
 		String json = handler.handleRequest(graph, Collections.<String, String>emptyMap());
@@ -122,30 +132,60 @@ public class CheckpointConfigHandlerTest {
 	 */
 	@Test
 	public void testEnabledExternalizedCheckpointSettings() throws Exception {
-		ExternalizedCheckpointSettings externalizedSettings = ExternalizedCheckpointSettings.externalizeCheckpoints(true);
+		GraphAndSettings graphAndSettings = createGraphAndSettings(true, false);
+
+		AccessExecutionGraph graph = graphAndSettings.graph;
+		ExternalizedCheckpointSettings externalizedSettings = graphAndSettings.externalizedSettings;
+
+		CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class));
+		String json = handler.handleRequest(graph, Collections.<String, String>emptyMap());
+
+		ObjectMapper mapper = new ObjectMapper();
+		JsonNode externalizedNode = mapper.readTree(json).get("externalization");
+		assertNotNull(externalizedNode);
+		assertEquals(externalizedSettings.externalizeCheckpoints(), externalizedNode.get("enabled").asBoolean());
+		assertEquals(externalizedSettings.deleteOnCancellation(), externalizedNode.get("delete_on_cancellation").asBoolean());
+	}
+
+	private static GraphAndSettings createGraphAndSettings(boolean externalized, boolean exactlyOnce)
{
+		long interval = 18231823L;
+		long timeout = 996979L;
+		long minPause = 119191919L;
+		int maxConcurrent = 12929329;
+		ExternalizedCheckpointSettings externalizedSetting = externalized
+			? ExternalizedCheckpointSettings.externalizeCheckpoints(true)
+			: ExternalizedCheckpointSettings.none();
 
 		JobSnapshottingSettings settings = new JobSnapshottingSettings(
 			Collections.<JobVertexID>emptyList(),
 			Collections.<JobVertexID>emptyList(),
 			Collections.<JobVertexID>emptyList(),
-			996979L,
-			1818L,
-			1212L,
-			12,
-			externalizedSettings,
+			interval,
+			timeout,
+			minPause,
+			maxConcurrent,
+			externalizedSetting,
 			null,
-			false); // at least once
+			exactlyOnce);
 
 		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
 		when(graph.getJobSnapshottingSettings()).thenReturn(settings);
 
-		CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class));
-		String json = handler.handleRequest(graph, Collections.<String, String>emptyMap());
+		return new GraphAndSettings(graph, settings, externalizedSetting);
+	}
 
-		ObjectMapper mapper = new ObjectMapper();
-		JsonNode externalizedNode = mapper.readTree(json).get("externalization");
-		assertNotNull(externalizedNode);
-		assertEquals(externalizedSettings.externalizeCheckpoints(), externalizedNode.get("enabled").asBoolean());
-		assertEquals(externalizedSettings.deleteOnCancellation(), externalizedNode.get("delete_on_cancellation").asBoolean());
+	private static class GraphAndSettings {
+		public final AccessExecutionGraph graph;
+		public final JobSnapshottingSettings snapshottingSettings;
+		public final ExternalizedCheckpointSettings externalizedSettings;
+
+		public GraphAndSettings(
+				AccessExecutionGraph graph,
+				JobSnapshottingSettings snapshottingSettings,
+				ExternalizedCheckpointSettings externalizedSettings) {
+			this.graph = graph;
+			this.snapshottingSettings = snapshottingSettings;
+			this.externalizedSettings = externalizedSettings;
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
index ca9b606..770b032 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
 import org.apache.flink.runtime.checkpoint.CheckpointProperties;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
@@ -32,12 +33,19 @@ import org.apache.flink.runtime.checkpoint.TaskStateStats;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
@@ -50,6 +58,44 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class CheckpointStatsDetailsHandlerTest {
+	
+	@Test
+	public void testArchiver() throws IOException {
+		JsonArchivist archivist = new CheckpointStatsDetailsHandler.CheckpointStatsDetailsJsonArchivist();
+
+		CompletedCheckpointStats completedCheckpoint = createCompletedCheckpoint();
+		FailedCheckpointStats failedCheckpoint = createFailedCheckpoint();
+		List<AbstractCheckpointStats> checkpoints = new ArrayList<>();
+		checkpoints.add(failedCheckpoint);
+		checkpoints.add(completedCheckpoint);
+		
+		CheckpointStatsHistory history = mock(CheckpointStatsHistory.class);
+		when(history.getCheckpoints()).thenReturn(checkpoints);
+		CheckpointStatsSnapshot snapshot = mock(CheckpointStatsSnapshot.class);
+		when(snapshot.getHistory()).thenReturn(history);
+
+		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
+		when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
+		when(graph.getJobID()).thenReturn(new JobID());
+
+		ObjectMapper mapper = new ObjectMapper();
+		
+		Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(graph);
+		Assert.assertEquals(2, archives.size());
+		
+		Iterator<ArchivedJson> iterator = archives.iterator();
+		ArchivedJson archive1 = iterator.next();
+		Assert.assertEquals(
+			"/jobs/" + graph.getJobID() + "/checkpoints/details/" + failedCheckpoint.getCheckpointId(),
+			archive1.getPath());
+		compareFailedCheckpoint(failedCheckpoint, mapper.readTree(archive1.getJson()));
+
+		ArchivedJson archive2 = iterator.next();
+		Assert.assertEquals(
+			"/jobs/" + graph.getJobID() + "/checkpoints/details/" + completedCheckpoint.getCheckpointId(),
+			archive2.getPath());
+		compareCompletedCheckpoint(completedCheckpoint, mapper.readTree(archive2.getJson()));
+	}
 
 	@Test
 	public void testGetPaths() {
@@ -146,8 +192,7 @@ public class CheckpointStatsDetailsHandlerTest {
 		assertEquals(checkpoint.getNumberOfSubtasks(), rootNode.get("num_subtasks").asInt());
 		assertEquals(checkpoint.getNumberOfAcknowledgedSubtasks(), rootNode.get("num_acknowledged_subtasks").asInt());
 
-		verifyTaskNode(task1, rootNode);
-		verifyTaskNode(task2, rootNode);
+		verifyTaskNodes(taskStats, rootNode);
 	}
 
 	/**
@@ -155,6 +200,32 @@ public class CheckpointStatsDetailsHandlerTest {
 	 */
 	@Test
 	public void testCheckpointDetailsRequestCompletedCheckpoint() throws Exception {
+		CompletedCheckpointStats checkpoint = createCompletedCheckpoint();
+
+		JsonNode rootNode = triggerRequest(checkpoint);
+
+		compareCompletedCheckpoint(checkpoint, rootNode);
+
+		verifyTaskNodes(checkpoint.getAllTaskStateStats(), rootNode);
+	}
+
+	/**
+	 * Tests a checkpoint details request for a failed checkpoint.
+	 */
+	@Test
+	public void testCheckpointDetailsRequestFailedCheckpoint() throws Exception {
+		FailedCheckpointStats checkpoint = createFailedCheckpoint();
+
+		JsonNode rootNode = triggerRequest(checkpoint);
+
+		compareFailedCheckpoint(checkpoint, rootNode);
+
+		verifyTaskNodes(checkpoint.getAllTaskStateStats(), rootNode);
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static CompletedCheckpointStats createCompletedCheckpoint() {
 		CompletedCheckpointStats checkpoint = mock(CompletedCheckpointStats.class);
 		when(checkpoint.getCheckpointId()).thenReturn(1818213L);
 		when(checkpoint.getStatus()).thenReturn(CheckpointStatsStatus.COMPLETED);
@@ -177,8 +248,10 @@ public class CheckpointStatsDetailsHandlerTest {
 
 		when(checkpoint.getAllTaskStateStats()).thenReturn(taskStats);
 
-		JsonNode rootNode = triggerRequest(checkpoint);
+		return checkpoint;
+	}
 
+	private static void compareCompletedCheckpoint(CompletedCheckpointStats checkpoint, JsonNode
rootNode) {
 		assertEquals(checkpoint.getCheckpointId(), rootNode.get("id").asLong());
 		assertEquals(checkpoint.getStatus().toString(), rootNode.get("status").asText());
 		assertEquals(checkpoint.getProperties().isSavepoint(), rootNode.get("is_savepoint").asBoolean());
@@ -191,18 +264,11 @@ public class CheckpointStatsDetailsHandlerTest {
 		assertEquals(checkpoint.getExternalPath(), rootNode.get("external_path").asText());
 		assertEquals(checkpoint.getNumberOfSubtasks(), rootNode.get("num_subtasks").asInt());
 		assertEquals(checkpoint.getNumberOfAcknowledgedSubtasks(), rootNode.get("num_acknowledged_subtasks").asInt());
-
-		verifyTaskNode(task1, rootNode);
-		verifyTaskNode(task2, rootNode);
 	}
 
-	/**
-	 * Tests a checkpoint details request for a failed checkpoint.
-	 */
-	@Test
-	public void testCheckpointDetailsRequestFailedCheckpoint() throws Exception {
+	private static FailedCheckpointStats createFailedCheckpoint() {
 		FailedCheckpointStats checkpoint = mock(FailedCheckpointStats.class);
-		when(checkpoint.getCheckpointId()).thenReturn(1818213L);
+		when(checkpoint.getCheckpointId()).thenReturn(1818214L);
 		when(checkpoint.getStatus()).thenReturn(CheckpointStatsStatus.FAILED);
 		when(checkpoint.getProperties()).thenReturn(CheckpointProperties.forStandardSavepoint());
 		when(checkpoint.getTriggerTimestamp()).thenReturn(1818L);
@@ -223,8 +289,10 @@ public class CheckpointStatsDetailsHandlerTest {
 
 		when(checkpoint.getAllTaskStateStats()).thenReturn(taskStats);
 
-		JsonNode rootNode = triggerRequest(checkpoint);
+		return checkpoint;
+	}
 
+	private static void compareFailedCheckpoint(FailedCheckpointStats checkpoint, JsonNode rootNode)
{
 		assertEquals(checkpoint.getCheckpointId(), rootNode.get("id").asLong());
 		assertEquals(checkpoint.getStatus().toString(), rootNode.get("status").asText());
 		assertEquals(checkpoint.getProperties().isSavepoint(), rootNode.get("is_savepoint").asBoolean());
@@ -237,13 +305,8 @@ public class CheckpointStatsDetailsHandlerTest {
 		assertEquals(checkpoint.getFailureMessage(), rootNode.get("failure_message").asText());
 		assertEquals(checkpoint.getNumberOfSubtasks(), rootNode.get("num_subtasks").asInt());
 		assertEquals(checkpoint.getNumberOfAcknowledgedSubtasks(), rootNode.get("num_acknowledged_subtasks").asInt());
-
-		verifyTaskNode(task1, rootNode);
-		verifyTaskNode(task2, rootNode);
 	}
 
-	// ------------------------------------------------------------------------
-
 	private static JsonNode triggerRequest(AbstractCheckpointStats checkpoint) throws Exception
{
 		CheckpointStatsHistory history = mock(CheckpointStatsHistory.class);
 		when(history.getCheckpointById(anyLong())).thenReturn(checkpoint);
@@ -262,16 +325,18 @@ public class CheckpointStatsDetailsHandlerTest {
 		return mapper.readTree(json);
 	}
 
-	private static void verifyTaskNode(TaskStateStats task, JsonNode parentNode) {
-		long duration = ThreadLocalRandom.current().nextInt(128);
-
-		JsonNode taskNode = parentNode.get("tasks").get(task.getJobVertexId().toString());
-		assertEquals(task.getLatestAckTimestamp(), taskNode.get("latest_ack_timestamp").asLong());
-		assertEquals(task.getStateSize(), taskNode.get("state_size").asLong());
-		assertEquals(task.getEndToEndDuration(task.getLatestAckTimestamp() - duration), taskNode.get("end_to_end_duration").asLong());
-		assertEquals(task.getAlignmentBuffered(), taskNode.get("alignment_buffered").asLong());
-		assertEquals(task.getNumberOfSubtasks(), taskNode.get("num_subtasks").asInt());
-		assertEquals(task.getNumberOfAcknowledgedSubtasks(), taskNode.get("num_acknowledged_subtasks").asInt());
+	private static void verifyTaskNodes(Collection<TaskStateStats> tasks, JsonNode parentNode)
{
+		for (TaskStateStats task : tasks) {
+			long duration = ThreadLocalRandom.current().nextInt(128);
+
+			JsonNode taskNode = parentNode.get("tasks").get(task.getJobVertexId().toString());
+			assertEquals(task.getLatestAckTimestamp(), taskNode.get("latest_ack_timestamp").asLong());
+			assertEquals(task.getStateSize(), taskNode.get("state_size").asLong());
+			assertEquals(task.getEndToEndDuration(task.getLatestAckTimestamp() - duration), taskNode.get("end_to_end_duration").asLong());
+			assertEquals(task.getAlignmentBuffered(), taskNode.get("alignment_buffered").asLong());
+			assertEquals(task.getNumberOfSubtasks(), taskNode.get("num_subtasks").asInt());
+			assertEquals(task.getNumberOfAcknowledgedSubtasks(), taskNode.get("num_acknowledged_subtasks").asInt());
+		}
 	}
 
 	private static TaskStateStats createTaskStateStats() {

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
index ab7c7a3..1e4a255 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
 import org.apache.flink.runtime.checkpoint.CheckpointProperties;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsCounts;
@@ -34,10 +35,15 @@ import org.apache.flink.runtime.checkpoint.PendingCheckpointStats;
 import org.apache.flink.runtime.checkpoint.RestoredCheckpointStats;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -51,6 +57,32 @@ import static org.mockito.Mockito.when;
 public class CheckpointStatsHandlerTest {
 
 	@Test
+	public void testArchiver() throws IOException {
+		JsonArchivist archivist = new CheckpointStatsDetailsHandler.CheckpointStatsDetailsJsonArchivist();
+		TestCheckpointStats testCheckpointStats = createTestCheckpointStats();
+		when(testCheckpointStats.graph.getJobID()).thenReturn(new JobID());
+		
+		Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(testCheckpointStats.graph);
+		Assert.assertEquals(3, archives.size());
+
+		ObjectMapper mapper = new ObjectMapper();
+		
+		Iterator<ArchivedJson> iterator = archives.iterator();
+		ArchivedJson archive1 = iterator.next();
+		Assert.assertEquals("/jobs/" + testCheckpointStats.graph.getJobID() + "/checkpoints/details/"
+ testCheckpointStats.inProgress.getCheckpointId(), archive1.getPath());
+		compareInProgressCheckpoint(testCheckpointStats.inProgress, mapper.readTree(archive1.getJson()));
+
+		ArchivedJson archive2 = iterator.next();
+		Assert.assertEquals("/jobs/" + testCheckpointStats.graph.getJobID() + "/checkpoints/details/"
+ testCheckpointStats.completedSavepoint.getCheckpointId(), archive2.getPath());
+		compareCompletedSavepoint(testCheckpointStats.completedSavepoint, mapper.readTree(archive2.getJson()));
+		
+		ArchivedJson archive3 = iterator.next();
+		Assert.assertEquals("/jobs/" + testCheckpointStats.graph.getJobID() + "/checkpoints/details/"
+ testCheckpointStats.failed.getCheckpointId(), archive3.getPath());
+		compareFailedCheckpoint(testCheckpointStats.failed, mapper.readTree(archive3.getJson()));
+	}
+	
+
+	@Test
 	public void testGetPaths() {
 		CheckpointStatsHandler handler = new CheckpointStatsHandler(mock(ExecutionGraphHolder.class));
 		String[] paths = handler.getPaths();
@@ -63,6 +95,18 @@ public class CheckpointStatsHandlerTest {
 	 */
 	@Test
 	public void testCheckpointStatsRequest() throws Exception {
+		TestCheckpointStats testCheckpointStats = createTestCheckpointStats();
+
+		CheckpointStatsHandler handler = new CheckpointStatsHandler(mock(ExecutionGraphHolder.class));
+		String json = handler.handleRequest(testCheckpointStats.graph, Collections.<String,
String>emptyMap());
+
+		ObjectMapper mapper = new ObjectMapper();
+		JsonNode rootNode = mapper.readTree(json);
+
+		compareCheckpointStats(testCheckpointStats, rootNode);
+	}
+
+	private static TestCheckpointStats createTestCheckpointStats() {
 		// Counts
 		CheckpointStatsCounts counts = mock(CheckpointStatsCounts.class);
 		when(counts.getNumberOfRestoredCheckpoints()).thenReturn(123123123L);
@@ -104,7 +148,7 @@ public class CheckpointStatsHandlerTest {
 		when(latestCompleted.getExternalPath()).thenReturn("latest-completed-external-path");
 
 		CompletedCheckpointStats latestSavepoint = mock(CompletedCheckpointStats.class);
-		when(latestSavepoint.getCheckpointId()).thenReturn(1992139L);
+		when(latestSavepoint.getCheckpointId()).thenReturn(1992140L);
 		when(latestSavepoint.getTriggerTimestamp()).thenReturn(1919191900L);
 		when(latestSavepoint.getLatestAckTimestamp()).thenReturn(1977791901L);
 		when(latestSavepoint.getStateSize()).thenReturn(111939272822L);
@@ -133,7 +177,7 @@ public class CheckpointStatsHandlerTest {
 		List<AbstractCheckpointStats> checkpoints = new ArrayList<>();
 
 		PendingCheckpointStats inProgress = mock(PendingCheckpointStats.class);
-		when(inProgress.getCheckpointId()).thenReturn(1992139L);
+		when(inProgress.getCheckpointId()).thenReturn(1992141L);
 		when(inProgress.getStatus()).thenReturn(CheckpointStatsStatus.IN_PROGRESS);
 		when(inProgress.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint());
 		when(inProgress.getTriggerTimestamp()).thenReturn(1919191900L);
@@ -189,12 +233,15 @@ public class CheckpointStatsHandlerTest {
 		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
 		when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
 
-		CheckpointStatsHandler handler = new CheckpointStatsHandler(mock(ExecutionGraphHolder.class));
-		String json = handler.handleRequest(graph, Collections.<String, String>emptyMap());
-
-		ObjectMapper mapper = new ObjectMapper();
-		JsonNode rootNode = mapper.readTree(json);
+		return new TestCheckpointStats(
+			graph, counts, stateSizeSummary, durationSummary, alignmentBufferedSummary, summary,
+			latestCompleted, latestSavepoint, latestFailed, latestRestored, inProgress, 
+			completedSavepoint, failed, history, snapshot
+		);
+	}
 
+	private static void compareCheckpointStats(TestCheckpointStats checkpointStats, JsonNode
rootNode) {
+		CheckpointStatsCounts counts = checkpointStats.counts;
 		JsonNode countNode = rootNode.get("counts");
 		assertEquals(counts.getNumberOfRestoredCheckpoints(), countNode.get("restored").asLong());
 		assertEquals(counts.getTotalNumberOfCheckpoints(), countNode.get("total").asLong());
@@ -202,22 +249,26 @@ public class CheckpointStatsHandlerTest {
 		assertEquals(counts.getNumberOfCompletedCheckpoints(), countNode.get("completed").asLong());
 		assertEquals(counts.getNumberOfFailedCheckpoints(), countNode.get("failed").asLong());
 
+		MinMaxAvgStats stateSizeSummary = checkpointStats.stateSizeSummary;
 		JsonNode summaryNode = rootNode.get("summary");
 		JsonNode sizeSummaryNode = summaryNode.get("state_size");
 		assertEquals(stateSizeSummary.getMinimum(), sizeSummaryNode.get("min").asLong());
 		assertEquals(stateSizeSummary.getMaximum(), sizeSummaryNode.get("max").asLong());
 		assertEquals(stateSizeSummary.getAverage(), sizeSummaryNode.get("avg").asLong());
 
+		MinMaxAvgStats durationSummary = checkpointStats.durationSummary;
 		JsonNode durationSummaryNode = summaryNode.get("end_to_end_duration");
 		assertEquals(durationSummary.getMinimum(), durationSummaryNode.get("min").asLong());
 		assertEquals(durationSummary.getMaximum(), durationSummaryNode.get("max").asLong());
 		assertEquals(durationSummary.getAverage(), durationSummaryNode.get("avg").asLong());
 
+		MinMaxAvgStats alignmentBufferedSummary = checkpointStats.alignmentBufferedSummary;
 		JsonNode alignmentBufferedNode = summaryNode.get("alignment_buffered");
 		assertEquals(alignmentBufferedSummary.getMinimum(), alignmentBufferedNode.get("min").asLong());
 		assertEquals(alignmentBufferedSummary.getMaximum(), alignmentBufferedNode.get("max").asLong());
 		assertEquals(alignmentBufferedSummary.getAverage(), alignmentBufferedNode.get("avg").asLong());
 
+		CompletedCheckpointStats latestCompleted = checkpointStats.latestCompleted;
 		JsonNode latestNode = rootNode.get("latest");
 		JsonNode latestCheckpointNode = latestNode.get("completed");
 		assertEquals(latestCompleted.getCheckpointId(), latestCheckpointNode.get("id").asLong());
@@ -228,6 +279,7 @@ public class CheckpointStatsHandlerTest {
 		assertEquals(latestCompleted.getAlignmentBuffered(), latestCheckpointNode.get("alignment_buffered").asLong());
 		assertEquals(latestCompleted.getExternalPath(), latestCheckpointNode.get("external_path").asText());
 
+		CompletedCheckpointStats latestSavepoint = checkpointStats.latestSavepoint;
 		JsonNode latestSavepointNode = latestNode.get("savepoint");
 		assertEquals(latestSavepoint.getCheckpointId(), latestSavepointNode.get("id").asLong());
 		assertEquals(latestSavepoint.getTriggerTimestamp(), latestSavepointNode.get("trigger_timestamp").asLong());
@@ -237,6 +289,7 @@ public class CheckpointStatsHandlerTest {
 		assertEquals(latestSavepoint.getAlignmentBuffered(), latestSavepointNode.get("alignment_buffered").asLong());
 		assertEquals(latestSavepoint.getExternalPath(), latestSavepointNode.get("external_path").asText());
 
+		FailedCheckpointStats latestFailed = checkpointStats.latestFailed;
 		JsonNode latestFailedNode = latestNode.get("failed");
 		assertEquals(latestFailed.getCheckpointId(), latestFailedNode.get("id").asLong());
 		assertEquals(latestFailed.getTriggerTimestamp(), latestFailedNode.get("trigger_timestamp").asLong());
@@ -247,6 +300,7 @@ public class CheckpointStatsHandlerTest {
 		assertEquals(latestFailed.getFailureTimestamp(), latestFailedNode.get("failure_timestamp").asLong());
 		assertEquals(latestFailed.getFailureMessage(), latestFailedNode.get("failure_message").asText());
 
+		RestoredCheckpointStats latestRestored = checkpointStats.latestRestored;
 		JsonNode latestRestoredNode = latestNode.get("restored");
 		assertEquals(latestRestored.getCheckpointId(), latestRestoredNode.get("id").asLong());
 		assertEquals(latestRestored.getRestoreTimestamp(), latestRestoredNode.get("restore_timestamp").asLong());
@@ -259,6 +313,25 @@ public class CheckpointStatsHandlerTest {
 		assertTrue(it.hasNext());
 		JsonNode inProgressNode = it.next();
 
+		PendingCheckpointStats inProgress = checkpointStats.inProgress;
+		compareInProgressCheckpoint(inProgress, inProgressNode);
+
+		assertTrue(it.hasNext());
+		JsonNode completedSavepointNode = it.next();
+
+		CompletedCheckpointStats completedSavepoint = checkpointStats.completedSavepoint;
+		compareCompletedSavepoint(completedSavepoint, completedSavepointNode);
+
+		assertTrue(it.hasNext());
+		JsonNode failedNode = it.next();
+
+		FailedCheckpointStats failed = checkpointStats.failed;
+		compareFailedCheckpoint(failed, failedNode);
+
+		assertFalse(it.hasNext());
+	}
+
+	private static void compareInProgressCheckpoint(PendingCheckpointStats inProgress, JsonNode
inProgressNode) {
 		assertEquals(inProgress.getCheckpointId(), inProgressNode.get("id").asLong());
 		assertEquals(inProgress.getStatus().toString(), inProgressNode.get("status").asText());
 		assertEquals(inProgress.getProperties().isSavepoint(), inProgressNode.get("is_savepoint").asBoolean());
@@ -269,10 +342,9 @@ public class CheckpointStatsHandlerTest {
 		assertEquals(inProgress.getAlignmentBuffered(), inProgressNode.get("alignment_buffered").asLong());
 		assertEquals(inProgress.getNumberOfSubtasks(), inProgressNode.get("num_subtasks").asInt());
 		assertEquals(inProgress.getNumberOfAcknowledgedSubtasks(), inProgressNode.get("num_acknowledged_subtasks").asInt());
+	}
 
-		assertTrue(it.hasNext());
-		JsonNode completedSavepointNode = it.next();
-
+	private static void compareCompletedSavepoint(CompletedCheckpointStats completedSavepoint,
JsonNode completedSavepointNode) {
 		assertEquals(completedSavepoint.getCheckpointId(), completedSavepointNode.get("id").asLong());
 		assertEquals(completedSavepoint.getStatus().toString(), completedSavepointNode.get("status").asText());
 		assertEquals(completedSavepoint.getProperties().isSavepoint(), completedSavepointNode.get("is_savepoint").asBoolean());
@@ -286,10 +358,9 @@ public class CheckpointStatsHandlerTest {
 
 		assertEquals(completedSavepoint.getExternalPath(), completedSavepointNode.get("external_path").asText());
 		assertEquals(completedSavepoint.isDiscarded(), completedSavepointNode.get("discarded").asBoolean());
+	}
 
-		assertTrue(it.hasNext());
-		JsonNode failedNode = it.next();
-
+	private static void compareFailedCheckpoint(FailedCheckpointStats failed, JsonNode failedNode)
{
 		assertEquals(failed.getCheckpointId(), failedNode.get("id").asLong());
 		assertEquals(failed.getStatus().toString(), failedNode.get("status").asText());
 		assertEquals(failed.getProperties().isSavepoint(), failedNode.get("is_savepoint").asBoolean());
@@ -303,7 +374,56 @@ public class CheckpointStatsHandlerTest {
 
 		assertEquals(failed.getFailureTimestamp(), failedNode.get("failure_timestamp").asLong());
 		assertEquals(failed.getFailureMessage(), failedNode.get("failure_message").asText());
-
-		assertFalse(it.hasNext());
+	}
+	
+	private static class TestCheckpointStats {
+		public final AccessExecutionGraph graph;
+		public final CheckpointStatsCounts counts;
+		public final MinMaxAvgStats stateSizeSummary;
+		public final MinMaxAvgStats durationSummary;
+		public final MinMaxAvgStats alignmentBufferedSummary;
+		public final CompletedCheckpointStatsSummary summary;
+		public final CompletedCheckpointStats latestCompleted;
+		public final CompletedCheckpointStats latestSavepoint;
+		public final FailedCheckpointStats latestFailed;
+		public final RestoredCheckpointStats latestRestored;
+		public final PendingCheckpointStats inProgress;
+		public final CompletedCheckpointStats completedSavepoint;
+		public final FailedCheckpointStats failed;
+		public final CheckpointStatsHistory history;
+		public final CheckpointStatsSnapshot snapshot;
+
+		public TestCheckpointStats(
+				AccessExecutionGraph graph,
+				CheckpointStatsCounts counts,
+				MinMaxAvgStats stateSizeSummary,
+				MinMaxAvgStats durationSummary,
+				MinMaxAvgStats alignmentBufferedSummary,
+				CompletedCheckpointStatsSummary summary,
+				CompletedCheckpointStats latestCompleted,
+				CompletedCheckpointStats latestSavepoint,
+				FailedCheckpointStats latestFailed,
+				RestoredCheckpointStats latestRestored,
+				PendingCheckpointStats inProgress,
+				CompletedCheckpointStats completedSavepoint,
+				FailedCheckpointStats failed,
+				CheckpointStatsHistory history,
+				CheckpointStatsSnapshot snapshot) {
+			this.graph = graph;
+			this.counts = counts;
+			this.stateSizeSummary = stateSizeSummary;
+			this.durationSummary = durationSummary;
+			this.alignmentBufferedSummary = alignmentBufferedSummary;
+			this.summary = summary;
+			this.latestCompleted = latestCompleted;
+			this.latestSavepoint = latestSavepoint;
+			this.latestFailed = latestFailed;
+			this.latestRestored = latestRestored;
+			this.inProgress = inProgress;
+			this.completedSavepoint = completedSavepoint;
+			this.failed = failed;
+			this.history = history;
+			this.snapshot = snapshot;
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
index 26433fa..bbab621 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
@@ -31,9 +32,13 @@ import org.apache.flink.runtime.checkpoint.TaskStateStats;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -55,6 +60,42 @@ import static org.mockito.Mockito.when;
 public class CheckpointStatsSubtaskDetailsHandlerTest {
 
 	@Test
+	public void testArchiver() throws Exception {
+		JsonArchivist archivist = new CheckpointStatsDetailsSubtasksHandler.CheckpointStatsDetailsSubtasksJsonArchivist();
+		ObjectMapper mapper = new ObjectMapper();
+
+		PendingCheckpointStats checkpoint = mock(PendingCheckpointStats.class);
+		when(checkpoint.getCheckpointId()).thenReturn(1992139L);
+		when(checkpoint.getStatus()).thenReturn(CheckpointStatsStatus.IN_PROGRESS);
+		when(checkpoint.getTriggerTimestamp()).thenReturn(0L); // ack timestamp = duration
+
+		TaskStateStats task = createTaskStateStats(1237);
+		when(checkpoint.getAllTaskStateStats()).thenReturn(Collections.singletonList(task));
+
+		CheckpointStatsHistory history = mock(CheckpointStatsHistory.class);
+		when(history.getCheckpoints()).thenReturn(Collections.<AbstractCheckpointStats>singletonList(checkpoint));
+		CheckpointStatsSnapshot snapshot = mock(CheckpointStatsSnapshot.class);
+		when(snapshot.getHistory()).thenReturn(history);
+
+		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
+		when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
+		when(graph.getJobID()).thenReturn(new JobID());
+		
+		Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(graph);
+		Assert.assertEquals(1, archives.size());
+		
+		ArchivedJson archive = archives.iterator().next();
+		Assert.assertEquals(
+			"/jobs/" + graph.getJobID() + "/checkpoints/details/" + checkpoint.getCheckpointId() +
"/subtasks/" + task.getJobVertexId(),
+			archive.getPath());
+		JsonNode rootNode = mapper.readTree(archive.getJson());
+		assertEquals(checkpoint.getCheckpointId(), rootNode.get("id").asLong());
+		assertEquals(checkpoint.getStatus().toString(), rootNode.get("status").asText());
+
+		verifyTaskNode(rootNode, task, checkpoint.getTriggerTimestamp());
+	}
+
+	@Test
 	public void testGetPaths() {
 		CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class),
new CheckpointStatsCache(0));
 		String[] paths = handler.getPaths();

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java
index 0340d87..ed339ed 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java
@@ -105,7 +105,7 @@ public class ArchivedJobGenerationUtils {
 		originalAttempt = new ArchivedExecutionBuilder()
 			.setStateTimestamps(new long[]{1, 2, 3, 4, 5, 6, 7, 8, 9})
 			.setParallelSubtaskIndex(1)
-			.setAttemptNumber(3)
+			.setAttemptNumber(0)
 			.setAssignedResourceLocation(location)
 			.setUserAccumulators(new StringifiedAccumulatorResult[]{acc1, acc2})
 			.setState(ExecutionState.FINISHED)

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java
new file mode 100644
index 0000000..22e011c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.history;
+
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A simple container for a handler's JSON response and the REST URLs for which the response
would've been returned.
+ * 
+ * These are created by {@link JsonArchivist}s, and used by the {@link MemoryArchivist} to
create a directory structure
+ * resembling the REST API.
+ */
+public class ArchivedJson {
+	private final String path;
+	private final String json;
+	
+	public ArchivedJson(String path, String json) {
+		this.path = Preconditions.checkNotNull(path);
+		this.json = Preconditions.checkNotNull(json);
+	}
+
+	public String getPath() {
+		return path;
+	}
+
+	public String getJson() {
+		return json;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/JsonArchivist.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/JsonArchivist.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/JsonArchivist.java
new file mode 100644
index 0000000..a87cc47
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/JsonArchivist.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.history;
+
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * Interface for all classes that want to participate in the archiving of job-related json
responses.
+ * 
+ * Note that all JsonArchivists that are to be used for the history server must be added
+ * to {@link WebRuntimeMonitor#getArchivers()}.
+ */
+public interface JsonArchivist {
+
+	/**
+	 * Returns a {@link Collection} of {@link ArchivedJson}s containing JSON responses and their
respective REST URL
+	 * for a given job.
+	 *
+	 * The collection should contain one entry for every response that could be generated for
the given
+	 * job, for example one entry for each task. The REST URLs should be unique and must not
contain placeholders.
+	 *
+	 * @param graph AccessExecutionGraph for which the responses should be generated
+	 *
+	 * @return Collection containing an ArchivedJson for every response that could be generated
for the given job
+	 * @throws IOException thrown if the JSON generation fails
+	 */
+	Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException;
+}


Mime
View raw message