flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject [5/5] flink git commit: [FLINK-1550] JM JVM Metrics
Date Fri, 01 Jul 2016 13:09:32 GMT
[FLINK-1550] JM JVM Metrics


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fafb9817
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fafb9817
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fafb9817

Branch: refs/heads/master
Commit: fafb981772bff57a153f02fd171d7d15f3a08379
Parents: a3a9fd1
Author: zentol <chesnay@apache.org>
Authored: Fri Jul 1 14:14:57 2016 +0200
Committer: zentol <chesnay@apache.org>
Committed: Fri Jul 1 15:09:16 2016 +0200

----------------------------------------------------------------------
 .../flink/runtime/jobmanager/JobManager.scala   | 138 ++++++++++++++++++-
 1 file changed, 137 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fafb9817/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index be1caa5..314977a 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -20,8 +20,10 @@ package org.apache.flink.runtime.jobmanager
 
 import java.io.{File, IOException}
 import java.net.{BindException, ServerSocket, UnknownHostException, InetAddress, InetSocketAddress}
+import java.lang.management.ManagementFactory
 import java.util.UUID
 import java.util.concurrent.{ExecutorService, TimeUnit, TimeoutException}
+import javax.management.ObjectName
 
 import akka.actor.Status.Failure
 import akka.actor._
@@ -33,7 +35,7 @@ import org.apache.flink.api.common.{ExecutionConfig, JobID}
 import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration}
 import org.apache.flink.core.fs.FileSystem
 import org.apache.flink.core.io.InputSplitAssigner
-import org.apache.flink.metrics.{MetricRegistry => FlinkMetricRegistry}
+import org.apache.flink.metrics.{Gauge, MetricGroup, MetricRegistry => FlinkMetricRegistry}
 import org.apache.flink.metrics.groups.JobManagerMetricGroup
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
 import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
@@ -217,6 +219,13 @@ class JobManager(
         log.error("Could not start the savepoint store.", e)
         throw new RuntimeException("Could not start the  savepoint store store.", e)
     }
