eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yonzhang2...@apache.org
Subject incubator-eagle git commit: EAGLE-438 Multiple Inputs for Hadoop JMX Collector Python Script make jmx collection applicable to all hadoop components
Date Fri, 12 Aug 2016 21:50:59 GMT
Repository: incubator-eagle
Updated Branches:
  refs/heads/develop cc1261553 -> 502c7e37f


EAGLE-438 Multiple Inputs for Hadoop JMX Collector Python Script
make jmx collection applicable to all hadoop components

https://issues.apache.org/jira/browse/EAGLE-438

Author: @peterkim95 <Peter Kim>
Reviewer: Ralph Su

Closes: #316


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/502c7e37
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/502c7e37
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/502c7e37

Branch: refs/heads/develop
Commit: 502c7e37f842eabc4021c47053dd95bdd1c7efde
Parents: cc12615
Author: yonzhang <yonzhang2012@gmail.com>
Authored: Fri Aug 12 14:54:52 2016 -0700
Committer: yonzhang <yonzhang2012@gmail.com>
Committed: Fri Aug 12 14:54:52 2016 -0700

----------------------------------------------------------------------
 eagle-external/hadoop_jmx_collector/README.md   | 75 +++++++++++++-------
 .../hadoop_jmx_collector/config-sample.json     | 33 ++++++---
 .../hadoop_jmx_collector/hadoop_jmx_kafka.py    | 23 +++---
 .../hadoop_jmx_collector/metric_collector.py    | 37 +++++-----
 4 files changed, 106 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/502c7e37/eagle-external/hadoop_jmx_collector/README.md
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/README.md b/eagle-external/hadoop_jmx_collector/README.md
index cd8d887..bd89600 100644
--- a/eagle-external/hadoop_jmx_collector/README.md
+++ b/eagle-external/hadoop_jmx_collector/README.md
@@ -18,58 +18,81 @@ limitations under the License.
 -->
 
 
-# Hadoop Jmx Collector
+# Hadoop JMX Collector to Kafka
 
-These scripts help to collect Hadoop jmx and evently sent the metrics to stdout or Kafka.
Tested with Python 2.7.
+Python script to collect JMX metrics for any Hadoop component, and send it to Kafka topic
 
 ### How to use it
 
-  1. Edit the configuration file (json file). For example:
-  
+  1. Edit the configuration file config.json. For example:
+            ```
             {
              "env": {
               "site": "sandbox"
              },
-             "input": {
-              "component": "namenode",
-              "port": "50070",
-              "https": false
-             },
+             "inputs": [
+                {
+                  "component": "namenode",
+                  "host": "127.0.0.1",
+                  "port": "50070",
+                  "https": false,
+                  "kafka_topic": "nn_jmx_metric_sandbox"
+                },
+                {
+                  "component": "resourcemanager",
+                  "host": "127.0.0.1",
+                  "port": "8088",
+                  "https": false,
+                  "kafka_topic": "rm_jmx_metric_sandbox"
+                },
+                {
+                  "component": "datanode",
+                  "host": "127.0.0.1",
+                  "port": "50075",
+                  "https": false,
+                  "kafka_topic": "dn_jmx_metric_sandbox"
+                }
+             ],
              "filter": {
               "monitoring.group.selected": ["hadoop", "java.lang"]
              },
              "output": {
              }
             }
-
+            ```
   2. Run the scripts
-  
-        # for general use
-        python hadoop_jmx_kafka.py > 1.txt
 
+        ```
+        python hadoop_jmx_kafka.py
+        ```
+
+### Editing config.json
 
-### Edit `eagle-collector.conf`
+* inputs
 
-* input
+  "port" defines the hadoop service port, such as 50070 => "namenode", 16010 => "hbasemaster".
+  Like the example above, you can specify multiple hadoop components to collect
 
-  "port" defines the hadoop service port, such as 50070 => "namenode", 60010 => "hbase
master".
+  "https" is whether or not you want to use SSL protocol in your connection to the host+port
+
+  "kafka_topic" is the kafka topic that you want to populate with the jmx data from the respective
component
 
 * filter
 
   "monitoring.group.selected" can filter out beans which we care about.
 
