eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject incubator-eagle git commit: EAGLE-138 Extend JMX Collector to support "hadoop.namenode.JournalTransaction"
Date Fri, 22 Jan 2016 14:42:06 GMT
Repository: incubator-eagle
Updated Branches:
  refs/heads/master 13bb16cf4 -> 4688a9ccb


EAGLE-138 Extend JMX Collector to support "hadoop.namenode.JournalTransaction"

https://issues.apache.org/jira/browse/EAGLE-138

Closes #72


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/4688a9cc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/4688a9cc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/4688a9cc

Branch: refs/heads/master
Commit: 4688a9ccbf8a906ab9f394d456c7c46422cae4b3
Parents: 13bb16c
Author: Hao Chen <hao@apache.org>
Authored: Fri Jan 22 22:41:03 2016 +0800
Committer: Hao Chen <hao@apache.org>
Committed: Fri Jan 22 22:41:03 2016 +0800

----------------------------------------------------------------------
 .gitignore                                      |   1 +
 .../add_extended_metrics.py                     |  39 -------------
 .../add_extended_metrics.pyc                    | Bin 1432 -> 0 bytes
 eagle-external/hadoop_jmx_collector/config.json |  19 +++++++
 .../hadoop_jmx_collector/eagle-collector.conf   |  29 ----------
 .../hadoop_jmx_collector/hadoop_jmx_kafka.py    |  57 ++++++++-----------
 .../hadoop_jmx_collector/metric_extensions.py   |  52 +++++++++++++++++
 .../hadoop_jmx_collector/system_metric_kafka.py |   2 +-
 .../hadoop_jmx_collector/util_func.py           |  15 ++---
 .../hadoop_jmx_collector/util_func.pyc          | Bin 2619 -> 0 bytes
 10 files changed, 102 insertions(+), 112 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4688a9cc/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index c51d955..8a40f3a 100644
--- a/.gitignore
+++ b/.gitignore
@@ -75,3 +75,4 @@ logs/
 *.cache-tests
 
 *.orig
