flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject [3/5] flink git commit: [FLINK-4733] Port TaskManager metrics
Date Mon, 31 Oct 2016 15:08:31 GMT
[FLINK-4733] Port TaskManager metrics


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cf4f3644
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cf4f3644
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cf4f3644

Branch: refs/heads/master
Commit: cf4f3644c0eda42c4c872daa175d095117788086
Parents: 1f4f6f9
Author: zentol <chesnay@apache.org>
Authored: Mon Oct 31 14:13:01 2016 +0100
Committer: zentol <chesnay@apache.org>
Committed: Mon Oct 31 15:12:03 2016 +0100

----------------------------------------------------------------------
 .../runtime/webmonitor/WebRuntimeMonitor.java   |  4 +-
 .../handlers/TaskManagersHandler.java           | 62 +++++++++++++--
 .../runtime/webmonitor/metrics/MetricStore.java | 11 +++
 .../taskmanager/taskmanager.metrics.jade        | 79 ++++++++-----------
 .../taskmanager/taskmanager.metrics.html        | 82 ++++++++------------
 flink-runtime/pom.xml                           | 17 ++--
 .../apache/flink/runtime/instance/Instance.java | 10 ---
 .../flink/runtime/instance/InstanceManager.java |  3 +-
 .../flink/runtime/jobmanager/JobManager.scala   |  4 +-
 .../runtime/messages/TaskManagerMessages.scala  |  7 +-
 .../flink/runtime/taskmanager/TaskManager.scala | 66 +---------------
 .../runtime/instance/InstanceManagerTest.java   | 12 +--
 12 files changed, 153 insertions(+), 204 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cf4f3644/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index e907124..a0afba2 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -287,8 +287,8 @@ public class WebRuntimeMonitor implements WebMonitor {
 			.GET("/jobs/:jobid/checkpoints", handler(new JobCheckpointsHandler(currentGraphs)))
 			.GET("/jobs/:jobid/metrics", handler(new JobMetricsHandler(metricFetcher)))
 
-			.GET("/taskmanagers", handler(new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT)))
-			.GET("/taskmanagers/:" + TaskManagersHandler.TASK_MANAGER_ID_KEY + "/metrics", handler(new
TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT)))
+			.GET("/taskmanagers", handler(new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT, metricFetcher)))
+			.GET("/taskmanagers/:" + TaskManagersHandler.TASK_MANAGER_ID_KEY + "/metrics", handler(new
TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT, metricFetcher)))
 			.GET("/taskmanagers/:" + TaskManagersHandler.TASK_MANAGER_ID_KEY + "/log", 
 				new TaskManagerLogHandler(retriever, context, jobManagerAddressPromise.future(), timeout,
 					TaskManagerLogHandler.FileMode.LOG, config, enableSSL))

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4f3644/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
index c20d4fe..42815ae 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
@@ -25,6 +25,8 @@ import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.JobManagerMessages.RegisteredTaskManagers;
 import org.apache.flink.runtime.messages.JobManagerMessages.TaskManagerInstance;
+import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
+import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
 import org.apache.flink.util.StringUtils;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
