Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1F33F200B40 for ; Fri, 1 Jul 2016 15:09:30 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 1DCF9160A5D; Fri, 1 Jul 2016 13:09:30 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 445A7160A72 for ; Fri, 1 Jul 2016 15:09:29 +0200 (CEST) Received: (qmail 19911 invoked by uid 500); 1 Jul 2016 13:09:28 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 19724 invoked by uid 99); 1 Jul 2016 13:09:28 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 01 Jul 2016 13:09:28 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3BEBCE93E0; Fri, 1 Jul 2016 13:09:28 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: chesnay@apache.org To: commits@flink.apache.org Date: Fri, 01 Jul 2016 13:09:32 -0000 Message-Id: <8f4ec2aac548493f99e0bd2107a55a45@git.apache.org> In-Reply-To: <4986ca225ce145c995d02b2dd9f1becd@git.apache.org> References: <4986ca225ce145c995d02b2dd9f1becd@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [5/5] flink git commit: [FLINK-1550] JM JVM Metrics archived-at: Fri, 01 Jul 2016 13:09:30 -0000 [FLINK-1550] JM JVM Metrics Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fafb9817 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fafb9817 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fafb9817 Branch: refs/heads/master Commit: fafb981772bff57a153f02fd171d7d15f3a08379 Parents: a3a9fd1 Author: zentol Authored: Fri Jul 1 14:14:57 2016 +0200 Committer: zentol Committed: Fri Jul 1 15:09:16 2016 +0200 ---------------------------------------------------------------------- .../flink/runtime/jobmanager/JobManager.scala | 138 ++++++++++++++++++- 1 file changed, 137 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/fafb9817/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index be1caa5..314977a 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -20,8 +20,10 @@ package org.apache.flink.runtime.jobmanager import java.io.{File, IOException} import java.net.{BindException, ServerSocket, UnknownHostException, InetAddress, InetSocketAddress} +import java.lang.management.ManagementFactory import java.util.UUID import java.util.concurrent.{ExecutorService, TimeUnit, TimeoutException} +import javax.management.ObjectName import akka.actor.Status.Failure import akka.actor._ @@ -33,7 +35,7 @@ import org.apache.flink.api.common.{ExecutionConfig, JobID} import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration} import org.apache.flink.core.fs.FileSystem import org.apache.flink.core.io.InputSplitAssigner -import org.apache.flink.metrics.{MetricRegistry => FlinkMetricRegistry} +import org.apache.flink.metrics.{Gauge, MetricGroup, MetricRegistry => FlinkMetricRegistry} import org.apache.flink.metrics.groups.JobManagerMetricGroup import org.apache.flink.runtime.accumulators.AccumulatorSnapshot import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour} @@ -217,6 +219,13 @@ class JobManager( log.error("Could not start the savepoint store.", e) throw new RuntimeException("Could not start the savepoint store store.", e) } + + jobManagerMetricGroup match { + case Some(group) => + instantiateMetrics(group) + case None => + log.warn("Could not instantiate JobManager metrics.") + } } override def postStop(): Unit = { @@ -1747,6 +1756,133 @@ class JobManager( // Shutdown and discard all queued messages context.system.shutdown() } + + private def instantiateMetrics(jobManagerMetricGroup: MetricGroup) : Unit = { + instantiateStatusMetrics(jobManagerMetricGroup) + } + + private def instantiateStatusMetrics(jobManagerMetricGroup: MetricGroup) : Unit = { + val jvm = jobManagerMetricGroup + .addGroup("Status") + .addGroup("JVM") + + instantiateClassLoaderMetrics(jvm.addGroup("ClassLoader")) + instantiateGarbageCollectorMetrics(jvm.addGroup("GarbageCollector")) + instantiateMemoryMetrics(jvm.addGroup("Memory")) + instantiateThreadMetrics(jvm.addGroup("Threads")) + instantiateCPUMetrics(jvm.addGroup("CPU")) + } + + private def instantiateClassLoaderMetrics(metrics: MetricGroup) { + val mxBean = ManagementFactory.getClassLoadingMXBean + + metrics.gauge[Long, Gauge[Long]]("ClassesLoaded", new Gauge[Long] { + override def getValue: Long = mxBean.getTotalLoadedClassCount + }) + metrics.gauge[Long, Gauge[Long]]("ClassesUnloaded", new Gauge[Long] { + override def getValue: Long = mxBean.getUnloadedClassCount + }) + } + + private def instantiateGarbageCollectorMetrics(metrics: MetricGroup) { + val garbageCollectors = ManagementFactory.getGarbageCollectorMXBeans + + for (garbageCollector <- garbageCollectors.asScala) { + val gcGroup = metrics.addGroup("\"" + garbageCollector.getName + "\"") + gcGroup.gauge[Long, Gauge[Long]]("Count", new Gauge[Long] { + override def getValue: Long = garbageCollector.getCollectionCount + }) + gcGroup.gauge[Long, Gauge[Long]]("Time", new Gauge[Long] { + override def getValue: Long = garbageCollector.getCollectionTime + }) + } + } + + private def instantiateMemoryMetrics(metrics: MetricGroup) { + val mxBean = ManagementFactory.getMemoryMXBean + val heap = metrics.addGroup("Heap") + heap.gauge[Long, Gauge[Long]]("Used", new Gauge[Long] { + override def getValue: Long = mxBean.getHeapMemoryUsage.getUsed + }) + heap.gauge[Long, Gauge[Long]]("Committed", new Gauge[Long] { + override def getValue: Long = mxBean.getHeapMemoryUsage.getCommitted + }) + heap.gauge[Long, Gauge[Long]]("Max", new Gauge[Long] { + override def getValue: Long = mxBean.getHeapMemoryUsage.getMax + }) + + val nonHeap = metrics.addGroup("NonHeap") + nonHeap.gauge[Long, Gauge[Long]]("Used", new Gauge[Long] { + override def getValue: Long = mxBean.getNonHeapMemoryUsage.getUsed + }) + nonHeap.gauge[Long, Gauge[Long]]("Committed", new Gauge[Long] { + override def getValue: Long = mxBean.getNonHeapMemoryUsage.getCommitted + }) + nonHeap.gauge[Long, Gauge[Long]]("Max", new Gauge[Long] { + override def getValue: Long = mxBean.getNonHeapMemoryUsage.getMax + }) + + val con = ManagementFactory.getPlatformMBeanServer; + + val directObjectName = new ObjectName("java.nio:type=BufferPool,name=direct") + + val direct = metrics.addGroup("Direct") + direct.gauge[Long, Gauge[Long]]("Count", new Gauge[Long] { + override def getValue: Long = con + .getAttribute(directObjectName, "Count").asInstanceOf[Long] + }) + direct.gauge[Long, Gauge[Long]]("MemoryUsed", new Gauge[Long] { + override def getValue: Long = con + .getAttribute(directObjectName, "MemoryUsed").asInstanceOf[Long] + }) + direct.gauge[Long, Gauge[Long]]("TotalCapacity", new Gauge[Long] { + override def getValue: Long = con + .getAttribute(directObjectName, "TotalCapacity").asInstanceOf[Long] + }) + + val mappedObjectName = new ObjectName("java.nio:type=BufferPool,name=mapped") + + val mapped = metrics.addGroup("Mapped") + mapped.gauge[Long, Gauge[Long]]("Count", new Gauge[Long] { + override def getValue: Long = con + .getAttribute(mappedObjectName, "Count").asInstanceOf[Long] + }) + mapped.gauge[Long, Gauge[Long]]("MemoryUsed", new Gauge[Long] { + override def getValue: Long = con + .getAttribute(mappedObjectName, "MemoryUsed").asInstanceOf[Long] + }) + mapped.gauge[Long, Gauge[Long]]("TotalCapacity", new Gauge[Long] { + override def getValue: Long = con + .getAttribute(mappedObjectName, "TotalCapacity").asInstanceOf[Long] + }) + } + + private def instantiateThreadMetrics(metrics: MetricGroup): Unit = { + val mxBean = ManagementFactory.getThreadMXBean + + metrics.gauge[Int, Gauge[Int]]("Count", new Gauge[Int] { + override def getValue: Int = mxBean.getThreadCount + }) + } + + private def instantiateCPUMetrics(metrics: MetricGroup): Unit = { + try { + val mxBean = ManagementFactory.getOperatingSystemMXBean + .asInstanceOf[com.sun.management.OperatingSystemMXBean] + + metrics.gauge[Double, Gauge[Double]]("Load", new Gauge[Double] { + override def getValue: Double = mxBean.getProcessCpuLoad + }) + metrics.gauge[Long, Gauge[Long]]("Time", new Gauge[Long] { + override def getValue: Long = mxBean.getProcessCpuTime + }) + } + catch { + case t: Throwable => + log.warn("Cannot access com.sun.management.OperatingSystemMXBean.getProcessCpuLoad()" + + " - CPU load metrics will not be available.") + } + } } /**