eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hdenduk...@apache.org
Subject [9/9] incubator-eagle git commit: EAGLE-167 refactor metric collector script
Date Fri, 26 Feb 2016 21:26:27 GMT
EAGLE-167 refactor metric collector script

https://issues.apache.org/jira/browse/EAGLE-167

Author: @haoch <hao@apache.org>
Reviewer: @haoch <hao@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/1965b4af
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/1965b4af
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/1965b4af

Branch: refs/heads/branch-0.3.0
Commit: 1965b4aff6f27b971effc7bd5be0fd773a686789
Parents: d37643b
Author: Hao Chen <hao@apache.org>
Authored: Fri Feb 26 16:17:09 2016 +0800
Committer: Hao Chen <hao@apache.org>
Committed: Fri Feb 26 16:17:09 2016 +0800

----------------------------------------------------------------------
 eagle-external/hadoop_jmx_collector/base.py     | 250 -------------
 .../hadoop_jmx_collector/hadoop_ha_checker.py   |  10 +-
 .../hadoop_jmx_collector/hadoop_jmx_kafka.py    | 228 ++++--------
 .../hadoop_jmx_collector/metric_collector.py    | 354 +++++++++++++++++++
 .../hadoop_jmx_collector/metric_extensions.py   |  95 -----
 5 files changed, 420 insertions(+), 517 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1965b4af/eagle-external/hadoop_jmx_collector/base.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/base.py b/eagle-external/hadoop_jmx_collector/base.py