+
+    jobManagerMetricGroup match {
+      case Some(group) =>
+        instantiateMetrics(group)
+      case None =>
+        log.warn("Could not instantiate JobManager metrics.")
+    }
   }
 
   override def postStop(): Unit = {
@@ -1747,6 +1756,133 @@ class JobManager(
     // Shutdown and discard all queued messages
     context.system.shutdown()
   }
+
+  private def instantiateMetrics(jobManagerMetricGroup: MetricGroup) : Unit = {
+    instantiateStatusMetrics(jobManagerMetricGroup)
+  }
+
+  private def instantiateStatusMetrics(jobManagerMetricGroup: MetricGroup) : Unit = {
+    val jvm = jobManagerMetricGroup
+      .addGroup("Status")
+      .addGroup("JVM")
+
+    instantiateClassLoaderMetrics(jvm.addGroup("ClassLoader"))
+    instantiateGarbageCollectorMetrics(jvm.addGroup("GarbageCollector"))
+    instantiateMemoryMetrics(jvm.addGroup("Memory"))
+    instantiateThreadMetrics(jvm.addGroup("Threads"))
+    instantiateCPUMetrics(jvm.addGroup("CPU"))
+  }
+
+  private def instantiateClassLoaderMetrics(metrics: MetricGroup) {
+    val mxBean = ManagementFactory.getClassLoadingMXBean
+
+    metrics.gauge[Long, Gauge[Long]]("ClassesLoaded", new Gauge[Long] {
+      override def getValue: Long = mxBean.getTotalLoadedClassCount
+    })
+    metrics.gauge[Long, Gauge[Long]]("ClassesUnloaded", new Gauge[Long] {
+      override def getValue: Long = mxBean.getUnloadedClassCount
+    })
+  }
+
+  private def instantiateGarbageCollectorMetrics(metrics: MetricGroup) {
+    val garbageCollectors = ManagementFactory.getGarbageCollectorMXBeans
+
+    for (garbageCollector <- garbageCollectors.asScala) {
+      val gcGroup = metrics.addGroup("\"" + garbageCollector.getName + "\"")
+      gcGroup.gauge[Long, Gauge[Long]]("Count", new Gauge[Long] {
+        override def getValue: Long = garbageCollector.getCollectionCount
+      })
+      gcGroup.gauge[Long, Gauge[Long]]("Time", new Gauge[Long] {
+        override def getValue: Long = garbageCollector.getCollectionTime
+      })
+    }
+  }
+
+  private def instantiateMemoryMetrics(metrics: MetricGroup) {
+    val mxBean = ManagementFactory.getMemoryMXBean
+    val heap = metrics.addGroup("Heap")
+    heap.gauge[Long, Gauge[Long]]("Used", new Gauge[Long] {
+      override def getValue: Long = mxBean.getHeapMemoryUsage.getUsed
+    })
+    heap.gauge[Long, Gauge[Long]]("Committed", new Gauge[Long] {
+      override def getValue: Long = mxBean.getHeapMemoryUsage.getCommitted
+    })
+    heap.gauge[Long, Gauge[Long]]("Max", new Gauge[Long] {
+      override def getValue: Long = mxBean.getHeapMemoryUsage.getMax
+    })
+
+    val nonHeap = metrics.addGroup("NonHeap")
+    nonHeap.gauge[Long, Gauge[Long]]("Used", new Gauge[Long] {
+      override def getValue: Long = mxBean.getNonHeapMemoryUsage.getUsed
+    })
+    nonHeap.gauge[Long, Gauge[Long]]("Committed", new Gauge[Long] {
+      override def getValue: Long = mxBean.getNonHeapMemoryUsage.getCommitted
+    })
+    nonHeap.gauge[Long, Gauge[Long]]("Max", new Gauge[Long] {
+      override def getValue: Long = mxBean.getNonHeapMemoryUsage.getMax
+    })
+
+    val con = ManagementFactory.getPlatformMBeanServer;
+
+    val directObjectName = new ObjectName("java.nio:type=BufferPool,name=direct")
+
+    val direct = metrics.addGroup("Direct")
+    direct.gauge[Long, Gauge[Long]]("Count", new Gauge[Long] {
+      override def getValue: Long = con
+        .getAttribute(directObjectName, "Count").asInstanceOf[Long]
+    })
+    direct.gauge[Long, Gauge[Long]]("MemoryUsed", new Gauge[Long] {
+      override def getValue: Long = con
+        .getAttribute(directObjectName, "MemoryUsed").asInstanceOf[Long]
+    })
+    direct.gauge[Long, Gauge[Long]]("TotalCapacity", new Gauge[Long] {
+      override def getValue: Long = con
+        .getAttribute(directObjectName, "TotalCapacity").asInstanceOf[Long]
+    })
+
+    val mappedObjectName = new ObjectName("java.nio:type=BufferPool,name=mapped")
+
+    val mapped = metrics.addGroup("Mapped")
+    mapped.gauge[Long, Gauge[Long]]("Count", new Gauge[Long] {
+      override def getValue: Long = con
+        .getAttribute(mappedObjectName, "Count").asInstanceOf[Long]
+    })
+    mapped.gauge[Long, Gauge[Long]]("MemoryUsed", new Gauge[Long] {
+      override def getValue: Long = con
+        .getAttribute(mappedObjectName, "MemoryUsed").asInstanceOf[Long]
+    })
+    mapped.gauge[Long, Gauge[Long]]("TotalCapacity", new Gauge[Long] {
+      override def getValue: Long = con
+        .getAttribute(mappedObjectName, "TotalCapacity").asInstanceOf[Long]
+    })
+  }
+
+  private def instantiateThreadMetrics(metrics: MetricGroup): Unit = {
+    val mxBean = ManagementFactory.getThreadMXBean
+
+    metrics.gauge[Int, Gauge[Int]]("Count", new Gauge[Int] {
+      override def getValue: Int = mxBean.getThreadCount
+    })
+  }
+
+  private def instantiateCPUMetrics(metrics: MetricGroup): Unit = {
+    try {
+      val mxBean = ManagementFactory.getOperatingSystemMXBean
+        .asInstanceOf[com.sun.management.OperatingSystemMXBean]
+
+      metrics.gauge[Double, Gauge[Double]]("Load", new Gauge[Double] {
+        override def getValue: Double = mxBean.getProcessCpuLoad
+      })
+      metrics.gauge[Long, Gauge[Long]]("Time", new Gauge[Long] {
+        override def getValue: Long = mxBean.getProcessCpuTime
+      })
+    }
+    catch {
+      case t: Throwable =>
+        log.warn("Cannot access com.sun.management.OperatingSystemMXBean.getProcessCpuLoad()"
+
+          " - CPU load metrics will not be available.")
+    }
+  }
 }
 
 /**


Mime
View raw message