flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [5/8] flink git commit: [FLINK-3180] [runtime] Log direct memory usage in MemoryLogger
Date Mon, 21 Dec 2015 17:44:08 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/af996301/flink-runtime-web/web-dashboard/web/partials/taskmanager/taskmanager.metrics.html
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/web/partials/taskmanager/taskmanager.metrics.html
b/flink-runtime-web/web-dashboard/web/partials/taskmanager/taskmanager.metrics.html
index 691490e..e1d6670 100644
--- a/flink-runtime-web/web-dashboard/web/partials/taskmanager/taskmanager.metrics.html
+++ b/flink-runtime-web/web-dashboard/web/partials/taskmanager/taskmanager.metrics.html
@@ -16,123 +16,145 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
 
-
 -->
-<table ng-if="metrics.id" class="table table-properties">
-  <thead>
-    <tr>
-      <th>Memory type / State</th>
-      <th>Memory - Committed</th>
-      <th>Memory - Initial</th>
-      <th>Memory - Maximum</th>
-    </tr>
-  </thead>
-  <tbody>
-    <tr>
-      <td>Heap</td>
-      <td title="{{metrics.metrics.gauges['memory.heap.committed'].value}} bytes">{{metrics.metrics.gauges['memory.heap.committed'].value
| humanizeBytes}}</td>
-      <td title="{{metrics.metrics.gauges['memory.heap.init'].value}} bytes">{{metrics.metrics.gauges['memory.heap.init'].value
| humanizeBytes}}</td>
-      <td title="{{metrics.metrics.gauges['memory.heap.max'].value}} bytes">{{metrics.metrics.gauges['memory.heap.max'].value
| humanizeBytes}}</td>
-    </tr>
-    <tr>
-      <td>Non-Heap</td>
-      <td title="{{metrics.metrics.gauges['memory.non-heap.committed'].value}} bytes">{{metrics.metrics.gauges['memory.non-heap.committed'].value
| humanizeBytes}}</td>
-      <td title="{{metrics.metrics.gauges['memory.non-heap.init'].value}} bytes">{{metrics.metrics.gauges['memory.non-heap.init'].value
| humanizeBytes}}</td>
-      <td title="{{metrics.metrics.gauges['memory.non-heap.max'].value}} bytes">{{metrics.metrics.gauges['memory.non-heap.max'].value
| humanizeBytes}}</td>
-    </tr>
-    <tr>
-      <td>Total</td>
-      <td title="{{metrics.metrics.gauges['memory.total.committed'].value}} bytes">{{metrics.metrics.gauges['memory.total.committed'].value
| humanizeBytes}}</td>
-      <td title="{{metrics.metrics.gauges['memory.total.init'].value}} bytes">{{metrics.metrics.gauges['memory.total.init'].value
| humanizeBytes}}</td>
-      <td title="{{metrics.metrics.gauges['memory.total.max'].value}} bytes">{{metrics.metrics.gauges['memory.total.max'].value
| humanizeBytes}}</td>
-    </tr>
-  </tbody>
-</table>
-<table ng-if="metrics.id" class="table">
-  <thead>
-    <tr>
-      <th>Data Port</th>
-      <th>All Slots</th>
-      <th>Free Slots</th>
-      <th>CPU Cores</th>
-      <th>Physical Memory</th>
-      <th>Free Memory</th>
-      <th>Flink Managed Memory</th>
-    </tr>
-  </thead>
-  <tbody>
-    <tr>
-      <td>{{ metrics.dataPort }}</td>
-      <td>{{ metrics.slotsNumber }}</td>
-      <td>{{ metrics.freeSlots }}</td>
-      <td>{{ metrics.cpuCores }}</td>
-      <td title="{{metrics.physicalMemory}} bytes">{{ metrics.physicalMemory | humanizeBytes
}}</td>
-      <td title="{{metrics.freeMemory}} bytes">{{ metrics.freeMemory | humanizeBytes
}}</td>
-      <td title="{{metrics.managedMemory}} bytes">{{ metrics.managedMemory | humanizeBytes
}}</td>
-    </tr>
-  </tbody>
-</table>
-<div ng-if="metrics.id" class="row">
-  <div class="col-md-6">
-    <table class="table table-properties">
-      <thead>
-        <tr>
-          <th colspan="2">Memory - Pools</th>
-        </tr>
-      </thead>
-      <tbody>
-        <tr>
-          <td>Code Cache</td>
-          <td table-property="table-property" value="metrics.metrics.gauges['memory.pools.Code-Cache.usage'].value
| number:2"></td>
-        </tr>
-        <tr>
-          <td>Compressed Class Space</td>
-          <td table-property="table-property" value="metrics.metrics.gauges['memory.pools.Compressed-Class-Space.usage'].value
| number:2"></td>
-        </tr>
-        <tr>
-          <td>Metaspace</td>
-          <td table-property="table-property" value="metrics.metrics.gauges['memory.pools.Metaspace.usage'].value
| number:2"></td>
-        </tr>
-        <tr>
-          <td>PS Eden Space</td>
-          <td table-property="table-property" value="metrics.metrics.gauges['memory.pools.PS-Eden-Space.usage'].value
| number:2"></td>
-        </tr>
-        <tr>
-          <td>PS Old Gen</td>
-          <td table-property="table-property" value="metrics.metrics.gauges['memory.pools.PS-Old-Gen.usage'].value
| number:2"></td>
-        </tr>
-        <tr>
-          <td>PS Survivor Space</td>
-          <td table-property="table-property" value="metrics.metrics.gauges['memory.pools.PS-Survivor-Space.usage'].value
| number:2"></td>
-        </tr>
-      </tbody>
-    </table>
-  </div>
-  <div class="col-md-6">
-    <table class="table table-properties">
-      <thead>
-        <tr>
-          <th colspan="2">Garbage Collection</th>
-        </tr>
-      </thead>
-      <tbody>
-        <tr>
-          <td>PS-MarkSweep Count</td>
-          <td table-property="table-property" value="metrics.metrics.gauges['gc.PS-MarkSweep.count'].value"></td>
-        </tr>
-        <tr>
-          <td>PS-MarkSweep Time (ms)</td>
-          <td table-property="table-property" value="metrics.metrics.gauges['gc.PS-MarkSweep.time'].value"></td>
-        </tr>
-        <tr>
-          <td>PS-Scavenge Count</td>
-          <td table-property="table-property" value="metrics.metrics.gauges['gc.PS-Scavenge.count'].value"></td>
-        </tr>
-        <tr>
-          <td>PS-Scavenge Time (ms)</td>
-          <td table-property="table-property" value="manager.metrics.gauges['gc.PS-Scavenge.time'].value"></td>
-        </tr>
-      </tbody>
-    </table>
-  </div>
+<div ng-if="metrics.id">
+  <h1>Overview</h1>
+  <table class="table">
+    <thead>
+      <tr>
+        <th>Data Port</th>
+        <th>All Slots</th>
+        <th>Free Slots</th>
+        <th>CPU Cores</th>
+        <th>Physical Memory</th>
+        <th>Free Memory</th>
+        <th>Flink Managed Memory</th>
+      </tr>
+    </thead>
+    <tbody>
+      <tr>
+        <td>{{ metrics.dataPort }}</td>
+        <td>{{ metrics.slotsNumber }}</td>
+        <td>{{ metrics.freeSlots }}</td>
+        <td>{{ metrics.cpuCores }}</td>
+        <td>{{ metrics.physicalMemory | humanizeBytes }}</td>
+        <td>{{ metrics.freeMemory | humanizeBytes }}</td>
+        <td>{{ metrics.managedMemory | humanizeBytes }}</td>
+      </tr>
+    </tbody>
+  </table>
+  <h1>Memory</h1>
+  <h2>JVM (Heap/Non-Heap)</h2>
+  <table class="table table-properties">
+    <thead>
+      <tr>
+        <th>Type</th>
+        <th>Committed</th>
+        <th>Initial</th>
+        <th>Maximum</th>
+      </tr>
+    </thead>
+    <tbody>
+      <tr>
+        <td>Heap</td>
+        <td>{{metrics.metrics.gauges['memory.heap.committed'].value | humanizeBytes}}</td>
+        <td>{{metrics.metrics.gauges['memory.heap.init'].value | humanizeBytes}}</td>
+        <td>{{metrics.metrics.gauges['memory.heap.max'].value | humanizeBytes}}</td>
+      </tr>
+      <tr>
+        <td>Non-Heap</td>
+        <td>{{metrics.metrics.gauges['memory.non-heap.committed'].value | humanizeBytes}}</td>
+        <td>{{metrics.metrics.gauges['memory.non-heap.init'].value | humanizeBytes}}</td>
+        <td>{{metrics.metrics.gauges['memory.non-heap.max'].value | humanizeBytes}}</td>
+      </tr>
+      <tr>
+        <td>Total</td>
+        <td>{{metrics.metrics.gauges['memory.total.committed'].value | humanizeBytes}}</td>
+        <td>{{metrics.metrics.gauges['memory.total.init'].value | humanizeBytes}}</td>
+        <td>{{metrics.metrics.gauges['memory.total.max'].value | humanizeBytes}}</td>
+      </tr>
+    </tbody>
+  </table>
+  <h2>Outside JVM</h2>
+  <table class="table table-properties">
+    <thead>
+      <tr>
+        <th>Type</th>
+        <th>Count</th>
+        <th>Used</th>
+        <th>Capacity</th>
+      </tr>
+    </thead>
+    <tbody>
+      <tr>
+        <td>Direct</td>
+        <td>{{ metrics.metrics.gauges['direct-memory.direct.count'].value }}</td>
+        <td>{{ metrics.metrics.gauges['direct-memory.direct.used'].value | humanizeBytes
}}</td>
+        <td>{{ metrics.metrics.gauges['direct-memory.direct.capacity'].value | humanizeBytes
}}</td>
+      </tr>
+      <tr>
+        <td>Mapped</td>
+        <td>{{ metrics.metrics.gauges['direct-memory.mapped.count'].value }}</td>
+        <td>{{ metrics.metrics.gauges['direct-memory.mapped.used'].value | humanizeBytes
}}</td>
+        <td>{{ metrics.metrics.gauges['direct-memory.mapped.capacity'].value | humanizeBytes
}}</td>
+      </tr>
+    </tbody>
+  </table>
+  <h1>Garbage Collection</h1>
+  <table class="table table-properties">
+    <thead>
+      <tr>
+        <th>Collector</th>
+        <th>Count</th>
+        <th>Time</th>
+      </tr>
+    </thead>
+    <tbody>
+      <tr>
+        <td>PS-MarkSweep</td>
+        <td table-property="table-property" value="metrics.metrics.gauges['gc.PS-MarkSweep.count'].value"></td>
+        <td table-property="table-property" value="metrics.metrics.gauges['gc.PS-MarkSweep.time'].value
| humanizeDuration"></td>
+      </tr>
+      <tr>
+        <td>PS-Scavenge</td>
+        <td table-property="table-property" value="metrics.metrics.gauges['gc.PS-Scavenge.count'].value"></td>
+        <td table-property="table-property" value="metrics.metrics.gauges['gc.PS-Scavenge.time'].value
| humanizeDuration"></td>
+      </tr>
+    </tbody>
+  </table>
+  <h1>Other Memory Pools</h1>
+  <table class="table table-properties">
+    <thead>
+      <tr>
+        <th>Pool</th>
+        <td>Relative Usage</td>
+      </tr>
+    </thead>
+    <tbody>
+      <tr>
+        <td>Code Cache</td>
+        <td table-property="table-property" value="metrics.metrics.gauges['memory.pools.Code-Cache.usage'].value
| number:2"></td>
+      </tr>
+      <tr>
+        <td>Compressed Class Space</td>
+        <td table-property="table-property" value="metrics.metrics.gauges['memory.pools.Compressed-Class-Space.usage'].value
| number:2"></td>
+      </tr>
+      <tr>
+        <td>Metaspace</td>
+        <td table-property="table-property" value="metrics.metrics.gauges['memory.pools.Metaspace.usage'].value
| number:2"></td>
+      </tr>
+      <tr>
+        <td>PS Eden Space</td>
+        <td table-property="table-property" value="metrics.metrics.gauges['memory.pools.PS-Eden-Space.usage'].value
| number:2"></td>
+      </tr>
+      <tr>
+        <td>PS Old Gen</td>
+        <td table-property="table-property" value="metrics.metrics.gauges['memory.pools.PS-Old-Gen.usage'].value
| number:2"></td>
+      </tr>
+      <tr>
+        <td>PS Survivor Space</td>
+        <td table-property="table-property" value="metrics.metrics.gauges['memory.pools.PS-Survivor-Space.usage'].value
| number:2"></td>
+      </tr>
+    </tbody>
+  </table>
 </div>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/af996301/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/MemoryLogger.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/MemoryLogger.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/MemoryLogger.java