-* output 
-  
-  if we left it empty, then the output is stdout by default. 
-
-        "output": {}
-        
-  It also supports Kafka as its output. 
+* output
 
+  You can specify the kafka broker list
+        ```
         "output": {
           "kafka": {
-            "topic": "test_topic",
-            "brokerList": [ "sandbox.hortonworks.com:6667"]
+            "brokerList": [ "localhost:9092"]
           }
         }
-      
+        ```
+
+  To check that the a desired kafka topic is being populated:
+    ```
+    kafka-console-consumer --zookeeper localhost:2181 --topic nn_jmx_metric_sandbox
+    ```

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/502c7e37/eagle-external/hadoop_jmx_collector/config-sample.json
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/config-sample.json b/eagle-external/hadoop_jmx_collector/config-sample.json
index 57c9aae..488b8e0 100644
--- a/eagle-external/hadoop_jmx_collector/config-sample.json
+++ b/eagle-external/hadoop_jmx_collector/config-sample.json
@@ -12,18 +12,35 @@
        "https":false
      }
    },
-   "input": {
-    "component": "namenode",
-    "port": "50070",
-    "https": false
-   },
+   "inputs": [
+      {
+        "component": "namenode",
+        "host": "server.eagle.apache.org",
+        "port": "50070",
+        "https": false,
+        "kafka_topic": "nn_jmx_metric_sandbox"
+      },
+      {
+        "component": "resourcemanager",
+        "host": "server.eagle.apache.org",
+        "port": "8088",
+        "https": false,
+        "kafka_topic": "rm_jmx_metric_sandbox"
+      },
+      {
+        "component": "datanode",
+        "host": "server.eagle.apache.org",
+        "port": "50075",
+        "https": false,
+        "kafka_topic": "dn_jmx_metric_sandbox"
+      }
+   ],
    "filter": {
     "monitoring.group.selected": ["hadoop", "java.lang"]
    },
    "output": {
      "kafka": {
-       "topic": "nn_jmx_metric_sandbox",
-       "brokerList": ["sandbox.hortonworks.com:6667"]
+       "brokerList": ["localhost:9092"]
      }
    }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/502c7e37/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py b/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py
index 286d0be..6eca7d9 100644
--- a/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py
+++ b/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py
@@ -16,8 +16,11 @@
 # limitations under the License.
 #
 
-from metric_collector import JmxMetricCollector,JmxMetricListener,Runner
+from metric_collector import JmxMetricCollector,JmxMetricListener,Runner,MetricCollector,Helper
 import json
