flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [3/9] flink git commit: [FLINK-3160] [web-dashboard] Aggregate operator statistics by TaskManager
Date Mon, 15 Feb 2016 12:56:57 GMT
[FLINK-3160] [web-dashboard] Aggregate operator statistics by TaskManager

Adds a new per-job tab displaying subtask statistics aggregated by TaskManager.

This closes #1564.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/73bc35f1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/73bc35f1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/73bc35f1

Branch: refs/heads/master
Commit: 73bc35f14c547c14ddf82660160e554404e54b8c
Parents: babf84c
Author: Greg Hogan <code@greghogan.com>
Authored: Thu Feb 11 15:32:34 2016 -0500
Committer: Ufuk Celebi <uce@apache.org>
Committed: Mon Feb 15 13:55:00 2016 +0100

----------------------------------------------------------------------
 docs/internals/monitoring_rest_api.md           |  41 +++++
 .../runtime/webmonitor/WebRuntimeMonitor.java   |   2 +
 .../webmonitor/handlers/JarDeleteHandler.java   |   2 +-
 .../webmonitor/handlers/JarRunHandler.java      |   2 +-
 .../handlers/JobVertexDetailsHandler.java       |   2 +-
 .../handlers/JobVertexTaskManagersHandler.java  | 175 +++++++++++++++++++
 .../app/partials/jobs/completed-jobs.jade       |   2 +-
 .../web-dashboard/app/partials/jobs/job.jade    |   2 +-
 .../app/partials/jobs/job.plan.jade             |   5 +-
 .../jobs/job.plan.node-list.overview.jade       |  54 ------
 .../jobs/job.plan.node-list.subtasks.jade       |  54 ++++++
 .../jobs/job.plan.node-list.taskmanagers.jade   |  54 ++++++
 .../partials/jobs/job.plan.node.subtasks.jade   |   2 +-
 .../jobs/job.plan.node.taskmanagers.jade        |  59 +++++++
 .../app/partials/jobs/running-jobs.jade         |   2 +-
 .../web-dashboard/app/partials/overview.jade    |   4 +-
 .../web-dashboard/app/scripts/index.coffee      |  13 +-
 .../app/scripts/modules/jobs/jobs.ctrl.coffee   |  21 ++-
 .../app/scripts/modules/jobs/jobs.svc.coffee    |  14 ++
 .../scripts/modules/submit/submit.ctrl.coffee   |   2 +-
 flink-runtime-web/web-dashboard/web/js/index.js |  53 +++++-
 .../web/partials/jobs/completed-jobs.html       |   2 +-
 .../web-dashboard/web/partials/jobs/job.html    |   2 +-
 .../web/partials/jobs/job.plan.html             |   3 +-
 .../jobs/job.plan.node-list.subtasks.html       |  60 +++++++
 .../jobs/job.plan.node-list.taskmanagers.html   |  60 +++++++
 .../partials/jobs/job.plan.node.subtasks.html   |   2 +-
 .../jobs/job.plan.node.taskmanagers.html        |  55 ++++++
 .../web/partials/jobs/running-jobs.html         |   2 +-
 .../web-dashboard/web/partials/overview.html    |   4 +-
 30 files changed, 669 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/73bc35f1/docs/internals/monitoring_rest_api.md