deleted file mode 100644
index bbb2344..0000000
--- a/eagle-external/hadoop_jmx_collector/base.py
+++ /dev/null
@@ -1,250 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-# !/usr/bin/python
-
-
-# !/usr/bin/python
-
-import os
-import re
-import time
-import json
-import urllib2
-import sys
-import socket
-import types
-import httplib
-import logging
-import threading
-
-# load six
-sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), '', 'lib/six'))
-import six
-
-# load kafka-python
-sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), '', 'lib/kafka-python'))
-from kafka import KafkaClient, SimpleProducer, SimpleConsumer
-
-logging.basicConfig(level=logging.INFO,
-                    format='%(asctime)s %(name)-12s %(levelname)-6s %(message)s',
-                    datefmt='%m-%d %H:%M')
-
-class Helper:
-    def __init__(self):
-        pass
-
-    @staticmethod
-    def load_config(config_file="config.json"):
-        """
-
-        :param config_file:
-        :return:
-        """
-        # read the self-defined filters
-        script_dir = os.path.dirname(__file__)
-        rel_path = "./" + config_file
-        abs_file_path = os.path.join(script_dir, rel_path)
-        if not os.path.isfile(abs_file_path):
-            logging.error(abs_file_path + " doesn't exist, please rename config-sample.json
to config.json")
-            exit(1)
-        f = open(abs_file_path, 'r')
-        json_file = f.read()
-        f.close()
-        config = json.loads(json_file)
-        return config
-
-    @staticmethod
-    def is_number(str):
-        """
-
-        :param str:
-        :return:
-        """
-        try:
-            if str == None or isinstance(str, (bool)):
-                return False
-            float(str)
-            return True
-        except:
-            return False
-
-    @staticmethod
-    def http_get(host, port, https=False, path=None):
-        """
-        Read url by GET method
-
-        :param path:
-        :param url:
-        :param https:
-        :return:
-        """
-        url = ":".join([host, str(port)])
-        result = None
-        response = None
-        try:
-            if https:
-                logging.info("Reading https://" + str(url) + path)
-                c = httplib.HTTPSConnection(url, timeout=57)
-                c.request("GET", path)
-                response = c.getresponse()
-            else:
-                logging.info("Reading http://" + str(url) + path)
-                response = urllib2.urlopen("http://" + str(url) + path, timeout=57)
-            logging.debug("Got response")
-            result = response.read()
-        finally:
-            if response is not None:
-                response.close()
-            return result
-
-
-class JmxReader(object):
-    def __init__(self, host, port, https=False):
-        self.host = host
-        self.port = port
-        self.https = https
-        self.jmx_json = None
-        self.jmx_beans = None
-        self.jmx_raw = None
-
-    def open(self):
-        """
-        :return: JmxReader
-        """
-        self.read_raw()
-        self.set_raw(self.jmx_raw)
-        return self
-
-    def read_raw(self):
-        """
-        transfer the json string into dict
-        :param host:
-        :param port:
-        :param https:
-        :return: text
-        """
-        self.jmx_raw = Helper.http_get(self.host, self.port, self.https, "/jmx?anonymous=true")
-        if self.jmx_raw is None:
-            raise Exception("Response from " + url + " is None")
-        return self
-
-    def set_raw(self, text):
-        self.jmx_json = json.loads(text)
-        self.jmx_beans = self.jmx_json[u'beans']
-        self.jmx_raw = text
-        return self
-
-    def get_jmx_beans(self):
-        return self.jmx_beans
-
-    def get_jmx_bean_by_name(self, name):
-        for bean in self.jmx_beans:
-            if bean.has_key("name") and bean["name"] == name:
-                return bean
-
-
-class YarnWSReader:
-    def __init__(self, host, port, https=False):
-        self.host = host
-        self.port = port
-        self.https = https
-
-    def read_cluster_info(self):
-        cluster_info = Helper.http_get(self.host, self.port, self.https, "/ws/v1/cluster/info")
-        logging.debug(cluster_info)
-        return json.loads(cluster_info)
-
-class MetricSender(object):
-    def __init__(self, config):
-        pass
-
-    def open(self):
-        pass
-
-    def send(self, msg):
-        raise Exception("should be overrode")
-
-    def close(self):
-        pass
-
-
-class KafkaMetricSender(MetricSender):
-    def __init__(self, config):
-        super(KafkaMetricSender, self).__init__(config)
-        kafka_config = config["output"]["kafka"]
-        # default topic
-        self.topic = kafka_config["topic"].encode('utf-8')
-        # producer
-        self.broker_list = kafka_config["brokerList"]
-        self.kafka_client = None
-        self.kafka_producer = None
-
-    def open(self):
-        self.kafka_client = KafkaClient(self.broker_list, timeout=59)
-        self.kafka_producer = SimpleProducer(self.kafka_client, batch_send=True, batch_send_every_n=500,
-                                             batch_send_every_t=30)
-
-    def send(self, msg):
-        self.kafka_producer.send_messages(self.topic, json.dumps(msg))
-
-    def close(self):
-        if self.kafka_producer is not None:
-            self.kafka_producer.stop()
-        if self.kafka_client is not None:
-            self.kafka_client.close()
-
-
-class MetricCollector(threading.Thread):
-    def __init__(self):
-        threading.Thread.__init__(self)
-
-        self.config = Helper.load_config()
-        self.sender = KafkaMetricSender(self.config)
-
-    def start(self):
-        try:
-            self.sender.open()
-            self.run()
-        finally:
-            self.sender.close()
-
-    def collect(self, msg):
-        if not msg.has_key("timestamp"):
-            msg["timestamp"] = int(round(time.time() * 1000))
-        if msg.has_key("value"):
-            msg["value"] = float(str(msg["value"]))
-        if not msg.has_key("host") or len(msg["host"]) == 0:
-            msg["host"] = socket.getfqdn()
-        if not msg.has_key("site"):
-            msg["site"] = self.config["env"]["site"]
-
-        self.sender.send(msg)
-
-    def run(self):
-        raise Exception("`run` method should be overrode by sub-class before being called")
-
-class Runner(object):
-    @staticmethod
-    def run(*threads):
-        """
-        Execute concurrently
-
-        :param threads:
-        :return:
-        """
-        for thread in threads:
-            thread.start()
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1965b4af/eagle-external/hadoop_jmx_collector/hadoop_ha_checker.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/hadoop_ha_checker.py b/eagle-external/hadoop_jmx_collector/hadoop_ha_checker.py
index 063279b..2a5096b 100644
--- a/eagle-external/hadoop_jmx_collector/hadoop_ha_checker.py
+++ b/eagle-external/hadoop_jmx_collector/hadoop_ha_checker.py
@@ -1,3 +1,5 @@
+# !/usr/bin/python
+#
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
@@ -12,14 +14,11 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+#
 
-
-# !/usr/bin/python
-
-from base import MetricCollector, JmxReader, YarnWSReader, Runner
+from metric_collector import MetricCollector, JmxReader, YarnWSReader, Runner
 import logging
 
-
 class HadoopNNHAChecker(MetricCollector):
     def run(self):
         if not self.config["env"].has_key("name_node"):
@@ -126,5 +125,4 @@ class HadoopRMHAChecker(MetricCollector):
         })
 
 if __name__ == '__main__':