@@ -42,9 +44,12 @@ public class TaskManagersHandler extends AbstractJsonRequestHandler  {
 	public static final String TASK_MANAGER_ID_KEY = "taskmanagerid";
 	
 	private final FiniteDuration timeout;
+
+	private final MetricFetcher fetcher;
 	
-	public TaskManagersHandler(FiniteDuration timeout) {
+	public TaskManagersHandler(FiniteDuration timeout, MetricFetcher fetcher) {
 		this.timeout = requireNonNull(timeout);
+		this.fetcher = fetcher;
 	}
 
 	@Override
@@ -95,10 +100,57 @@ public class TaskManagersHandler extends AbstractJsonRequestHandler 
{
 
 					// only send metrics when only one task manager requests them.
 					if (pathParams.containsKey(TASK_MANAGER_ID_KEY)) {
-						byte[] report = instance.getLastMetricsReport();
-						if (report != null) {
-							gen.writeFieldName("metrics");
-							gen.writeRawValue(new String(report, "utf-8"));
+						fetcher.update();
+						MetricStore.TaskManagerMetricStore metrics = fetcher.getMetricStore().getTaskManagerMetricStore(instance.getId().toString());
+						if (metrics != null) {
+							gen.writeObjectFieldStart("metrics");
+							long heapUsed = Long.valueOf( metrics.getMetric("Status.JVM.Memory.Heap.Used", "0"));
+							long heapCommitted = Long.valueOf( metrics.getMetric("Status.JVM.Memory.Heap.Committed",
"0"));
+							long heapTotal = Long.valueOf( metrics.getMetric("Status.JVM.Memory.Heap.Max", "0"));
+
+							gen.writeNumberField("heapCommitted", heapCommitted);
+							gen.writeNumberField("heapUsed", heapUsed);
+							gen.writeNumberField("heapMax", heapTotal);
+
+							long nonHeapUsed = Long.valueOf( metrics.getMetric("Status.JVM.Memory.NonHeap.Used",
"0"));
+							long nonHeapCommitted = Long.valueOf( metrics.getMetric("Status.JVM.Memory.NonHeap.Committed",
"0"));
+							long nonHeapTotal = Long.valueOf( metrics.getMetric("Status.JVM.Memory.NonHeap.Max",
"0"));
+
+							gen.writeNumberField("nonHeapCommitted", nonHeapCommitted);
+							gen.writeNumberField("nonHeapUsed", nonHeapUsed);
+							gen.writeNumberField("nonHeapMax", nonHeapTotal);
+
+							gen.writeNumberField("totalCommitted", heapCommitted + nonHeapCommitted);
+							gen.writeNumberField("totalUsed", heapUsed + nonHeapUsed);
+							gen.writeNumberField("totalMax", heapTotal + nonHeapTotal);
+
+							gen.writeStringField("directCount", metrics.getMetric("Status.JVM.Memory.Direct.Count",
"0"));
+							gen.writeStringField("directUsed", metrics.getMetric("Status.JVM.Memory.Direct.MemoryUsed",
"0"));
+							gen.writeStringField("directMax", metrics.getMetric("Status.JVM.Memory.Direct.TotalCapacity",
"0"));
+
+							gen.writeStringField("mappedCount", metrics.getMetric("Status.JVM.Memory.Mapped.Count",
"0"));
+							gen.writeStringField("mappedUsed", metrics.getMetric("Status.JVM.Memory.Mapped.MemoryUsed",
"0"));
+							gen.writeStringField("mappedMax", metrics.getMetric("Status.JVM.Memory.Mapped.TotalCapacity",
"0"));
+
+							gen.writeStringField("memorySegmentsAvailable", metrics.getMetric("Status.Network.AvailableMemorySegments",
"0"));
+							gen.writeStringField("memorySegmentsTotal", metrics.getMetric("Status.Network.TotalMemorySegments",
"0"));
+
+							gen.writeArrayFieldStart("garbageCollectors");
+
+							for (String gcName : metrics.garbageCollectorNames) {
+								String count = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Count",
null);
+								String time = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Time",
null);
+								if (count != null  && time != null) {
+									gen.writeStartObject();
+									gen.writeStringField("name", gcName);
+									gen.writeStringField("count", count);
+									gen.writeStringField("time", time);
+									gen.writeEndObject();
+								}
+							}
+
+							gen.writeEndArray();
+							gen.writeEndObject();
 						}
 					}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4f3644/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
index 989145b..51b3b4d 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
@@ -23,7 +23,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 
 import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_COUNTER;
 import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_GAUGE;
@@ -77,6 +79,10 @@ public class MetricStore {
 						tm = new TaskManagerMetricStore();
 						taskManagers.put(tmID, tm);
 					}
+					if (name.contains("GarbageCollector")) {
+						String gcName = name.substring("Status.JVM.GarbageCollector.".length(), name.lastIndexOf('.'));
+						tm.addGarbageCollectorName(gcName);
+					}
 					addMetric(tm.metrics, name, metric);
 					break;
 				case INFO_CATEGORY_JOB:
@@ -260,6 +266,11 @@ public class MetricStore {
 	 * Sub-structure containing metrics of a single TaskManager.
 	 */
 	public static class TaskManagerMetricStore extends ComponentMetricStore {
+		public final Set<String> garbageCollectorNames = new HashSet<>();
+		
+		public void addGarbageCollectorName(String name) {
+			garbageCollectorNames.add(name);
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4f3644/flink-runtime-web/web-dashboard/app/partials/taskmanager/taskmanager.metrics.jade
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/partials/taskmanager/taskmanager.metrics.jade
b/flink-runtime-web/web-dashboard/app/partials/taskmanager/taskmanager.metrics.jade
index c546d74..7920178 100644
--- a/flink-runtime-web/web-dashboard/app/partials/taskmanager/taskmanager.metrics.jade
+++ b/flink-runtime-web/web-dashboard/app/partials/taskmanager/taskmanager.metrics.jade
@@ -50,19 +50,19 @@ div(ng-if="metrics.id")
     tbody
       tr
         td Heap
-        td {{metrics.metrics.gauges['memory.heap.committed'].value | humanizeBytes}}
-        td {{metrics.metrics.gauges['memory.heap.init'].value | humanizeBytes}}
-        td {{metrics.metrics.gauges['memory.heap.max'].value | humanizeBytes}}
+        td {{ metrics.metrics.heapCommitted | humanizeBytes  }}
+        td {{ metrics.metrics.heapUsed | humanizeBytes  }}
+        td {{ metrics.metrics.heapMax | humanizeBytes  }}
       tr
         td Non-Heap
-        td {{metrics.metrics.gauges['memory.non-heap.committed'].value | humanizeBytes}}
-        td {{metrics.metrics.gauges['memory.non-heap.init'].value | humanizeBytes}}
-        td {{metrics.metrics.gauges['memory.non-heap.max'].value | humanizeBytes}}
+        td {{ metrics.metrics.nonHeapCommitted | humanizeBytes  }}
+        td {{ metrics.metrics.nonHeapUsed | humanizeBytes  }}
+        td {{ metrics.metrics.nonHeapMax | humanizeBytes  }}
       tr
         td Total
-        td {{metrics.metrics.gauges['memory.total.committed'].value | humanizeBytes}}
-        td {{metrics.metrics.gauges['memory.total.init'].value | humanizeBytes}}
-        td {{metrics.metrics.gauges['memory.total.max'].value | humanizeBytes}}
+        td {{ metrics.metrics.totalCommitted | humanizeBytes  }}
+        td {{ metrics.metrics.totalUsed | humanizeBytes  }}
+        td {{ metrics.metrics.totalMax | humanizeBytes  }}
 
   h2 Outside JVM
   table.table.table-properties
@@ -75,54 +75,41 @@ div(ng-if="metrics.id")
     tbody
       tr
         td Direct
-        td {{ metrics.metrics.gauges['direct-memory.direct.count'].value }}
-        td {{ metrics.metrics.gauges['direct-memory.direct.used'].value | humanizeBytes }}
-        td {{ metrics.metrics.gauges['direct-memory.direct.capacity'].value | humanizeBytes
}}
+        td {{ metrics.metrics.directCount }}
+        td {{ metrics.metrics.directUsed }}
+        td {{ metrics.metrics.directTotal }}
       tr
         td Mapped
-        td {{ metrics.metrics.gauges['direct-memory.mapped.count'].value }}
-        td {{ metrics.metrics.gauges['direct-memory.mapped.used'].value | humanizeBytes }}
-        td {{ metrics.metrics.gauges['direct-memory.mapped.capacity'].value | humanizeBytes
}}
+        td {{ metrics.metrics.mappedCount }}
+        td {{ metrics.metrics.mappedUsed }}
+        td {{ metrics.metrics.mappedMax }}
 
-  h1 Garbage Collection
+  h1 Network
+
+  h2 MemorySegments
   table.table.table-properties
     thead
       tr
-        th Collector
+        th Type
         th Count
-        th Time
     tbody
       tr
-        td PS-MarkSweep
-        td(table-property value="metrics.metrics.gauges['gc.PS-MarkSweep.count'].value")
-        td(table-property value="metrics.metrics.gauges['gc.PS-MarkSweep.time'].value | humanizeDuration")
+        td Available
+        td {{ metrics.metrics.memorySegmentsAvailable }}
       tr
-        td PS-Scavenge
-        td(table-property value="metrics.metrics.gauges['gc.PS-Scavenge.count'].value")
-        td(table-property value="metrics.metrics.gauges['gc.PS-Scavenge.time'].value | humanizeDuration")
+        td Total
+        td {{ metrics.metrics.memorySegmentsTotal }}
+
 
-  h1 Other Memory Pools
+  h1 Garbage Collection
   table.table.table-properties
     thead
       tr
-        th Pool
-        td Relative Usage
-    tbody
-      tr
-        td Code Cache
-        td(table-property value="metrics.metrics.gauges['memory.pools.Code-Cache.usage'].value
| number:2")
-      tr
-        td Compressed Class Space
-        td(table-property value="metrics.metrics.gauges['memory.pools.Compressed-Class-Space.usage'].value
| number:2")
-      tr
-        td Metaspace
-        td(table-property value="metrics.metrics.gauges['memory.pools.Metaspace.usage'].value
| number:2")
-      tr
-        td PS Eden Space
-        td(table-property value="metrics.metrics.gauges['memory.pools.PS-Eden-Space.usage'].value
| number:2")
-      tr
-        td PS Old Gen
-        td(table-property value="metrics.metrics.gauges['memory.pools.PS-Old-Gen.usage'].value
| number:2")
-      tr
-        td PS Survivor Space
-        td(table-property value="metrics.metrics.gauges['memory.pools.PS-Survivor-Space.usage'].value
| number:2")
+        th Collector
+        th Count
+        th Time
+    tbody(ng-repeat="g in metrics.metrics.garbageCollectors")
+      tr  
+        td {{ g.name }}
+        td {{ g.count }}
+        td {{ g.time }}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4f3644/flink-runtime-web/web-dashboard/web/partials/taskmanager/taskmanager.metrics.html
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/web/partials/taskmanager/taskmanager.metrics.html
b/flink-runtime-web/web-dashboard/web/partials/taskmanager/taskmanager.metrics.html
index e1d6670..43687cd 100644
--- a/flink-runtime-web/web-dashboard/web/partials/taskmanager/taskmanager.metrics.html
+++ b/flink-runtime-web/web-dashboard/web/partials/taskmanager/taskmanager.metrics.html
@@ -57,21 +57,21 @@ limitations under the License.
     <tbody>
       <tr>
         <td>Heap</td>
-        <td>{{metrics.metrics.gauges['memory.heap.committed'].value | humanizeBytes}}</td>
-        <td>{{metrics.metrics.gauges['memory.heap.init'].value | humanizeBytes}}</td>
-        <td>{{metrics.metrics.gauges['memory.heap.max'].value | humanizeBytes}}</td>
+        <td>{{ metrics.metrics.heapCommitted | humanizeBytes  }}</td>
+        <td>{{ metrics.metrics.heapUsed | humanizeBytes  }}</td>
+        <td>{{ metrics.metrics.heapMax | humanizeBytes  }}</td>
       </tr>
       <tr>
         <td>Non-Heap</td>
-        <td>{{metrics.metrics.gauges['memory.non-heap.committed'].value | humanizeBytes}}</td>
-        <td>{{metrics.metrics.gauges['memory.non-heap.init'].value | humanizeBytes}}</td>
-        <td>{{metrics.metrics.gauges['memory.non-heap.max'].value | humanizeBytes}}</td>
+        <td>{{ metrics.metrics.nonHeapCommitted | humanizeBytes  }}</td>
+        <td>{{ metrics.metrics.nonHeapUsed | humanizeBytes  }}</td>
+        <td>{{ metrics.metrics.nonHeapMax | humanizeBytes  }}</td>
       </tr>
       <tr>
         <td>Total</td>
-        <td>{{metrics.metrics.gauges['memory.total.committed'].value | humanizeBytes}}</td>
-        <td>{{metrics.metrics.gauges['memory.total.init'].value | humanizeBytes}}</td>
-        <td>{{metrics.metrics.gauges['memory.total.max'].value | humanizeBytes}}</td>
+        <td>{{ metrics.metrics.totalCommitted | humanizeBytes  }}</td>
+        <td>{{ metrics.metrics.totalUsed | humanizeBytes  }}</td>
+        <td>{{ metrics.metrics.totalMax | humanizeBytes  }}</td>
       </tr>
     </tbody>
   </table>
@@ -88,72 +88,52 @@ limitations under the License.
     <tbody>
       <tr>
         <td>Direct</td>
-        <td>{{ metrics.metrics.gauges['direct-memory.direct.count'].value }}</td>
-        <td>{{ metrics.metrics.gauges['direct-memory.direct.used'].value | humanizeBytes
}}</td>
-        <td>{{ metrics.metrics.gauges['direct-memory.direct.capacity'].value | humanizeBytes
}}</td>
+        <td>{{ metrics.metrics.directCount }}</td>
+        <td>{{ metrics.metrics.directUsed }}</td>
+        <td>{{ metrics.metrics.directTotal }}</td>
       </tr>
       <tr>
         <td>Mapped</td>
-        <td>{{ metrics.metrics.gauges['direct-memory.mapped.count'].value }}</td>
-        <td>{{ metrics.metrics.gauges['direct-memory.mapped.used'].value | humanizeBytes
}}</td>
-        <td>{{ metrics.metrics.gauges['direct-memory.mapped.capacity'].value | humanizeBytes
}}</td>
+        <td>{{ metrics.metrics.mappedCount }}</td>
+        <td>{{ metrics.metrics.mappedUsed }}</td>
+        <td>{{ metrics.metrics.mappedMax }}</td>
       </tr>
     </tbody>
   </table>
-  <h1>Garbage Collection</h1>
+  <h1>Network</h1>
+  <h2>MemorySegments</h2>
   <table class="table table-properties">
     <thead>
       <tr>
-        <th>Collector</th>
+        <th>Type</th>
         <th>Count</th>
-        <th>Time</th>
       </tr>
     </thead>
     <tbody>
       <tr>
-        <td>PS-MarkSweep</td>
-        <td table-property="table-property" value="metrics.metrics.gauges['gc.PS-MarkSweep.count'].value"></td>
-        <td table-property="table-property" value="metrics.metrics.gauges['gc.PS-MarkSweep.time'].value
| humanizeDuration"></td>
+        <td>Available</td>
+        <td>{{ metrics.metrics.memorySegmentsAvailable }}</td>
       </tr>
       <tr>
-        <td>PS-Scavenge</td>
-        <td table-property="table-property" value="metrics.metrics.gauges['gc.PS-Scavenge.count'].value"></td>
-        <td table-property="table-property" value="metrics.metrics.gauges['gc.PS-Scavenge.time'].value
| humanizeDuration"></td>
+        <td>Total</td>
+        <td>{{ metrics.metrics.memorySegmentsTotal }}</td>
       </tr>
     </tbody>
   </table>
-  <h1>Other Memory Pools</h1>
+  <h1>Garbage Collection</h1>
   <table class="table table-properties">
     <thead>
       <tr>
-        <th>Pool</th>
-        <td>Relative Usage</td>
+        <th>Collector</th>
+        <th>Count</th>
+        <th>Time</th>
       </tr>
     </thead>
-    <tbody>
-      <tr>
-        <td>Code Cache</td>
-        <td table-property="table-property" value="metrics.metrics.gauges['memory.pools.Code-Cache.usage'].value
| number:2"></td>
-      </tr>
-      <tr>
-        <td>Compressed Class Space</td>
-        <td table-property="table-property" value="metrics.metrics.gauges['memory.pools.Compressed-Class-Space.usage'].value
| number:2"></td>
-      </tr>
-      <tr>
-        <td>Metaspace</td>
-        <td table-property="table-property" value="metrics.metrics.gauges['memory.pools.Metaspace.usage'].value
| number:2"></td>
-      </tr>
-      <tr>
-        <td>PS Eden Space</td>
-        <td table-property="table-property" value="metrics.metrics.gauges['memory.pools.PS-Eden-Space.usage'].value
| number:2"></td>
-      </tr>
-      <tr>
-        <td>PS Old Gen</td>
-        <td table-property="table-property" value="metrics.metrics.gauges['memory.pools.PS-Old-Gen.usage'].value
| number:2"></td>
-      </tr>
-      <tr>
-        <td>PS Survivor Space</td>
-        <td table-property="table-property" value="metrics.metrics.gauges['memory.pools.PS-Survivor-Space.usage'].value
| number:2"></td>
+    <tbody ng-repeat="g in metrics.metrics.garbageCollectors">
+      <tr> 
+        <td>{{ g.name }}</td>
+        <td>{{ g.count }}</td>
+        <td>{{ g.time }}</td>
       </tr>
     </tbody>
   </table>

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4f3644/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 5fea8fb..0a294fa 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -123,20 +123,15 @@ under the License.
 		</dependency>
 
 		<dependency>
-			<groupId>io.dropwizard.metrics</groupId>
-			<artifactId>metrics-core</artifactId>
-			<version>${metrics.version}</version>
-		</dependency>
-		<dependency>
-			<groupId>io.dropwizard.metrics</groupId>
-			<artifactId>metrics-jvm</artifactId>
-			<version>${metrics.version}</version>
+			<groupId>com.fasterxml.jackson.core</groupId>
+			<artifactId>jackson-core</artifactId>
+			<version>${jackson.version}</version>
 		</dependency>
 
 		<dependency>
-			<groupId>io.dropwizard.metrics</groupId>
-			<artifactId>metrics-json</artifactId>
-			<version>${metrics.version}</version>
+			<groupId>com.fasterxml.jackson.core</groupId>
+			<artifactId>jackson-databind</artifactId>
+			<version>${jackson.version}</version>
 		</dependency>
 
 		<dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4f3644/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
index 4a8139b..d63d475 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
@@ -75,8 +75,6 @@ public class Instance implements SlotOwner {
 	/** Time when last heat beat has been received from the task manager running on this taskManager.
*/
 	private volatile long lastReceivedHeartBeat = System.currentTimeMillis();
 
-	private byte[] lastMetricsReport;
-
 	/** Flag marking the instance as alive or as dead. */
 	private volatile boolean isDead;
 
@@ -189,14 +187,6 @@ public class Instance implements SlotOwner {
 		this.lastReceivedHeartBeat = System.currentTimeMillis();
 	}
 
-	public void setMetricsReport(byte[] lastMetricsReport) {
-		this.lastMetricsReport = lastMetricsReport;
-	}
-
-	public byte[] getLastMetricsReport() {
-		return lastMetricsReport;
-	}
-
 	/**
 	 * Checks whether the last heartbeat occurred within the last {@code n} milliseconds
 	 * before the given timestamp {@code now}.

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4f3644/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
index b0e7e57..132ee6f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
@@ -96,7 +96,7 @@ public class InstanceManager {
 		}
 	}
 
-	public boolean reportHeartBeat(InstanceID instanceId, byte[] lastMetricsReport) {
+	public boolean reportHeartBeat(InstanceID instanceId) {
 		if (instanceId == null) {
 			throw new IllegalArgumentException("InstanceID may not be null.");
 		}
@@ -118,7 +118,6 @@ public class InstanceManager {
 			}
 
 			host.reportHeartBeat();
-			host.setMetricsReport(lastMetricsReport);
 
 			if (LOG.isDebugEnabled()) {
 				LOG.debug("Received heartbeat from TaskManager " + host);

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4f3644/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 516bbbe..bcfdd23 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1028,12 +1028,12 @@ class JobManager(
         TaskManagerInstance(Option(instanceManager.getRegisteredInstanceById(instanceID)))
       )
 
-    case Heartbeat(instanceID, metricsReport, accumulators) =>
+    case Heartbeat(instanceID, accumulators) =>
       log.debug(s"Received heartbeat message from $instanceID.")
 
       updateAccumulators(accumulators)
 
-      instanceManager.reportHeartBeat(instanceID, metricsReport)
+      instanceManager.reportHeartBeat(instanceID)
 
     case message: AccumulatorMessage => handleAccumulatorMessage(message)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4f3644/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
index b433015..a493b3d 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
@@ -52,15 +52,12 @@ object TaskManagerMessages {
 
   /**
    * Reports liveliness of the TaskManager instance with the given instance ID to the
-   * This message is sent to the job. This message reports the TaskManagers
-   * metrics, as a byte array.
+   * This message is sent to the job.
    *
    * @param instanceID The instance ID of the reporting TaskManager.
-   * @param metricsReport utf-8 encoded JSON metrics report from the metricRegistry.
    * @param accumulators Accumulators of tasks serialized as Tuple2[internal, user-defined]
    */
-  case class Heartbeat(instanceID: InstanceID, metricsReport: Array[Byte],
-     accumulators: Seq[AccumulatorSnapshot])
+  case class Heartbeat(instanceID: InstanceID, accumulators: Seq[AccumulatorSnapshot])
 
 
   // --------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4f3644/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 9727860..7608b87 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.taskmanager
 
 import java.io.{File, FileInputStream, IOException}
-import java.lang.management.{ManagementFactory, OperatingSystemMXBean}
+import java.lang.management.ManagementFactory
 import java.net.{InetAddress, InetSocketAddress}
 import java.util
 import java.util.UUID
@@ -28,10 +28,6 @@ import java.util.concurrent.TimeUnit
 import _root_.akka.actor._
 import _root_.akka.pattern.ask
 import _root_.akka.util.Timeout
-import com.codahale.metrics.json.MetricsModule
-import com.codahale.metrics.jvm.{BufferPoolMetricSet, GarbageCollectorMetricSet, MemoryUsageGaugeSet}
-import com.codahale.metrics.{Gauge, MetricFilter, MetricRegistry}
-import com.fasterxml.jackson.databind.ObjectMapper
 import grizzled.slf4j.Logger
 import org.apache.commons.lang3.exception.ExceptionUtils
 import org.apache.flink.configuration._
@@ -155,20 +151,8 @@ class TaskManager(
   /** Handler for distributed files cached by this TaskManager */
   protected val fileCache = new FileCache(config.configuration)
 
-  /** Registry of metrics periodically transmitted to the JobManager */
-  private val metricRegistry = TaskManager.createMetricsRegistry()
-
   private var taskManagerMetricGroup : TaskManagerMetricGroup = _
 
-  /** Metric serialization */
-  private val metricRegistryMapper: ObjectMapper = new ObjectMapper()
-    .registerModule(
-      new MetricsModule(
-        TimeUnit.SECONDS,
-        TimeUnit.MILLISECONDS,
-        false,
-        MetricFilter.ALL))
-
   /** Actors which want to be notified once this task manager has been
     * registered at the job manager */
   private val waitForRegistration = scala.collection.mutable.Set[ActorRef]()
@@ -1332,7 +1316,6 @@ class TaskManager(
   protected def sendHeartbeatToJobManager(): Unit = {
     try {
       log.debug("Sending heartbeat to JobManager")
-      val metricsReport: Array[Byte] = metricRegistryMapper.writeValueAsBytes(metricRegistry)
 
       val accumulatorEvents =
         scala.collection.mutable.Buffer[AccumulatorSnapshot]()
@@ -1351,7 +1334,7 @@ class TaskManager(
       }
 
        currentJobManager foreach {
-        jm => jm ! decorateMessage(Heartbeat(instanceID, metricsReport, accumulatorEvents))
+        jm => jm ! decorateMessage(Heartbeat(instanceID, accumulatorEvents))
       }
     }
     catch {
@@ -2481,49 +2464,4 @@ object TaskManager {
       case (_, id) => throw new IllegalArgumentException(s"Temporary file directory #$id
is null.")
     }
   }
-
-  /**
-   * Creates the registry of default metrics, including stats about garbage collection, memory
-   * usage, and system CPU load.
-   *
-   * @return The registry with the default metrics.
-   */
-  private def createMetricsRegistry() : MetricRegistry = {
-    val metricRegistry = new MetricRegistry()
-
-    // register default metrics
-    metricRegistry.register("gc", new GarbageCollectorMetricSet)
-    metricRegistry.register("memory", new MemoryUsageGaugeSet)
-    metricRegistry.register("direct-memory", new BufferPoolMetricSet(
-      ManagementFactory.getPlatformMBeanServer))
-    metricRegistry.register("load", new Gauge[Double] {
-      override def getValue: Double =
-        ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage()
-    })
-
-    val osBean: OperatingSystemMXBean = ManagementFactory.getOperatingSystemMXBean()
-
-    try {
-      val fetchCPULoadMethod = Class.forName("com.sun.management.OperatingSystemMXBean")
-        .getMethods()
-        .find( _.getName() == "getProcessCpuLoad" )
-
-      // verify that we can invoke the method
-      fetchCPULoadMethod.map(_.invoke(osBean).asInstanceOf[Double]).getOrElse(-1.0)
-
-      metricRegistry.register("cpuLoad", new Gauge[Double] {
-        override def getValue: Double = fetchCPULoadMethod
-          .map(_.invoke(osBean).asInstanceOf[Double]).getOrElse(-1.0)
-      })
-    }
-    catch {
-      case t: Throwable =>
-        LOG.warn("Cannot access com.sun.management.OperatingSystemMXBean.getProcessCpuLoad()"
+
-          " - CPU load metrics will not be available.")
-        metricRegistry.register("cpuLoad", new Gauge[Double] {
-          override def getValue: Double = -1.0
-        })
-    }
-    metricRegistry
-  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4f3644/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
index f3747c8..2b10e09 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
@@ -186,12 +186,12 @@ public class InstanceManagerTest{
 					probe3.getRef(), ici3, hardwareDescription, 1, leaderSessionID);
 
 			// report some immediate heart beats
-			assertTrue(cm.reportHeartBeat(instanceID1, new byte[] {}));
-			assertTrue(cm.reportHeartBeat(instanceID2, new byte[] {}));
-			assertTrue(cm.reportHeartBeat(instanceID3, new byte[] {}));
+			assertTrue(cm.reportHeartBeat(instanceID1));
+			assertTrue(cm.reportHeartBeat(instanceID2));
+			assertTrue(cm.reportHeartBeat(instanceID3));
 
 			// report heart beat for non-existing instance
-			assertFalse(cm.reportHeartBeat(new InstanceID(), new byte[] {}));
+			assertFalse(cm.reportHeartBeat(new InstanceID()));
 
 			final long WAIT = 200;
 			CommonTestUtils.sleepUninterruptibly(WAIT);
@@ -205,7 +205,7 @@ public class InstanceManagerTest{
 			long h3 = it.next().getLastHeartBeat();
 
 			// send one heart beat again and verify that the
-			assertTrue(cm.reportHeartBeat(instance1.getId(), new byte[] {}));
+			assertTrue(cm.reportHeartBeat(instance1.getId()));
 			long newH1 = instance1.getLastHeartBeat();
 
 			long now = System.currentTimeMillis();
@@ -244,7 +244,7 @@ public class InstanceManagerTest{
 				// expected
 			}
 			
-			assertFalse(cm.reportHeartBeat(new InstanceID(), new byte[] {}));
+			assertFalse(cm.reportHeartBeat(new InstanceID()));
 		}
 		catch (Exception e) {
 			System.err.println(e.getMessage());


Mime
View raw message