+**/*.pyc

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4688a9cc/eagle-external/hadoop_jmx_collector/add_extended_metrics.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/add_extended_metrics.py b/eagle-external/hadoop_jmx_collector/add_extended_metrics.py
deleted file mode 100644
index 9e0f105..0000000
--- a/eagle-external/hadoop_jmx_collector/add_extended_metrics.py
+++ /dev/null
@@ -1,39 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-#!/usr/bin/python
-
-from util_func import *
-
-
-def cal_mem_usage(producer, topic, bean, metricMap, metric_prefix_name):
-    kafka_dict = metricMap.copy()
-    PercentVal = None
-    PercentVal = round(float(bean['MemNonHeapUsedM']) / float(bean['MemNonHeapMaxM']) * 100.0,
2)
-    send_output_message(producer, topic, kafka_dict, metric_prefix_name + ".MemNonHeapUsedUsage",
PercentVal)
-
-    PercentVal = round(float(bean['MemNonHeapCommittedM']) / float(bean['MemNonHeapMaxM'])
* 100, 2)
-    send_output_message(producer, topic, kafka_dict, metric_prefix_name + ".MemNonHeapCommittedUsage",
PercentVal)
-
-    PercentVal = round(float(bean['MemHeapUsedM']) / float(bean['MemHeapMaxM']) * 100, 2)
-    send_output_message(producer, topic, kafka_dict, metric_prefix_name + ".MemHeapUsedUsage",
PercentVal)
-
-    PercentVal = round(float(bean['MemHeapCommittedM']) / float(bean['MemHeapMaxM']) * 100,
2)
-    send_output_message(producer, topic, kafka_dict, metric_prefix_name + ".MemHeapCommittedUsage",
PercentVal)
-
-
-def add_extended_metrics(producer, topic, metricMap, fat_bean):
-    cal_mem_usage(producer, topic, fat_bean, metricMap, "hadoop.namenode.jvm")

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4688a9cc/eagle-external/hadoop_jmx_collector/add_extended_metrics.pyc
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/add_extended_metrics.pyc b/eagle-external/hadoop_jmx_collector/add_extended_metrics.pyc
deleted file mode 100644
index f8ed8f2..0000000
Binary files a/eagle-external/hadoop_jmx_collector/add_extended_metrics.pyc and /dev/null
differ

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4688a9cc/eagle-external/hadoop_jmx_collector/config.json
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/config.json b/eagle-external/hadoop_jmx_collector/config.json
new file mode 100644
index 0000000..7ca926e
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/config.json
@@ -0,0 +1,19 @@
+{
+   "env": {
+    "site": "sandbox"
+   },
+   "input": {
+    "component": "namenode",
+    "port": "50070",
+    "https": false
+   },
+   "filter": {
+    "monitoring.group.selected": ["hadoop", "java.lang"]
+   },
+   "output": {
+     "kafka": {
+       "topic": "nn_jmx_metric_sandbox",
+       "brokerList": ["localhost:9092"]
+     }
+   }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4688a9cc/eagle-external/hadoop_jmx_collector/eagle-collector.conf
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/eagle-collector.conf b/eagle-external/hadoop_jmx_collector/eagle-collector.conf
deleted file mode 100644
index 24fac51..0000000
--- a/eagle-external/hadoop_jmx_collector/eagle-collector.conf
+++ /dev/null
@@ -1,29 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-{
-   "env": {
-    "site": "sandbox"
-   },
-   "input": {
-    "component": "namenode",
-    "port": "50070",
-    "https": false
-   },
-   "filter": {
-    "monitoring.group.selected": ["hadoop", "java.lang"]
-   },
-   "output": {
-   }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4688a9cc/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py b/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py
index 95569e5..8e51556 100644
--- a/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py
+++ b/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py
@@ -34,7 +34,7 @@ sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)),
'', 'li
 from kafka import KafkaClient, SimpleProducer, SimpleConsumer
 
 from util_func import *
-from add_extended_metrics import *
+from metric_extensions import *
 
 
 DATA_TYPE = "hadoop"
@@ -73,7 +73,7 @@ def get_metric_prefix_name(mbean_attribute, context):
     return DATA_TYPE + "." + metric_prefix_name
 
 
-def getHadoopData(producer, topic, config, beans, dataMap, fat_bean):
+def parse_hadoop_jmx(producer, topic, config, beans, dataMap, fat_bean):
     selected_group = [s.encode('utf-8') for s in config[u'filter'].get('monitoring.group.selected')]
     #print selected_group
 
@@ -89,9 +89,7 @@ def getHadoopData(producer, topic, config, beans, dataMap, fat_bean):
         if mbean_domain not in selected_group:
             # print "Unexpected mbean domain = %s on %s" % (mbean_domain, mbean)
             continue
-
         fat_bean.update(bean)
-
         context = bean.get("tag.Context", "")
         metric_prefix_name = get_metric_prefix_name(mbean_attribute, context)
 
@@ -116,10 +114,9 @@ def getHadoopData(producer, topic, config, beans, dataMap, fat_bean):
             send_output_message(producer, topic, kafka_dict, metric, value)
 
 
-def loadJmxData(host, inputConfig):
-    port = inputConfig.get('port')
-    https = inputConfig.get('https')
-
+def get_jmx_beans(host, port, https):
+    # port = inputConfig.get('port')
+    # https = inputConfig.get('https')
     url = host + ':' + port
     #print url
 
@@ -134,7 +131,6 @@ def loadJmxData(host, inputConfig):
 
     return beans
 
-
 def main():
     kafka = None
     producer = None
@@ -144,34 +140,32 @@ def main():
         #start = time.clock()
 
         # read the kafka.ini
-        config = loadConfigFile('eagle-collector.conf')
+        config = load_config('config.json')
         #print config
 
         site = config[u'env'].get('site').encode('utf-8')
         component = config[u'input'].get('component').encode('utf-8')
-        host = socket.getfqdn()
-        #host="10.249.66.185"
-
-        beans = loadJmxData(host, config[u'input'])
-
-        outputs = [s.encode('utf-8') for s in config[u'output']]
-        #print outputs
-
-        if('kafka' in outputs):
-            kafkaConfig = config[u'output'].get(u'kafka')
-            brokerList = kafkaConfig.get('brokerList')
-            topic = kafkaConfig.get('topic')
-            #print brokerList
-            kafka, producer = kafka_connect(brokerList)
 
-        dataMap = {"site": site, "host": host, "timestamp": '', "component": component, "metric":
'', "value": ''}
+        if config[u'input'].has_key("host"):
+            host = config[u'input'].get("host")
+        else:
+            host = socket.getfqdn()
+
+        port = config[u'input'].get('port')
+        https = config[u'input'].get('https')
+        kafkaConfig = config[u'output'].get(u'kafka')
+        brokerList = kafkaConfig.get('brokerList')
+        topic = kafkaConfig.get('topic')
+
+        beans = get_jmx_beans(host, port, https)
+        #print brokerList
+        kafka, producer = kafka_connect(brokerList)
+        default_metric = {"site": site, "host": host, "timestamp": '', "component": component,
"metric": '', "value": ''}
         fat_bean = dict()
-        getHadoopData(producer, topic, config, beans, dataMap, fat_bean)
-        add_extended_metrics(producer, topic, dataMap, fat_bean)
-
+        parse_hadoop_jmx(producer, topic, config, beans, default_metric, fat_bean)
+        extend_jmx_metrics(producer, topic, default_metric, fat_bean)
     except Exception, e:
         print 'main except: ', e
-
     finally:
         if kafka != None and producer != None:
             kafka_close(kafka, producer)
@@ -179,8 +173,5 @@ def main():
         #elapsed = (time.clock() - start)
         #print("Time used:",elapsed)
 
-
 if __name__ == "__main__":
-    main()
-
-
+    main()
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4688a9cc/eagle-external/hadoop_jmx_collector/metric_extensions.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/metric_extensions.py b/eagle-external/hadoop_jmx_collector/metric_extensions.py
new file mode 100644
index 0000000..4ea89c9
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/metric_extensions.py
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+#!/usr/bin/python
+
+from util_func import *
+import json
+
+def cal_mem_usage(producer, topic, bean, metricMap, metric_prefix_name):
+    kafka_dict = metricMap.copy()
+    PercentVal = None
+    PercentVal = round(float(bean['MemNonHeapUsedM']) / float(bean['MemNonHeapMaxM']) * 100.0,
2)
+    send_output_message(producer, topic, kafka_dict, metric_prefix_name + ".MemNonHeapUsedUsage",
PercentVal)
+
+    PercentVal = round(float(bean['MemNonHeapCommittedM']) / float(bean['MemNonHeapMaxM'])
* 100, 2)
+    send_output_message(producer, topic, kafka_dict, metric_prefix_name + ".MemNonHeapCommittedUsage",
PercentVal)
+
+    PercentVal = round(float(bean['MemHeapUsedM']) / float(bean['MemHeapMaxM']) * 100, 2)
+    send_output_message(producer, topic, kafka_dict, metric_prefix_name + ".MemHeapUsedUsage",
PercentVal)
+
+    PercentVal = round(float(bean['MemHeapCommittedM']) / float(bean['MemHeapMaxM']) * 100,
2)
+    send_output_message(producer, topic, kafka_dict, metric_prefix_name + ".MemHeapCommittedUsage",
PercentVal)
+
+def journal_transaction_info(producer, topic, bean, metric, metric_prefix_name):
+    new_metric = metric.copy()
+    if bean.has_key("JournalTransactionInfo"):
+        JournalTransactionInfo = json.loads(bean.get("JournalTransactionInfo"))
+
+        LastAppliedOrWrittenTxId = int(JournalTransactionInfo.get("LastAppliedOrWrittenTxId"))
+        MostRecentCheckpointTxId = int(JournalTransactionInfo.get("MostRecentCheckpointTxId"))
+
+        send_output_message(producer, topic, new_metric, metric_prefix_name + ".LastAppliedOrWrittenTxId",
LastAppliedOrWrittenTxId)
+        send_output_message(producer, topic, new_metric, metric_prefix_name + ".MostRecentCheckpointTxId",
MostRecentCheckpointTxId)
+    else:
+        raise Exception("JournalTransactionInfo not found")
+
+def extend_jmx_metrics(producer, topic, metric, bean):
+    cal_mem_usage(producer, topic, bean, metric, "hadoop.namenode.jvm")
+    journal_transaction_info(producer,topic,bean,metric,"hadoop.namenode.JournalTransaction")

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4688a9cc/eagle-external/hadoop_jmx_collector/system_metric_kafka.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/system_metric_kafka.py b/eagle-external/hadoop_jmx_collector/system_metric_kafka.py
index 40197eb..7d805d7 100644
--- a/eagle-external/hadoop_jmx_collector/system_metric_kafka.py
+++ b/eagle-external/hadoop_jmx_collector/system_metric_kafka.py
@@ -355,7 +355,7 @@ def main(argv):
     topic = None
     try:
         # read the kafka.ini
-        config = loadConfigFile('eagle-collector.conf')
+        config = load_config('config.json')
         print config
 
         site = config[u'env'].get('site').encode('utf-8')

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4688a9cc/eagle-external/hadoop_jmx_collector/util_func.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/util_func.py b/eagle-external/hadoop_jmx_collector/util_func.py
index 47ca240..5994a36 100644
--- a/eagle-external/hadoop_jmx_collector/util_func.py
+++ b/eagle-external/hadoop_jmx_collector/util_func.py
@@ -21,6 +21,7 @@ import re
 import time
 import json
 
+from kafka import KafkaClient, SimpleProducer, SimpleConsumer
 
 def kafka_connect(host):
     # To send messages synchronously
@@ -48,21 +49,15 @@ def send_output_message(producer, topic, kafka_dict, metric, value):
     else:
         print(kafka_json)
 
-
-def readFile(filename):
-    f = open(filename, 'r')
-    s = f.read()
-    f.close()
-    return s
-
-
-def loadConfigFile(filename):
+def load_config(filename):
     # read the self-defined filters
 
     script_dir = os.path.dirname(__file__)
     rel_path = "./" + filename
     abs_file_path = os.path.join(script_dir, rel_path)
-    json_file = readFile(abs_file_path)
+    f = open(filename, 'r')
+    json_file = f.read()
+    f.close()
     #print json_file
 
     try:

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4688a9cc/eagle-external/hadoop_jmx_collector/util_func.pyc
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/util_func.pyc b/eagle-external/hadoop_jmx_collector/util_func.pyc
deleted file mode 100644
index 9fca678..0000000
Binary files a/eagle-external/hadoop_jmx_collector/util_func.pyc and /dev/null differ


Mime
View raw message