+import logging
+
+logging.basicConfig(level=logging.INFO,format='%(asctime)s %(name)-12s %(levelname)-6s %(message)s',datefmt='%m-%d
%H:%M')
 
 class NNSafeModeMetric(JmxMetricListener):
     def on_metric(self, metric):
@@ -47,8 +50,7 @@ class MemortUsageMetric(JmxMetricListener):
         if bean["name"] == "Hadoop:service=NameNode,name=JvmMetrics":
             memnonheapusedusage = round(float(bean['MemNonHeapUsedM']) / float(bean['MemNonHeapMaxM'])
* 100.0, 2)
             self.collector.on_bean_kv(self.PREFIX, "memnonheapusedusage", memnonheapusedusage)
-            memnonheapcommittedusage = round(float(bean['MemNonHeapCommittedM']) / float(bean['MemNonHeapMaxM'])
* 100,
-                                             2)
+            memnonheapcommittedusage = round(float(bean['MemNonHeapCommittedM']) / float(bean['MemNonHeapMaxM'])
* 100, 2)
             self.collector.on_bean_kv(self.PREFIX, "memnonheapcommittedusage", memnonheapcommittedusage)
             memheapusedusage = round(float(bean['MemHeapUsedM']) / float(bean['MemHeapMaxM'])
* 100, 2)
             self.collector.on_bean_kv(self.PREFIX, "memheapusedusage", memheapusedusage)
@@ -77,12 +79,9 @@ class JournalTransactionInfoMetric(JmxMetricListener):
 
 
 if __name__ == '__main__':
-    collector = JmxMetricCollector()
-    collector.register(
-            NNSafeModeMetric(),
-            NNHAMetric(),
-            MemortUsageMetric(),
-            JournalTransactionInfoMetric(),
-            NNCapacityUsageMetric()
-    )
-    Runner.run(collector)
+    config = Helper.load_config()
+
+    for ip in config['inputs']:
+        collector = JmxMetricCollector(ip['component'], ip['host'], ip['port'], ip['https'],
ip['kafka_topic'])
+        collector.register(NNSafeModeMetric(), NNHAMetric(), MemortUsageMetric(), JournalTransactionInfoMetric(),
NNCapacityUsageMetric())
+        Runner.run(collector)

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/502c7e37/eagle-external/hadoop_jmx_collector/metric_collector.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/metric_collector.py b/eagle-external/hadoop_jmx_collector/metric_collector.py
index 939b811..79320cc 100644
--- a/eagle-external/hadoop_jmx_collector/metric_collector.py
+++ b/eagle-external/hadoop_jmx_collector/metric_collector.py
@@ -186,7 +186,7 @@ class KafkaMetricSender(MetricSender):
         super(KafkaMetricSender, self).__init__(config)
         kafka_config = config["output"]["kafka"]
         # default topic
-        self.topic = kafka_config["topic"].encode('utf-8')
+        # self.topic = kafka_config["topic"].encode('utf-8')
         # producer
         self.broker_list = kafka_config["brokerList"]
         self.kafka_client = None
@@ -197,8 +197,8 @@ class KafkaMetricSender(MetricSender):
         self.kafka_producer = SimpleProducer(self.kafka_client, batch_send=True, batch_send_every_n=500,
                                              batch_send_every_t=30)
 
-    def send(self, msg):
-        self.kafka_producer.send_messages(self.topic, json.dumps(msg))
+    def send(self, msg, topic):
+        self.kafka_producer.send_messages(topic, json.dumps(msg))
 
     def close(self):
         if self.kafka_producer is not None:
@@ -208,14 +208,21 @@ class KafkaMetricSender(MetricSender):
 
 
 class MetricCollector(threading.Thread):
-    def __init__(self):
+    def __init__(self, comp, host, port, https, topic):
         threading.Thread.__init__(self)
+
+        self.comp = comp
+        self.host = host
+        self.port = port
+        self.https = https
+        self.topic = topic
+
         self.config = Helper.load_config()
         self.sender = KafkaMetricSender(self.config)
         self.fqdn = socket.getfqdn()
-        self.init(self.config)
+        self.init(self.config, self.comp, self.host, self.port, self.https, self.topic)
 
-    def init(self, config):
+    def init(self, config, comp, host, port, https, topic):
         pass
 
     def start(self):
@@ -235,7 +242,7 @@ class MetricCollector(threading.Thread):
         if not msg.has_key("site"):
             msg["site"] = self.config["env"]["site"]
 
-        self.sender.send(msg)
+        self.sender.send(msg, self.topic)
 
     def run(self):
         raise Exception("`run` method should be overrode by sub-class before being called")
@@ -260,14 +267,12 @@ class JmxMetricCollector(MetricCollector):
     port = None
     listeners = []
 
-    def init(self, config):
-        if config["input"].has_key("host"):
-            self.host = config["input"]["host"]
-        else:
-            self.host = self.fqdn
-        self.port = config["input"]["port"]
-        self.https = config["input"]["https"]
-        self.component = config["input"]["component"]
+    def init(self, config, comp, host, port, https, topic):
+        self.host = host
+        self.port = port
+        self.https = https
+        self.component = comp
+        self.topic = topic
         self.selected_domain = [s.encode('utf-8') for s in config[u'filter'].get('monitoring.group.selected')]
         self.listeners = []
 
@@ -351,4 +356,4 @@ class JmxMetricListener:
         pass
 
     def on_metric(self, metric):
-        pass
\ No newline at end of file
+        pass


Mime
View raw message