flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [11/16] flink git commit: [FLINK-7531] Move Flink legacy rest handler to flink-runtime
Date Tue, 19 Sep 2017 22:44:21 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java
deleted file mode 100644
index e34631e..0000000
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java
+++ /dev/null
@@ -1,334 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.util.TestLogger;
-
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.nio.charset.Charset;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-/**
- * Tests for the JobCancellationWithSavepointHandler.
- */
-public class JobCancellationWithSavepointHandlersTest extends TestLogger {
-
-	private static final Executor executor = Executors.directExecutor();
-
-	@Test
-	public void testGetPaths() {
-		JobCancellationWithSavepointHandlers handler = new JobCancellationWithSavepointHandlers(mock(ExecutionGraphHolder.class), executor);
-
-		JobCancellationWithSavepointHandlers.TriggerHandler triggerHandler = handler.getTriggerHandler();
-		String[] triggerPaths = triggerHandler.getPaths();
-		Assert.assertEquals(2, triggerPaths.length);
-		List<String> triggerPathsList = Arrays.asList(triggerPaths);
-		Assert.assertTrue(triggerPathsList.contains("/jobs/:jobid/cancel-with-savepoint"));
-		Assert.assertTrue(triggerPathsList.contains("/jobs/:jobid/cancel-with-savepoint/target-directory/:targetDirectory"));
-
-		JobCancellationWithSavepointHandlers.InProgressHandler progressHandler = handler.getInProgressHandler();
-		String[] progressPaths = progressHandler.getPaths();
-		Assert.assertEquals(1, progressPaths.length);
-		Assert.assertEquals("/jobs/:jobid/cancel-with-savepoint/in-progress/:requestId", progressPaths[0]);
-	}
-
-	/**
-	 * Tests that the cancellation ask timeout respects the checkpoint timeout.
-	 * Otherwise, AskTimeoutExceptions are bound to happen for large state.
-	 */
-	@Test
-	public void testAskTimeoutEqualsCheckpointTimeout() throws Exception {
-		long timeout = 128288238L;
-		JobID jobId = new JobID();
-		ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class);
-		ExecutionGraph graph = mock(ExecutionGraph.class);
-		CheckpointCoordinator coord = mock(CheckpointCoordinator.class);
-		when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(CompletableFuture.completedFuture(Optional.of(graph)));
-		when(graph.getCheckpointCoordinator()).thenReturn(coord);
-		when(coord.getCheckpointTimeout()).thenReturn(timeout);
-
-		JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, executor);
-		JobCancellationWithSavepointHandlers.TriggerHandler handler = handlers.getTriggerHandler();
-
-		Map<String, String> params = new HashMap<>();
-		params.put("jobid", jobId.toString());
-		params.put("targetDirectory", "placeholder");
-
-		JobManagerGateway jobManager = mock(JobManagerGateway.class);
-		when(jobManager.cancelJobWithSavepoint(eq(jobId), anyString(), any(Time.class))).thenReturn(CompletableFuture.completedFuture("foobar"));
-
-		handler.handleRequest(params, Collections.emptyMap(), jobManager);
-
-		verify(jobManager).cancelJobWithSavepoint(eq(jobId), anyString(), any(Time.class));
-	}
-
-	/**
-	 * Tests that the savepoint directory configuration is respected.
-	 */
-	@Test
-	public void testSavepointDirectoryConfiguration() throws Exception {
-		long timeout = 128288238L;
-		JobID jobId = new JobID();
-		ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class);
-		ExecutionGraph graph = mock(ExecutionGraph.class);
-		CheckpointCoordinator coord = mock(CheckpointCoordinator.class);
-		when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(CompletableFuture.completedFuture(Optional.of(graph)));
-		when(graph.getCheckpointCoordinator()).thenReturn(coord);
-		when(coord.getCheckpointTimeout()).thenReturn(timeout);
-
-		JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, executor, "the-default-directory");
-		JobCancellationWithSavepointHandlers.TriggerHandler handler = handlers.getTriggerHandler();
-
-		Map<String, String> params = new HashMap<>();
-		params.put("jobid", jobId.toString());
-
-		JobManagerGateway jobManager = mock(JobManagerGateway.class);
-		when(jobManager.cancelJobWithSavepoint(eq(jobId), anyString(), any(Time.class))).thenReturn(CompletableFuture.completedFuture("foobar"));
-
-		// 1. Use targetDirectory path param
-		params.put("targetDirectory", "custom-directory");
-		handler.handleRequest(params, Collections.<String, String>emptyMap(), jobManager);
-
-		verify(jobManager).cancelJobWithSavepoint(eq(jobId), eq("custom-directory"), any(Time.class));
-
-		// 2. Use default
-		params.remove("targetDirectory");
-
-		handler.handleRequest(params, Collections.<String, String>emptyMap(), jobManager);
-
-		verify(jobManager).cancelJobWithSavepoint(eq(jobId), eq("the-default-directory"), any(Time.class));
-
-		// 3. Throw Exception
-		handlers = new JobCancellationWithSavepointHandlers(holder, executor, null);
-		handler = handlers.getTriggerHandler();
-
-		try {
-			handler.handleRequest(params, Collections.<String, String>emptyMap(), jobManager).get();
-			fail("Did not throw expected test Exception");
-		} catch (Exception e) {
-			IllegalStateException cause = (IllegalStateException) e.getCause();
-			assertEquals(true, cause.getMessage().contains(CoreOptions.SAVEPOINT_DIRECTORY.key()));
-		}
-	}
-
-	/**
-	 * Tests triggering a new request and monitoring it.
-	 */
-	@Test
-	public void testTriggerNewRequest() throws Exception {
-		JobID jobId = new JobID();
-		ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class);
-		ExecutionGraph graph = mock(ExecutionGraph.class);
-		CheckpointCoordinator coord = mock(CheckpointCoordinator.class);
-		when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(CompletableFuture.completedFuture(Optional.of(graph)));
-		when(graph.getCheckpointCoordinator()).thenReturn(coord);
-
-		JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, executor);
-		JobCancellationWithSavepointHandlers.TriggerHandler trigger = handlers.getTriggerHandler();
-		JobCancellationWithSavepointHandlers.InProgressHandler progress = handlers.getInProgressHandler();
-
-		Map<String, String> params = new HashMap<>();
-		params.put("jobid", jobId.toString());
-		params.put("targetDirectory", "custom-directory");
-
-		JobManagerGateway jobManager = mock(JobManagerGateway.class);
-
-		// Successful
-		CompletableFuture<String> successfulCancelWithSavepoint = new CompletableFuture<>();
-		when(jobManager.cancelJobWithSavepoint(eq(jobId), eq("custom-directory"), any(Time.class))).thenReturn(successfulCancelWithSavepoint);
-
-		// Trigger
-		FullHttpResponse response = trigger.handleRequest(params, Collections.emptyMap(), jobManager).get();
-
-		verify(jobManager).cancelJobWithSavepoint(eq(jobId), eq("custom-directory"), any(Time.class));
-
-		String location = String.format("/jobs/%s/cancel-with-savepoint/in-progress/1", jobId);
-
-		assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus());
-		assertEquals("application/json; charset=UTF-8", response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
-		assertEquals(Integer.toString(response.content().readableBytes()), response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));
-		assertEquals(location, response.headers().get(HttpHeaders.Names.LOCATION));
-
-		String json = response.content().toString(Charset.forName("UTF-8"));
-		JsonNode root = new ObjectMapper().readTree(json);
-
-		assertEquals("accepted", root.get("status").asText());
-		assertEquals("1", root.get("request-id").asText());
-		assertEquals(location, root.get("location").asText());
-
-		// Trigger again
-		response = trigger.handleRequest(params, Collections.<String, String>emptyMap(), jobManager).get();
-		assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus());
-		assertEquals("application/json; charset=UTF-8", response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
-		assertEquals(Integer.toString(response.content().readableBytes()), response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));
-		assertEquals(location, response.headers().get(HttpHeaders.Names.LOCATION));
-
-		json = response.content().toString(Charset.forName("UTF-8"));
-		root = new ObjectMapper().readTree(json);
-
-		assertEquals("accepted", root.get("status").asText());
-		assertEquals("1", root.get("request-id").asText());
-		assertEquals(location, root.get("location").asText());
-
-		// Only single actual request
-		verify(jobManager).cancelJobWithSavepoint(eq(jobId), eq("custom-directory"), any(Time.class));
-
-		// Query progress
-		params.put("requestId", "1");
-
-		response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager).get();
-		assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus());
-		assertEquals("application/json; charset=UTF-8", response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
-		assertEquals(Integer.toString(response.content().readableBytes()), response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));
-
-		json = response.content().toString(Charset.forName("UTF-8"));
-		root = new ObjectMapper().readTree(json);
-
-		assertEquals("in-progress", root.get("status").asText());
-		assertEquals("1", root.get("request-id").asText());
-
-		// Complete
-		successfulCancelWithSavepoint.complete("_path-savepoint_");
-
-		response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager).get();
-
-		assertEquals(HttpResponseStatus.CREATED, response.getStatus());
-		assertEquals("application/json; charset=UTF-8", response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
-		assertEquals(Integer.toString(response.content().readableBytes()), response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));
-
-		json = response.content().toString(Charset.forName("UTF-8"));
-
-		root = new ObjectMapper().readTree(json);
-
-		assertEquals("success", root.get("status").asText());
-		assertEquals("1", root.get("request-id").asText());
-		assertEquals("_path-savepoint_", root.get("savepoint-path").asText());
-
-		// Query again, keep recent history
-
-		response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager).get();
-
-		assertEquals(HttpResponseStatus.CREATED, response.getStatus());
-		assertEquals("application/json; charset=UTF-8", response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
-		assertEquals(Integer.toString(response.content().readableBytes()), response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));
-
-		json = response.content().toString(Charset.forName("UTF-8"));
-
-		root = new ObjectMapper().readTree(json);
-
-		assertEquals("success", root.get("status").asText());
-		assertEquals("1", root.get("request-id").asText());
-		assertEquals("_path-savepoint_", root.get("savepoint-path").asText());
-
-		// Query for unknown request
-		params.put("requestId", "9929");
-
-		response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager).get();
-		assertEquals(HttpResponseStatus.BAD_REQUEST, response.getStatus());
-		assertEquals("application/json; charset=UTF-8", response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
-		assertEquals(Integer.toString(response.content().readableBytes()), response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));
-
-		json = response.content().toString(Charset.forName("UTF-8"));
-
-		root = new ObjectMapper().readTree(json);
-
-		assertEquals("failed", root.get("status").asText());
-		assertEquals("9929", root.get("request-id").asText());
-		assertEquals("Unknown job/request ID", root.get("cause").asText());
-	}
-
-	/**
-	 * Tests response when a request fails.
-	 */
-	@Test
-	public void testFailedCancellation() throws Exception {
-		JobID jobId = new JobID();
-		ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class);
-		ExecutionGraph graph = mock(ExecutionGraph.class);
-		CheckpointCoordinator coord = mock(CheckpointCoordinator.class);
-		when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(CompletableFuture.completedFuture(Optional.of(graph)));
-		when(graph.getCheckpointCoordinator()).thenReturn(coord);
-
-		JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, executor);
-		JobCancellationWithSavepointHandlers.TriggerHandler trigger = handlers.getTriggerHandler();
-		JobCancellationWithSavepointHandlers.InProgressHandler progress = handlers.getInProgressHandler();
-
-		Map<String, String> params = new HashMap<>();
-		params.put("jobid", jobId.toString());
-		params.put("targetDirectory", "custom-directory");
-
-		JobManagerGateway jobManager = mock(JobManagerGateway.class);
-
-		// Successful
-		CompletableFuture<String> unsuccessfulCancelWithSavepoint = FutureUtils.completedExceptionally(new Exception("Test Exception"));
-		when(jobManager.cancelJobWithSavepoint(eq(jobId), eq("custom-directory"), any(Time.class))).thenReturn(unsuccessfulCancelWithSavepoint);
-
-		// Trigger
-		trigger.handleRequest(params, Collections.<String, String>emptyMap(), jobManager);
-		verify(jobManager).cancelJobWithSavepoint(eq(jobId), eq("custom-directory"), any(Time.class));
-
-		// Query progress
-		params.put("requestId", "1");
-
-		FullHttpResponse response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager).get();
-		assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR, response.getStatus());
-		assertEquals("application/json; charset=UTF-8", response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
-		assertEquals(Integer.toString(response.content().readableBytes()), response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));
-
-		String json = response.content().toString(Charset.forName("UTF-8"));
-		JsonNode root = new ObjectMapper().readTree(json);
-
-		assertEquals("failed", root.get("status").asText());
-		assertEquals("1", root.get("request-id").asText());
-		assertEquals("Test Exception", root.get("cause").asText());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java
deleted file mode 100644
index 1c08ae8..0000000
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.api.common.ArchivedExecutionConfig;
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-
-import static org.mockito.Mockito.mock;
-
-/**
- * Tests for the JobConfigHandler.
- */
-public class JobConfigHandlerTest {
-
-	@Test
-	public void testArchiver() throws Exception {
-		JsonArchivist archivist = new JobConfigHandler.JobConfigJsonArchivist();
-		AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
-
-		Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
-		Assert.assertEquals(1, archives.size());
-
-		ArchivedJson archive = archives.iterator().next();
-		Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/config", archive.getPath());
-		compareJobConfig(originalJob, archive.getJson());
-	}
-
-	@Test
-	public void testGetPaths() {
-		JobConfigHandler handler = new JobConfigHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
-		String[] paths = handler.getPaths();
-		Assert.assertEquals(1, paths.length);
-		Assert.assertEquals("/jobs/:jobid/config", paths[0]);
-	}
-
-	public void testJsonGeneration() throws Exception {
-		AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
-		String answer = JobConfigHandler.createJobConfigJson(originalJob);
-		compareJobConfig(originalJob, answer);
-	}
-
-	private static void compareJobConfig(AccessExecutionGraph originalJob, String answer) throws IOException {
-		JsonNode job = ArchivedJobGenerationUtils.MAPPER.readTree(answer);
-
-		Assert.assertEquals(originalJob.getJobID().toString(), job.get("jid").asText());
-		Assert.assertEquals(originalJob.getJobName(), job.get("name").asText());
-
-		ArchivedExecutionConfig originalConfig = originalJob.getArchivedExecutionConfig();
-		JsonNode config = job.get("execution-config");
-
-		Assert.assertEquals(originalConfig.getExecutionMode(), config.get("execution-mode").asText());
-		Assert.assertEquals(originalConfig.getRestartStrategyDescription(), config.get("restart-strategy").asText());
-		Assert.assertEquals(originalConfig.getParallelism(), config.get("job-parallelism").asInt());
-		Assert.assertEquals(originalConfig.getObjectReuseEnabled(), config.get("object-reuse-mode").asBoolean());
-
-		Map<String, String> originalUserConfig = originalConfig.getGlobalJobParameters();
-		JsonNode userConfig = config.get("user-config");
-
-		for (Map.Entry<String, String> originalEntry : originalUserConfig.entrySet()) {
-			Assert.assertEquals(originalEntry.getValue(), userConfig.get(originalEntry.getKey()).asText());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java
deleted file mode 100644
index ee0498e..0000000
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
-import org.apache.flink.runtime.executiongraph.IOMetrics;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
-
-import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-
-import static org.mockito.Mockito.mock;
-
-/**
- * Tests for the JobDetailsHandler.
- */
-public class JobDetailsHandlerTest {
-
-	@Test
-	public void testArchiver() throws Exception {
-		JsonArchivist archivist = new JobDetailsHandler.JobDetailsJsonArchivist();
-		AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
-
-		Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
-		Assert.assertEquals(2, archives.size());
-
-		Iterator<ArchivedJson> iterator = archives.iterator();
-		ArchivedJson archive1 = iterator.next();
-		Assert.assertEquals("/jobs/" + originalJob.getJobID(), archive1.getPath());
-		compareJobDetails(originalJob, archive1.getJson());
-
-		ArchivedJson archive2 = iterator.next();
-		Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/vertices", archive2.getPath());
-		compareJobDetails(originalJob, archive2.getJson());
-	}
-
-	@Test
-	public void testGetPaths() {
-		JobDetailsHandler handler = new JobDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), null);
-		String[] paths = handler.getPaths();
-		Assert.assertEquals(2, paths.length);
-		List<String> pathsList = Lists.newArrayList(paths);
-		Assert.assertTrue(pathsList.contains("/jobs/:jobid"));
-		Assert.assertTrue(pathsList.contains("/jobs/:jobid/vertices"));
-	}
-
-	@Test
-	public void testJsonGeneration() throws Exception {
-		AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
-		String json = JobDetailsHandler.createJobDetailsJson(originalJob, null);
-
-		compareJobDetails(originalJob, json);
-	}
-
-	private static void compareJobDetails(AccessExecutionGraph originalJob, String json) throws IOException {
-		JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(json);
-
-		Assert.assertEquals(originalJob.getJobID().toString(), result.get("jid").asText());
-		Assert.assertEquals(originalJob.getJobName(), result.get("name").asText());
-		Assert.assertEquals(originalJob.isStoppable(), result.get("isStoppable").asBoolean());
-		Assert.assertEquals(originalJob.getState().name(), result.get("state").asText());
-
-		Assert.assertEquals(originalJob.getStatusTimestamp(JobStatus.CREATED), result.get("start-time").asLong());
-		Assert.assertEquals(originalJob.getStatusTimestamp(originalJob.getState()), result.get("end-time").asLong());
-		Assert.assertEquals(
-			originalJob.getStatusTimestamp(originalJob.getState()) - originalJob.getStatusTimestamp(JobStatus.CREATED),
-			result.get("duration").asLong()
-		);
-
-		JsonNode timestamps = result.get("timestamps");
-		for (JobStatus status : JobStatus.values()) {
-			Assert.assertEquals(originalJob.getStatusTimestamp(status), timestamps.get(status.name()).asLong());
-		}
-
-		ArrayNode tasks = (ArrayNode) result.get("vertices");
-		int x = 0;
-		for (AccessExecutionJobVertex expectedTask : originalJob.getVerticesTopologically()) {
-			JsonNode task = tasks.get(x);
-
-			Assert.assertEquals(expectedTask.getJobVertexId().toString(), task.get("id").asText());
-			Assert.assertEquals(expectedTask.getName(), task.get("name").asText());
-			Assert.assertEquals(expectedTask.getParallelism(), task.get("parallelism").asInt());
-			Assert.assertEquals(expectedTask.getAggregateState().name(), task.get("status").asText());
-
-			Assert.assertEquals(3, task.get("start-time").asLong());
-			Assert.assertEquals(5, task.get("end-time").asLong());
-			Assert.assertEquals(2, task.get("duration").asLong());
-
-			JsonNode subtasksPerState = task.get("tasks");
-			Assert.assertEquals(0, subtasksPerState.get(ExecutionState.CREATED.name()).asInt());
-			Assert.assertEquals(0, subtasksPerState.get(ExecutionState.SCHEDULED.name()).asInt());
-			Assert.assertEquals(0, subtasksPerState.get(ExecutionState.DEPLOYING.name()).asInt());
-			Assert.assertEquals(0, subtasksPerState.get(ExecutionState.RUNNING.name()).asInt());
-			Assert.assertEquals(1, subtasksPerState.get(ExecutionState.FINISHED.name()).asInt());
-			Assert.assertEquals(0, subtasksPerState.get(ExecutionState.CANCELING.name()).asInt());
-			Assert.assertEquals(0, subtasksPerState.get(ExecutionState.CANCELED.name()).asInt());
-			Assert.assertEquals(0, subtasksPerState.get(ExecutionState.FAILED.name()).asInt());
-
-			long expectedNumBytesIn = 0;
-			long expectedNumBytesOut = 0;
-			long expectedNumRecordsIn = 0;
-			long expectedNumRecordsOut = 0;
-
-			for (AccessExecutionVertex vertex : expectedTask.getTaskVertices()) {
-				IOMetrics ioMetrics = vertex.getCurrentExecutionAttempt().getIOMetrics();
-
-				expectedNumBytesIn += ioMetrics.getNumBytesInLocal() + ioMetrics.getNumBytesInRemote();
-				expectedNumBytesOut += ioMetrics.getNumBytesOut();
-				expectedNumRecordsIn += ioMetrics.getNumRecordsIn();
-				expectedNumRecordsOut += ioMetrics.getNumRecordsOut();
-			}
-
-			JsonNode metrics = task.get("metrics");
-
-			Assert.assertEquals(expectedNumBytesIn, metrics.get("read-bytes").asLong());
-			Assert.assertEquals(expectedNumBytesOut, metrics.get("write-bytes").asLong());
-			Assert.assertEquals(expectedNumRecordsIn, metrics.get("read-records").asLong());
-			Assert.assertEquals(expectedNumRecordsOut, metrics.get("write-records").asLong());
-
-			x++;
-		}
-		Assert.assertEquals(1, tasks.size());
-
-		JsonNode statusCounts = result.get("status-counts");
-		Assert.assertEquals(0, statusCounts.get(ExecutionState.CREATED.name()).asInt());
-		Assert.assertEquals(0, statusCounts.get(ExecutionState.SCHEDULED.name()).asInt());
-		Assert.assertEquals(0, statusCounts.get(ExecutionState.DEPLOYING.name()).asInt());
-		Assert.assertEquals(1, statusCounts.get(ExecutionState.RUNNING.name()).asInt());
-		Assert.assertEquals(0, statusCounts.get(ExecutionState.FINISHED.name()).asInt());
-		Assert.assertEquals(0, statusCounts.get(ExecutionState.CANCELING.name()).asInt());
-		Assert.assertEquals(0, statusCounts.get(ExecutionState.CANCELED.name()).asInt());
-		Assert.assertEquals(0, statusCounts.get(ExecutionState.FAILED.name()).asInt());
-
-		Assert.assertEquals(ArchivedJobGenerationUtils.MAPPER.readTree(originalJob.getJsonPlan()), result.get("plan"));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
deleted file mode 100644
index 6e0f918..0000000
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
-import org.apache.flink.util.ExceptionUtils;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Collection;
-
-import static org.mockito.Mockito.mock;
-
-/**
- * Tests for the JobExceptionsHandler.
- */
-public class JobExceptionsHandlerTest {
-
-	@Test
-	public void testArchiver() throws Exception {
-		JsonArchivist archivist = new JobExceptionsHandler.JobExceptionsJsonArchivist();
-		AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
-
-		Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
-		Assert.assertEquals(1, archives.size());
-
-		ArchivedJson archive = archives.iterator().next();
-		Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/exceptions", archive.getPath());
-		compareExceptions(originalJob, archive.getJson());
-	}
-
-	@Test
-	public void testGetPaths() {
-		JobExceptionsHandler handler = new JobExceptionsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
-		String[] paths = handler.getPaths();
-		Assert.assertEquals(1, paths.length);
-		Assert.assertEquals("/jobs/:jobid/exceptions", paths[0]);
-	}
-
-	@Test
-	public void testJsonGeneration() throws Exception {
-		AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
-		String json = JobExceptionsHandler.createJobExceptionsJson(originalJob);
-
-		compareExceptions(originalJob, json);
-	}
-
-	private static void compareExceptions(AccessExecutionGraph originalJob, String json) throws IOException {
-		JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(json);
-
-		Assert.assertEquals(originalJob.getFailureCause().getExceptionAsString(), result.get("root-exception").asText());
-		Assert.assertEquals(originalJob.getFailureCause().getTimestamp(), result.get("timestamp").asLong());
-
-		ArrayNode exceptions = (ArrayNode) result.get("all-exceptions");
-
-		int x = 0;
-		for (AccessExecutionVertex expectedSubtask : originalJob.getAllExecutionVertices()) {
-			if (!expectedSubtask.getFailureCauseAsString().equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) {
-				JsonNode exception = exceptions.get(x);
-
-				Assert.assertEquals(expectedSubtask.getFailureCauseAsString(), exception.get("exception").asText());
-				Assert.assertEquals(expectedSubtask.getStateTimestamp(ExecutionState.FAILED), exception.get("timestamp").asLong());
-				Assert.assertEquals(expectedSubtask.getTaskNameWithSubtaskIndex(), exception.get("task").asText());
-
-				TaskManagerLocation location = expectedSubtask.getCurrentAssignedResourceLocation();
-				String expectedLocationString = location.getFQDNHostname() + ':' + location.dataPort();
-				Assert.assertEquals(expectedLocationString, exception.get("location").asText());
-			}
-			x++;
-		}
-		Assert.assertEquals(x > JobExceptionsHandler.MAX_NUMBER_EXCEPTION_TO_REPORT, result.get("truncated").asBoolean());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandlerTest.java
deleted file mode 100644
index 94fd5a8..0000000
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandlerTest.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.concurrent.Executors;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Tests for the JobManagerConfigHandler.
- */
-public class JobManagerConfigHandlerTest {
-	@Test
-	public void testGetPaths() {
-		JobManagerConfigHandler handler = new JobManagerConfigHandler(Executors.directExecutor(), null);
-		String[] paths = handler.getPaths();
-		Assert.assertEquals(1, paths.length);
-		Assert.assertEquals("/jobmanager/config", paths[0]);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java
deleted file mode 100644
index 4a934ec..0000000
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Collection;
-
-import static org.mockito.Mockito.mock;
-
-/**
- * Tests for the JobPlanHandler.
- */
-public class JobPlanHandlerTest {
-
-	@Test
-	public void testArchiver() throws Exception {
-		JsonArchivist archivist = new JobPlanHandler.JobPlanJsonArchivist();
-		AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
-
-		Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
-		Assert.assertEquals(1, archives.size());
-
-		ArchivedJson archive = archives.iterator().next();
-		Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/plan", archive.getPath());
-		Assert.assertEquals(originalJob.getJsonPlan(), archive.getJson());
-	}
-
-	@Test
-	public void testGetPaths() {
-		JobPlanHandler handler = new JobPlanHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
-		String[] paths = handler.getPaths();
-		Assert.assertEquals(1, paths.length);
-		Assert.assertEquals("/jobs/:jobid/plan", paths[0]);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandlerTest.java
deleted file mode 100644
index 8c05c83..0000000
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandlerTest.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.util.TestLogger;
-
-import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.List;
-
-/**
- * Tests for the JobStoppingHandler.
- */
-public class JobStoppingHandlerTest extends TestLogger {
-	@Test
-	public void testGetPaths() {
-		JobStoppingHandler handler = new JobStoppingHandler(Executors.directExecutor(), TestingUtils.TIMEOUT());
-		String[] paths = handler.getPaths();
-		Assert.assertEquals(2, paths.length);
-		List<String> pathsList = Lists.newArrayList(paths);
-		Assert.assertTrue(pathsList.contains("/jobs/:jobid/stop"));
-		Assert.assertTrue(pathsList.contains("/jobs/:jobid/yarn-stop"));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java
deleted file mode 100644
index 5af9aa6..0000000
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Collection;
-
-import static org.mockito.Mockito.mock;
-
-/**
- * Tests for the JobVertexAccumulatorsHandler.
- */
-public class JobVertexAccumulatorsHandlerTest {
-
-	@Test
-	public void testArchiver() throws Exception {
-		JsonArchivist archivist = new JobVertexAccumulatorsHandler.JobVertexAccumulatorsJsonArchivist();
-		AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
-		AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask();
-
-		Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
-		Assert.assertEquals(1, archives.size());
-
-		ArchivedJson archive = archives.iterator().next();
-		Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/vertices/" + originalTask.getJobVertexId() + "/accumulators", archive.getPath());
-		compareAccumulators(originalTask, archive.getJson());
-	}
-
-	@Test
-	public void testGetPaths() {
-		JobVertexAccumulatorsHandler handler = new JobVertexAccumulatorsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
-		String[] paths = handler.getPaths();
-		Assert.assertEquals(1, paths.length);
-		Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/accumulators", paths[0]);
-	}
-
-	@Test
-	public void testJsonGeneration() throws Exception {
-		AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask();
-		String json = JobVertexAccumulatorsHandler.createVertexAccumulatorsJson(originalTask);
-
-		compareAccumulators(originalTask, json);
-	}
-
-	private static void compareAccumulators(AccessExecutionJobVertex originalTask, String json) throws IOException {
-		JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(json);
-
-		Assert.assertEquals(originalTask.getJobVertexId().toString(), result.get("id").asText());
-
-		ArrayNode accs = (ArrayNode) result.get("user-accumulators");
-		StringifiedAccumulatorResult[] expectedAccs = originalTask.getAggregatedUserAccumulatorsStringified();
-
-		ArchivedJobGenerationUtils.compareStringifiedAccumulators(expectedAccs, accs);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandlerTest.java
deleted file mode 100644
index 0d15e08..0000000
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandlerTest.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.webmonitor.BackPressureStatsTracker;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.runtime.webmonitor.OperatorBackPressureStats;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Collections;
-
-import scala.Option;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-/**
- * Tests for back pressure handler responses.
- */
-public class JobVertexBackPressureHandlerTest {
-	@Test
-	public void testGetPaths() {
-		JobVertexBackPressureHandler handler = new JobVertexBackPressureHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), mock(BackPressureStatsTracker.class), 0);
-		String[] paths = handler.getPaths();
-		Assert.assertEquals(1, paths.length);
-		Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/backpressure", paths[0]);
-	}
-
-	/** Tests the response when no stats are available. */
-	@Test
-	public void testResponseNoStatsAvailable() throws Exception {
-		ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class);
-		BackPressureStatsTracker statsTracker = mock(BackPressureStatsTracker.class);
-
-		when(statsTracker.getOperatorBackPressureStats(any(ExecutionJobVertex.class)))
-				.thenReturn(Option.<OperatorBackPressureStats>empty());
-
-		JobVertexBackPressureHandler handler = new JobVertexBackPressureHandler(
-				mock(ExecutionGraphHolder.class),
-				Executors.directExecutor(),
-				statsTracker,
-				9999);
-
-		String response = handler.handleRequest(jobVertex, Collections.<String, String>emptyMap()).get();
-
-		ObjectMapper mapper = new ObjectMapper();
-		JsonNode rootNode = mapper.readTree(response);
-
-		// Single element
-		assertEquals(1, rootNode.size());
-
-		// Status
-		JsonNode status = rootNode.get("status");
-		assertNotNull(status);
-		assertEquals("deprecated", status.textValue());
-
-		verify(statsTracker).triggerStackTraceSample(any(ExecutionJobVertex.class));
-	}
-
-	/** Tests the response when stats are available. */
-	@Test
-	public void testResponseStatsAvailable() throws Exception {
-		ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class);
-		BackPressureStatsTracker statsTracker = mock(BackPressureStatsTracker.class);
-
-		OperatorBackPressureStats stats = new OperatorBackPressureStats(
-				0, System.currentTimeMillis(), new double[] { 0.31, 0.48, 1.0, 0.0 });
-
-		when(statsTracker.getOperatorBackPressureStats(any(ExecutionJobVertex.class)))
-				.thenReturn(Option.apply(stats));
-
-		JobVertexBackPressureHandler handler = new JobVertexBackPressureHandler(
-				mock(ExecutionGraphHolder.class),
-				Executors.directExecutor(),
-				statsTracker,
-				9999);
-
-		String response = handler.handleRequest(jobVertex, Collections.<String, String>emptyMap()).get();
-
-		ObjectMapper mapper = new ObjectMapper();
-		JsonNode rootNode = mapper.readTree(response);
-
-		// Single element
-		assertEquals(4, rootNode.size());
-
-		// Status
-		JsonNode status = rootNode.get("status");
-		assertNotNull(status);
-		assertEquals("ok", status.textValue());
-
-		// Back pressure level
-		JsonNode backPressureLevel = rootNode.get("backpressure-level");
-		assertNotNull(backPressureLevel);
-		assertEquals("high", backPressureLevel.textValue());
-
-		// End time stamp
-		JsonNode endTimeStamp = rootNode.get("end-timestamp");
-		assertNotNull(endTimeStamp);
-		assertEquals(stats.getEndTimestamp(), endTimeStamp.longValue());
-
-		// Subtasks
-		JsonNode subTasks = rootNode.get("subtasks");
-		assertEquals(stats.getNumberOfSubTasks(), subTasks.size());
-		for (int i = 0; i < subTasks.size(); i++) {
-			JsonNode subTask = subTasks.get(i);
-
-			JsonNode index = subTask.get("subtask");
-			assertEquals(i, index.intValue());
-
-			JsonNode level = subTask.get("backpressure-level");
-			assertEquals(JobVertexBackPressureHandler
-					.getBackPressureLevel(stats.getBackPressureRatio(i)), level.textValue());
-
-			JsonNode ratio = subTask.get("ratio");
-			assertEquals(stats.getBackPressureRatio(i), ratio.doubleValue(), 0.0);
-		}
-
-		// Verify not triggered
-		verify(statsTracker, never()).triggerStackTraceSample(any(ExecutionJobVertex.class));
-	}
-
-	/** Tests that after the refresh interval another sample is triggered. */
-	@Test
-	public void testResponsePassedRefreshInterval() throws Exception {
-		ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class);
-		BackPressureStatsTracker statsTracker = mock(BackPressureStatsTracker.class);
-
-		OperatorBackPressureStats stats = new OperatorBackPressureStats(
-				0, System.currentTimeMillis(), new double[] { 0.31, 0.48, 1.0, 0.0 });
-
-		when(statsTracker.getOperatorBackPressureStats(any(ExecutionJobVertex.class)))
-				.thenReturn(Option.apply(stats));
-
-		JobVertexBackPressureHandler handler = new JobVertexBackPressureHandler(
-				mock(ExecutionGraphHolder.class),
-				Executors.directExecutor(),
-				statsTracker,
-				0); // <----- refresh interval should fire immediately
-
-		String response = handler.handleRequest(jobVertex, Collections.<String, String>emptyMap()).get();
-
-		ObjectMapper mapper = new ObjectMapper();
-		JsonNode rootNode = mapper.readTree(response);
-
-		// Single element
-		assertEquals(4, rootNode.size());
-
-		// Status
-		JsonNode status = rootNode.get("status");
-		assertNotNull(status);
-		// Interval passed, hence deprecated
-		assertEquals("deprecated", status.textValue());
-
-		// Back pressure level
-		JsonNode backPressureLevel = rootNode.get("backpressure-level");
-		assertNotNull(backPressureLevel);
-		assertEquals("high", backPressureLevel.textValue());
-
-		// End time stamp
-		JsonNode endTimeStamp = rootNode.get("end-timestamp");
-		assertNotNull(endTimeStamp);
-		assertEquals(stats.getEndTimestamp(), endTimeStamp.longValue());
-
-		// Subtasks
-		JsonNode subTasks = rootNode.get("subtasks");
-		assertEquals(stats.getNumberOfSubTasks(), subTasks.size());
-		for (int i = 0; i < subTasks.size(); i++) {
-			JsonNode subTask = subTasks.get(i);
-
-			JsonNode index = subTask.get("subtask");
-			assertEquals(i, index.intValue());
-
-			JsonNode level = subTask.get("backpressure-level");
-			assertEquals(JobVertexBackPressureHandler
-					.getBackPressureLevel(stats.getBackPressureRatio(i)), level.textValue());
-
-			JsonNode ratio = subTask.get("ratio");
-			assertEquals(stats.getBackPressureRatio(i), ratio.doubleValue(), 0.0);
-		}
-
-		// Verify triggered
-		verify(statsTracker).triggerStackTraceSample(any(ExecutionJobVertex.class));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java
deleted file mode 100644
index 1b8d9aa..0000000
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Collection;
-
-import static org.mockito.Mockito.mock;
-
-/**
- * Tests for the JobVertexDetailsHandler.
- */
-public class JobVertexDetailsHandlerTest {
-
-	@Test
-	public void testArchiver() throws Exception {
-		JsonArchivist archivist = new JobVertexDetailsHandler.JobVertexDetailsJsonArchivist();
-		AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
-		AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask();
-
-		Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
-		Assert.assertEquals(1, archives.size());
-
-		ArchivedJson archive = archives.iterator().next();
-		Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/vertices/" + originalTask.getJobVertexId(), archive.getPath());
-		compareVertexDetails(originalTask, archive.getJson());
-	}
-
-	@Test
-	public void testGetPaths() {
-		JobVertexDetailsHandler handler = new JobVertexDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), null);
-		String[] paths = handler.getPaths();
-		Assert.assertEquals(1, paths.length);
-		Assert.assertEquals("/jobs/:jobid/vertices/:vertexid", paths[0]);
-	}
-
-	@Test
-	public void testJsonGeneration() throws Exception {
-		AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask();
-		String json = JobVertexDetailsHandler.createVertexDetailsJson(
-			originalTask, ArchivedJobGenerationUtils.getTestJob().getJobID().toString(), null);
-
-		compareVertexDetails(originalTask, json);
-	}
-
-	private static void compareVertexDetails(AccessExecutionJobVertex originalTask, String json) throws IOException {
-		JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(json);
-
-		Assert.assertEquals(originalTask.getJobVertexId().toString(), result.get("id").asText());
-		Assert.assertEquals(originalTask.getName(), result.get("name").asText());
-		Assert.assertEquals(originalTask.getParallelism(), result.get("parallelism").asInt());
-		Assert.assertTrue(result.get("now").asLong() > 0);
-
-		ArrayNode subtasks = (ArrayNode) result.get("subtasks");
-
-		Assert.assertEquals(originalTask.getTaskVertices().length, subtasks.size());
-		for (int x = 0; x < originalTask.getTaskVertices().length; x++) {
-			AccessExecutionVertex expectedSubtask = originalTask.getTaskVertices()[x];
-			JsonNode subtask = subtasks.get(x);
-
-			Assert.assertEquals(x, subtask.get("subtask").asInt());
-			Assert.assertEquals(expectedSubtask.getExecutionState().name(), subtask.get("status").asText());
-			Assert.assertEquals(expectedSubtask.getCurrentExecutionAttempt().getAttemptNumber(), subtask.get("attempt").asInt());
-
-			TaskManagerLocation location = expectedSubtask.getCurrentAssignedResourceLocation();
-			String expectedLocationString = location.getHostname() + ":" + location.dataPort();
-			Assert.assertEquals(expectedLocationString, subtask.get("host").asText());
-			long start = expectedSubtask.getStateTimestamp(ExecutionState.DEPLOYING);
-			Assert.assertEquals(start, subtask.get("start-time").asLong());
-			long end = expectedSubtask.getStateTimestamp(ExecutionState.FINISHED);
-			Assert.assertEquals(end, subtask.get("end-time").asLong());
-			Assert.assertEquals(end - start, subtask.get("duration").asLong());
-
-			ArchivedJobGenerationUtils.compareIoMetrics(expectedSubtask.getCurrentExecutionAttempt().getIOMetrics(), subtask.get("metrics"));
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java
deleted file mode 100644
index badb952..0000000
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
-import org.apache.flink.runtime.executiongraph.IOMetrics;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Collection;
-
-import static org.mockito.Mockito.mock;
-
-/**
- * Tests for the JobVertexTaskManagersHandler.
- */
-public class JobVertexTaskManagersHandlerTest {
-
-	@Test
-	public void testArchiver() throws Exception {
-		JsonArchivist archivist = new JobVertexTaskManagersHandler.JobVertexTaskManagersJsonArchivist();
-		AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
-		AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask();
-		AccessExecutionVertex originalSubtask = ArchivedJobGenerationUtils.getTestSubtask();
-
-		Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
-		Assert.assertEquals(1, archives.size());
-
-		ArchivedJson archive = archives.iterator().next();
-		Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/vertices/" + originalTask.getJobVertexId() + "/taskmanagers", archive.getPath());
-		compareVertexTaskManagers(originalTask, originalSubtask, archive.getJson());
-	}
-
-	@Test
-	public void testGetPaths() {
-		JobVertexTaskManagersHandler handler = new JobVertexTaskManagersHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), null);
-		String[] paths = handler.getPaths();
-		Assert.assertEquals(1, paths.length);
-		Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/taskmanagers", paths[0]);
-	}
-
-	@Test
-	public void testJsonGeneration() throws Exception {
-		AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask();
-		AccessExecutionVertex originalSubtask = ArchivedJobGenerationUtils.getTestSubtask();
-		String json = JobVertexTaskManagersHandler.createVertexDetailsByTaskManagerJson(
-			originalTask, ArchivedJobGenerationUtils.getTestJob().getJobID().toString(), null);
-
-		compareVertexTaskManagers(originalTask, originalSubtask, json);
-	}
-
-	private static void compareVertexTaskManagers(AccessExecutionJobVertex originalTask, AccessExecutionVertex originalSubtask, String json) throws IOException {
-		JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(json);
-
-		Assert.assertEquals(originalTask.getJobVertexId().toString(), result.get("id").asText());
-		Assert.assertEquals(originalTask.getName(), result.get("name").asText());
-		Assert.assertTrue(result.get("now").asLong() > 0);
-
-		ArrayNode taskmanagers = (ArrayNode) result.get("taskmanagers");
-
-		JsonNode taskManager = taskmanagers.get(0);
-
-		TaskManagerLocation location = originalSubtask.getCurrentAssignedResourceLocation();
-		String expectedLocationString = location.getHostname() + ':' + location.dataPort();
-		Assert.assertEquals(expectedLocationString, taskManager.get("host").asText());
-		Assert.assertEquals(ExecutionState.FINISHED.name(), taskManager.get("status").asText());
-
-		Assert.assertEquals(3, taskManager.get("start-time").asLong());
-		Assert.assertEquals(5, taskManager.get("end-time").asLong());
-		Assert.assertEquals(2, taskManager.get("duration").asLong());
-
-		JsonNode statusCounts = taskManager.get("status-counts");
-		Assert.assertEquals(0, statusCounts.get(ExecutionState.CREATED.name()).asInt());
-		Assert.assertEquals(0, statusCounts.get(ExecutionState.SCHEDULED.name()).asInt());
-		Assert.assertEquals(0, statusCounts.get(ExecutionState.DEPLOYING.name()).asInt());
-		Assert.assertEquals(0, statusCounts.get(ExecutionState.RUNNING.name()).asInt());
-		Assert.assertEquals(1, statusCounts.get(ExecutionState.FINISHED.name()).asInt());
-		Assert.assertEquals(0, statusCounts.get(ExecutionState.CANCELING.name()).asInt());
-		Assert.assertEquals(0, statusCounts.get(ExecutionState.CANCELED.name()).asInt());
-		Assert.assertEquals(0, statusCounts.get(ExecutionState.FAILED.name()).asInt());
-
-		long expectedNumBytesIn = 0;
-		long expectedNumBytesOut = 0;
-		long expectedNumRecordsIn = 0;
-		long expectedNumRecordsOut = 0;
-
-		for (AccessExecutionVertex vertex : originalTask.getTaskVertices()) {
-			IOMetrics ioMetrics = vertex.getCurrentExecutionAttempt().getIOMetrics();
-
-			expectedNumBytesIn += ioMetrics.getNumBytesInLocal() + ioMetrics.getNumBytesInRemote();
-			expectedNumBytesOut += ioMetrics.getNumBytesOut();
-			expectedNumRecordsIn += ioMetrics.getNumRecordsIn();
-			expectedNumRecordsOut += ioMetrics.getNumRecordsOut();
-		}
-
-		JsonNode metrics = taskManager.get("metrics");
-
-		Assert.assertEquals(expectedNumBytesIn, metrics.get("read-bytes").asLong());
-		Assert.assertEquals(expectedNumBytesOut, metrics.get("write-bytes").asLong());
-		Assert.assertEquals(expectedNumRecordsIn, metrics.get("read-records").asLong());
-		Assert.assertEquals(expectedNumRecordsOut, metrics.get("write-records").asLong());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandlerTest.java
deleted file mode 100644
index a80bac9..0000000
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandlerTest.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import static org.mockito.Mockito.mock;
-
-/**
- * Tests for the SubtaskCurrentAttemptDetailsHandler.
- */
-public class SubtaskCurrentAttemptDetailsHandlerTest {
-	@Test
-	public void testGetPaths() {
-		SubtaskCurrentAttemptDetailsHandler handler = new SubtaskCurrentAttemptDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), null);
-		String[] paths = handler.getPaths();
-		Assert.assertEquals(1, paths.length);
-		Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum", paths[0]);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
deleted file mode 100644
index 6773fd4..0000000
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.executiongraph.AccessExecution;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Collection;
-
-import static org.mockito.Mockito.mock;
-
-/**
- * Tests for the SubtaskExecutionAttemptAccumulatorsHandler.
- */
-public class SubtaskExecutionAttemptAccumulatorsHandlerTest {
-
-	@Test
-	public void testArchiver() throws Exception {
-		JsonArchivist archivist = new SubtaskExecutionAttemptAccumulatorsHandler.SubtaskExecutionAttemptAccumulatorsJsonArchivist();
-		AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
-		AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask();
-		AccessExecution originalAttempt = ArchivedJobGenerationUtils.getTestAttempt();
-
-		Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
-		Assert.assertEquals(1, archives.size());
-
-		ArchivedJson archive = archives.iterator().next();
-		Assert.assertEquals(
-			"/jobs/" + originalJob.getJobID() +
-			"/vertices/" + originalTask.getJobVertexId() +
-			"/subtasks/" + originalAttempt.getParallelSubtaskIndex() +
-			"/attempts/" + originalAttempt.getAttemptNumber() +
-			"/accumulators",
-			archive.getPath());
-		compareAttemptAccumulators(originalAttempt, archive.getJson());
-	}
-
-	@Test
-	public void testGetPaths() {
-		SubtaskExecutionAttemptAccumulatorsHandler handler = new SubtaskExecutionAttemptAccumulatorsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
-		String[] paths = handler.getPaths();
-		Assert.assertEquals(1, paths.length);
-		Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt/accumulators", paths[0]);
-	}
-
-	@Test
-	public void testJsonGeneration() throws Exception {
-		AccessExecution originalAttempt = ArchivedJobGenerationUtils.getTestAttempt();
-		String json = SubtaskExecutionAttemptAccumulatorsHandler.createAttemptAccumulatorsJson(originalAttempt);
-
-		compareAttemptAccumulators(originalAttempt, json);
-	}
-
-	private static void compareAttemptAccumulators(AccessExecution originalAttempt, String json) throws IOException {
-		JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(json);
-
-		Assert.assertEquals(originalAttempt.getParallelSubtaskIndex(), result.get("subtask").asInt());
-		Assert.assertEquals(originalAttempt.getAttemptNumber(), result.get("attempt").asInt());
-		Assert.assertEquals(originalAttempt.getAttemptId().toString(), result.get("id").asText());
-
-		ArchivedJobGenerationUtils.compareStringifiedAccumulators(originalAttempt.getUserAccumulatorsStringified(), (ArrayNode) result.get("user-accumulators"));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java
deleted file mode 100644
index 7777d2d..0000000
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.AccessExecution;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Iterator;
-
-import static org.mockito.Mockito.mock;
-
-/**
- * Tests for the SubtaskExecutionAttemptDetailsHandler.
- */
-public class SubtaskExecutionAttemptDetailsHandlerTest {
-
-	@Test
-	public void testArchiver() throws Exception {
-		JsonArchivist archivist = new SubtaskExecutionAttemptDetailsHandler.SubtaskExecutionAttemptDetailsJsonArchivist();
-		AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
-		AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask();
-		AccessExecution originalAttempt = ArchivedJobGenerationUtils.getTestAttempt();
-
-		Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
-		Assert.assertEquals(2, archives.size());
-
-		Iterator<ArchivedJson> iterator = archives.iterator();
-		ArchivedJson archive1 = iterator.next();
-		Assert.assertEquals(
-			"/jobs/" + originalJob.getJobID() +
-				"/vertices/" + originalTask.getJobVertexId() +
-				"/subtasks/" + originalAttempt.getParallelSubtaskIndex(),
-			archive1.getPath());
-		compareAttemptDetails(originalAttempt, archive1.getJson());
-
-		ArchivedJson archive2 = iterator.next();
-		Assert.assertEquals(
-			"/jobs/" + originalJob.getJobID() +
-				"/vertices/" + originalTask.getJobVertexId() +
-				"/subtasks/" + originalAttempt.getParallelSubtaskIndex() +
-				"/attempts/" + originalAttempt.getAttemptNumber(),
-			archive2.getPath());
-		compareAttemptDetails(originalAttempt, archive2.getJson());
-	}
-
-	@Test
-	public void testGetPaths() {
-		SubtaskExecutionAttemptDetailsHandler handler = new SubtaskExecutionAttemptDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(),  null);
-		String[] paths = handler.getPaths();
-		Assert.assertEquals(1, paths.length);
-		Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt", paths[0]);
-	}
-
-	@Test
-	public void testJsonGeneration() throws Exception {
-		AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
-		AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask();
-		AccessExecution originalAttempt = ArchivedJobGenerationUtils.getTestAttempt();
-		String json = SubtaskExecutionAttemptDetailsHandler.createAttemptDetailsJson(
-			originalAttempt, originalJob.getJobID().toString(), originalTask.getJobVertexId().toString(), null);
-
-		compareAttemptDetails(originalAttempt, json);
-	}
-
-	private static void compareAttemptDetails(AccessExecution originalAttempt, String json) throws IOException {
-		JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(json);
-
-		Assert.assertEquals(originalAttempt.getParallelSubtaskIndex(), result.get("subtask").asInt());
-		Assert.assertEquals(originalAttempt.getState().name(), result.get("status").asText());
-		Assert.assertEquals(originalAttempt.getAttemptNumber(), result.get("attempt").asInt());
-		Assert.assertEquals(originalAttempt.getAssignedResourceLocation().getHostname(), result.get("host").asText());
-		long start = originalAttempt.getStateTimestamp(ExecutionState.DEPLOYING);
-		Assert.assertEquals(start, result.get("start-time").asLong());
-		long end = originalAttempt.getStateTimestamp(ExecutionState.FINISHED);
-		Assert.assertEquals(end, result.get("end-time").asLong());
-		Assert.assertEquals(end - start, result.get("duration").asLong());
-
-		ArchivedJobGenerationUtils.compareIoMetrics(originalAttempt.getIOMetrics(), result.get("metrics"));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java
deleted file mode 100644
index 7b400da..0000000
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Collection;
-
-import static org.mockito.Mockito.mock;
-
-/**
- * Tests for the SubtasksAllAccumulatorsHandler.
- */
-public class SubtasksAllAccumulatorsHandlerTest {
-
-	@Test
-	public void testArchiver() throws Exception {
-		JsonArchivist archivist = new SubtasksAllAccumulatorsHandler.SubtasksAllAccumulatorsJsonArchivist();
-		AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
-		AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask();
-
-		Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
-		Assert.assertEquals(1, archives.size());
-
-		ArchivedJson archive = archives.iterator().next();
-		Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/vertices/" + originalTask.getJobVertexId() +
-			"/subtasks/accumulators", archive.getPath());
-		compareSubtaskAccumulators(originalTask, archive.getJson());
-	}
-
-	@Test
-	public void testGetPaths() {
-		SubtasksAllAccumulatorsHandler handler = new SubtasksAllAccumulatorsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
-		String[] paths = handler.getPaths();
-		Assert.assertEquals(1, paths.length);
-		Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/accumulators", paths[0]);
-	}
-
-	@Test
-	public void testJsonGeneration() throws Exception {
-		AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask();
-		String json = SubtasksAllAccumulatorsHandler.createSubtasksAccumulatorsJson(originalTask);
-		compareSubtaskAccumulators(originalTask, json);
-	}
-
-	private static void compareSubtaskAccumulators(AccessExecutionJobVertex originalTask, String json) throws IOException {
-		JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(json);
-
-		Assert.assertEquals(originalTask.getJobVertexId().toString(), result.get("id").asText());
-		Assert.assertEquals(originalTask.getParallelism(), result.get("parallelism").asInt());
-
-		ArrayNode subtasks = (ArrayNode) result.get("subtasks");
-
-		Assert.assertEquals(originalTask.getTaskVertices().length, subtasks.size());
-		for (int x = 0; x < originalTask.getTaskVertices().length; x++) {
-			JsonNode subtask = subtasks.get(x);
-			AccessExecutionVertex expectedSubtask = originalTask.getTaskVertices()[x];
-
-			Assert.assertEquals(x, subtask.get("subtask").asInt());
-			Assert.assertEquals(expectedSubtask.getCurrentExecutionAttempt().getAttemptNumber(), subtask.get("attempt").asInt());
-			Assert.assertEquals(expectedSubtask.getCurrentAssignedResourceLocation().getHostname(), subtask.get("host").asText());
-
-			ArchivedJobGenerationUtils.compareStringifiedAccumulators(
-				expectedSubtask.getCurrentExecutionAttempt().getUserAccumulatorsStringified(),
-				(ArrayNode) subtask.get("user-accumulators"));
-		}
-	}
-}


Mime
View raw message