flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [01/11] flink git commit: [FLINK-4410] [runtime, runtime-web] Remove old checkpoint stats tracker code
Date Tue, 10 Jan 2017 08:49:40 GMT
Repository: flink
Updated Branches:
  refs/heads/master f79944aa8 -> 536e4b352


[FLINK-4410] [runtime, runtime-web] Remove old checkpoint stats tracker code


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c1fee3bb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c1fee3bb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c1fee3bb

Branch: refs/heads/master
Commit: c1fee3bb1b6549a7610c3675807490fbf97f6976
Parents: f79944a
Author: Ufuk Celebi <uce@apache.org>
Authored: Fri Dec 23 20:31:29 2016 +0100
Committer: Ufuk Celebi <uce@apache.org>
Committed: Tue Jan 10 09:48:51 2017 +0100

----------------------------------------------------------------------
 .../handlers/JobCheckpointsHandler.java         |  99 ----
 .../handlers/JobVertexCheckpointsHandler.java   |  73 ---
 .../handlers/JobCheckpointsHandlerTest.java     | 204 --------
 .../JobVertexCheckpointsHandlerTest.java        | 141 ------
 .../jobs/job.plan.node.checkpoints.job.jade     |  98 ----
 .../job.plan.node.checkpoints.operator.jade     |  64 ---
 .../ArchivedCheckpointStatsTracker.java         |  53 ---
 .../checkpoint/stats/CheckpointStats.java       | 131 ------
 .../stats/CheckpointStatsTracker.java           |  58 ---
 .../stats/DisabledCheckpointStatsTracker.java   |  45 --
 .../checkpoint/stats/JobCheckpointStats.java    | 114 -----
 .../stats/OperatorCheckpointStats.java          | 115 -----
 .../stats/SimpleCheckpointStatsTracker.java     | 468 -------------------
 .../CheckpointCoordinatorFailureTest.java       |   4 +-
 .../DisabledCheckpointStatsTrackerTest.java     |  34 --
 .../stats/SimpleCheckpointStatsTrackerTest.java | 381 ---------------
 16 files changed, 1 insertion(+), 2081 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c1fee3bb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCheckpointsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCheckpointsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCheckpointsHandler.java