index 9258482..8bb3576 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/MemoryLogger.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/MemoryLogger.java
@@ -19,9 +19,10 @@
 package org.apache.flink.runtime.taskmanager;
 
 import akka.actor.ActorSystem;
-
 import org.slf4j.Logger;
+import javax.management.MBeanServer;
 
+import java.lang.management.BufferPoolMXBean;
 import java.lang.management.GarbageCollectorMXBean;
 import java.lang.management.ManagementFactory;
 import java.lang.management.MemoryMXBean;
@@ -49,6 +50,8 @@ public class MemoryLogger extends Thread {
 	private final List<MemoryPoolMXBean> poolBeans;
 	
 	private final List<GarbageCollectorMXBean> gcBeans;
+
+	private final BufferPoolMXBean directBufferBean;
 	
 	private final ActorSystem monitored;
 	
@@ -57,11 +60,11 @@ public class MemoryLogger extends Thread {
 	/**
 	 * Creates a new memory logger that logs in the given interval and lives as long as the
 	 * given actor system.
-	 * 
+	 *
 	 * @param logger The logger to use for outputting the memory statistics.
 	 * @param interval The interval in which the thread logs.
 	 * @param monitored The actor system to whose life the thread is bound. The thread terminates
-	 *                  once the actor system terminates.   
+	 *                  once the actor system terminates.
 	 */
 	public MemoryLogger(Logger logger, long interval, ActorSystem monitored) {
 		super("Memory Logger");
@@ -75,6 +78,22 @@ public class MemoryLogger extends Thread {
 		this.memoryBean = ManagementFactory.getMemoryMXBean();
 		this.poolBeans = ManagementFactory.getMemoryPoolMXBeans();
 		this.gcBeans = ManagementFactory.getGarbageCollectorMXBeans();
+
+		// The direct buffer pool bean needs to be accessed via the bean server
+		MBeanServer beanServer = ManagementFactory.getPlatformMBeanServer();
+		BufferPoolMXBean directBufferBean = null;
+		try {
+			directBufferBean = ManagementFactory.newPlatformMXBeanProxy(
+					beanServer,
+					"java.nio:type=BufferPool,name=direct",
+					BufferPoolMXBean.class);
+		}
+		catch (Exception e) {
+			logger.warn("Failed to initialize direct buffer pool bean.", e);
+		}
+		finally {
+			this.directBufferBean = directBufferBean;
+		}
 	}
 	
 	public void shutdown() {
@@ -89,6 +108,7 @@ public class MemoryLogger extends Thread {
 		try {
 			while (running && (monitored == null || !monitored.isTerminated())) {
 				logger.info(getMemoryUsageStatsAsString(memoryBean));
+				logger.info(getDirectMemoryStatsAsString(directBufferBean));
 				logger.info(getMemoryPoolStatsAsString(poolBeans));
 				logger.info(getGarbageCollectorStatsAsString(gcBeans));
 				
@@ -132,6 +152,27 @@ public class MemoryLogger extends Thread {
 	}
 
 	/**
+	 * Returns a String with the <strong>direct</strong> memory footprint.
+	 *
+	 * <p>These stats are not part of the other memory beans.
+	 *
+	 * @param bufferPoolMxBean The direct buffer pool bean or <code>null</code>
if none available.
+	 *
+	 * @return A string with the count, total capacity, and used direct memory.
+	 */
+	public static String getDirectMemoryStatsAsString(BufferPoolMXBean bufferPoolMxBean) {
+		if (bufferPoolMxBean == null) {
+			return "Direct memory stats: unavailable";
+		}
+		else {
+			return String.format("Direct memory stats: Count: %d, Total Capacity: %d, Used Memory:
%d",
+					bufferPoolMxBean.getCount(),
+					bufferPoolMxBean.getTotalCapacity(),
+					bufferPoolMxBean.getMemoryUsed());
+		}
+	}
+
+	/**
 	 * Gets the memory pool statistics from the JVM.
 	 *
 	 * @param poolBeans The collection of memory pool beans.

http://git-wip-us.apache.org/repos/asf/flink/blob/af996301/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 950b82c..b4665fa 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -31,7 +31,7 @@ import _root_.akka.util.Timeout
 
 import com.codahale.metrics.{Gauge, MetricFilter, MetricRegistry}
 import com.codahale.metrics.json.MetricsModule
-import com.codahale.metrics.jvm.{MemoryUsageGaugeSet, GarbageCollectorMetricSet}
+import com.codahale.metrics.jvm.{BufferPoolMetricSet, MemoryUsageGaugeSet, GarbageCollectorMetricSet}
 
 import com.fasterxml.jackson.databind.ObjectMapper
 import grizzled.slf4j.Logger
@@ -1986,6 +1986,8 @@ object TaskManager {
     // register default metrics
     metricRegistry.register("gc", new GarbageCollectorMetricSet)
     metricRegistry.register("memory", new MemoryUsageGaugeSet)
+    metricRegistry.register("direct-memory", new BufferPoolMetricSet(
+      ManagementFactory.getPlatformMBeanServer))
     metricRegistry.register("load", new Gauge[Double] {
       override def getValue: Double =
         ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage()


Mime
View raw message