-    # HadoopNNHAChecker().start()
     Runner.run(HadoopNNHAChecker(), HadoopRMHAChecker())
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1965b4af/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py b/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py
index b342a5e..8837c30 100644
--- a/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py
+++ b/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py
@@ -1,5 +1,5 @@
-#!/usr/bin/python
-
+# !/usr/bin/python
+#
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
@@ -14,170 +14,66 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+#
 
-import os
-import re
-import time
+from metric_collector import JmxMetricCollector,JmxMetricListener,Runner
 import json
-import urllib2
-import sys
-import socket
-import types
-import httplib
-
-# load six
-sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), '', 'lib/six'))
-import six
-
-# load kafka-python
-sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), '', 'lib/kafka-python'))
-from kafka import KafkaClient, SimpleProducer, SimpleConsumer
-
-from util_func import *
-from metric_extensions import *
-
-
-DATA_TYPE = "hadoop"
-
-def readUrl(url, https=False):
-    jmxjson = 'error'
-    try:
-        if https:
-            print "Reading https://" + str(url) + "/jmx?anonymous=true"
-            c = httplib.HTTPSConnection(url, timeout=57)
-            c.request("GET", "/jmx?anonymous=true")
-            response = c.getresponse()
-        else:
-            print "Reading http://" + str(url) + "/jmx?anonymous=true"
-            response = urllib2.urlopen("http://" + url + '/jmx?anonymous=true', timeout=57)
-    except Exception, e:
-        print 'Reason: ', e
-    else:
-        # everything is fine
-        jmxjson = response.read()
-        response.close()
-    finally:
-        return jmxjson
-
-
-def get_metric_prefix_name(mbean_attribute, context):
-    mbean_list = list(prop.split("=", 1)
-                      for prop in mbean_attribute.split(","))
-    metric_prefix_name = None
-    if context == "":
-        metric_prefix_name = '.'.join([i[1] for i in mbean_list])
-    else:
-        name_index = [i[0] for i in mbean_list].index('name')
-        mbean_list[name_index][1] = context
-        metric_prefix_name = '.'.join([i[1] for i in mbean_list])
-    return (DATA_TYPE + "." + metric_prefix_name).replace(" ","").lower()
-
-def parse_hadoop_jmx(producer, topic, config, beans, dataMap, fat_bean):
-    selected_group = [s.encode('utf-8') for s in config[u'filter'].get('monitoring.group.selected')]
-    #print selected_group
-
-    for bean in beans:
-        kafka_dict = dataMap.copy()
-
-        # mbean is of the form "domain:key=value,...,foo=bar"
-        mbean = bean[u'name']
-        mbean_domain, mbean_attribute = mbean.rstrip().split(":", 1)
-        mbean_domain = mbean_domain.lower()
-
-        # print mbean_domain
-        if mbean_domain not in selected_group:
-            # print "Unexpected mbean domain = %s on %s" % (mbean_domain, mbean)
-            continue
-        fat_bean.update(bean)
-        context = bean.get("tag.Context", "")
-        metric_prefix_name = get_metric_prefix_name(mbean_attribute, context)
-
-        # print kafka_dict
-        for key, value in bean.iteritems():
-            #print key, value
-            key = key.lower()
-            if re.match(r'tag.*', key):
-                continue
-
-            if mbean_domain == 'hadoop' and re.match(r'^namespace', key):
-                #print key
-                items = re.split('_table_', key)
-                key = items[1]
-                items = re.split('_region_', key)
-                kafka_dict['table'] = items[0]
-                items = re.split('_metric_', items[1])
-                kafka_dict['region'] = items[0]
-                key = items[1]
-
-            metric = metric_prefix_name + '.' + key
-
-            single_metric_callback(producer, topic, kafka_dict, metric, value)
-
-def get_jmx_beans(host, port, https):
-    # port = inputConfig.get('port')
-    # https = inputConfig.get('https')
-    url = host + ':' + port
-    #print url
-
-    jmxjson = readUrl(url, https)
-
-    if jmxjson == 'error':
-        print 'jmx load error'
-
-    # transfer the json string into dict
-    jmx = json.loads(jmxjson)
-    beans = jmx[u'beans']
-
-    return beans
-
-def main():
-    kafka = None
-    producer = None
-    topic = None
-    brokerList = None
-
-    try:
-        #start = time.clock()
-
-        # read the kafka.ini
-        if (len(sys.argv) > 1):
-            config = load_config(sys.argv[1])
-        else:
-            config = load_config('config.json')
-        #print config
-
-        site = config[u'env'].get('site').encode('utf-8')
-        component = config[u'input'].get('component').encode('utf-8')
-
-        if config[u'input'].has_key("host"):
-            host = config[u'input'].get("host").encode('utf-8')
-        else:
-            host = socket.getfqdn()
-
-        port = config[u'input'].get('port')
-        https = config[u'input'].get('https')
-        kafkaConfig = config[u'output'].get(u'kafka')
-        if kafkaConfig != None :
-            brokerList = kafkaConfig.get('brokerList')
-            topic = kafkaConfig.get('topic').encode('utf-8')
-
-        beans = get_jmx_beans(host, port, https)
-        #print brokerList
-        if brokerList != None:
-            kafka, producer = kafka_connect(brokerList)
-
-        default_metric = {"site": site, "host": host, "timestamp": '', "component": component,
"metric": '', "value": ''}
-        fat_bean = dict()
-        parse_hadoop_jmx(producer, topic, config, beans, default_metric, fat_bean)
-        metrics_bean_callback(producer, topic, default_metric, fat_bean)
-    # except Exception, e:
-    #     print 'main except: ', e
-    finally:
-        if kafka != None and producer != None:
-            kafka_close(kafka, producer)
-
-        #elapsed = (time.clock() - start)
-        #print("Time used:",elapsed)
 