deleted file mode 100644
index 404a14e..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCheckpointsHandler.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-import org.apache.flink.runtime.checkpoint.stats.CheckpointStats;
-import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
-import org.apache.flink.runtime.checkpoint.stats.JobCheckpointStats;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import scala.Option;
-
-import java.io.StringWriter;
-import java.util.Map;
-
-/**
- * Request handler that returns checkpoint stats for a job.
- */
-public class JobCheckpointsHandler extends AbstractExecutionGraphRequestHandler {
-
-	public JobCheckpointsHandler(ExecutionGraphHolder executionGraphHolder) {
-		super(executionGraphHolder);
-	}
-
-	@Override
-	public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
-		StringWriter writer = new StringWriter();
-		JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
-
-		CheckpointStatsTracker tracker = graph.getCheckpointStatsTracker();
-
-		gen.writeStartObject();
-
-		if (tracker != null) {
-			Option<JobCheckpointStats> stats = tracker.getJobStats();
-
-			if (stats.isDefined()) {
-				JobCheckpointStats jobStats = stats.get();
-
-				// Total number of checkpoints
-				gen.writeNumberField("count", jobStats.getCount());
-
-				// Optional external path
-				if (jobStats.getExternalPath() != null) {
-					gen.writeStringField("external-path", jobStats.getExternalPath());
-				}
-
-				// Duration
-				gen.writeFieldName("duration");
-				gen.writeStartObject();
-				gen.writeNumberField("min", jobStats.getMinDuration());
-				gen.writeNumberField("max", jobStats.getMaxDuration());
-				gen.writeNumberField("avg", jobStats.getAverageDuration());
-				gen.writeEndObject();
-
-				// State size
-				gen.writeFieldName("size");
-				gen.writeStartObject();
-				gen.writeNumberField("min", jobStats.getMinStateSize());
-				gen.writeNumberField("max", jobStats.getMaxStateSize());
-				gen.writeNumberField("avg", jobStats.getAverageStateSize());
-				gen.writeEndObject();
-
-				// Recent history
-				gen.writeArrayFieldStart("history");
-				for (CheckpointStats checkpoint : jobStats.getRecentHistory()) {
-					gen.writeStartObject();
-					gen.writeNumberField("id", checkpoint.getCheckpointId());
-					gen.writeNumberField("timestamp", checkpoint.getTriggerTimestamp());
-					gen.writeNumberField("duration", checkpoint.getDuration());
-					gen.writeNumberField("size", checkpoint.getStateSize());
-					gen.writeEndObject();
-				}
-				gen.writeEndArray();
-			}
-		}
-
-		gen.writeEndObject();
-		gen.close();
-
-		return writer.toString();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c1fee3bb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexCheckpointsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexCheckpointsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexCheckpointsHandler.java
deleted file mode 100644
index 8a68ffa..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexCheckpointsHandler.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats;
-import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import scala.Option;
-
-import java.io.StringWriter;
-import java.util.Map;
-
-/**
- * Request handler that returns checkpoint stats for a single job vertex.
- */
-public class JobVertexCheckpointsHandler extends AbstractJobVertexRequestHandler {
-
-	public JobVertexCheckpointsHandler(ExecutionGraphHolder executionGraphHolder) {
-		super(executionGraphHolder);
-	}
-
-	@Override
-	public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
-		StringWriter writer = new StringWriter();
-		JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
-		gen.writeStartObject();
-
-		Option<OperatorCheckpointStats> statsOption = jobVertex.getCheckpointStats();
-
-		if (statsOption.isDefined()) {
-			OperatorCheckpointStats stats = statsOption.get();
-
-			gen.writeNumberField("id", stats.getCheckpointId());
-			gen.writeNumberField("timestamp", stats.getTriggerTimestamp());
-			gen.writeNumberField("duration", stats.getDuration());
-			gen.writeNumberField("size", stats.getStateSize());
-			gen.writeNumberField("parallelism", stats.getNumberOfSubTasks());
-
-			gen.writeArrayFieldStart("subtasks");
-			for (int i = 0; i < stats.getNumberOfSubTasks(); i++) {
-				gen.writeStartObject();
-				gen.writeNumberField("subtask", i);
-				gen.writeNumberField("duration", stats.getSubTaskDuration(i));
-				gen.writeNumberField("size", stats.getSubTaskStateSize(i));
-				gen.writeEndObject();
-			}
-			gen.writeEndArray();
-		}
-
-		gen.writeEndObject();
-		gen.close();
-
-		return writer.toString();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c1fee3bb/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCheckpointsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCheckpointsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCheckpointsHandlerTest.java
deleted file mode 100644
index dfbb9cf..0000000
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCheckpointsHandlerTest.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.flink.runtime.checkpoint.stats.CheckpointStats;
-import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
-import org.apache.flink.runtime.checkpoint.stats.JobCheckpointStats;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-
-import org.junit.Test;
-import scala.Option;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class JobCheckpointsHandlerTest {
-
-	@Test
-	public void testNoCoordinator() throws Exception {
-		JobCheckpointsHandler handler = new JobCheckpointsHandler(
-				mock(ExecutionGraphHolder.class));
-
-		ExecutionGraph graph = mock(ExecutionGraph.class);
-
-		// No coordinator
-		when(graph.getCheckpointStatsTracker()).thenReturn(null);
-
-		String response = handler.handleRequest(graph, Collections.<String, String>emptyMap());
-
-		// Expecting empty response
-		assertEquals("{}", response);
-	}
-
-	@Test
-	public void testNoStats() throws Exception {
-		JobCheckpointsHandler handler = new JobCheckpointsHandler(
-				mock(ExecutionGraphHolder.class));
-
-		ExecutionGraph graph = mock(ExecutionGraph.class);
-		CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
-
-		when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
-
-		// No stats
-		when(tracker.getJobStats()).thenReturn(Option.<JobCheckpointStats>empty());
-
-		String response = handler.handleRequest(graph, Collections.<String, String>emptyMap());
-
-		// Expecting empty response
-		assertEquals("{}", response);
-	}
-
-	@Test
-	public void testStats() throws Exception {
-		JobCheckpointsHandler handler = new JobCheckpointsHandler(
-				mock(ExecutionGraphHolder.class));
-
-		ExecutionGraph graph = mock(ExecutionGraph.class);
-		CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
-
-		when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
-
-		final List<CheckpointStats> history = new ArrayList<>();
-		history.add(new CheckpointStats(0, 1, 1, 124));
-		history.add(new CheckpointStats(1, 5, 177, 0));
-		history.add(new CheckpointStats(2, 6, 8282, 2));
-		history.add(new CheckpointStats(3, 6812, 2800, 1024));
-
-		JobCheckpointStats stats = new JobCheckpointStats() {
-			@Override
-			public List<CheckpointStats> getRecentHistory() {
-				return history;
-			}
-
-			@Override
-			public long getCount() {
-				return 4;
-			}
-
-			@Override
-			public String getExternalPath() {
-				return null;
-			}
-
-			@Override
-			public long getMinDuration() {
-				return 1;
-			}
-
-			@Override
-			public long getMaxDuration() {
-				return 8282;
-			}
-
-			@Override
-			public long getAverageDuration() {
-				return 2815;
-			}
-
-			@Override
-			public long getMinStateSize() {
-				return 0;
-			}
-
-			@Override
-			public long getMaxStateSize() {
-				return 1024;
-			}
-
-			@Override
-			public long getAverageStateSize() {
-				return 287;
-			}
-		};
-
-		when(tracker.getJobStats()).thenReturn(Option.apply(stats));
-
-		// Request stats
-		String response = handler.handleRequest(graph, Collections.<String, String>emptyMap());
-
-		ObjectMapper mapper = new ObjectMapper();
-		JsonNode rootNode = mapper.readTree(response);
-
-		// Count
-		int count = rootNode.get("count").asInt();
-		assertEquals(stats.getCount(), count);
-
-		// Duration
-		JsonNode durationNode = rootNode.get("duration");
-		assertNotNull(durationNode);
-
-		long minDuration = durationNode.get("min").asLong();
-		long maxDuration = durationNode.get("max").asLong();
-		long avgDuration = durationNode.get("avg").asLong();
-
-		assertEquals(stats.getMinDuration(), minDuration);
-		assertEquals(stats.getMaxDuration(), maxDuration);
-		assertEquals(stats.getAverageDuration(), avgDuration);
-
-		// State size
-		JsonNode sizeNode = rootNode.get("size");
-		assertNotNull(sizeNode);
-
-		long minSize = sizeNode.get("min").asLong();
-		long maxSize = sizeNode.get("max").asLong();
-		long avgSize = sizeNode.get("avg").asLong();
-
-		assertEquals(stats.getMinStateSize(), minSize);
-		assertEquals(stats.getMaxStateSize(), maxSize);
-		assertEquals(stats.getAverageStateSize(), avgSize);
-
-		JsonNode historyNode = rootNode.get("history");
-		assertNotNull(historyNode);
-		assertTrue(historyNode.isArray());
-
-		Iterator<JsonNode> it = historyNode.elements();
-
-		for (int i = 0; i < history.size(); i++) {
-			CheckpointStats s = history.get(i);
-
-			JsonNode node = it.next();
-
-			long checkpointId = node.get("id").asLong();
-			long timestamp = node.get("timestamp").asLong();
-			long duration = node.get("duration").asLong();
-			long size = node.get("size").asLong();
-
-			assertEquals(s.getCheckpointId(), checkpointId);
-			assertEquals(s.getTriggerTimestamp(), timestamp);
-			assertEquals(s.getDuration(), duration);
-			assertEquals(s.getStateSize(), size);
-		}
-
-		assertFalse(it.hasNext());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c1fee3bb/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexCheckpointsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexCheckpointsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexCheckpointsHandlerTest.java
deleted file mode 100644
index 18aae35..0000000
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexCheckpointsHandlerTest.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-
-import org.junit.Test;
-import scala.Option;
-
-import java.util.Collections;
-import java.util.Iterator;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class JobVertexCheckpointsHandlerTest {
-
-	@Test
-	public void testNoCoordinator() throws Exception {
-		JobVertexCheckpointsHandler handler = new JobVertexCheckpointsHandler(
-				mock(ExecutionGraphHolder.class));
-
-		ExecutionJobVertex vertex = mock(ExecutionJobVertex.class);
-		when(vertex.getCheckpointStats())
-			.thenReturn(Option.<OperatorCheckpointStats>empty());
-
-		String response = handler.handleRequest(vertex, Collections.<String, String>emptyMap());
-
-		// Expecting empty response
-		assertEquals("{}", response);
-	}
-
-	@Test
-	public void testNoStats() throws Exception {
-		JobVertexCheckpointsHandler handler = new JobVertexCheckpointsHandler(
-				mock(ExecutionGraphHolder.class));
-
-		ExecutionJobVertex vertex = mock(ExecutionJobVertex.class);
-
-		// No stats
-		when(vertex.getCheckpointStats())
-				.thenReturn(Option.<OperatorCheckpointStats>empty());
-
-		String response = handler.handleRequest(vertex, Collections.<String, String>emptyMap());
-
-		// Expecting empty response
-		assertEquals("{}", response);
-	}
-
-	@Test
-	public void testStats() throws Exception {
-		JobVertexCheckpointsHandler handler = new JobVertexCheckpointsHandler(
-				mock(ExecutionGraphHolder.class));
-
-		JobVertexID vertexId = new JobVertexID();
-
-		ExecutionJobVertex vertex = mock(ExecutionJobVertex.class);
-
-		when(vertex.getJobVertexId()).thenReturn(vertexId);
-
-		long[][] subTaskStats = new long[][] {
-				new long[] { 1, 10 },
-				new long[] { 2, 9 },
-				new long[] { 3, 8 },
-				new long[] { 4, 7 },
-				new long[] { 5, 6 },
-				new long[] { 6, 5 },
-				new long[] { 7, 4 },
-				new long[] { 8, 3 },
-				new long[] { 9, 2 },
-				new long[] { 10, 1 } };
-
-		// Stats
-		OperatorCheckpointStats stats = new OperatorCheckpointStats(
-				3, 6812, 2800, 1024, subTaskStats);
-
-		when(vertex.getCheckpointStats())
-				.thenReturn(Option.apply(stats));
-
-		// Request stats
-		String response = handler.handleRequest(vertex, Collections.<String, String>emptyMap());
-
-		ObjectMapper mapper = new ObjectMapper();
-		JsonNode rootNode = mapper.readTree(response);
-
-		// Operator stats
-		long checkpointId = rootNode.get("id").asLong();
-		long timestamp = rootNode.get("timestamp").asLong();
-		long duration = rootNode.get("duration").asLong();
-		long size = rootNode.get("size").asLong();
-		long parallelism = rootNode.get("parallelism").asLong();
-
-		assertEquals(stats.getCheckpointId(), checkpointId);
-		assertEquals(stats.getTriggerTimestamp(), timestamp);
-		assertEquals(stats.getDuration(), duration);
-		assertEquals(stats.getStateSize(), size);
-		assertEquals(subTaskStats.length, parallelism);
-
-		// Sub task stats
-		JsonNode subTasksNode = rootNode.get("subtasks");
-		assertNotNull(subTasksNode);
-		assertTrue(subTasksNode.isArray());
-
-		Iterator<JsonNode> it = subTasksNode.elements();
-
-		for (int i = 0; i < subTaskStats.length; i++) {
-			JsonNode node = it.next();
-
-			assertEquals(i, node.get("subtask").asInt());
-			assertEquals(subTaskStats[i][0], node.get("duration").asLong());
-			assertEquals(subTaskStats[i][1], node.get("size").asLong());
-		}
-
-		assertFalse(it.hasNext());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c1fee3bb/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.checkpoints.job.jade
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.checkpoints.job.jade b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.checkpoints.job.jade
deleted file mode 100644
index 6d3b6b3..0000000
--- a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.checkpoints.job.jade
+++ /dev/null
@@ -1,98 +0,0 @@
-//
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
-
-div(ng-if="!jobCheckpointStats")
-  p
-    em No checkpoints
-
-table(ng-if="jobCheckpointStats").table.table-hover.table-inner
-  tbody
-    tr
-      td
-        strong Count
-      td(colspan=3)
-        span {{ jobCheckpointStats['count'] }}
-
-    tr
-      td
-        strong Duration
-      td
-        p
-          strong Minimum:
-          span  {{ jobCheckpointStats['duration']['min'] | humanizeDuration }}
-      td
-        p
-          strong Maximum:
-          span  {{ jobCheckpointStats['duration']['max'] | humanizeDuration }}
-
-      td
-        p
-          strong Average:
-          span  {{ jobCheckpointStats['duration']['avg'] | humanizeDuration }}
-
-    tr
-      td
-          strong State Size
-      td
-          p
-            strong Minimum:
-            span  {{ jobCheckpointStats['size']['min'] | humanizeBytes }}
-      td
-          p
-            strong Maximum:
-            span  {{ jobCheckpointStats['size']['max'] | humanizeBytes }}
-      td
-          p
-            strong Average:
-            span  {{ jobCheckpointStats['size']['avg'] | humanizeBytes }}
-
-    tr(ng-if="jobCheckpointStats['external-path']")
-      td(colspan=4)
-        strong Latest Checkpoint Path:
-        =" "
-        | {{ jobCheckpointStats['external-path'] }}
-
-div(ng-if="!showHistory && jobCheckpointStats && jobCheckpointStats['history'].length > 0")
-  a.btn.btn-default(ng-click="toggleHistory()")
-    | <strong>Show history</strong> ({{ jobCheckpointStats['history'].length }})
-    = ' '
-    i.fa.fa-chevron-down
-
-div(ng-if="showHistory && jobCheckpointStats && jobCheckpointStats['history'].length > 0")
-  a.btn.btn-default(ng-click="toggleHistory()")
-    | Hide history ({{ jobCheckpointStats['history'].length }})
-    = ' '
-    i.fa.fa-chevron-up
-
-  table.table.table-hover.table-inner
-    thead
-      tr
-        td
-          strong ID
-        td
-          strong Trigger Time
-        td
-          strong Duration
-        td
-          strong State Size
-
-    tbody(ng-if="jobCheckpointStats['history'] && jobCheckpointStats['history'].length > 0" ng-repeat="historic in jobCheckpointStats['history']")
-      tr
-        td {{ historic['id'] }}
-        td {{ historic['timestamp'] | amDateFormat:'H:mm:ss' }}
-        td {{ historic['duration'] | humanizeDuration }}
-        td {{ historic['size'] | humanizeBytes }}

http://git-wip-us.apache.org/repos/asf/flink/blob/c1fee3bb/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.checkpoints.operator.jade
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.checkpoints.operator.jade b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.checkpoints.operator.jade
deleted file mode 100644
index 7adc631..0000000
--- a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.checkpoints.operator.jade
+++ /dev/null
@@ -1,64 +0,0 @@
-//
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
-
-div(ng-if="!operatorCheckpointStats")
-  p
-    em No checkpoints
-
-div(ng-if="operatorCheckpointStats")
-  table.table.table-hover.table-clickable.table-activable.table-inner
-    thead
-      tr
-        th ID
-        th Trigger Timestamp
-        th Duration
-        th State Size
-
-    tbody
-      tr
-        td(width="22%") {{ operatorCheckpointStats['id'] }}
-        td(width="22%") {{ operatorCheckpointStats['timestamp'] | amDateFormat:'H:mm:ss' }}
-        td(width="22%") {{ operatorCheckpointStats['duration'] | humanizeDuration }}
-        td(width="22%") {{ operatorCheckpointStats['size'] | humanizeBytes }}
-
-  div(ng-if="!nodeUnfolded && subtasksCheckpointStats && subtasksCheckpointStats.length > 0")
-    a.btn.btn-default(ng-click="toggleFold()")
-      | Show subtasks
-      = ' '
-      i.fa.fa-chevron-down
-
-    a.btn.btn-default.pull-right(ng-click="deactivateNode(); $event.stopPropagation()" title="Fold")
-      i.fa.fa-chevron-up
-
-  div(ng-if="nodeUnfolded && subtasksCheckpointStats && subtasksCheckpointStats.length > 0")
-    a.btn.btn-default(ng-click="toggleFold()")
-      | Hide subtasks
-      = ' '
-      i.fa.fa-chevron-up
-
-    table.table.table-hover.table-clickable.table-activable.table-inner
-      thead
-        tr
-          th Subtask
-          th Duration
-          th Type
-
-      tbody(ng-repeat="subtask in subtasksCheckpointStats")
-        tr
-          td {{ subtask['subtask'] + 1 }}
-          td {{ subtask['duration'] | humanizeDuration }}
-          td {{ subtask['size'] | humanizeBytes }}

http://git-wip-us.apache.org/repos/asf/flink/blob/c1fee3bb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ArchivedCheckpointStatsTracker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ArchivedCheckpointStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ArchivedCheckpointStatsTracker.java
deleted file mode 100644
index 92df7d7..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ArchivedCheckpointStatsTracker.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.runtime.checkpoint;
-
-import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
-import org.apache.flink.runtime.checkpoint.stats.JobCheckpointStats;
-import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import scala.Option;
-
-import java.io.Serializable;
-import java.util.Map;
-
-public class ArchivedCheckpointStatsTracker implements CheckpointStatsTracker, Serializable {
-	private static final long serialVersionUID = 1469003563086353555L;
-
-	private final Option<JobCheckpointStats> jobStats;
-	private final Map<JobVertexID, OperatorCheckpointStats> operatorStats;
-
-	public ArchivedCheckpointStatsTracker(Option<JobCheckpointStats> jobStats, Map<JobVertexID, OperatorCheckpointStats> operatorStats) {
-		this.jobStats = jobStats;
-		this.operatorStats = operatorStats;
-	}
-
-	@Override
-	public void onCompletedCheckpoint(CompletedCheckpoint checkpoint) {
-	}
-
-	@Override
-	public Option<JobCheckpointStats> getJobStats() {
-		return jobStats;
-	}
-
-	@Override
-	public Option<OperatorCheckpointStats> getOperatorStats(JobVertexID operatorId) {
-		return Option.apply(operatorStats.get(operatorId));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c1fee3bb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/CheckpointStats.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/CheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/CheckpointStats.java
deleted file mode 100644
index 64f17d4..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/CheckpointStats.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.stats;
-
-import java.io.Serializable;
-
-/**
- * Statistics for a specific checkpoint.
- */
-public class CheckpointStats implements Serializable {
-
-	/** ID of the checkpoint. */
-	private final long checkpointId;
-
-	/** Timestamp when the checkpoint was triggered. */
-	private final long triggerTimestamp;
-
-	/** Duration of the checkpoint in milliseconds. */
-	private final long duration;
-
-	/** State size in bytes. */
-	private final long stateSize;
-
-	/**
-	 * Creates a checkpoint statistic.
-	 *
-	 * @param checkpointId     Checkpoint ID
-	 * @param triggerTimestamp Timestamp when the checkpoint was triggered
-	 * @param duration         Duration (in milliseconds)
-	 * @param stateSize        State size (in bytes)
-	 */
-	public CheckpointStats(
-			long checkpointId,
-			long triggerTimestamp,
-			long duration,
-			long stateSize) {
-
-		this.checkpointId = checkpointId;
-		this.triggerTimestamp = triggerTimestamp;
-		this.duration = duration;
-		this.stateSize = stateSize;
-	}
-
-	/**
-	 * Returns the ID of the checkpoint.
-	 *
-	 * @return ID of the checkpoint.
-	 */
-	public long getCheckpointId() {
-		return checkpointId;
-	}
-
-	/**
-	 * Returns the timestamp when the checkpoint was triggered.
-	 *
-	 * @return Timestamp when the checkpoint was triggered.
-	 */
-	public long getTriggerTimestamp() {
-		return triggerTimestamp;
-	}
-
-	/**
-	 * Returns the duration in milliseconds.
-	 *
-	 * @return Duration in milliseconds.
-	 */
-	public long getDuration() {
-		return duration;
-	}
-
-	/**
-	 * Returns the state size in bytes.
-	 *
-	 * @return The state size in bytes.
-	 */
-	public long getStateSize() {
-		return stateSize;
-	}
-
-	@Override
-	public String toString() {
-		return "CheckpointStats{" +
-				"checkpointId=" + checkpointId +
-				", triggerTimestamp=" + triggerTimestamp +
-				", duration=" + duration +
-				", stateSize=" + stateSize +
-				'}';
-	}
-
-	@Override
-	public boolean equals(Object o) {
-		if (this == o) {
-			return true;
-		}
-		if (o == null || getClass() != o.getClass()) {
-			return false;
-		}
-
-		CheckpointStats that = (CheckpointStats) o;
-
-		return checkpointId == that.checkpointId &&
-				triggerTimestamp == that.triggerTimestamp &&
-				duration == that.duration &&
-				stateSize == that.stateSize;
-	}
-
-	@Override
-	public int hashCode() {
-		int result = (int) (checkpointId ^ (checkpointId >>> 32));
-		result = 31 * result + (int) (triggerTimestamp ^ (triggerTimestamp >>> 32));
-		result = 31 * result + (int) (duration ^ (duration >>> 32));
-		result = 31 * result + (int) (stateSize ^ (stateSize >>> 32));
-		return result;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c1fee3bb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/CheckpointStatsTracker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/CheckpointStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/CheckpointStatsTracker.java
deleted file mode 100644
index 1277eaa..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/CheckpointStatsTracker.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.stats;
-
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import scala.Option;
-
-/**
- * A tracker for checkpoint statistics.
- *
- * <p>You can disable statistics by setting {@link ConfigConstants#JOB_MANAGER_WEB_CHECKPOINTS_DISABLE}.
- */
-public interface CheckpointStatsTracker {
-
-	/**
-	 * Callback on a completed checkpoint.
-	 *
-	 * @param checkpoint The completed checkpoint.
-	 */
-	void onCompletedCheckpoint(CompletedCheckpoint checkpoint);
-
-	/**
-	 * Returns a snapshot of the checkpoint statistics for a job.
-	 *
-	 * @return Checkpoints stats for the job.
-	 */
-	Option<JobCheckpointStats> getJobStats();
-
-	/**
-	 * Returns a snapshot of the checkpoint statistics for an operator.
-	 *
-	 * @param operatorId The operator ID to gather the stats for.
-	 *
-	 * @return Checkpoint stats for the operator.
-	 *
-	 * @throws IllegalArgumentException If unknown operator ID.
-	 */
-	Option<OperatorCheckpointStats> getOperatorStats(JobVertexID operatorId);
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c1fee3bb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/DisabledCheckpointStatsTracker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/DisabledCheckpointStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/DisabledCheckpointStatsTracker.java
deleted file mode 100644
index 4ba9278..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/DisabledCheckpointStatsTracker.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.stats;
-
-import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import scala.Option;
-
-/**
- * A tracker for checkpoint statistics when they are disabled.
- */
-public class DisabledCheckpointStatsTracker implements CheckpointStatsTracker {
-
-	@Override
-	public void onCompletedCheckpoint(CompletedCheckpoint checkpoint) {
-		// Nothing to do
-	}
-
-	@Override
-	public Option<JobCheckpointStats> getJobStats() {
-		return Option.empty();
-	}
-
-	@Override
-	public Option<OperatorCheckpointStats> getOperatorStats(JobVertexID operatorId) {
-		return Option.empty();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c1fee3bb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/JobCheckpointStats.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/JobCheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/JobCheckpointStats.java
deleted file mode 100644
index e156c8e..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/JobCheckpointStats.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.stats;
-
-import org.apache.flink.configuration.ConfigConstants;
-
-import java.io.Serializable;
-import java.util.List;
-
-/**
- * Snapshot of checkpoint statistics for a job.
- */
-public interface JobCheckpointStats extends Serializable {
-
-	// ------------------------------------------------------------------------
-	// General stats
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Returns a list of recently completed checkpoints stats.
-	 *
-	 * <p>The history size is configurable via {@link ConfigConstants#JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE}.
-	 *
-	 * @return List of recently completed checkpoints stats.
-	 */
-	List<CheckpointStats> getRecentHistory();
-
-	/**
-	 * Returns the total number of completed checkpoints.
-	 *
-	 * @return Total number of completed checkpoints.
-	 */
-	long getCount();
-
-	/**
-	 * Returns the most recent external path of a checkpoint.
-	 *
-	 * @return External checkpoint path or <code>null</code> if none available.
-	 */
-	String getExternalPath();
-
-	// ------------------------------------------------------------------------
-	// Duration
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Returns the minimum checkpoint duration ever seen over all completed
-	 * checkpoints.
-	 *
-	 * @return Minimum checkpoint duration over all completed checkpoints.
-	 */
-	long getMinDuration();
-
-	/**
-	 * Returns the maximum checkpoint duration ever seen over all completed
-	 * checkpoints.
-	 *
-	 * @return Maximum checkpoint duration over all completed checkpoints.
-	 */
-	long getMaxDuration();
-
-	/**
-	 * Returns the average checkpoint duration ever seen over all completed
-	 * checkpoints.
-	 *
-	 * @return Average checkpoint duration over all completed checkpoints.
-	 */
-	long getAverageDuration();
-
-	// ------------------------------------------------------------------------
-	// State size
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Returns the minimum checkpoint state size ever seen over all completed
-	 * checkpoints.
-	 *
-	 * @return Minimum checkpoint state size over all completed checkpoints.
-	 */
-	long getMinStateSize();
-
-	/**
-	 * Returns the maximum checkpoint state size ever seen over all completed
-	 * checkpoints.
-	 *
-	 * @return Maximum checkpoint state size over all completed checkpoints.
-	 */
-	long getMaxStateSize();
-
-	/**
-	 * Average the minimum checkpoint state size ever seen over all completed
-	 * checkpoints.
-	 *
-	 * @return Average checkpoint state size over all completed checkpoints.
-	 */
-	long getAverageStateSize();
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c1fee3bb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/OperatorCheckpointStats.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/OperatorCheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/OperatorCheckpointStats.java
deleted file mode 100644
index 6c2d497..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/OperatorCheckpointStats.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.stats;
-
-import java.util.Arrays;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Statistics for a specific checkpoint per operator.
- */
-public class OperatorCheckpointStats extends CheckpointStats {
-
-	private static final long serialVersionUID = -1594736655739376140L;
-
-	/** Duration in milliseconds and state sizes in bytes per sub task. */
-	private final long[][] subTaskStats;
-
-	/**
-	 * Creates a checkpoint statistic for an operator.
-	 *
-	 * @param checkpointId     Checkpoint ID this statistic belongs to
-	 * @param triggerTimestamp Timestamp when the corresponding checkpoint was triggered
-	 * @param duration         Duration (in milliseconds) to complete this statistic
-	 * @param stateSize        State size (in bytes)
-	 * @param subTaskStats     Stats per subtask ([i][0] and [i][1] encode the duration and state
-	 *                         size for sub task i respectively).
-	 */
-	public OperatorCheckpointStats(
-			long checkpointId,
-			long triggerTimestamp,
-			long duration,
-			long stateSize,
-			long[][] subTaskStats) {
-
-		super(checkpointId, triggerTimestamp, duration, stateSize);
-
-		this.subTaskStats = checkNotNull(subTaskStats);
-	}
-
-	/**
-	 * Returns the number of sub tasks.
-	 *
-	 * @return Number of sub tasks.
-	 */
-	public int getNumberOfSubTasks() {
-		return subTaskStats.length;
-	}
-
-	/**
-	 * Returns the duration of a specific sub task.
-	 *
-	 * @return Duration of the sub task.
-	 */
-	public long getSubTaskDuration(int index) {
-		return subTaskStats[index][0];
-	}
-
-	/**
-	 * Returns the state size of a specific sub task.
-	 *
-	 * @return The state size in bytes.
-	 */
-	public long getSubTaskStateSize(int index) {
-		return subTaskStats[index][1];
-	}
-
-	@Override
-	public String toString() {
-		return "OperatorCheckpointStats{" +
-				"checkpointId=" + getCheckpointId() +
-				", subTaskStats=" + Arrays.deepToString(subTaskStats) +
-				'}';
-	}
-
-	@Override
-	public boolean equals(Object o) {
-		if (this == o) {
-			return true;
-		}
-		if (o == null || getClass() != o.getClass()) {
-			return false;
-		}
-
-		OperatorCheckpointStats that = (OperatorCheckpointStats) o;
-
-		return getCheckpointId() == that.getCheckpointId() &&
-				getTriggerTimestamp() == that.getTriggerTimestamp() &&
-				Arrays.deepEquals(subTaskStats, that.subTaskStats);
-	}
-
-	@Override
-	public int hashCode() {
-		int result = (int) (getCheckpointId() ^ (getCheckpointId() >>> 32));
-		result = 31 * result + (int) (getTriggerTimestamp() ^ (getTriggerTimestamp() >>> 32));
-		result = 31 * result + (int) (subTaskStats.length ^ (subTaskStats.length >>> 32));
-		return result;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c1fee3bb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
deleted file mode 100644
index 39fbad5..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
+++ /dev/null
@@ -1,468 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.stats;
-
-import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
-import org.apache.flink.runtime.checkpoint.SubtaskState;
-import org.apache.flink.runtime.checkpoint.TaskState;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import scala.Option;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-
-/**
- * A simple checkpoint stats tracker.
- */
-public class SimpleCheckpointStatsTracker implements CheckpointStatsTracker {
-
-	/** Lock guarding access to the stats */
-	private final Object statsLock = new Object();
-
-	/** The maximum number of recent checkpoint stats to remember. */
-	private final int historySize;
-
-	/** A bounded list of detailed stats. */
-	private final ArrayList<CheckpointStats> history = new ArrayList<>();
-
-	/**
-	 * Expected parallelism of tasks, which acknowledge checkpoints. Used for
-	 * per sub task state size computation.
-	 */
-	private final Map<JobVertexID, Integer> taskParallelism;
-
-	/**
-	 * Stats per operator. The long[parallelism][2], where [i][0] holds the
-	 * duration, [i][1] the state size for sub task i of the operator.
-	 */
-	private Map<JobVertexID, long[][]> subTaskStats;
-
-	/**
-	 * Last computed job-specific statistic. Cleared on every completed
-	 * checkpoint. And computed only on call to {@link #getJobStats()}.
-	 */
-	private JobCheckpointStats lastJobStats;
-
-	/**
-	 * A map caching computed operator-specific statistics. Cleared on every
-	 * completed checkpoint. And computed only on call to {@link #getOperatorStats(JobVertexID)}.
-	 */
-	private Map<JobVertexID, OperatorCheckpointStats> operatorStatsCache = new HashMap<>();
-
-	/**
-	 * The total number of completed checkpoints. This does not always
-	 * equal the last completed ID, because some checkpoints may have been
-	 * cancelled after incrementing the ID counter.
-	 */
-	private long overallCount;
-
-	/** The minimum checkpoint completion duration (over all checkpoints). */
-	private long overallMinDuration = Long.MAX_VALUE;
-
-	/** The maximum checkpoint completion duration (over all checkpoints). */
-	private long overallMaxDuration = Long.MIN_VALUE;
-
-	/**
-	 * The total checkpoint completion duration of all completed checkpoints.
-	 * Used for computing the average duration.
-	 */
-	private long overallTotalDuration;
-
-	/** The minimum checkpoint state size (over all checkpoints). */
-	private long overallMinStateSize = Long.MAX_VALUE;
-
-	/** The maximum checkpoint state size (over all checkpoints). */
-	private long overallMaxStateSize = Long.MIN_VALUE;
-
-	/**
-	 * The total checkpoint state size (over all checkpoints). Used for
-	 * computing the overall average state size.
-	 */
-	private long overallTotalStateSize;
-
-	/**
-	 * The latest completed checkpoint (highest ID) or <code>null</code>.
-	 */
-	private CompletedCheckpoint latestCompletedCheckpoint;
-
-	public SimpleCheckpointStatsTracker(
-			int historySize,
-			List<ExecutionJobVertex> tasksToWaitFor,
-			MetricGroup metrics) {
-
-		checkArgument(historySize >= 0);
-		this.historySize = historySize;
-
-		// We know upfront which tasks will ack the checkpoints
-		if (tasksToWaitFor != null && !tasksToWaitFor.isEmpty()) {
-			taskParallelism = new HashMap<>(tasksToWaitFor.size());
-
-			for (ExecutionJobVertex vertex : tasksToWaitFor) {
-				taskParallelism.put(vertex.getJobVertexId(), vertex.getParallelism());
-			}
-		} else {
-			taskParallelism = Collections.emptyMap();
-		}
-
-		metrics.gauge("lastCheckpointSize", new CheckpointSizeGauge());
-		metrics.gauge("lastCheckpointDuration", new CheckpointDurationGauge());
-		metrics.gauge("lastCheckpointExternalPath", new CheckpointExternalPathGauge());
-	}
-
-	@Override
-	public void onCompletedCheckpoint(CompletedCheckpoint checkpoint) {
-		// Sanity check
-		if (taskParallelism.isEmpty()) {
-			return;
-		}
-
-		synchronized (statsLock) {
-			long overallStateSize;
-			try {
-				overallStateSize = checkpoint.getStateSize();
-			} catch (Exception ex) {
-				throw new RuntimeException(ex);
-			}
-
-			// Operator stats
-			Map<JobVertexID, long[][]> statsForSubTasks = new HashMap<>();
-
-			for (Map.Entry<JobVertexID, TaskState> taskStateEntry: checkpoint.getTaskStates().entrySet()) {
-				JobVertexID jobVertexID = taskStateEntry.getKey();
-				TaskState taskState = taskStateEntry.getValue();
-
-				int parallelism = taskParallelism.get(jobVertexID);
-				long[][] statsPerSubtask = new long[parallelism][2];
-
-				statsForSubTasks.put(jobVertexID, statsPerSubtask);
-
-				for (int i = 0; i < Math.min(taskState.getParallelism(), parallelism); i++) {
-					SubtaskState subtaskState = taskState.getState(i);
-
-					if (subtaskState != null) {
-						statsPerSubtask[i][0] = subtaskState.getDuration();
-						statsPerSubtask[i][1] = subtaskState.getStateSize();
-					}
-				}
-			}
-
-			// It is possible that completed checkpoints are added out of
-			// order. Make sure that in this case the last completed
-			// checkpoint is not updated.
-			boolean isInOrder = latestCompletedCheckpoint != null &&
-					checkpoint.getCheckpointID() > latestCompletedCheckpoint.getCheckpointID();
-
-			// Clear this in each case
-			lastJobStats = null;
-
-			if (overallCount == 0 || isInOrder) {
-				latestCompletedCheckpoint = checkpoint;
-
-				// Clear cached stats
-				operatorStatsCache.clear();
-
-				// Update the stats per sub task
-				subTaskStats = statsForSubTasks;
-			}
-
-			long checkpointId = checkpoint.getCheckpointID();
-			long checkpointTriggerTimestamp = checkpoint.getTimestamp();
-			long checkpointDuration = checkpoint.getDuration();
-
-			overallCount++;
-
-			// Duration stats
-			if (checkpointDuration > overallMaxDuration) {
-				overallMaxDuration = checkpointDuration;
-			}
-
-			if (checkpointDuration < overallMinDuration) {
-				overallMinDuration = checkpointDuration;
-			}
-
-			overallTotalDuration += checkpointDuration;
-
-			// State size stats
-			if (overallStateSize < overallMinStateSize) {
-				overallMinStateSize = overallStateSize;
-			}
-
-			if (overallStateSize > overallMaxStateSize) {
-				overallMaxStateSize = overallStateSize;
-			}
-
-			this.overallTotalStateSize += overallStateSize;
-
-			// Recent history
-			if (historySize > 0) {
-				CheckpointStats stats = new CheckpointStats(
-						checkpointId,
-						checkpointTriggerTimestamp,
-						checkpointDuration,
-						overallStateSize);
-
-				if (isInOrder) {
-					if (history.size() == historySize) {
-						history.remove(0);
-					}
-
-					history.add(stats);
-				}
-				else {
-					final int size = history.size();
-
-					// Only remove it if it the new checkpoint is not too old
-					if (size == historySize) {
-						if (checkpointId > history.get(0).getCheckpointId()) {
-							history.remove(0);
-						}
-					}
-
-					int pos = 0;
-
-					// Find position
-					for (int i = 0; i < size; i++) {
-						pos = i;
-
-						if (checkpointId < history.get(i).getCheckpointId()) {
-							break;
-						}
-					}
-
-					history.add(pos, stats);
-				}
-			}
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public Option<JobCheckpointStats> getJobStats() {
-		synchronized (statsLock) {
-			if (lastJobStats != null) {
-				return Option.apply(lastJobStats);
-			}
-			else if (latestCompletedCheckpoint != null) {
-				long overallAverageDuration = overallCount == 0
-						? 0
-						: overallTotalDuration / overallCount;
-
-				long overallAverageStateSize = overallCount == 0
-						? 0
-						: overallTotalStateSize / overallCount;
-
-				lastJobStats = new JobCheckpointStatsSnapshot(
-						// Need to clone in order to have a consistent snapshot.
-						// We can safely update it afterwards.
-						(List<CheckpointStats>) history.clone(),
-						latestCompletedCheckpoint.getExternalPath(),
-						overallCount,
-						overallMinDuration,
-						overallMaxDuration,
-						overallAverageDuration,
-						overallMinStateSize,
-						overallMaxStateSize,
-						overallAverageStateSize);
-
-				return Option.apply(lastJobStats);
-			}
-			else {
-				return Option.empty();
-			}
-		}
-	}
-
-	@Override
-	public Option<OperatorCheckpointStats> getOperatorStats(JobVertexID operatorId) {
-		synchronized (statsLock) {
-			OperatorCheckpointStats stats = operatorStatsCache.get(operatorId);
-
-			if (stats != null) {
-				return Option.apply(stats);
-			}
-			else if (latestCompletedCheckpoint != null && subTaskStats != null) {
-				long[][] subTaskStats = this.subTaskStats.get(operatorId);
-
-				if (subTaskStats == null) {
-					return Option.empty();
-				}
-				else {
-					long maxDuration = Long.MIN_VALUE;
-					long stateSize = 0;
-
-					for (long[] subTaskStat : subTaskStats) {
-						if (subTaskStat[0] > maxDuration) {
-							maxDuration = subTaskStat[0];
-						}
-
-						stateSize += subTaskStat[1];
-					}
-
-					stats = new OperatorCheckpointStats(
-							latestCompletedCheckpoint.getCheckpointID(),
-							latestCompletedCheckpoint.getTimestamp(),
-							maxDuration,
-							stateSize,
-							subTaskStats);
-
-					// Remember this and don't recompute if requested again
-					operatorStatsCache.put(operatorId, stats);
-
-					return Option.apply(stats);
-				}
-			}
-			else {
-				return Option.empty();
-			}
-		}
-	}
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * A snapshot of checkpoint stats.
-	 */
-	private static class JobCheckpointStatsSnapshot implements JobCheckpointStats {
-
-		private static final long serialVersionUID = 7558212015099742418L;
-
-		// General
-		private final List<CheckpointStats> recentHistory;
-		private final long count;
-		private final String externalPath;
-
-		// Duration
-		private final long minDuration;
-		private final long maxDuration;
-		private final long averageDuration;
-
-		// State size
-		private final long minStateSize;
-		private final long maxStateSize;
-		private final long averageStateSize;
-
-		public JobCheckpointStatsSnapshot(
-				List<CheckpointStats> recentHistory,
-				String externalPath,
-				long count,
-				long minDuration,
-				long maxDuration,
-				long averageDuration,
-				long minStateSize,
-				long maxStateSize,
-				long averageStateSize) {
-
-			this.recentHistory = recentHistory;
-			this.count = count;
-			this.externalPath = externalPath;
-
-			this.minDuration = minDuration;
-			this.maxDuration = maxDuration;
-			this.averageDuration = averageDuration;
-
-			this.minStateSize = minStateSize;
-			this.maxStateSize = maxStateSize;
-			this.averageStateSize = averageStateSize;
-		}
-
-		@Override
-		public List<CheckpointStats> getRecentHistory() {
-			return recentHistory;
-		}
-
-		@Override
-		public long getCount() {
-			return count;
-		}
-
-		@Override
-		public String getExternalPath() {
-			return externalPath;
-		}
-
-		@Override
-		public long getMinDuration() {
-			return minDuration;
-		}
-
-		@Override
-		public long getMaxDuration() {
-			return maxDuration;
-		}
-
-		@Override
-		public long getAverageDuration() {
-			return averageDuration;
-		}
-
-		@Override
-		public long getMinStateSize() {
-			return minStateSize;
-		}
-
-		@Override
-		public long getMaxStateSize() {
-			return maxStateSize;
-		}
-
-		@Override
-		public long getAverageStateSize() {
-			return averageStateSize;
-		}
-	}
-
-	private class CheckpointSizeGauge implements Gauge<Long> {
-		@Override
-		public Long getValue() {
-			try {
-				return latestCompletedCheckpoint == null ? -1 : latestCompletedCheckpoint.getStateSize();
-			} catch (Exception ex) {
-				throw new RuntimeException(ex);
-			}
-		}
-	}
-
-	private class CheckpointDurationGauge implements Gauge<Long> {
-		@Override
-		public Long getValue() {
-			return latestCompletedCheckpoint == null ? -1 : latestCompletedCheckpoint.getDuration();
-		}
-	}
-
-	private class CheckpointExternalPathGauge implements Gauge<String> {
-
-		@Override
-		public String getValue() {
-			CompletedCheckpoint checkpoint = latestCompletedCheckpoint;
-			if (checkpoint != null && checkpoint.getExternalPath() != null) {
-				return checkpoint.getExternalPath();
-			} else {
-				return "n/a";
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c1fee3bb/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
index 26db012..d3a440a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
@@ -73,7 +72,6 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
 			new StandaloneCheckpointIDCounter(),
 			new FailingCompletedCheckpointStore(),
 			null,
-			new DisabledCheckpointStatsTracker(),
 			Executors.directExecutor());
 
 		coord.triggerCheckpoint(triggerTimestamp, false);
@@ -84,7 +82,7 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
 
 		assertFalse(pendingCheckpoint.isDiscarded());
 
-		final long checkpointId =coord.getPendingCheckpoints().keySet().iterator().next();
+		final long checkpointId = coord.getPendingCheckpoints().keySet().iterator().next();
 
 		final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, triggerTimestamp);
 		AcknowledgeCheckpoint acknowledgeMessage = new AcknowledgeCheckpoint(jid, executionAttemptId, checkpointMetaData);

http://git-wip-us.apache.org/repos/asf/flink/blob/c1fee3bb/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/DisabledCheckpointStatsTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/DisabledCheckpointStatsTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/DisabledCheckpointStatsTrackerTest.java
deleted file mode 100644
index 50aca83..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/DisabledCheckpointStatsTrackerTest.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.stats;
-
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.junit.Test;
-
-import static org.junit.Assert.assertFalse;
-
-public class DisabledCheckpointStatsTrackerTest {
-	
-	@Test
-	public void testDisabled() throws Exception {
-		CheckpointStatsTracker tracker = new DisabledCheckpointStatsTracker();
-		assertFalse(tracker.getJobStats().isDefined());
-		assertFalse(tracker.getOperatorStats(new JobVertexID()).isDefined());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c1fee3bb/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
deleted file mode 100644
index 50a59a5..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
+++ /dev/null
@@ -1,381 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.stats;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
-import org.apache.flink.runtime.checkpoint.CheckpointProperties;
-import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
-import org.apache.flink.runtime.checkpoint.SubtaskState;
-import org.apache.flink.runtime.checkpoint.TaskState;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.state.ChainedStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.runtime.state.filesystem.FileStateHandle;
-import org.junit.Test;
-
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class SimpleCheckpointStatsTrackerTest {
-
-	private static final Random RAND = new Random();
-
-	@Test
-	public void testNoCompletedCheckpointYet() throws Exception {
-		CheckpointStatsTracker tracker = new SimpleCheckpointStatsTracker(
-				0, Collections.<ExecutionJobVertex>emptyList(), new UnregisteredMetricsGroup());
-
-		assertFalse(tracker.getJobStats().isDefined());
-		assertFalse(tracker.getOperatorStats(new JobVertexID()).isDefined());
-	}
-
-	@Test
-	public void testRandomStats() throws Exception {
-		CompletedCheckpoint[] checkpoints = generateRandomCheckpoints(16);
-		List<ExecutionJobVertex> tasksToWaitFor = createTasksToWaitFor(checkpoints[0]);
-		CheckpointStatsTracker tracker = new SimpleCheckpointStatsTracker(10, tasksToWaitFor, new UnregisteredMetricsGroup());
-
-		for (int i = 0; i < checkpoints.length; i++) {
-			CompletedCheckpoint checkpoint = checkpoints[i];
-
-			tracker.onCompletedCheckpoint(checkpoint);
-
-			verifyJobStats(tracker, 10, Arrays.copyOfRange(checkpoints, 0, i + 1));
-			verifySubtaskStats(tracker, tasksToWaitFor, checkpoint);
-		}
-	}
-
-	@Test
-	public void testIllegalOperatorId() throws Exception {
-		CompletedCheckpoint[] checkpoints = generateRandomCheckpoints(16);
-		List<ExecutionJobVertex> tasksToWaitFor = createTasksToWaitFor(checkpoints[0]);
-		CheckpointStatsTracker tracker = new SimpleCheckpointStatsTracker(10, tasksToWaitFor, new UnregisteredMetricsGroup());
-
-		for (CompletedCheckpoint checkpoint : checkpoints) {
-			tracker.onCompletedCheckpoint(checkpoint);
-		}
-
-		assertTrue(tracker.getJobStats().isDefined());
-
-		assertTrue(tracker.getOperatorStats(new JobVertexID()).isEmpty());
-	}
-
-	@Test
-	public void testCompletedCheckpointReordering() throws Exception {
-		CompletedCheckpoint[] checkpoints = generateRandomCheckpoints(2);
-		List<ExecutionJobVertex> tasksToWaitFor = createTasksToWaitFor(checkpoints[0]);
-		CheckpointStatsTracker tracker = new SimpleCheckpointStatsTracker(10, tasksToWaitFor, new UnregisteredMetricsGroup());
-
-		// First the second checkpoint notifies
-		tracker.onCompletedCheckpoint(checkpoints[1]);
-		verifyJobStats(tracker, 10, new CompletedCheckpoint[] { checkpoints[1] });
-		verifySubtaskStats(tracker, tasksToWaitFor, checkpoints[1]);
-
-		// Then the first one
-		tracker.onCompletedCheckpoint(checkpoints[0]);
-		verifyJobStats(tracker, 10, checkpoints);
-
-		// This should not alter the results for the subtasks
-		verifySubtaskStats(tracker, tasksToWaitFor, checkpoints[1]);
-	}
-
-	@Test
-	@SuppressWarnings("unchecked")
-	public void testOperatorStateCachedClearedOnNewCheckpoint() throws Exception {
-		CompletedCheckpoint[] checkpoints = generateRandomCheckpoints(2);
-		List<ExecutionJobVertex> tasksToWaitFor = createTasksToWaitFor(checkpoints[0]);
-		CheckpointStatsTracker tracker = new SimpleCheckpointStatsTracker(10, tasksToWaitFor, new UnregisteredMetricsGroup());
-
-		tracker.onCompletedCheckpoint(checkpoints[0]);
-
-		Set<JobVertexID> jobVerticesID = checkpoints[0].getTaskStates().keySet();
-
-		Iterator<JobVertexID> jobVertexIDIterator = jobVerticesID.iterator();
-
-		JobVertexID operatorId = null;
-
-		if (jobVertexIDIterator.hasNext()) {
-			operatorId = jobVertexIDIterator.next();
-		}
-
-		assertNotNull(operatorId);
-
-		assertNotNull(tracker.getOperatorStats(operatorId));
-
-		// Get the cache
-		Field f = tracker.getClass().getDeclaredField("operatorStatsCache");
-		f.setAccessible(true);
-		Map<JobVertexID, OperatorCheckpointStats> cache =
-				(Map<JobVertexID, OperatorCheckpointStats>) f.get(tracker);
-
-		// Cache contains result
-		assertTrue(cache.containsKey(operatorId));
-
-		// Add new checkpoint
-		tracker.onCompletedCheckpoint(checkpoints[1]);
-
-		assertTrue(cache.isEmpty());
-	}
-
-	// ------------------------------------------------------------------------
-
-	private static void verifyJobStats(
-			CheckpointStatsTracker tracker,
-			int historySize,
-			CompletedCheckpoint[] checkpoints) throws Exception {
-
-		assertTrue(tracker.getJobStats().isDefined());
-		JobCheckpointStats jobStats = tracker.getJobStats().get();
-
-		// History
-		List<CheckpointStats> history = jobStats.getRecentHistory();
-
-		if (historySize > checkpoints.length) {
-			assertEquals(checkpoints.length, history.size());
-		}
-		else {
-			assertEquals(historySize, history.size());
-		}
-
-		// Recently completed checkpoint stats
-		assertTrue(checkpoints.length >= history.size());
-
-		for (int i = 0; i < history.size(); i++) {
-			CheckpointStats actualStats = history.get(history.size() - i - 1);
-
-			CompletedCheckpoint checkpoint = checkpoints[checkpoints.length - 1 - i];
-
-			long stateSize = checkpoint.getStateSize();
-
-			CheckpointStats expectedStats = new CheckpointStats(
-					checkpoint.getCheckpointID(),
-					checkpoint.getTimestamp(),
-					checkpoint.getDuration(),
-					stateSize);
-
-			assertEquals(expectedStats, actualStats);
-		}
-
-		// Stats
-		long minDuration = Long.MAX_VALUE;
-		long maxDuration = Long.MIN_VALUE;
-		long totalDuration = 0;
-
-		long minStateSize = Long.MAX_VALUE;
-		long maxStateSize = Long.MIN_VALUE;
-		long totalStateSize = 0;
-
-		long count = 0;
-
-		// Compute the expected stats
-		for (CompletedCheckpoint checkpoint : checkpoints) {
-			count++;
-
-			if (checkpoint.getDuration() < minDuration) {
-				minDuration = checkpoint.getDuration();
-			}
-
-			if (checkpoint.getDuration() > maxDuration) {
-				maxDuration = checkpoint.getDuration();
-			}
-
-			totalDuration += checkpoint.getDuration();
-
-			long stateSize = checkpoint.getStateSize();
-
-			// State size
-			if (stateSize < minStateSize) {
-				minStateSize = stateSize;
-			}
-
-			if (stateSize > maxStateSize) {
-				maxStateSize = stateSize;
-			}
-
-			totalStateSize += stateSize;
-		}
-
-		// Verify
-		assertEquals(count, jobStats.getCount());
-		assertEquals(minDuration, jobStats.getMinDuration());
-		assertEquals(maxDuration, jobStats.getMaxDuration());
-		assertEquals(totalDuration / count, jobStats.getAverageDuration());
-		assertEquals(minStateSize, jobStats.getMinStateSize());
-		assertEquals(maxStateSize, jobStats.getMaxStateSize());
-		assertEquals(totalStateSize / count, jobStats.getAverageStateSize());
-	}
-
-	private static void verifySubtaskStats(
-			CheckpointStatsTracker tracker,
-			List<ExecutionJobVertex> tasksToWaitFor,
-			CompletedCheckpoint checkpoint) {
-
-		for (ExecutionJobVertex vertex : tasksToWaitFor) {
-			JobVertexID operatorId = vertex.getJobVertexId();
-			int parallelism = vertex.getParallelism();
-			TaskState taskState = checkpoint.getTaskState(operatorId);
-
-			assertNotNull(taskState);
-
-			OperatorCheckpointStats actualStats = tracker.getOperatorStats(operatorId).get();
-
-			long operatorDuration = Long.MIN_VALUE;
-			long operatorStateSize = 0;
-
-			long[][] expectedSubTaskStats = new long[parallelism][2];
-
-			for (int i = 0; i < parallelism; i++) {
-				SubtaskState subtaskState = taskState.getState(i);
-
-				expectedSubTaskStats[i][0] = subtaskState.getDuration();
-				expectedSubTaskStats[i][1] = subtaskState.getStateSize();
-			}
-
-			OperatorCheckpointStats expectedStats = new OperatorCheckpointStats(
-					checkpoint.getCheckpointID(),
-					checkpoint.getTimestamp(),
-					operatorDuration, // we want the max duration of all subtasks
-					operatorStateSize,
-					expectedSubTaskStats);
-
-			assertEquals(expectedStats, actualStats);
-		}
-	}
-
-	private static CompletedCheckpoint[] generateRandomCheckpoints(
-			int numCheckpoints) throws Exception {
-
-		// Config
-		JobID jobId = new JobID();
-		int minNumOperators = 4;
-		int maxNumOperators = 32;
-		int minOperatorParallelism = 4;
-		int maxOperatorParallelism = 16;
-		int maxParallelism = 32;
-
-		// Use yuge numbers here in order to test that summing up state sizes
-		// does not overflow. This was a bug in the initial version, because
-		// the individual state sizes (longs) were summed up in an int.
-		long minStateSize = Integer.MAX_VALUE;
-		long maxStateSize = Long.MAX_VALUE;
-		CompletedCheckpoint[] checkpoints = new CompletedCheckpoint[numCheckpoints];
-
-		int numOperators = RAND.nextInt(maxNumOperators - minNumOperators + 1) + minNumOperators;
-
-		// Setup
-		JobVertexID[] operatorIds = new JobVertexID[numOperators];
-		int[] operatorParallelism = new int[numOperators];
-
-		for (int i = 0; i < numOperators; i++) {
-			operatorIds[i] = new JobVertexID();
-			operatorParallelism[i] = RAND.nextInt(maxOperatorParallelism - minOperatorParallelism + 1) + minOperatorParallelism;
-		}
-
-		// Generate checkpoints
-		for (int i = 0; i < numCheckpoints; i++) {
-			long triggerTimestamp = System.currentTimeMillis();
-			int maxDuration = RAND.nextInt(128 + 1);
-
-			Map<JobVertexID, TaskState> taskGroupStates = new HashMap<>();
-
-			// The maximum random duration is used as time to completion
-			int completionDuration = 0;
-
-			// Generate states for same set of operators
-			for (int operatorIndex = 0; operatorIndex < numOperators; operatorIndex++) {
-				JobVertexID operatorId = operatorIds[operatorIndex];
-				int parallelism = operatorParallelism[operatorIndex];
-
-				TaskState taskState = new TaskState(operatorId, parallelism, maxParallelism, 1);
-
-				taskGroupStates.put(operatorId, taskState);
-
-				for (int subtaskIndex = 0; subtaskIndex < parallelism; subtaskIndex++) {
-					int duration = RAND.nextInt(maxDuration + 1);
-
-					if (duration > completionDuration) {
-						completionDuration = duration;
-					}
-
-					final long proxySize = minStateSize + ((long) (RAND.nextDouble() * (maxStateSize - minStateSize)));
-					StreamStateHandle proxy = new StateHandleProxy(new Path(), proxySize);
-
-					SubtaskState subtaskState = new SubtaskState(
-							new ChainedStateHandle<>(Collections.singletonList(proxy)), null, null, null, null, duration);
-
-					taskState.putState(subtaskIndex, subtaskState);
-				}
-			}
-
-			// Add some random delay
-			final long completionTimestamp = triggerTimestamp + completionDuration + RAND.nextInt(10);
-
-			checkpoints[i] = new CompletedCheckpoint(jobId, i, triggerTimestamp, completionTimestamp, taskGroupStates, CheckpointProperties.forStandardCheckpoint(), null);
-		}
-
-		return checkpoints;
-	}
-
-	private List<ExecutionJobVertex> createTasksToWaitFor(CompletedCheckpoint checkpoint) {
-
-		List<ExecutionJobVertex> jobVertices = new ArrayList<>(checkpoint.getTaskStates().size());
-
-		for (Map.Entry<JobVertexID, TaskState> entry : checkpoint.getTaskStates().entrySet()) {
-			JobVertexID operatorId = entry.getKey();
-			int parallelism = entry.getValue().getParallelism();
-			ExecutionJobVertex v = mock(ExecutionJobVertex.class);
-			when(v.getJobVertexId()).thenReturn(operatorId);
-			when(v.getParallelism()).thenReturn(parallelism);
-
-			jobVertices.add(v);
-		}
-
-		return jobVertices;
-	}
-
-	private static class StateHandleProxy extends FileStateHandle {
-
-		private static final long serialVersionUID = 35356735683568L;
-
-		public StateHandleProxy(Path filePath, long size) {
-			super(filePath, size);
-		}
-
-		@Override
-		public void discardState() {}
-	}
-}


Mime
View raw message