----------------------------------------------------------------------
diff --git a/docs/internals/monitoring_rest_api.md b/docs/internals/monitoring_rest_api.md
index 70952f5..f53d46f 100644
--- a/docs/internals/monitoring_rest_api.md
+++ b/docs/internals/monitoring_rest_api.md
@@ -67,7 +67,9 @@ Values in angle brackets are variables, for example `http://hostname:8081/jobs/<
   - `/jobs/<jobid>/config`
   - `/jobs/<jobid>/exceptions`
   - `/jobs/<jobid>/accumulators`
+  - `/jobs/<jobid>/vertices/<vertexid>`
   - `/jobs/<jobid>/vertices/<vertexid>/subtasktimes`
+  - `/jobs/<jobid>/vertices/<vertexid>/taskmanagers`
   - `/jobs/<jobid>/vertices/<vertexid>/accumulators`
   - `/jobs/<jobid>/vertices/<vertexid>/subtasks/accumulators`
   - `/jobs/<jobid>/vertices/<vertexid>/subtasks/<subtasknum>`
@@ -364,6 +366,45 @@ Sample Result:
 }
 ~~~
 
+**`/jobs/<jobid>/vertices/<vertexid>/taskmanagers`**
+
+TaskManager statistics for one specific vertex. This is an aggregation of subtask statistics
returned by `/jobs/<jobid>/vertices/<vertexid>`.
+
+Sample Result:
+
+~~~
+{
+  "id": "fe20bcc29b87cdc76589ca42114c2499",
+  "name": "Reduce (SUM(1), at main(WordCount.java:72)",
+  "now": 1454348282653,
+  "taskmanagers": [ {
+    "host": "ip-10-0-43-227:35413",
+    "status": "FINISHED",
+    "start-time": 1454347870991,
+    "end-time": 1454347872111,
+    "duration": 1120,
+    "metrics": {
+      "read-bytes": 32503056, "write-bytes": 9637041, "read-records": 2906087, "write-records":
849467
+    },
+    "status-counts": {
+      "CREATED": 0, "SCHEDULED": 0, "DEPLOYING": 0, "RUNNING": 0, "FINISHED": 18, "CANCELING":
0, "CANCELED": 0, "FAILED": 0
+    }
+  },{
+    "host": "ip-10-0-43-227:41486",
+    "status": "FINISHED",
+    "start-time": 1454347871001,
+    "end-time": 1454347872395,
+    "duration": 1394,
+    "metrics": {
+      "read-bytes": 32389499, "write-bytes": 9608829, "read-records": 2895999, "write-records":
846948
+    },
+    "status-counts": {
+      "CREATED": 0, "SCHEDULED": 0, "DEPLOYING": 0, "RUNNING": 0, "FINISHED": 18, "CANCELING":
0, "CANCELED": 0, "FAILED": 0
+    }
+  } ]
+}
+~~~
+
 **`/jobs/<jobid>/vertices/<vertexid>/accumulators`**
 
 The aggregated user-defined accumulators, for a specific vertex.

http://git-wip-us.apache.org/repos/asf/flink/blob/73bc35f1/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 08ed2f9..0b5de1f 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -57,6 +57,7 @@ import org.apache.flink.runtime.webmonitor.handlers.JobVertexAccumulatorsHandler
 import org.apache.flink.runtime.webmonitor.handlers.JobVertexBackPressureHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JobVertexCheckpointsHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JobVertexDetailsHandler;
+import org.apache.flink.runtime.webmonitor.handlers.JobVertexTaskManagersHandler;
 import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
 import org.apache.flink.runtime.webmonitor.handlers.SubtaskCurrentAttemptDetailsHandler;
 import org.apache.flink.runtime.webmonitor.handlers.SubtaskExecutionAttemptAccumulatorsHandler;
@@ -213,6 +214,7 @@ public class WebRuntimeMonitor implements WebMonitor {
 
 			.GET("/jobs/:jobid/vertices/:vertexid", handler(new JobVertexDetailsHandler(currentGraphs)))
 			.GET("/jobs/:jobid/vertices/:vertexid/subtasktimes", handler(new SubtasksTimesHandler(currentGraphs)))
+			.GET("/jobs/:jobid/vertices/:vertexid/taskmanagers", handler(new JobVertexTaskManagersHandler(currentGraphs)))
 			.GET("/jobs/:jobid/vertices/:vertexid/accumulators", handler(new JobVertexAccumulatorsHandler(currentGraphs)))
 			.GET("/jobs/:jobid/vertices/:vertexid/checkpoints", handler(new JobVertexCheckpointsHandler(currentGraphs)))
 			.GET("/jobs/:jobid/vertices/:vertexid/backpressure", handler(new JobVertexBackPressureHandler(

http://git-wip-us.apache.org/repos/asf/flink/blob/73bc35f1/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
index 6599fbd..6e6c520 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
@@ -53,7 +53,7 @@ public class JarDeleteHandler implements RequestHandler {
 				success = success || f.delete();
 			}
 			StringWriter writer = new StringWriter();
-			JsonGenerator gen = JsonFactory.jacksonFactory.createJsonGenerator(writer);
+			JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
 			gen.writeStartObject();
 			if (!success) {
 				// this seems to always fail on Windows.

http://git-wip-us.apache.org/repos/asf/flink/blob/73bc35f1/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
index 482725f..d7b8e72 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
@@ -61,7 +61,7 @@ public class JarRunHandler extends JarActionHandler {
 			}
 
 			StringWriter writer = new StringWriter();
-			JsonGenerator gen = JsonFactory.jacksonFactory.createJsonGenerator(writer);
+			JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
 			gen.writeStartObject();
 			gen.writeStringField("jobid", graph.f0.getJobID().toString());
 			gen.writeEndObject();

http://git-wip-us.apache.org/repos/asf/flink/blob/73bc35f1/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
index 4190cff..d4e885e 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
@@ -62,7 +62,7 @@ public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler
{
 			final ExecutionState status = vertex.getExecutionState();
 			
 			InstanceConnectionInfo location = vertex.getCurrentAssignedResourceLocation();
-			String locationString = location == null ? "(unassigned)" : location.getHostname();
+			String locationString = location == null ? "(unassigned)" : location.getHostname() + ":"
+ location.dataPort();
 
 			long startTime = vertex.getStateTimestamp(ExecutionState.DEPLOYING);
 			if (startTime == 0) {

http://git-wip-us.apache.org/repos/asf/flink/blob/73bc35f1/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
new file mode 100644
index 0000000..befc0bf
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * A request handler that provides the details of a job vertex, including id, name, and the
+ * runtime and metrics of all its subtasks aggregated by TaskManager.
+ */
+public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandler {
+
+	public JobVertexTaskManagersHandler(ExecutionGraphHolder executionGraphHolder) {
+		super(executionGraphHolder);
+	}
+
+	@Override
+	public String handleRequest(ExecutionJobVertex jobVertex, Map<String, String> params)
throws Exception {
+		// Build a map that groups tasks by TaskManager
+		Map<String, List<ExecutionVertex>> taskManagerVertices = new HashMap<>();
+
+		for (ExecutionVertex vertex : jobVertex.getTaskVertices()) {
+			InstanceConnectionInfo location = vertex.getCurrentAssignedResourceLocation();
+			String taskManager = location == null ? "(unassigned)" : location.getHostname() + ":"
+ location.dataPort();
+
+			List<ExecutionVertex> vertices = taskManagerVertices.get(taskManager);
+
+			if (vertices == null) {
+				vertices = new ArrayList<ExecutionVertex>();
+				taskManagerVertices.put(taskManager, vertices);
+			}
+
+			vertices.add(vertex);
+		}
+
+		// Build JSON response
+		final long now = System.currentTimeMillis();
+
+		StringWriter writer = new StringWriter();
+		JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
+
+		gen.writeStartObject();
+
+		gen.writeStringField("id", jobVertex.getJobVertexId().toString());
+		gen.writeStringField("name", jobVertex.getJobVertex().getName());
+		gen.writeNumberField("now", now);
+
+		gen.writeArrayFieldStart("taskmanagers");
+		for (Entry<String, List<ExecutionVertex>> entry : taskManagerVertices.entrySet())
{
+			String host = entry.getKey();
+			List<ExecutionVertex> taskVertices = entry.getValue();
+
+			int[] tasksPerState = new int[ExecutionState.values().length];
+
+			long startTime = Long.MAX_VALUE;
+			long endTime = 0;
+			boolean allFinished = true;
+
+			LongCounter tmReadBytes = new LongCounter();
+			LongCounter tmWriteBytes = new LongCounter();
+			LongCounter tmReadRecords = new LongCounter();
+			LongCounter tmWriteRecords = new LongCounter();
+
+			for (ExecutionVertex vertex : taskVertices) {
+				final ExecutionState state = vertex.getExecutionState();
+				tasksPerState[state.ordinal()]++;
+
+				// take the earliest start time
+				long started = vertex.getStateTimestamp(ExecutionState.DEPLOYING);
+				if (started > 0) {
+					startTime = Math.min(startTime, started);
+				}
+
+				allFinished &= state.isTerminal();
+				endTime = Math.max(endTime, vertex.getStateTimestamp(state));
+
+				Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> metrics = vertex.getCurrentExecutionAttempt().getFlinkAccumulators();
+
+				if (metrics != null) {
+					LongCounter readBytes = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_BYTES_IN);
+					tmReadBytes.merge(readBytes);
+
+					LongCounter writeBytes = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_BYTES_OUT);
+					tmWriteBytes.merge(writeBytes);
+
+					LongCounter readRecords = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_RECORDS_IN);
+					tmReadRecords.merge(readRecords);
+
+					LongCounter writeRecords = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_RECORDS_OUT);
+					tmWriteRecords.merge(writeRecords);
+				}
+			}
+
+			long duration;
+			if (startTime < Long.MAX_VALUE) {
+				if (allFinished) {
+					duration = endTime - startTime;
+				}
+				else {
+					endTime = -1L;
+					duration = now - startTime;
+				}
+			}
+			else {
+				startTime = -1L;
+				endTime = -1L;
+				duration = -1L;
+			}
+
+			ExecutionState jobVertexState =
+				ExecutionJobVertex.getAggregateJobVertexState(tasksPerState, taskVertices.size());
+
+			gen.writeStartObject();
+
+			gen.writeStringField("host", host);
+			gen.writeStringField("status", jobVertexState.name());
+
+			gen.writeNumberField("start-time", startTime);
+			gen.writeNumberField("end-time", endTime);
+			gen.writeNumberField("duration", duration);
+
+			gen.writeObjectFieldStart("metrics");
+			gen.writeNumberField("read-bytes", tmReadBytes.getLocalValuePrimitive());
+			gen.writeNumberField("write-bytes", tmWriteBytes.getLocalValuePrimitive());
+			gen.writeNumberField("read-records", tmReadRecords.getLocalValuePrimitive());
+			gen.writeNumberField("write-records", tmWriteRecords.getLocalValuePrimitive());
+			gen.writeEndObject();
+
+			gen.writeObjectFieldStart("status-counts");
+			for (ExecutionState state : ExecutionState.values()) {
+				gen.writeNumberField(state.name(), tasksPerState[state.ordinal()]);
+			}
+			gen.writeEndObject();
+
+			gen.writeEndObject();
+		}
+		gen.writeEndArray();
+
+		gen.writeEndObject();
+
+		gen.close();
+		return writer.toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73bc35f1/flink-runtime-web/web-dashboard/app/partials/jobs/completed-jobs.jade
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/partials/jobs/completed-jobs.jade b/flink-runtime-web/web-dashboard/app/partials/jobs/completed-jobs.jade
index 06bc55f..dbeb433 100644
--- a/flink-runtime-web/web-dashboard/app/partials/jobs/completed-jobs.jade
+++ b/flink-runtime-web/web-dashboard/app/partials/jobs/completed-jobs.jade
@@ -35,7 +35,7 @@ nav.navbar.navbar-default.navbar-fixed-top.navbar-main
         th Status
 
     tbody
-      tr(ng-repeat="job in jobs|orderBy:\"'end-time'\":true" ui-sref="single-job.plan.overview({
jobid: job.jid })")
+      tr(ng-repeat="job in jobs|orderBy:\"'end-time'\":true" ui-sref="single-job.plan.subtasks({
jobid: job.jid })")
         td {{job['start-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss'}}
         td {{job['end-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss'}}
         td(title="{{job.duration | humanizeDuration:false}}") {{job.duration | humanizeDuration:true}}

http://git-wip-us.apache.org/repos/asf/flink/blob/73bc35f1/flink-runtime-web/web-dashboard/app/partials/jobs/job.jade
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/partials/jobs/job.jade b/flink-runtime-web/web-dashboard/app/partials/jobs/job.jade
index ffc8962..5b541ae 100644
--- a/flink-runtime-web/web-dashboard/app/partials/jobs/job.jade
+++ b/flink-runtime-web/web-dashboard/app/partials/jobs/job.jade
@@ -46,7 +46,7 @@ nav.navbar.navbar-default.navbar-fixed-top.navbar-main(ng-if="job")
 nav.navbar.navbar-default.navbar-fixed-top.navbar-main-additional(ng-if="job")
   ul.nav.nav-tabs
     li(ui-sref-active='active')
-      a(ui-sref=".plan.overview") Plan
+      a(ui-sref=".plan.subtasks") Plan
 
     //- li(ui-sref-active='active' ng-if="job['end-time'] > -1")
     li(ui-sref-active='active')

http://git-wip-us.apache.org/repos/asf/flink/blob/73bc35f1/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.jade
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.jade b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.jade
index 46eeec6..d0576e7 100644
--- a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.jade
+++ b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.jade
@@ -22,7 +22,10 @@
   nav.navbar.navbar-default.navbar-secondary-additional
     ul.nav.nav-tabs
       li(ui-sref-active='active')
-        a(ui-sref=".overview({nodeid: nodeid})") Overview
+        a(ui-sref=".subtasks({nodeid: nodeid})") Subtasks
+
+      li(ui-sref-active='active')
+        a(ui-sref=".taskmanagers({nodeid: nodeid})") TaskManagers
 
       li(ui-sref-active='active')
         a(ui-sref=".accumulators({nodeid: nodeid})") Accumulators

http://git-wip-us.apache.org/repos/asf/flink/blob/73bc35f1/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.overview.jade
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.overview.jade
b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.overview.jade
deleted file mode 100644
index ef9257d..0000000
--- a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.overview.jade
+++ /dev/null
@@ -1,54 +0,0 @@
-//
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
- 
-      http://www.apache.org/licenses/LICENSE-2.0
- 
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
-
-table.table.table-body-hover.table-clickable.table-activable
-  thead
-    tr
-      th Start Time
-      th End Time
-      th Duration
-      th Name
-      th Bytes received
-      th Records received
-      th Bytes sent
-      th Records sent
-      th Tasks
-      th Status
-
-  tbody(ng-repeat="v in job.vertices" ng-class="{ active: v.id == nodeid }" ng-click="changeNode(v.id)")
-    tr(ng-if="v.type == 'regular'")
-      td
-        span(ng-if="v['start-time'] > -1") {{ v['start-time'] | amDateFormat:'YYYY-MM-DD,
H:mm:ss' }}
-      td
-        span(ng-if="v['end-time'] > -1") {{ v['end-time'] | amDateFormat:'YYYY-MM-DD,
H:mm:ss' }}
-      td
-        span(ng-if="v.duration > -1" title="{{v.duration | humanizeDuration:false}}")
{{v.duration | humanizeDuration:true}}
-
-      td.td-long {{ v.name | humanizeText }}
-      td(title="{{v.metrics['read-bytes']}} bytes") {{ v.metrics['read-bytes'] | humanizeBytes
}}
-      td {{ v.metrics['read-records'] | number }}
-      td(title="{{v.metrics['write-bytes']}} bytes") {{ v.metrics['write-bytes'] | humanizeBytes
}}
-      td {{ v.metrics['write-records'] | number }}
-      td
-        .label-group
-          bs-label(status="{{status}}" ng-repeat="(index, status) in stateList") {{v.tasks[status]}}
-
-      td 
-        bs-label(status="{{v.status}}") {{v.status}}
-    tr(ng-if="nodeid && v.id == nodeid")
-      td(colspan="10")
-        div(ng-include=" 'partials/jobs/job.plan.node.subtasks.html' ")

http://git-wip-us.apache.org/repos/asf/flink/blob/73bc35f1/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.subtasks.jade
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.subtasks.jade
b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.subtasks.jade
new file mode 100644
index 0000000..ef9257d
--- /dev/null
+++ b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.subtasks.jade
@@ -0,0 +1,54 @@
+//
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+ 
+      http://www.apache.org/licenses/LICENSE-2.0
+ 
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+
+table.table.table-body-hover.table-clickable.table-activable
+  thead
+    tr
+      th Start Time
+      th End Time
+      th Duration
+      th Name
+      th Bytes received
+      th Records received
+      th Bytes sent
+      th Records sent
+      th Tasks
+      th Status
+
+  tbody(ng-repeat="v in job.vertices" ng-class="{ active: v.id == nodeid }" ng-click="changeNode(v.id)")
+    tr(ng-if="v.type == 'regular'")
+      td
+        span(ng-if="v['start-time'] > -1") {{ v['start-time'] | amDateFormat:'YYYY-MM-DD,
H:mm:ss' }}
+      td
+        span(ng-if="v['end-time'] > -1") {{ v['end-time'] | amDateFormat:'YYYY-MM-DD,
H:mm:ss' }}
+      td
+        span(ng-if="v.duration > -1" title="{{v.duration | humanizeDuration:false}}")
{{v.duration | humanizeDuration:true}}
+
+      td.td-long {{ v.name | humanizeText }}
+      td(title="{{v.metrics['read-bytes']}} bytes") {{ v.metrics['read-bytes'] | humanizeBytes
}}
+      td {{ v.metrics['read-records'] | number }}
+      td(title="{{v.metrics['write-bytes']}} bytes") {{ v.metrics['write-bytes'] | humanizeBytes
}}
+      td {{ v.metrics['write-records'] | number }}
+      td
+        .label-group
+          bs-label(status="{{status}}" ng-repeat="(index, status) in stateList") {{v.tasks[status]}}
+
+      td 
+        bs-label(status="{{v.status}}") {{v.status}}
+    tr(ng-if="nodeid && v.id == nodeid")
+      td(colspan="10")
+        div(ng-include=" 'partials/jobs/job.plan.node.subtasks.html' ")

http://git-wip-us.apache.org/repos/asf/flink/blob/73bc35f1/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.taskmanagers.jade
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.taskmanagers.jade
b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.taskmanagers.jade
new file mode 100644
index 0000000..2811461
--- /dev/null
+++ b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.taskmanagers.jade
@@ -0,0 +1,54 @@
+//
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+
+table.table.table-body-hover.table-clickable.table-activable
+  thead
+    tr
+      th Start Time
+      th End Time
+      th Duration
+      th Name
+      th Bytes received
+      th Records received
+      th Bytes sent
+      th Records sent
+      th Tasks
+      th Status
+
+  tbody(ng-repeat="v in job.vertices" ng-class="{ active: v.id == nodeid }" ng-click="changeNode(v.id)")
+    tr(ng-if="v.type == 'regular'")
+      td
+        span(ng-if="v['start-time'] > -1") {{ v['start-time'] | amDateFormat:'YYYY-MM-DD,
H:mm:ss' }}
+      td
+        span(ng-if="v['end-time'] > -1") {{ v['end-time'] | amDateFormat:'YYYY-MM-DD,
H:mm:ss' }}
+      td
+        span(ng-if="v.duration > -1" title="{{v.duration | humanizeDuration:false}}")
{{v.duration | humanizeDuration:true}}
+
+      td.td-long {{ v.name | humanizeText }}
+      td(title="{{v.metrics['read-bytes']}} bytes") {{ v.metrics['read-bytes'] | humanizeBytes
}}
+      td {{ v.metrics['read-records'] | number }}
+      td(title="{{v.metrics['write-bytes']}} bytes") {{ v.metrics['write-bytes'] | humanizeBytes
}}
+      td {{ v.metrics['write-records'] | number }}
+      td
+        .label-group
+          bs-label(status="{{status}}" ng-repeat="(index, status) in stateList") {{v.tasks[status]}}
+
+      td
+        bs-label(status="{{v.status}}") {{v.status}}
+    tr(ng-if="nodeid && v.id == nodeid")
+      td(colspan="10")
+        div(ng-include=" 'partials/jobs/job.plan.node.taskmanagers.html' ")

http://git-wip-us.apache.org/repos/asf/flink/blob/73bc35f1/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.subtasks.jade
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.subtasks.jade
b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.subtasks.jade
index 259b364..78b0999 100644
--- a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.subtasks.jade
+++ b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.subtasks.jade
@@ -34,7 +34,7 @@ table.table.table-hover.table-clickable.table-activable.table-inner(ng-if="subta
       th Status
 
   tbody
-    tr(ng-repeat="subtask in subtasks")
+    tr(ng-repeat="subtask in subtasks | orderBy:'host'")
       td
         span(ng-if="subtask['start-time'] > -1") {{ subtask['start-time'] | amDateFormat:'YYYY-MM-DD,
H:mm:ss' }}
       td

http://git-wip-us.apache.org/repos/asf/flink/blob/73bc35f1/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.taskmanagers.jade
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.taskmanagers.jade
b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.taskmanagers.jade
new file mode 100644
index 0000000..75a2031
--- /dev/null
+++ b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.taskmanagers.jade
@@ -0,0 +1,59 @@
+//
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+
+table.table.table-hover.table-clickable.table-activable.table-inner(ng-if="taskmanagers")
+  thead
+    tr
+      th Start Time
+      th End Time
+      th Duration
+      th Bytes received
+      th Records received
+      th Bytes sent
+      th Records sent
+      th Host
+      th Tasks
+      th Status
+
+  tbody(ng-repeat="tm in taskmanagers | orderBy:'host'")
+    tr
+      td
+        span(ng-if="v['start-time'] > -1") {{ v['start-time'] | amDateFormat:'YYYY-MM-DD,
H:mm:ss' }}
+      td
+        span(ng-if="v['end-time'] > -1") {{ v['end-time'] | amDateFormat:'YYYY-MM-DD,
H:mm:ss' }}
+      td
+        span(ng-if="v.duration > -1" title="{{v.duration | humanizeDuration:false}}")
{{v.duration | humanizeDuration:true}}
+
+      td
+        span(ng-if="tm.metrics['read-bytes'] > -1" title="{{tm.metrics['read-bytes']}}
bytes")
+          | {{ tm.metrics['read-bytes'] | humanizeBytes}}
+      td
+        span(ng-if="tm.metrics['read-records'] > -1") {{ tm.metrics['read-records'] |
number }}
+      td
+        span(ng-if="tm.metrics['write-bytes'] > -1" title="{{tm.metrics['write-bytes']}}
bytes")
+          | {{ tm.metrics['write-bytes'] | humanizeBytes}}
+      td
+        span(ng-if="tm.metrics['write-records'] > -1") {{ tm.metrics['write-records']
| number }}
+
+      td {{ tm.host }}
+
+      td
+        .label-group
+          bs-label(status="{{status}}" ng-repeat="(index, status) in stateList") {{tm['status-counts'][status]}}
+
+      td
+        bs-label(status="{{tm.status}}") {{tm.status}}

http://git-wip-us.apache.org/repos/asf/flink/blob/73bc35f1/flink-runtime-web/web-dashboard/app/partials/jobs/running-jobs.jade
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/partials/jobs/running-jobs.jade b/flink-runtime-web/web-dashboard/app/partials/jobs/running-jobs.jade
index 9eaa3b8..fbec30b 100644
--- a/flink-runtime-web/web-dashboard/app/partials/jobs/running-jobs.jade
+++ b/flink-runtime-web/web-dashboard/app/partials/jobs/running-jobs.jade
@@ -35,7 +35,7 @@ nav.navbar.navbar-default.navbar-fixed-top.navbar-main
         th Status
 
     tbody
-      tr(ng-repeat="job in jobs|orderBy:\"'start-time'\"" ui-sref="single-job.plan.overview({
jobid: job.jid })")
+      tr(ng-repeat="job in jobs|orderBy:\"'start-time'\"" ui-sref="single-job.plan.subtasks({
jobid: job.jid })")
         td {{job['start-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss'}}
         td {{job['end-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss'}}
         td(title="{{job.duration | humanizeDuration:false}}") {{job.duration | humanizeDuration:true}}

http://git-wip-us.apache.org/repos/asf/flink/blob/73bc35f1/flink-runtime-web/web-dashboard/app/partials/overview.jade
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/partials/overview.jade b/flink-runtime-web/web-dashboard/app/partials/overview.jade
index eeb3be9..9e72b03 100644
--- a/flink-runtime-web/web-dashboard/app/partials/overview.jade
+++ b/flink-runtime-web/web-dashboard/app/partials/overview.jade
@@ -101,7 +101,7 @@ nav.navbar.navbar-default.navbar-fixed-top.navbar-main
             th Status
 
         tbody
-          tr(ng-repeat="job in runningJobs|orderBy:\"'start-time'\"" ui-sref="single-job.plan.overview({
jobid: job.jid })")
+          tr(ng-repeat="job in runningJobs|orderBy:\"'start-time'\"" ui-sref="single-job.plan.subtasks({
jobid: job.jid })")
             td {{job['start-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss'}}
             td {{job['end-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss'}}
             td(title="{{job.duration | humanizeDuration:false}}") {{job.duration | humanizeDuration:true}}
@@ -129,7 +129,7 @@ nav.navbar.navbar-default.navbar-fixed-top.navbar-main
             th Status
 
         tbody
-          tr(ng-repeat="job in finishedJobs|orderBy:\"'end-time'\":true" ui-sref="single-job.plan.overview({
jobid: job.jid })")
+          tr(ng-repeat="job in finishedJobs|orderBy:\"'end-time'\":true" ui-sref="single-job.plan.subtasks({
jobid: job.jid })")
             td {{job['start-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss'}}
             td {{job['end-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss'}}
             td(title="{{job.duration | humanizeDuration:false}}") {{job.duration | humanizeDuration:true}}

http://git-wip-us.apache.org/repos/asf/flink/blob/73bc35f1/flink-runtime-web/web-dashboard/app/scripts/index.coffee
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/scripts/index.coffee b/flink-runtime-web/web-dashboard/app/scripts/index.coffee
index 07e5265..120e2d6 100644
--- a/flink-runtime-web/web-dashboard/app/scripts/index.coffee
+++ b/flink-runtime-web/web-dashboard/app/scripts/index.coffee
@@ -92,12 +92,19 @@ angular.module('flinkApp', ['ui.router', 'angularMoment'])
         templateUrl: "partials/jobs/job.plan.html"
         controller: 'JobPlanController'
 
-  .state "single-job.plan.overview",
+  .state "single-job.plan.subtasks",
     url: ""
     views:
       'node-details':
-        templateUrl: "partials/jobs/job.plan.node-list.overview.html"
-        controller: 'JobPlanOverviewController'
+        templateUrl: "partials/jobs/job.plan.node-list.subtasks.html"
+        controller: 'JobPlanSubtasksController'
+
+  .state "single-job.plan.taskmanagers",
+    url: "/taskmanagers"
+    views:
+      'node-details':
+        templateUrl: "partials/jobs/job.plan.node-list.taskmanagers.html"
+        controller: 'JobPlanTaskManagersController'
 
   .state "single-job.plan.accumulators",
     url: "/accumulators"

http://git-wip-us.apache.org/repos/asf/flink/blob/73bc35f1/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee b/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee
index 37ce217..f0ce892 100644
--- a/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee
+++ b/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee
@@ -123,21 +123,36 @@ angular.module('flinkApp')
 
 # --------------------------------------
 
-.controller 'JobPlanOverviewController', ($scope, JobsService) ->
-  console.log 'JobPlanOverviewController'
+.controller 'JobPlanSubtasksController', ($scope, JobsService) ->
+  console.log 'JobPlanSubtasksController'
 
   if $scope.nodeid and (!$scope.vertex or !$scope.vertex.st)
     JobsService.getSubtasks($scope.nodeid).then (data) ->
       $scope.subtasks = data
 
   $scope.$on 'reload', (event) ->
-    console.log 'JobPlanOverviewController'
+    console.log 'JobPlanSubtasksController'
     if $scope.nodeid
       JobsService.getSubtasks($scope.nodeid).then (data) ->
         $scope.subtasks = data
 
 # --------------------------------------
 
+.controller 'JobPlanTaskManagersController', ($scope, JobsService) ->
+  console.log 'JobPlanTaskManagersController'
+
+  if $scope.nodeid and (!$scope.vertex or !$scope.vertex.st)
+    JobsService.getTaskManagers($scope.nodeid).then (data) ->
+      $scope.taskmanagers = data
+
+  $scope.$on 'reload', (event) ->
+    console.log 'JobPlanTaskManagersController'
+    if $scope.nodeid
+      JobsService.getTaskManagers($scope.nodeid).then (data) ->
+        $scope.taskmanagers = data
+
+# --------------------------------------
+
 .controller 'JobPlanAccumulatorsController', ($scope, JobsService) ->
   console.log 'JobPlanAccumulatorsController'
 

http://git-wip-us.apache.org/repos/asf/flink/blob/73bc35f1/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.svc.coffee
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.svc.coffee b/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.svc.coffee
index 8756496..65ae5cb 100644
--- a/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.svc.coffee
+++ b/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.svc.coffee
@@ -180,6 +180,20 @@ angular.module('flinkApp')
 
     deferred.promise
 
+  @getTaskManagers = (vertexid) ->
+    deferred = $q.defer()
+
+    deferreds.job.promise.then (data) =>
+      # vertex = @seekVertex(vertexid)
+
+      $http.get flinkConfig.jobServer + "jobs/" + currentJob.jid + "/vertices/" + vertexid
+ "/taskmanagers"
+      .success (data) ->
+        taskmanagers = data.taskmanagers
+
+        deferred.resolve(taskmanagers)
+
+    deferred.promise
+
   @getAccumulators = (vertexid) ->
     deferred = $q.defer()
 

http://git-wip-us.apache.org/repos/asf/flink/blob/73bc35f1/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.ctrl.coffee
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.ctrl.coffee
b/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.ctrl.coffee
index eb3f71a..be495e4 100644
--- a/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.ctrl.coffee
+++ b/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.ctrl.coffee
@@ -107,7 +107,7 @@ angular.module('flinkApp')
           $scope.state['submit-button'] = "Submit"
           $scope.error = data.error
           if data.jobid?
-            $state.go("single-job.plan.overview", {jobid: data.jobid})
+            $state.go("single-job.plan.subtasks", {jobid: data.jobid})
 
   # job plan display related stuff
   $scope.nodeid = null


Mime
View raw message