-if __name__ == "__main__":
-    main()
+class NNSafeModeMetric(JmxMetricListener):
+    def on_metric(self, metric):
+        if metric["metric"] == "hadoop.namenode.fsnamesystemstate.fsstate":
+            if metric["value"] == "safeMode":
+                metric["value"] = 1
+            else:
+                metric["value"] = 0
+            self.collector.collect(metric)
+
+
+class NNHAMetric(JmxMetricListener):
+    PREFIX = "hadoop.namenode.fsnamesystem"
+
+    def on_bean(self, bean):
+        if bean["name"] == "Hadoop:service=NameNode,name=FSNamesystem":
+            if bean[u"tag.HAState"] == "active":
+                self.collector.on_bean_kv(self.PREFIX, "hastate", 0)
+            else:
+                self.collector.on_bean_kv(self.PREFIX, "hastate", 1)
+
+
+class MemortUsageMetric(JmxMetricListener):
+    PREFIX = "hadoop.namenode.jvm"
+
+    def on_bean(self, bean):
+        if bean["name"] == "Hadoop:service=NameNode,name=JvmMetrics":
+            memnonheapusedusage = round(float(bean['MemNonHeapUsedM']) / float(bean['MemNonHeapMaxM'])
* 100.0, 2)
+            self.collector.on_bean_kv(self.PREFIX, "memnonheapusedusage", memnonheapusedusage)
+            memnonheapcommittedusage = round(float(bean['MemNonHeapCommittedM']) / float(bean['MemNonHeapMaxM'])
* 100,
+                                             2)
+            self.collector.on_bean_kv(self.PREFIX, "memnonheapcommittedusage", memnonheapcommittedusage)
+            memheapusedusage = round(float(bean['MemHeapUsedM']) / float(bean['MemHeapMaxM'])
* 100, 2)
+            self.collector.on_bean_kv(self.PREFIX, "memheapusedusage", memheapusedusage)
+            memheapcommittedusage = round(float(bean['MemHeapCommittedM']) / float(bean['MemHeapMaxM'])
* 100, 2)
+            self.collector.on_bean_kv(self.PREFIX, "memheapcommittedusage", memheapcommittedusage)
+
+
+class JournalTransactionInfoMetric(JmxMetricListener):
+    PREFIX = "hadoop.namenode.journaltransaction"
+
+    def on_bean(self, bean):
+        if bean.has_key("JournalTransactionInfo"):
+            JournalTransactionInfo = json.loads(bean.get("JournalTransactionInfo"))
+            LastAppliedOrWrittenTxId = float(JournalTransactionInfo.get("LastAppliedOrWrittenTxId"))
+            MostRecentCheckpointTxId = float(JournalTransactionInfo.get("MostRecentCheckpointTxId"))
+            self.collector.on_bean_kv(self.PREFIX, "LastAppliedOrWrittenTxId", LastAppliedOrWrittenTxId)
+            self.collector.on_bean_kv(self.PREFIX, "MostRecentCheckpointTxId", MostRecentCheckpointTxId)
+
+
+if __name__ == '__main__':
+    collector = JmxMetricCollector()
+    collector.register(
+            NNSafeModeMetric(),
+            NNHAMetric(),
+            MemortUsageMetric(),
+            JournalTransactionInfoMetric()
+    )
+    Runner.run(collector)

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1965b4af/eagle-external/hadoop_jmx_collector/metric_collector.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/metric_collector.py b/eagle-external/hadoop_jmx_collector/metric_collector.py
new file mode 100644
index 0000000..939b811
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/metric_collector.py
@@ -0,0 +1,354 @@
+# !/usr/bin/python
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import re
+import time
+import json
+import urllib2
+import sys
+import socket
+import types
+import httplib
+import logging
+import threading
+
+# load six
+sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), '', 'lib/six'))
+import six
+
+# load kafka-python
+sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), '', 'lib/kafka-python'))
+from kafka import KafkaClient, SimpleProducer, SimpleConsumer
+
+logging.basicConfig(level=logging.INFO,
+                    format='%(asctime)s %(name)-12s %(levelname)-6s %(message)s',
+                    datefmt='%m-%d %H:%M')
+
+
+class Helper:
+    def __init__(self):
+        pass
+
+    @staticmethod
+    def load_config(config_file="config.json"):
+        """
+
+        :param config_file:
+        :return:
+        """
+        # read the self-defined filters
+        script_dir = os.path.dirname(__file__)
+        rel_path = "./" + config_file
+        abs_file_path = os.path.join(script_dir, rel_path)
+        if not os.path.isfile(abs_file_path):
+            logging.error(abs_file_path + " doesn't exist, please rename config-sample.json
to config.json")
+            exit(1)
+        f = open(abs_file_path, 'r')
+        json_file = f.read()
+        f.close()
+        config = json.loads(json_file)
+        return config
+
+    @staticmethod
+    def is_number(str):
+        """
+
+        :param str:
+        :return:
+        """
+        try:
+            if str == None or isinstance(str, (bool)):
+                return False
+            float(str)
+            return True
+        except:
+            return False
+
+    @staticmethod
+    def http_get(host, port, https=False, path=None):
+        """
+        Read url by GET method
+
+        :param path:
+        :param url:
+        :param https:
+        :return:
+        """
+        url = ":".join([host, str(port)])
+        result = None
+        response = None
+        try:
+            if https:
+                logging.info("Reading https://" + str(url) + path)
+                c = httplib.HTTPSConnection(url, timeout=57)
+                c.request("GET", path)
+                response = c.getresponse()
+            else:
+                logging.info("Reading http://" + str(url) + path)
+                response = urllib2.urlopen("http://" + str(url) + path, timeout=57)
+            logging.debug("Got response")
+            result = response.read()
+        finally:
+            if response is not None:
+                response.close()
+            return result
+
+
+class JmxReader(object):
+    def __init__(self, host, port, https=False):
+        self.host = host
+        self.port = port
+        self.https = https
+        self.jmx_json = None
+        self.jmx_beans = None
+        self.jmx_raw = None
+
+    def open(self):
+        """
+        :return: JmxReader
+        """
+        self.read_raw()
+        self.set_raw(self.jmx_raw)
+        return self
+
+    def read_raw(self):
+        """
+        transfer the json string into dict
+        :param host:
+        :param port:
+        :param https:
+        :return: text
+        """
+        self.jmx_raw = Helper.http_get(self.host, self.port, self.https, "/jmx?anonymous=true")
+        if self.jmx_raw is None:
+            raise Exception("Response from " + url + " is None")
+        return self
+
+    def set_raw(self, text):
+        self.jmx_json = json.loads(text)
+        self.jmx_beans = self.jmx_json[u'beans']
+        self.jmx_raw = text
+        return self
+
+    def get_jmx_beans(self):
+        return self.jmx_beans
+
+    def get_jmx_bean_by_name(self, name):
+        for bean in self.jmx_beans:
+            if bean.has_key("name") and bean["name"] == name:
+                return bean
+
+
+class YarnWSReader:
+    def __init__(self, host, port, https=False):
+        self.host = host
+        self.port = port
+        self.https = https
+
+    def read_cluster_info(self):
+        cluster_info = Helper.http_get(self.host, self.port, self.https, "/ws/v1/cluster/info")
+        logging.debug(cluster_info)
+        return json.loads(cluster_info)
+
+
+class MetricSender(object):
+    def __init__(self, config):
+        pass
+
+    def open(self):
+        pass
+
+    def send(self, msg):
+        raise Exception("should be overrode")
+
+    def close(self):
+        pass
+
+
+class KafkaMetricSender(MetricSender):
+    def __init__(self, config):
+        super(KafkaMetricSender, self).__init__(config)
+        kafka_config = config["output"]["kafka"]
+        # default topic
+        self.topic = kafka_config["topic"].encode('utf-8')
+        # producer
+        self.broker_list = kafka_config["brokerList"]
+        self.kafka_client = None
+        self.kafka_producer = None
+
+    def open(self):
+        self.kafka_client = KafkaClient(self.broker_list, timeout=59)
+        self.kafka_producer = SimpleProducer(self.kafka_client, batch_send=True, batch_send_every_n=500,
+                                             batch_send_every_t=30)
+
+    def send(self, msg):
+        self.kafka_producer.send_messages(self.topic, json.dumps(msg))
+
+    def close(self):
+        if self.kafka_producer is not None:
+            self.kafka_producer.stop()
+        if self.kafka_client is not None:
+            self.kafka_client.close()
+
+
+class MetricCollector(threading.Thread):
+    def __init__(self):
+        threading.Thread.__init__(self)
+        self.config = Helper.load_config()
+        self.sender = KafkaMetricSender(self.config)
+        self.fqdn = socket.getfqdn()
+        self.init(self.config)
+
+    def init(self, config):
+        pass
+
+    def start(self):
+        try:
+            self.sender.open()
+            self.run()
+        finally:
+            self.sender.close()
+
+    def collect(self, msg):
+        if not msg.has_key("timestamp"):
+            msg["timestamp"] = int(round(time.time() * 1000))
+        if msg.has_key("value"):
+            msg["value"] = float(str(msg["value"]))
+        if not msg.has_key("host") or len(msg["host"]) == 0:
+            msg["host"] = self.fqdn
+        if not msg.has_key("site"):
+            msg["site"] = self.config["env"]["site"]
+
+        self.sender.send(msg)
+
+    def run(self):
+        raise Exception("`run` method should be overrode by sub-class before being called")
+
+
+class Runner(object):
+    @staticmethod
+    def run(*threads):
+        """
+        Execute concurrently
+
+        :param threads:
+        :return:
+        """
+        for thread in threads:
+            thread.start()
+
+class JmxMetricCollector(MetricCollector):
+    selected_domain = None
+    component = None
+    https = False
+    port = None
+    listeners = []
+
+    def init(self, config):
+        if config["input"].has_key("host"):
+            self.host = config["input"]["host"]
+        else:
+            self.host = self.fqdn
+        self.port = config["input"]["port"]
+        self.https = config["input"]["https"]
+        self.component = config["input"]["component"]
+        self.selected_domain = [s.encode('utf-8') for s in config[u'filter'].get('monitoring.group.selected')]
+        self.listeners = []
+
+    def register(self, *listeners):
+        """
+        :param listeners: type of HadoopJmxListener
+        :return:
+        """
+        for listener in listeners:
+            listener.init(self)
+            self.listeners.append(listener)
+
+    def run(self):
+        try:
+            beans = JmxReader(self.host, self.port, self.https).open().get_jmx_beans()
+            self.on_beans(beans)
+        except Exception as e:
+            logging.exception("Failed to read jmx for " + self.host)
+
+    def filter_bean(self, bean, mbean_domain):
+        return mbean_domain in self.selected_domain
+
+    def on_beans(self, beans):
+        for bean in beans:
+            self.on_bean(bean)
+
+    def on_bean(self, bean):
+        # mbean is of the form "domain:key=value,...,foo=bar"
+        mbean = bean[u'name']
+        mbean_domain, mbean_attribute = mbean.rstrip().split(":", 1)
+        mbean_domain = mbean_domain.lower()
+
+        if not self.filter_bean(bean, mbean_domain):
+            return
+
+        context = bean.get("tag.Context", "")
+        metric_prefix_name = self.__build_metric_prefix(mbean_attribute, context)
+
+        # print kafka_dict
+        for key, value in bean.iteritems():
+            self.on_bean_kv(metric_prefix_name, key, value)
+
+        for listener in self.listeners:
+            listener.on_bean(bean.copy())
+
+    def on_bean_kv(self, prefix, key, value):
+        # Skip Tags
+        if re.match(r'tag.*', key):
+            return
+        metric_name = (prefix + '.' + key).lower()
+        self.on_metric({
+            "metric": metric_name,
+            "value": value
+        })
+
+    def on_metric(self, metric):
+        metric["component"] = self.component
+
+        if Helper.is_number(metric["value"]):
+            self.collect(metric)
+
+        for listener in self.listeners:
+            listener.on_metric(metric.copy())
+
+    def __build_metric_prefix(self, mbean_attribute, context):
+        mbean_list = list(prop.split("=", 1) for prop in mbean_attribute.split(","))
+        if context == "":
+            metric_prefix_name = '.'.join([i[1] for i in mbean_list])
+        else:
+            name_index = [i[0] for i in mbean_list].index('name')
+            mbean_list[name_index][1] = context
+            metric_prefix_name = '.'.join([i[1] for i in mbean_list])
+        return ("hadoop." + metric_prefix_name).replace(" ", "").lower()
+
+
+class JmxMetricListener:
+    def init(self, collector):
+        self.collector = collector
+
+    def on_bean(self, bean):
+        pass
+
+    def on_metric(self, metric):
+        pass
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1965b4af/eagle-external/hadoop_jmx_collector/metric_extensions.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/metric_extensions.py b/eagle-external/hadoop_jmx_collector/metric_extensions.py
deleted file mode 100644
index 0028204..0000000
--- a/eagle-external/hadoop_jmx_collector/metric_extensions.py
+++ /dev/null
@@ -1,95 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-# !/usr/bin/python
-
-from util_func import *
-import json
-
-
-# Metric Parsing Callback Entry
-def single_metric_callback(producer, topic, kafka_dict, metric, value):
-    if isNumber(value):
-        numeric_metric_callack(producer, topic, kafka_dict, metric, value)
-    else:
-        nonnumeric_metric_callack(producer, topic, kafka_dict, metric, value)
-
-
-def metrics_bean_callback(producer, topic, metric, bean):
-    cal_mem_usage(producer, topic, bean, metric, "hadoop.namenode.jvm")
-    journal_transaction_info(producer, topic, bean, metric, "hadoop.namenode.journaltransaction")
-    nn_hastate(producer,topic,bean,metric,"hadoop.namenode.fsnamesystem")
-
-#################################################
-# Metric Parsing Extensions
-#################################################
-
-def numeric_metric_callack(producer, topic, kafka_dict, metric, value):
-    # Send out numeric value directly
-    send_output_message(producer, topic, kafka_dict, metric, value)
-
-
-def nonnumeric_metric_callack(producer, topic, kafka_dict, metric, value):
-    nn_safe_mode_metric(producer, topic, kafka_dict, metric, value)
-
-def nn_safe_mode_metric(producer, topic, kafka_dict, metric, value):
-    if metric == "hadoop.namenode.fsnamesystemstate.fsstate":
-        if value == "safeMode":
-            value = 1
-        else:
-            value = 0
-
-        send_output_message(producer, topic, kafka_dict, metric, value)
-
-def nn_hastate(producer, topic, bean, metricMap, metric_prefix_name="hadoop.namenode.fsnamesystem"):
-    kafka_dict = metricMap.copy()
-    if bean[u'tag.HAState'] == "active":
-        value = 0
-    else:
-        value = 1
-
-    send_output_message(producer, topic, kafka_dict, metric_prefix_name + ".hastate", value)
-
-def cal_mem_usage(producer, topic, bean, metricMap, metric_prefix_name):
-    kafka_dict = metricMap.copy()
-    PercentVal = None
-    PercentVal = round(float(bean['MemNonHeapUsedM']) / float(bean['MemNonHeapMaxM']) * 100.0,
2)
-    send_output_message(producer, topic, kafka_dict, metric_prefix_name + ".MemNonHeapUsedUsage",
PercentVal)
-
-    PercentVal = round(float(bean['MemNonHeapCommittedM']) / float(bean['MemNonHeapMaxM'])
* 100, 2)
-    send_output_message(producer, topic, kafka_dict, metric_prefix_name + ".MemNonHeapCommittedUsage",
PercentVal)
-
-    PercentVal = round(float(bean['MemHeapUsedM']) / float(bean['MemHeapMaxM']) * 100, 2)
-    send_output_message(producer, topic, kafka_dict, metric_prefix_name + ".MemHeapUsedUsage",
PercentVal)
-
-    PercentVal = round(float(bean['MemHeapCommittedM']) / float(bean['MemHeapMaxM']) * 100,
2)
-    send_output_message(producer, topic, kafka_dict, metric_prefix_name + ".MemHeapCommittedUsage",
PercentVal)
-
-
-def journal_transaction_info(producer, topic, bean, metric, metric_prefix_name):
-    new_metric = metric.copy()
-    if bean.has_key("JournalTransactionInfo"):
-        JournalTransactionInfo = json.loads(bean.get("JournalTransactionInfo"))
-
-        LastAppliedOrWrittenTxId = int(JournalTransactionInfo.get("LastAppliedOrWrittenTxId"))
-        MostRecentCheckpointTxId = int(JournalTransactionInfo.get("MostRecentCheckpointTxId"))
-
-        send_output_message(producer, topic, new_metric, metric_prefix_name + ".LastAppliedOrWrittenTxId",
-                            LastAppliedOrWrittenTxId)
-        send_output_message(producer, topic, new_metric, metric_prefix_name + ".MostRecentCheckpointTxId",
-                            MostRecentCheckpointTxId)
-    else:
-        raise Exception("JournalTransactionInfo not found")


Mime
View raw message